基于 C++ 手写 Muduo 高性能网络库

大纲

前言

本文将基于 C++ 开发一个类似 Muduo 的高性能网络库,项目代码大部分都是从 Muduo 移值过来,同时去掉 Boost 依赖,并使用 C++ 11 进行代码重构,重点是学习 Muduo 的底层设计思想(尤其是 Multiple Reactors 模型)。

学习目标

  • 1、理解阻塞、非阻塞、同步、异步
  • 2、理解 Unix/Linux 上的五种 I/O 模型
  • 3、epoll 的原理以及优势
  • 4、深刻理解 Reactor 模型(基于 I/O 的事件驱动模型)
  • 5、从开源 C++ 网络库 Muduo 的源码中,学习优秀的代码设计
  • 6、掌握基于事件驱动和事件回调的 epoll + 线程池的面向对象编程
  • 7、通过深入理解 Muduo 源码,加深对于网络相关项目的深刻理解
  • 8、改造 Muduo,不依赖 Boost,使用 C++ 11 进行代码重构

知识储备

在使用 C++ 开发高性能的网路库之前,要求先掌握以下前置知识:

  • 1、TCP 协议和 UDP 协议
  • 2、Linux 的 TCP 网络编程和 UDP 网络编程
  • 3、I/O 多路复用编程,包括 select、poll、epoll 库的使用
  • 4、Linux 的多线程编程(pthread)、进程和线程模型
  • 5、C++ 20 标准新加入的协程支持

推荐阅读的书籍

《UNIX 环境高级编程》、《Linux 高性能服务器编程》、《 Linux 多线程服务端编程 - 使用 Muduo C++ 网络库》《鸟哥的 Linux 私房菜》

开发工具

软件版本说明
C++ 标准11C++ 标准的版本
G++(GCC)12.2.0建议使用 9 版本的 G++(GCC) 编译器
CMake3.25.1C/C++ 项目构建工具
LinuxDebian 12Muduo 库不支持 Windows 平台
Visual Studio Code1.100.2使用 VSCode 远程开发特性

基础概念

阻塞、非阻塞、同步、异步

提示

  • 下面提到的 "I/O 操作" 并不局限于网络 I/O,而是一个广义的 I/O 概念,它既包含网络 I/O(Socket 读写),也包含磁盘 I/O(文件读写)等所有涉及内核态与用户态之间数据交换的操作。
  • I/O 模型(如阻塞、非阻塞、同步、异步)更多用于讨论网络 I/O,原因是磁盘 I/O 的异步化由操作系统内核自动管理(页缓存 + 异步调度),应用层很少直接干预。
  • 同步与异步的区别

    • 同步:
      • 请求方 A 发起 I/O 调用后,由 A 自身完成数据的读写;
      • 无论阻塞与否,A 都要亲自执行数据的读写,将数据从内核缓冲区拷贝到用户空间(或反之)。
    • 异步:
      • 请求方 A 发起 I/O 调用后,仅仅发出请求,并由操作系统内核来完成数据的读写;
      • A 不需要等待操作完成,可以继续做其他事情;当操作系统内核完成读写操作后,会通过回调、事件通知等机制通知 A 结果。
  • 阻塞与非阻塞的区别

    • 阻塞:
      • 调用未完成前,调用线程会一直等待;
    • 非阻塞:
      • 调用立即返回,即使操作未完成,也会返回错误码或状态提示(例如 EAGAIN)。
  • 典型的一次 I/O 操作可以分为两个阶段

    • 数据准备(阶段一):该阶段取决于系统 I/O 操作的就绪状态,即数据是否已经可以被读写
      • 阻塞:调用会等待数据准备好后再继续执行。
      • 非阻塞:调用会立即返回,无论数据是否就绪。
    • 数据读写(阶段二):该阶段取决于应用程序与操作系统内核之间的交互方式
      • 同步:由应用程序主动完成数据的读写,将数据从内核缓冲区拷贝到用户空间(或反之)。
      • 异步:由操作系统内核完成数据的读写,并在操作完成后通知应用程序。

总结

  • 同步 / 异步区分的是谁来完成 I/O 读写(调用方自己还是操作系统内核来完成数据读写)。
  • 阻塞 / 非阻塞区分的是调用方等待的方式(是否挂起等待处理结果)。

常见的四种 I/O 模型

I/O 模型数据准备阶段数据读写阶段调用方行为示例说明
同步阻塞阻塞等待数据准备好调用方执行读写整个过程会阻塞当前线程int size = recv(fd, buf, 1024, 0);(若无数据则阻塞等待)
同步非阻塞非阻塞轮询数据准备好调用方执行读写调用立即返回,但需要反复尝试调用设置 O_NONBLOCK,多次调用 recv() 检查是否有数据可读
异步阻塞阻塞等待事件完成操作系统内核完成读写等待通知,但数据读写由操作系统内核完成例如 Windows OVERLAPPED I/O + GetOverlappedResult 阻塞等待
异步非阻塞非阻塞提交请求操作系统内核完成读写并通知完全不阻塞,结果通过回调 / 事件返回例如 Linux aio_read()io_uring 提交请求后立即返回

陈硕大神的原话:在处理 I/O 的时候,阻塞和非阻塞都是同步 I/O,只有使用了特殊的 API 才是异步 I/O(如下图所示)。

特别注意

  • select / poll / epoll 本身只是事件就绪通知机制,它们并不直接完成数据读写,调用它们的线程仍然需要自己去 read()write() 数据。
  • 因此,从严格意义上看,它们属于同步 I/O 实现方式,因为最终的 I/O 读写(即数据读写)是由调用线程自己完成的。
  • 但它们提供了非阻塞的事件等待,使得一个线程可以同时监听多个 fd,而不用一个线程阻塞在一个 fd 上。
  • 真正的异步 I/O 实现,在 Linux 上需要使用 aio_* 系列系统函数或者使用 io_uring

Unix/Linux 的五种 I/O 模型

Unix/Linux 支持以下五种 I/O 模型:

I/O 模型阻塞 / 非阻塞事件通知方式适用场景
阻塞 I/O 阻塞同步返回简单程序,低并发
非阻塞 I/O 非阻塞轮询少量 I/O,CPU 可支撑
I/O 多路复用阻塞或非阻塞操作系统内核返回就绪事件列表高并发网络服务器
信号驱动 I/O 非阻塞信号小规模异步通知
异步 I/O 非阻塞回调 / 事件高并发、对延迟敏感场景

阻塞 I/O(Blocking I/O)

  • 特征:应用程序调用 I/O 函数后,如果数据未就绪,调用线程会被阻塞,直到数据准备完成。
  • 优点:编程实现简单、逻辑直观。
  • 缺点:线程无法同时处理多个 I/O,吞吐量受限。

非阻塞 I/O(Non-Blocking I/O)

  • 特征:I/O 调用立即返回,即使数据未就绪也不会阻塞。应用程序需要通过轮询(Polling)或循环检查,目的是不断检测数据是否已经就绪,以便及时进行数据读写操作。
  • 优点:单线程可以处理多个 I/O。
  • 缺点:轮询会浪费 CPU 资源,逻辑较复杂。

I/O 多路复用(I/O Multiplexing)

  • 典型机制:selectpollepoll
  • 特征:单个线程可以同时监听多个 fd,通过操作系统内核返回就绪事件列表,再进行读写操作。
  • 优点:高效管理大量并发连接,避免轮询浪费。
  • 缺点:处理非常大量 fd 时,某些实现(如 selectpoll)效率有限。
  • 注意:在 I/O 多路复用中,复用的线程而不是 TCP 连接。由于最终的 I/O 读写(即数据读写)是由调用线程自己完成的,因此从严格意义上看,I/O 多路复用属于同步 I/O 实现方式。

信号驱动 I/O(Signal-Driven I/O)

  • 特征:应用程序注册信号处理函数(如 SIGIO),当 fd 可读或可写时,操作系统内核发送信号通知。
  • 优点:异步通知,无需轮询。
  • 缺点:信号处理复杂,信号丢失或竞态问题较多,不易大规模使用。
  • 注意:操作系统内核在第一个阶段(数据准备)是异步,在第二个阶段(数据读写)是同步;与非阻塞 I/O 的区别在于它提供了消息通知机制,不需要用户进程不断的轮询检查,减少了系统 API 的调用次数,提高了效率。

异步 I/O(Asynchronous I/O)

  • 特征:应用程序发起 I/O 调用后,立即返回;当数据准备好后,由操作系统内核完成数据读写;当数据读写操作完成后,通过信号、回调函数或事件机制通知应用程序。
  • 优点:真正的异步,高效利用 CPU,可处理大量并发 I/O。
  • 缺点:编程复杂,Linux 支持有限(传统 AIO 对网络 I/O 支持不好,io_uring 是新方案)。
  • 注意:这是真正的异步 I/O 实现,在 Linux 上需要使用 aio_* 系列系统函数或者使用 io_uring,Node.js 采用了该 I/O 模型。

优秀的网络服务器设计

在这个 CPU 多核时代,服务端网络编程如何选择线程模型呢?赞同 libev 作者的观点:”one loop perthread is usually a good model”,这样多线程服务端编程的问题就转换为如何设计一个高效且易于使用的 Event Loop, 然后每个线程运行一个 Event Loop 就行了(当然,线程间的同步、互斥少不了,还有其它的耗时事件需要起另外的线程来做)。Event Loop 是 Non-Blocking 网络编程的核心,可以简单理解为 Non-Blocking + epoll + thread-pool 的结合。在实际应用中,Non-Blocking 几乎总是与 I/O Multiplexing 一起使用,原因有以下两点:

  • 实际上没有人会采用轮询(Busy-Polling)方式不断检查某个 Non-Blocking I/O 操作是否完成,因为这会严重浪费 CPU 资源。
  • I/O Multiplexing 通常无法与 Blocking I/O 一起使用,因为在 Blocking I/O 中,accept()connect()read()write() 等调用都有可能阻塞当前线程,从而导致线程无法继续处理其他 Socket 上的 I/O 事件。

所以,当日常提到 Non-Blocking I/O 时,实际上指的是 Non-Blocking + I/O Multiplexing(如 epoll + thread-pool) 的组合,如何单独使用其中任意一种,都无法很好地实现高效的网络 I/O。


在网络编程领域中,主流的网络 I/O 模型有以下几种(不限于),Muduo 采用的是第四种(reactors in threads - one loop per thread)。

  • (1) accept + read/write

    • 不适用于并发服务器
  • (2) accept + fork - process-pre-connection

    • 适合并发连接数不大,计算任务工作量大于 Fork 的开销。
  • (3) accept + thread - thread-pre-connection

    • 比第二种网络 I/O 模型的开销小了一点,但是并发造成的线程堆积过多。
  • (4) reactors in threads - one loop per thread

    • 这是 Muduo 库的网络设计方案,底层实质上是基于 Linux 的 epoll + pthread 线程池实现,且依赖了 Boost 库,适用于并发连接数较大的场景。
    • 有一个 Main Reactor 负载 Accept 连接,然后将连接分发给某个 SubReactor(采用轮询的方式来选择 SubReactor),该连接的所用操作都在那个 SubReactor 所处的线程中完成。多个连接可能被分派到多个线程中被处理,以充分利用 CPU。
    • Main Reactor 中有一个 Base I/O Thread 负责 Accept 新的连接,接收到新的连接以后,使用轮询的方式在 Reactor Pool 中找到合适的 SubReactor 将这个连接挂载上去,这个连接上的所有任务都在这个 SubReactor 所处的线程中完成。
    • Reactor Poll 的大小是固定的,根据 CPU 的核心数量来确定。如果有过多的耗费 CPU 资源的计算任务,可以提交到 ThreadPool 线程池中专门处理耗时的计算任务。
  • (5) reactors in process - one loop pre process

    • 这是 Nginx 服务器的网络设计方案,基于进程设计,采用多个 Reactors 充当 I/O 进程和工作进程,通过一个 accept 锁,完美解决多个 Reactors 之间的 “惊群现象”。

reactors in process + fork 不如 reactors in threads 吗?

答案肯定是否定的,强大的 Nginx 服务器采用了 reactors in process 模型作为网络模块的架构设计,实现了简单好用的负载算法,使各个 fork 网络进程不会忙的越忙、闲的越闲,并且通过引入一把乐观锁解决了该模型导致的服务器惊群现象,功能十分强大。

Reactor 网络 I/O 模型

Reactor 模型的介绍

维基百科对 Reactor 的描述

The reactor design pattern is an event handling pattern for handling service requestsdelivered concurrently to a service handler by one or more inputs. The service handlerthen demultiplexes the incoming requests and dispatches them synchronously to theassociated request handlers. 翻译后:Reactor(反应器)设计模式是一种事件处理模式,用于处理由一个或多个输入并发传递到服务处理器的服务请求。然后,服务处理器对传入的请求进行多路分解,并同步地将它们分派给相应的请求处理器。

  • Reactor 是一种基于事件驱动(Event Driven)的网络 I/O 模型,核心思想是:

    • 主线程(或 I/O 线程)通过 I/O 多路复用(I/O Multiplexing)机制(如 selectpollepoll),监听多个连接的 I/O 事件。
    • 当某个事件就绪后,再分发(Dispatch)给对应的事件处理器(EventHandler)进行处理。
  • Reactor 虽然是网络 I/O 模型,但它通常与线程模型结合使用:

    • 单线程 Reactor:所有 I/O 事件的监听与处理都在同一个线程中完成。
    • 多线程 Reactor:I/O 事件的监听与业务处理分离,通常用线程池来处理业务逻辑。
    • 主从 Reactor:主 Reactor(即 MainReactor)负责连接建立,从 Reactor(即 SubReactor)负责 I/O 读写(即数据读写),结合多线程提升并发性能。
  • Reactor 的五大核心组件:

    核心组件作用 Muduo 网络库中对应的核心类
    Event(事件)表示 I/O 事件的抽象,如连接建立、可读、可写等,用于描述发生了什么类型的网络事件。Channel
    Demultiplexer(事件分离器)负责监听并检测多个 I/O 事件的就绪状态(通常由 selectpollepoll 等系统调用实现),并将已就绪的事件返回给 Reactor。PollerEPollPoller
    Reactor(反应堆)事件分发器,负责从 Demultiplexer 获取就绪事件,并将事件分发给对应的 EventHandler 处理。EventLoop
    EventHandler(事件处理器)负责具体的事件处理逻辑,如读、写、连接、业务处理等,是应用层的回调逻辑。回调函数 + TcpConnectionhandleRead() / handleWrite()
    Acceptor(连接接收器)负责监听服务器端口并接收新的客户端连接,在多 Reactor 模型中通常独立运行,仅负责建立连接并将连接交给子 Reactor 处理。Acceptor
  • Reactor 核心组件的工作流程:

  • Muduo 库的 Multiple Reactors 模型:

Reactor 模型与 Proactor 模型的区别

  • Reactor 模型与 Proactor 模型的主要区别
模型内核通知的事件谁负责实际 I/O 读写用户线程需要做什么
Reactor 可以读 / 可以写用户线程做读写用户线程收到可读 / 可写通知后,调用 read / write,并处理数据
Proactor 读完了 / 写完了内核做读写 (异步完成读 / 写后,再通知用户线程)用户线程收到读 / 写完成通知后,直接处理已读 / 已写的数据
  • 常见库 / 系统采用模型对比
库 / 系统模型平台
MuduoReactorLinux
Netty(NIO)ReactorLinux
libevent / libevReactorLinux
Boost.Asio(Linux)ReactorLinux
IOCPProactorWindows
Boost.Asio(Windows)ProactorWindows
  • 为什么 Linux 基本用不到 Proactor?
    • 因为 Linux 的 aio 不是真正意义上的内核异步 I/O:
      • 文件 I/O 是异步的
      • 网络 I/O 仍然是阻塞式的(内核不自动读取)
    • 所以 Linux 上的高性能网络库几乎都是:
      • epoll(Reactor)
      • epoll + thread pool(高级 Reactor)

I/O 多路复用技术概述

跨平台特性的对比

技术是否支持跨平台支持的平台特点
select✅ 广泛跨平台 Linux / macOS / BSD / Windows / Unix 最老的接口,POSIX 标准定义
poll⚠️ 支持类 Unix 跨平台(不支持 Windows)Linux /macOS/ BSD / Solaris 等select 的改进版 ,无 fd 数量限制
epoll❌ Linux 独有仅 Linux(2.6+)高性能 I/O 多路复用技术
kqueue❌ BSD /macOS 独有 FreeBSD / macOS / NetBSD / OpenBSDepoll 的 BSD 平台对应物

select 与 poll 的缺点

I/O 多路复用技术 select 有以下缺点:

  • (1) 文件描述符数量限制:

    • 单个进程可监视的文件描述符数量存在上限,通常为 1024(可修改)。但由于 select 采用轮询扫描方式检查文件描述符,随着监视数量的增加,性能会明显下降。
    • 在 Linux 内核头文件中有如下定义:#define __FD_SETSIZE 1024
  • (2) 内核与用户空间的数据拷贝开销大:

    • 每次调用 select 都需要在内核空间与用户空间之间复制大量的文件描述符集合,这会造成显著的性能开销。
  • (3) 结果集遍历效率低:

    • select 返回的是一个包含所有文件描述符的数组,应用程序需要遍历整个数组才能判断哪些描述符处于就绪状态,效率较低。
  • (4) 水平触发机制(Level Trigger):

    • select 采用水平触发方式,如果应用程序没有及时处理已就绪的文件描述符,那么在后续的每次 select 调用中,这些描述符仍会被重复通知。

I/O 多路复用技术 pollselect 相比,使用链表来保存文件描述符,不再受文件描述符数量上限的限制,但仍然存在与 select 相同的其他三个缺点(数据拷贝开销大、结果集遍历效率低、水平触发),这里不再累述。

select 无法支持高并发连接

select 为例,若服务器需支持 100 万并发连接,在 __FD_SETSIZE 为 1024 的情况下,至少需要创建约 1000 个进程才能满足要求。如此不仅会带来大量的进程上下文切换开销,还会因频繁的内核空间 / 用户空间句柄拷贝与数组遍历操作,导致系统性能急剧下降。因此,基于 select 模型的服务器要实现百万级并发几乎是不可能的。

epoll 的原理以及优势

设想这样一个场景:有 100 万个客户端同时与一个服务器进程保持 TCP 连接,但在任意时刻,通常只有几百到上千个连接是活跃的(这也是现实中最常见的情况)。如何高效地支撑如此庞大的并发连接呢?在 select / poll 时代,服务器每次调用都需要将这 100 万个连接的文件描述符从用户态复制到内核态,让内核轮询这些套接字上是否有事件发生;轮询完成后,再将结果从内核态复制回用户态,供应用程序继续遍历处理。这种方式带来了巨大的内存拷贝和遍历开销,因此基于 select / poll 通常只能处理几千个并发连接。


epoll 的设计思想与 select 完全不同,因此它们的缺点在 epoll 中已不复存在。epoll 在 Linux 内核中引入了一种专用的事件管理机制,通过红黑树(用于管理所有已注册的文件描述符)和就绪链表(用于管理已触发事件的文件描述符)来组织事件,大幅降低了事件查找和分发的开销,使大规模并发连接的事件管理更加高效。

  • (1) epoll_create():创建一个 epoll 对象(内核在 epoll 文件系统中为该对象分配资源)。
  • (2) epoll_ctl():向 epoll 对象中添加、修改或删除需要监听的套接字(例如 100 万个 TCP 连接)。
  • (3) epoll_wait():等待并收集有事件发生的文件描述符。

其中 epoll_create() 在内核上创建的 eventpoll 结构如下:

1
2
3
4
5
6
7
8
9
10
11
struct eventpoll {
....(省略)

/* 红黑树的根节点,这颗树中存储着所有添加到 epoll 中的需要监控的事件 */
struct rb_root rbr;

/* 双链表中则存放着将要通过 epoll_wait() 返回给用户的满足条件的事件 */
struct list_head rdlist;

....(省略)
}

得益于这种设计,只需在服务器启动时创建一次 epoll 对象,然后在连接建立或关闭时动态地添加或移除对应的套接字即可。更重要的是,epoll_wait() 的调用效率极高:

  • 它不需要在每次调用时复制所有文件描述符。
  • 内核也无需遍历全部连接,而是通过回调机制主动将就绪的文件描述符加入到就绪队列中。

因此,epoll 能够在单进程中轻松支撑数十万甚至上百万级的并发连接,这正是它区别于 select / poll 的根本优势所在。

epoll 的 LT 模式与 ET 模式

epoll 支持 LT(水平触发)与 ET(边缘触发),而 selectpoll 在设计上只支持 LT(水平触发),没有 ET(边缘触发)的概念。

  • LT 模式(Level Triggered,水平触发)

    • 语义:只要 fd 上有数据未被读取完,就会一直被 epoll 通知。
    • 特点:更 “宽松”,即使一次没读完,下次还会被提醒。
    • 行为示例:
      • 缓冲区有 100 字节可读;
      • 应用程序只读了 60 字节;
      • 下次 epoll_wait() 还会再次返回该 fd
    • 优点:编程简单、不易漏数据。
    • 缺点:频繁触发,效率略低。
  • ET 模式(Edge Triggered,边缘触发)

    • 语义:只有当状态发生变化(从无到有)时才触发一次事件。
    • 特点:仅在 “边缘” 通知,比如缓冲区从空变为非空。
    • 行为示例:
      • 缓冲区变为可读时触发;
      • 应用程序必须一次性读完所有数据(直到返回 EAGAIN);
      • 如果应用程序没读完,下次不会再收到通知。
    • 优点:减少系统调用次数,效率高。
    • 缺点:编程复杂,稍有疏忽就可能会 “丢事件”。
  • Muduo 采用的是 LT(水平触发)模式

    • 不会丢失数据或者消息
      • 应用程序没有读取完数据,内核是会不断上报数据的
    • 低延迟处理
      • 每次读数据只需要一次系统调用,照顾了多个连接的公平性,不会因为某个连接上的数据量过大而影响其他连接处理消息
    • 跨平台处理
      • select 一样可以跨平台使用

项目介绍

项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
c++-project-mymuduo
├── autobuild.sh
├── bin
├── build
├── CMakeLists.txt
├── example
│   ├── CMakeLists.txt
│   ├── epoll
│   │   ├── CMakeLists.txt
│   │   └── main.cc
│   └── mymuduo
│   ├── ChatClient.cc
│   ├── ChatClient.h
│   ├── ChatServer.cc
│   ├── ChatServer.h
│   ├── CMakeLists.txt
│   └── main.cc
├── lib
├── README.md
├── src
│   ├── include
│   │   ├── Acceptor.h
│   │   ├── Buffer.h
│   │   ├── Callbacks.h
│   │   ├── Channel.h
│   │   ├── Connector.h
│   │   ├── copyable.h
│   │   ├── CurrentThread.h
│   │   ├── EPollPoller.h
│   │   ├── EventLoop.h
│   │   ├── EventLoopThread.h
│   │   ├── EventLoopThreadPool.h
│   │   ├── InetAddress.h
│   │   ├── Logger.h
│   │   ├── noncopyable.h
│   │   ├── Poller.h
│   │   ├── Socket.h
│   │   ├── SocketsOps.h
│   │   ├── TcpClient.h
│   │   ├── TcpConnection.h
│   │   ├── TcpServer.h
│   │   ├── Thread.h
│   │   └── Timestamp.h
│   ├── Acceptor.cc
│   ├── Buffer.cc
│   ├── Channel.cc
│   ├── CMakeLists.txt
│   ├── Connector.cc
│   ├── CurrentThread.cc
│   ├── DefaultPoller.cc
│   ├── EPollPoller.cc
│   ├── EventLoop.cc
│   ├── EventLoopThread.cc
│   ├── EventLoopThreadPool.cc
│   ├── InetAddress.cc
│   ├── Logger.cc
│   ├── Poller.cc
│   ├── Socket.cc
│   ├── SocketsOps.cc
│   ├── TcpClient.cc
│   ├── TcpConnection.cc
│   ├── TcpServer.cc
│   ├── Thread.cc
│   └── Timestamp.cc
└── test
├── CMakeLists.txt
└── main.cc
目录名称目录说明
buildCMake 编译构建项目的目录(项目首次编译后才会有)
bin存放项目编译生成的可执行文件的目录(项目首次编译后才会有)
lib存放项目编译生成的 MyMuduo 动态链接库的目录(项目首次编译后才会有)
srcMyMuduo 网络库的源码
src/includeMyMuduo 网络库的头文件
testMyMuduo 网络库的的测试代码
example各种案例代码
example/epollepoll 的使用案例代码
example/mymuduoMyMuduo 网络库的使用案例代码
autobuild.sh项目一键编译构建的脚本文件

项目技术栈

基于 C++ 开发网络库时,使用到以下技术:

  • 单例设计模式
  • epoll 等 I/O 多路复用技术
  • Linux 网络编程基础(socket()bind()listen()accept()readv()write() 等)
  • C++ 11 多线程编程(std::threadstd::unique_lockstd::mutexstd::condition_variable 等)
  • 使用 CMake 构建与集成项目的编译环境

项目整体架构

架构图说明

在上述的架构图中,mainLoop 运行在主线程,负责监听新 TCP 连接并分发给 subLoop;而 subLoop(也称 ioLoop)运行在子线程,负责处理 TCP 连接的具体 I/O 事件(比如,读和写等)。mainLoop 与 subLoop 通过 pendingFunctors 异步任务队列进行线程间通信,禁止直接跨线程操作,这是为了保证某个 TCP 连接的所有 I/O 事件和连接状态操作都在同一个线程中执行,从而保证线程安全。在 Muduo 库的 Multiple Reactors 模型,mainLoop 对应的就是 mainReactor(主 Reactor),而 subLoop 对应的就是 subReactor(子 Reactor)。

项目代码

代码下载

本文开发的 MyMuduo 网络库只实现了 Muduo 的核心功能,并不支持 Muduo 的定时事件机制(TimerQueue)、IPV6 / DNS / HTTP / RPC 协议等,完整的项目代码可以在 这里 下载得到。

copyable

  • copyable.h
1
2
3
4
5
6
7
8
9
10
#pragma once

/**
* copyable 类被继承以后,派生类对象可以正常地执行构造和析构操作,同时派生类对象还可以进行拷贝构造和赋值操作
*/
class copyable {
protected:
copyable() = default;
~copyable() = default;
};

noncopyable

  • noncopyable.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#pragma once

/**
* noncopyable 类被继承以后,派生类对象可以正常地执行构造和析构操作,但是派生类对象不能进行拷贝构造和赋值操作
*/
class noncopyable {
public:
noncopyable(const noncopyable&) = delete;
void operator=(const noncopyable&) = delete;

protected:
noncopyable() = default;
~noncopyable() = default;
};

Logger

  • Logger.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#pragma once

#include <string>
#include <thread>

#include "CurrentThread.h"
#include "noncopyable.h"

// 定义宏
#define LOG_DEBUG(logmsgformat, ...) \
do { \
Logger& logger = Logger::instance(); \
if (logger.getLogLevel() <= DEBUG) { \
char c[1024] = {0}; \
snprintf(c, 1024, logmsgformat, ##__VA_ARGS__); \
int tid = CurrentThread::tid(); \
LogMessage msg = {DEBUG, c, tid}; \
logger.log(msg); \
} \
} while (0)

#define LOG_INFO(logmsgformat, ...) \
do { \
Logger& logger = Logger::instance(); \
if (logger.getLogLevel() <= INFO) { \
char c[1024] = {0}; \
snprintf(c, 1024, logmsgformat, ##__VA_ARGS__); \
int tid = CurrentThread::tid(); \
LogMessage msg = {INFO, c, tid}; \
logger.log(msg); \
} \
} while (0)

#define LOG_WARN(logmsgformat, ...) \
do { \
Logger& logger = Logger::instance(); \
if (logger.getLogLevel() <= WARN) { \
char c[1024] = {0}; \
snprintf(c, 1024, logmsgformat, ##__VA_ARGS__); \
int tid = CurrentThread::tid(); \
LogMessage msg = {WARN, c, tid}; \
logger.log(msg); \
} \
} while (0)

#define LOG_ERROR(logmsgformat, ...) \
do { \
Logger& logger = Logger::instance(); \
if (logger.getLogLevel() <= ERROR) { \
char c[1024] = {0}; \
snprintf(c, 1024, logmsgformat, ##__VA_ARGS__); \
int tid = CurrentThread::tid(); \
LogMessage msg = {ERROR, c, tid}; \
logger.log(msg); \
} \
} while (0)

#define LOG_FATAL(logmsgformat, ...) \
do { \
Logger& logger = Logger::instance(); \
if (logger.getLogLevel() <= FATAL) { \
char c[1024] = {0}; \
snprintf(c, 1024, logmsgformat, ##__VA_ARGS__); \
int tid = CurrentThread::tid(); \
LogMessage msg = {FATAL, c, tid}; \
logger.log(msg); \
std::this_thread::sleep_for(std::chrono::seconds(1)); \
exit(-1); \
} \
} while (0)

// 日志级别(DEBUG < INFO < WARN < ERROR < FATAL)
enum LogLevel {
DEBUG, // 调试日志信息
INFO, // 普通日志信息
WARN, // 警告日志信息
ERROR, // 错误日志信息
FATAL // 致命错误信息
};

// 日志信息
struct LogMessage {
LogLevel logLevel_; // 日志级别
std::string logContent_; // 日志内容
int threadId_; // 打印日志的线程的 ID
};

// 日志类(单例模式)
class Logger : noncopyable {
public:
// 获取单例对象
static Logger& instance();

// 输出日志信息
void log(const LogMessage& message);

// 获取日志级别
LogLevel getLogLevel();

// 设置日志级别
void setLogLevel(LogLevel level);

private:
// 私有构造函数
Logger();

// 私有析构函数
~Logger();

// 获取日志级别的名称
std::string logLevelToString(LogLevel level);

LogLevel logLevel_; // 记录日志级别
};
  • Logger.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include "Logger.h"

#include <sstream>

#include "Timestamp.h"

// 定义宏(设置 Debug 模式)
#ifdef MYMUDUO_DEBUG
constexpr bool kIsDebugMode = true;
#else
constexpr bool kIsDebugMode = false;
#endif

// 定义宏(跨平台获取当前调用的函数名称)
#if defined(__GNUC__) || defined(__clang__)
#define FUNC_NAME __PRETTY_FUNCTION__
#elif defined(_MSC_VER)
#define FUNC_NAME __FUNCSIG__
#else
#define FUNC_NAME __func__
#endif

// 构造函数
Logger::Logger() {
// 设置默认的日志级别
this->logLevel_ = !kIsDebugMode ? INFO : DEBUG;
}

// 析构函数
Logger::~Logger() {
}

// 获取单例对象
Logger& Logger::instance() {
// 局部静态变量(线程安全)
static Logger logger;
return logger;
}

// 输出日志信息
void Logger::log(const LogMessage& message) {
// 首先在外面构建好完整的字符串(避免多次 << 竞争)
std::ostringstream oss;
oss << Timestamp::now().toString() << " => " << message.threadId_ << " [" << logLevelToString(message.logLevel_)
<< "] " << message.logContent_ << '\n';

std::string s = oss.str();

// 然后一次性写入,不使用 std::endl(避免隐式 flush)
std::fwrite(s.data(), 1, s.size(), stdout);
}

// 设置日志级别
void Logger::setLogLevel(LogLevel level) {
this->logLevel_ = level;
}

// 获取日志级别
LogLevel Logger::getLogLevel() {
return this->logLevel_;
}

// 获取日志级别的名称
std::string Logger::logLevelToString(LogLevel level) {
switch (level) {
case DEBUG:
return "DEBUG";
case INFO:
return "INFO";
case WARN:
return "WARN";
case ERROR:
return "ERROR";
case FATAL:
return "FATAL";
default:
return "UNKNOWN";
}
}

Timestamp

  • Timestamp.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#pragma once

#include <iostream>

#include "copyable.h"

// 时间戳类
class Timestamp : public copyable {
public:
// 默认构造函数,初始化为 0 微秒
Timestamp();

// 构造函数,使用微秒数进行初始化
explicit Timestamp(int64_t microSecondsSinceEpochArg);

// 将时间戳转换为字符串表示(比如 2025-11-16 17:45:30)
std::string toString() const;

// 获取当前时间戳
static Timestamp now();

private:
int64_t microSecondsSinceEpoch_; // 自纪元(1970年1月1日)以来的微秒数
};
  • Timestamp.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include "Timestamp.h"

#include <time.h>

// 默认构造函数,初始化为 0 微秒
Timestamp::Timestamp()
: microSecondsSinceEpoch_(0){

};

// 构造函数,使用微秒数进行初始化
Timestamp::Timestamp(int64_t microSecondsSinceEpochArg)
: microSecondsSinceEpoch_(microSecondsSinceEpochArg){

};

// 将时间戳转换为字符串表示(比如 2025-11-16 17:45:30)
std::string Timestamp::toString() const {
char buf[128] = {0};
tm *tm_time = localtime(&microSecondsSinceEpoch_);
snprintf(buf, 128, "%4d-%02d-%02d %02d:%02d:%02d", tm_time->tm_year + 1900, tm_time->tm_mon + 1, tm_time->tm_mday,
tm_time->tm_hour, tm_time->tm_min, tm_time->tm_sec);
return buf;
}

// 获取当前时间戳
Timestamp Timestamp::now() {
return Timestamp(time(NULL));
}

InetAddress

  • InetAddress.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#pragma once

#include <netinet/in.h>

#include <iostream>

#include "copyable.h"

// 网络地址类
class InetAddress : public copyable {
public:
// 构造函数
explicit InetAddress(uint16_t port = 0, std::string ip = "127.0.0.1");

// 构造函数
explicit InetAddress(const sockaddr_in& addr);

// 获取 IP 地址字符串
std::string toIp() const;

// 获取 IP 地址和端口号字符串(比如 127.0.0.1:8080)
std::string toIpPort() const;

// 获取端口号
uint16_t toPort() const;

// 获取底层的 sockaddr_in 结构体指针
const sockaddr_in* getSockAddr() const;

// 设置底层的 sockaddr_in 结构体
void setSockAddr(const sockaddr_in& addr);

private:
sockaddr_in addr_; // 底层的 sockaddr_in 结构体
};
  • InetAddress.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include "InetAddress.h"

#include <arpa/inet.h>
#include <string.h>
#include <strings.h>

// 构造函数
InetAddress::InetAddress(uint16_t port, std::string ip) {
bzero(&addr_, sizeof addr_);
addr_.sin_family = AF_INET;
addr_.sin_port = htons(port);
addr_.sin_addr.s_addr = inet_addr(ip.c_str());
}

// 构造函数
InetAddress::InetAddress(const sockaddr_in& addr) {
this->addr_ = addr;
}

// 获取 IP 地址字符串
std::string InetAddress::toIp() const {
char buf[64] = {0};
::inet_ntop(AF_INET, &addr_.sin_addr, buf, sizeof buf);
return buf;
}

// 获取 IP 地址和端口号字符串(比如 127.0.0.1:8080)
std::string InetAddress::toIpPort() const {
char buf[64] = {0};
::inet_ntop(AF_INET, &addr_.sin_addr, buf, sizeof buf);
size_t end = strlen(buf);
uint16_t port = ntohs(addr_.sin_port);
sprintf(buf + end, ":%u", port);
return buf;
}

// 获取端口号
uint16_t InetAddress::toPort() const {
return ntohs(addr_.sin_port);
}

// 获取底层的 sockaddr_in 结构体指针
const sockaddr_in* InetAddress::getSockAddr() const {
return &addr_;
}

// 设置底层的 sockaddr_in 结构体
void InetAddress::setSockAddr(const sockaddr_in& addr) {
addr_ = addr;
}

SocketsOps

  • SocketsOps.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#pragma once

#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>

#include "Logger.h"

// 创建非阻塞的 Socket
int createNonblockingSocket();

// 获取 Socket 错误码
int getSocketError(int sockfd);

// 判断是否为自连接
bool isSelfConnect(int sockfd);

// 获取本端地址
sockaddr_in getLocalAddr(int sockfd);

// 获取对端地址
sockaddr_in getPeerAddr(int sockfd);
  • SocketsOps.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#include "SocketsOps.h"

#include <strings.h>

#include "Logger.h"

// 创建非阻塞的 Socket
int createNonblockingSocket() {
int sockfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_TCP);
if (sockfd < 0) {
LOG_FATAL("%s => create nonblock sockfd failed, errno:%d", __PRETTY_FUNCTION__, errno);
}
return sockfd;
}

// 获取 Socket 错误码
int getSocketError(int sockfd) {
int optval;
socklen_t optlen = sizeof optval;
if (::getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0) {
return errno;
} else {
return optval;
}
}

// 获取本端地址
sockaddr_in getLocalAddr(int sockfd) {
sockaddr_in localaddr;
bzero(&localaddr, sizeof(localaddr));
socklen_t addrlen = sizeof(localaddr);
if (::getsockname(sockfd, (sockaddr*)(&localaddr), &addrlen) < 0) {
LOG_ERROR("%s => get socket name failed, errno:%d", __PRETTY_FUNCTION__, errno);
}
return localaddr;
}

// 获取对端地址
sockaddr_in getPeerAddr(int sockfd) {
sockaddr_in peeraddr;
bzero(&peeraddr, sizeof(peeraddr));
socklen_t addrlen = sizeof(peeraddr);
if (::getpeername(sockfd, (sockaddr*)(&peeraddr), &addrlen) < 0) {
LOG_ERROR("%s => get peer name failed, errno:%d", __PRETTY_FUNCTION__, errno);
}
return peeraddr;
}

// 判断是否为自连接
bool isSelfConnect(int sockfd) {
sockaddr_in localaddr;
sockaddr_in peeraddr;
socklen_t addrlen = sizeof(sockaddr_in);

// 获取本端地址
if (getsockname(sockfd, (sockaddr*)&localaddr, &addrlen) < 0) {
return false;
}

// 获取对端地址
if (getpeername(sockfd, (sockaddr*)&peeraddr, &addrlen) < 0) {
return false;
}

// 必须都是 IPv4
if (localaddr.sin_family != AF_INET || peeraddr.sin_family != AF_INET) {
return false;
}

// 检查 IP + 端口是否完全相同
return (localaddr.sin_port == peeraddr.sin_port) && (localaddr.sin_addr.s_addr == peeraddr.sin_addr.s_addr);
}

Channel

  • Channel.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#pragma once

#include <functional>
#include <iostream>
#include <memory>

#include "Timestamp.h"
#include "noncopyable.h"

// 类前置声明
class EventLoop;

/**
* Channel 可以理解为通道,封装了 socket fd 和其感兴趣的 event(事件),比如 EPOLLIN、EPOLLOUT 事件,还绑定了 Poller,返回的具体事件
*/
class Channel : noncopyable {
public:
// 事件回调函数类型定义
using EventCallback = std::function<void()>;

// 读事件的回调函数类型定义
using ReadEventCallback = std::function<void(Timestamp)>;

// 构造函数
Channel(EventLoop* loop, int fd);

// 析构函数
~Channel();

// fd 得到 poller 通知以后,处理事件的函数
void handleEvent(Timestamp receiveTime);

/********** 设置事件的回调操作 **********/

void setReadCallback(ReadEventCallback cb) {
readCallback_ = std::move(cb);
}

void setWriteCallback(EventCallback cb) {
writeCallback_ = std::move(cb);
}

void setCloseCallback(EventCallback cb) {
closeCallback_ = std::move(cb);
}

void setErrorCallback(EventCallback cb) {
errorCallback_ = std::move(cb);
}

/********** 获取和设置 fd 和 events **********/

// 获取 socket 的 fd
int fd() const {
return fd_;
}

// 获取 fd 感兴趣的事件
int events() {
return events_;
}

// 设置 fd 上发生的具体事件
void set_revents(int revent) {
revents_ = revent;
}

/********** 设置 fd 相应的事件状态 **********/

// 开启监听 fd 上的读事件
void enableReading() {
events_ |= kReadEvent;
update();
}

// 关闭监听 fd 上的读事件
void disableReading() {
events_ &= ~kReadEvent;
update();
}

// 开启监听 fd 上的写事件
void enableWriting() {
events_ |= kWriteEvent;
update();
}

// 关闭监听 fd 上的写事件
void disableWriting() {
events_ &= ~kWriteEvent;
update();
}

// 禁止监听 fd 上的所有事件(读 + 写)
void disableAll() {
events_ = kNoneEvent;
update();
}

/********** 获取 fd 当前的事件状态 **********/

// 判断当前是否没有监听任何事件(既不读也不写)
bool isNoneEvent() const {
return events_ == kNoneEvent;
}

// 判断当前是否正在监听写事件
bool isWriting() const {
return events_ & kWriteEvent;
}

// 判断当前是否正在监听读事件
bool isReading() const {
return events_ & kReadEvent;
}

// 返回当前 Channel 在 Poller 中的状态
int index() {
return index_;
}

// 设置当前 Channel 在 Poller 中的状态
void set_index(int index) {
index_ = index;
}

// 防止当 Channel 被手动 remove 掉后,Channel 还在执行事件的回调操作
void tie(const std::shared_ptr<void>& obj);

// 从 Poller 中删除当前 Channel
void remove();

private:
// 更新 Channel 状态到 Poller 中
void update();

// 处理事件,有了 guard 之后,Channel 就不会在被手动 remove 掉后还继续执行事件的回调操作
void handleEventWithGuard(Timestamp receiveTime);

// 定义 Channel 支持的事件类型
static const int kNoneEvent; // 无事件
static const int kReadEvent; // 读事件
static const int kWriteEvent; // 写事件

EventLoop* loop_; // Channel 所属的事件循环
const int fd_; // fd,是 Poller 监听的对象
int events_; // 注册 fd 上感兴趣的事件
int revents_; // poller 返回的 fd 上具体发生的事件
int index_; // 标记 Channel 在 Poller 中的状态

std::weak_ptr<void> tie_; // 用于防止 Channel 被手动 remove 掉后,Channel 还在执行事件的回调操作
bool tied_; // 标记是否已绑定 tie_

// Channel 里面能够获知 fd 上最终发生的具体事件(revents_),所以由它负责调用具体事件的回调操作(即事件分发)
ReadEventCallback readCallback_; // 读事件的回调函数
EventCallback writeCallback_; // 写事件的回调函数
EventCallback closeCallback_; // 关闭事件的回调函数
EventCallback errorCallback_; // 错误事件的回调函数
};
  • Channel.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#include "Channel.h"

#include <sys/epoll.h>

#include "EventLoop.h"
#include "Logger.h"

// 定义 Channel 支持的事件类型(与 Epoll 兼容)
const int Channel::kNoneEvent = 0;
const int Channel::kReadEvent = EPOLLIN | EPOLLPRI;
const int Channel::kWriteEvent = EPOLLOUT;

// 构造函数
Channel::Channel(EventLoop* loop, int fd) : loop_(loop), fd_(fd), events_(0), revents_(0), index_(-1), tied_(false) {
}

// 析构函数
Channel::~Channel() {
}

// 防止当 Channel 被手动 remove 掉后,Channel 还在执行事件的回调操作
void Channel::tie(const std::shared_ptr<void>& obj) {
tie_ = obj;
tied_ = true;
}

// 从 Poller 中删除当前 Channel
void Channel::remove() {
// 通过 Channel 所属的 EventLoop,将当前的 Channel 删除掉
loop_->removeChannel(this);
}

// 更新 Channel 状态到 Poller 中
void Channel::update() {
// 通过 Channel 所属的 EventLoop,调用 Poller 相应的方法,注册 fd 的感兴趣的事件(events_)
loop_->updateChannel(this);
}

// fd 得到 poller 通知以后,处理事件的函数
void Channel::handleEvent(Timestamp receiveTime) {
if (tied_) {
std::shared_ptr<void> guad = tie_.lock();
if (guad) {
handleEventWithGuard(receiveTime);
}
} else {
handleEventWithGuard(receiveTime);
}
}

/**
* 处理事件,有了 guard 之后,Channel 就不会在被手动 remove 掉后还继续执行事件的回调操作了
*
* EPOLLIN:可读,文件描述符有数据可读且读取不会阻塞(如 socket 或 pipe 有数据)。
* EPOLLOUT:可写,文件描述符可以写入且不会阻塞(如 socket 可发送数据、pipe 可写入)。
* EPOLLERR:错误,文件描述符发生错误,无法正常读写(如 TCP reset、I/O 错误)。
* EPOLLHUP:挂断,文件描述符被挂断(如对端关闭连接)。注意:通常与 EPOLLIN 一起出现。
* EPOLLPRI:紧急数据,文件描述符有优先数据(TCP OOB 或特殊设备的紧急数据)。
*/
void Channel::handleEventWithGuard(Timestamp receiveTime) {
LOG_DEBUG("channel handle event, revents: %d", revents_);

// 发生挂断事件且没有读事件发生
if ((revents_ & EPOLLHUP) && !(revents_ & EPOLLIN)) {
if (closeCallback_) {
closeCallback_();
}
}

// 发生错误事件
if (revents_ & EPOLLERR) {
if (errorCallback_) {
errorCallback_();
}
}

// 发生读事件
if (revents_ & (EPOLLIN | EPOLLPRI | EPOLLHUP)) {
if (readCallback_) {
readCallback_(receiveTime);
}
}

// 发生写事件
if (revents_ & EPOLLOUT) {
if (writeCallback_) {
writeCallback_();
}
}
}

Poller

  • Poller.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#pragma once

#include <iostream>
#include <unordered_map>
#include <vector>

#include "Timestamp.h"
#include "noncopyable.h"

// 类前置声明
class Channel;
class EventLoop;

/**
* I/O 多路复用器抽象类
*/
class Poller : noncopyable {
public:
// Channel 列表类型定义
using ChannelList = std::vector<Channel*>;

// 构造函数
Poller(EventLoop* loop);

// 虚析构函数
virtual ~Poller();

/********** 统一定义所有 I/O 多路复用器的接口 **********/

// 监听就绪事件,返回活跃的 Channel 列表
virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0;

// 更新 Channel
virtual void updateChannel(Channel* channel) = 0;

// 移除 Channel
virtual void removeChannel(Channel* channel) = 0;

// 判断 Poller 中是否存在某个 Channel
virtual bool hasChannel(Channel* channel) const;

/********** 创建 I/O 多路复用器实例 **********/

static Poller* newDefaultPoller(EventLoop* loop);

protected:
// Channel 集合的类型定义,key 是 fd,而 value 是 fd 所属的 Channel
using ChannelMap = std::unordered_map<int, Channel*>;

ChannelMap channels_; // 保存所有的 Channel

private:
EventLoop* owerLoop_; // Poller 所属的事件循环
};
  • Poller.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include "Poller.h"

#include "Channel.h"

// 构造函数
Poller::Poller(EventLoop* loop) : owerLoop_(loop) {
}

// 虚析构函数
Poller::~Poller() {
}

// 判断 Poller 中是否存在某个 Channel
bool Poller::hasChannel(Channel* channel) const {
auto iterator = channels_.find(channel->fd());
return iterator != channels_.end() && iterator->second == channel;
}
  • DefaultPoller.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <stdlib.h>

#include "EPollPoller.h"
#include "Logger.h"
#include "Poller.h"

// 创建默认的 I/O 多路复用器
Poller* Poller::newDefaultPoller(EventLoop* loop) {
if (::getenv("MYMUDUO_USE_POLL")) {
// 创建 Poll 的实例
LOG_FATAL("not support poll, only support epoll");
return nullptr;
} else {
// 创建 Epoll 的实例
return new EPollPoller(loop);
}
}

Epoller

  • EPollPoller.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#pragma once

#include <sys/epoll.h>

#include <vector>

#include "EventLoop.h"
#include "Poller.h"
#include "Timestamp.h"

// 基于 Epoll 的 I/O 多路复用器
class EPollPoller : public Poller {
public:
// 构造函数
EPollPoller(EventLoop* loop);

// 析构函数
~EPollPoller() override;

// 监听就绪事件,返回活跃的 Channel 列表
Timestamp poll(int timeoutMs, ChannelList* activeChannels) override;

// 更新 Channel
void updateChannel(Channel* channel) override;

// 移除 Channel
void removeChannel(Channel* channel) override;

private:
// Epoll 事件列表的初始大小
static const int kInitEventListSize;

// 填充活跃的 Channel 列表
void fillActiveChannels(int numEvents, ChannelList* activeChannels) const;

/**
* 更新 Channel,其中 operation 参数的值有以下几种
* EPOLL_CTL_ADD 添加 fd 到 Epoll 实例
* EPOLL_CTL_DEL 从 Epoll 实例中删除 fd
* EPOLL_CTL_MOD 修改 fd 的监听事件
*/
void update(int operation, Channel* channel);

// Epoll 事件列表类型定义
using EventList = std::vector<::epoll_event>;

int epollfd_; // Epoll 文件描述符(Epoll 监听的对象)
EventList events_; // Epoll 事件列表
};
  • EPollPoller.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
#include "EPollPoller.h"

#include <strings.h>

#include "Channel.h"
#include "Logger.h"
#include "error.h"
#include "unistd.h"

// 定义 Epoll 事件列表的初始大小
const int EPollPoller::kInitEventListSize = 16;

// 定义 Channel 在 Epoll 中的状态
const int kNew = -1; // 新创建的 Channel
const int kAdded = 1; // 已经添加到 Epoll 中的 Channel
const int kDeleted = 2; // 已经从 Epoll 中移除的 Channel

// 构造函数
EPollPoller::EPollPoller(EventLoop* loop)
: Poller(loop), epollfd_(::epoll_create1(EPOLL_CLOEXEC)), events_(kInitEventListSize) {
// 如果创建 Epoll 文件描述符失败,则记录日志并终止程序
if (epollfd_ < 0) {
LOG_FATAL("%s => epoll_create1() error:%d", __PRETTY_FUNCTION__, errno);
}
}

// 析构函数
EPollPoller::~EPollPoller() {
// 关闭 Epoll 文件描述符
::close(epollfd_);
}

// 监听就绪事件,返回活跃的 Channel 列表
Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels) {
// 打印日志信息
LOG_DEBUG("%s => fd total count:%lu", __PRETTY_FUNCTION__, channels_.size());

// 监听就绪事件,会阻塞当前线程,超时等待返回 0(表示本次等待期间没有任何就绪事件发生)
int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), timeoutMs);

// 保存错误码
int savedErrno = errno;

// 获取当前时间戳
Timestamp now(Timestamp::now());

// 如果有就绪事件发生
if (numEvents > 0) {
// 打印日志信息
LOG_DEBUG("%s => epoll happend %d events", __PRETTY_FUNCTION__, numEvents);

// 填充活跃的 Channel 列表
fillActiveChannels(numEvents, activeChannels);

// 如果本次监听返回的就绪事件数量等于当前 Epoll 事件列表的大小,则将 Epoll 事件列表的容量扩大一倍
if (numEvents == events_.size()) {
events_.resize(events_.size() * 2);
}
}
// 如果监听超时没有任何就绪事件发生
else if (numEvents == 0) {
LOG_DEBUG("%s => epoll wait timeout, nothing happened", __PRETTY_FUNCTION__);
}
// 如果监听出错
else {
// 只有在错误码不是 EINTR(系统调用被中断)时,才记录错误日志
if (savedErrno != EINTR) {
// 恢复错误码
errno = savedErrno;
// 打印日志信息
LOG_ERROR("%s => epoll wait error", __PRETTY_FUNCTION__);
}
}

return now;
}

// 填充活跃的 Channel 列表
void EPollPoller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const {
// 遍历所有就绪的事件
for (int i = 0; i < numEvents; ++i) {
// 获取就绪的 Channel
Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
// 设置 Channel 上发生的具体事件
channel->set_revents(events_[i].events);
// 将就绪的 Channel 添加到活跃的 Channel 列表中
activeChannels->push_back(channel);
}
}

// 更新 Channel
void EPollPoller::updateChannel(Channel* channel) {
// 获取 Channel 在 Epoll 中的状态
const int index = channel->index();

// 打印日志信息
LOG_DEBUG("%s => fd=%d events=%d index=%d", __PRETTY_FUNCTION__, channel->fd(), channel->events(), index);

if (index == kNew || index == kDeleted) {
if (index == kNew) {
// 获取 socket 的 fd
int fd = channel->fd();
// 将 Channel 添加到 Channel 集合中
channels_[fd] = channel;
}
// 更新 Channel 在 Epoll 中的状态
channel->set_index(kAdded);
update(EPOLL_CTL_ADD, channel);
} else {
// 获取 socket 的 fd
int fd = channel->fd();
// 如果当前没有任何事件感兴趣,则将 Channel 从 Epoll 中删除
if (channel->isNoneEvent()) {
update(EPOLL_CTL_DEL, channel);
channel->set_index(kDeleted);
}
// 否则,更新 Channel 的状态
else {
update(EPOLL_CTL_MOD, channel);
}
}
}

// 更新 Channel
void EPollPoller::update(int operation, Channel* channel) {
// 获取 socket 的 fd
int fd = channel->fd();

// Epoll 事件
::epoll_event event;
bzero(&event, sizeof event);
event.data.ptr = channel;
event.events = channel->events();

// 设置 fd 相应的 Epoll 事件(使用 Channel 中记录的 interests)
if (::epoll_ctl(epollfd_, operation, fd, &event) < 0) {
if (operation == EPOLL_CTL_DEL) {
LOG_ERROR("epoll_ctl delete error:%d", errno);
} else {
LOG_FATAL("epoll_ctl add or mod error:%d", errno);
}
}
}

// 移除 Channel
void EPollPoller::removeChannel(Channel* channel) {
// 获取 socket 的 fd
int fd = channel->fd();

// 从 Channel 集合中将 fd 对应的 Channel 移除掉
channels_.erase(fd);

// 打印日志信息
LOG_DEBUG("%s => fd=%d", __PRETTY_FUNCTION__, fd);

// 获取 Channel 在 Epoll 中的状态
int index = channel->index();
if (index == kAdded) {
// 更新 Channel
update(EPOLL_CTL_DEL, channel);
}

// 更新 Channel 在 Epoll 中的状态
channel->set_index(kNew);
}

EventLoop

  • EventLoop.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#pragma once

#include <atomic>
#include <functional>
#include <iostream>
#include <memory>
#include <mutex>
#include <vector>

#include "Timestamp.h"
#include "noncopyable.h"

// 类前置声明
class Channel;
class Poller;

// 事件循环类
class EventLoop : noncopyable {
public:
// 回调函数类型定义
using Functor = std::function<void()>;

// 构造函数
EventLoop();

// 析构函数
~EventLoop();

// 开启事件循环
void loop();

// 退出事件循环
void quit();

// 获取 Poller 返回发生事件的时间点
Timestamp pollReturnTime() const;

// 在当前 EventLoop 所在的线程执行回调操作
void runInLoop(Functor cb);

// 将回调操作添加到队列中,唤醒 EventLoop 所在的线程执行回调操作
void queueInLoop(Functor cb);

// 唤醒 EventLoop 所在的线程
void wakeup();

// 更新 Channel
void updateChannel(Channel* channel);

// 移除 Channel
void removeChannel(Channel* channel);

// 判断 EventLoop 中是否存在某个 Channel
bool hasChannel(Channel* channel);

// 判断当前线程是否是 EventLoop 所在的线程
bool isInLoopThread() const;

// 如果当前线程不是 EventLoop 所在的线程,则触发断言失败
void assertInLoopThread();

// 如果当前线程不是 EventLoop 所在的线程,则中止程序运行
void abortNotInLoopThread();

private:
// 处理 Wakeup Channel 的读事件
void handleRead();

// 执行当前 EventLoop 需要执行的回调操作
void doPendingFunctors();

// Channel 列表的类型定义
using ChannelList = std::vector<Channel*>;

std::atomic_bool looping_; // 事件循环状态
std::atomic_bool quit_; // 标识退出 EventLoop 循环

const pid_t threadId_; // 记录当前 EventLoop 所在的线程的 ID
Timestamp pollReturnTime_; // 记录 Poller 返回发生事件的时间点
std::unique_ptr<Poller> poller_; // EventLoop 使用的 Poller(I/O 多路复用器)

int wakeupFd_; // 用于唤醒 EventLoop 所在的线程的 fd
std::unique_ptr<Channel> wakeupChannel_; // 用于唤醒 EventLoop 所在的线程的 Channel

ChannelList activeChannels_; // 保存 Poller 返回的活跃的 Channel 列表

std::atomic_bool callingPendingFunctors_; // 标识当前 EventLoop 是否正在执行回调操作
std::vector<Functor> pendingFunctors_; // 保存当前 EventLoop 需要执行的所有回调操作
std::mutex mutex_; // 保证 pendingFunctors_ 容器线程安全的互斥锁
};
  • EventLoop.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
#include "EventLoop.h"

#include <sys/eventfd.h>
#include <unistd.h>

#include <memory>

#include "Channel.h"
#include "CurrentThread.h"
#include "Logger.h"
#include "Poller.h"

// 定义线程局部变量(thread-local),用于防止一个线程创建多个 EventLoop
__thread EventLoop* t_loopInThisThread = nullptr;

// 定义 Poller(I/O 多路复用器)的默认超时时间,比如 10 秒
const int kPollTimeMs = 10000;

// 创建 wakeupFd,用来 Notify(唤醒)SubReactor 处理新来的 Channel
int createEventFd() {
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evtfd < 0) {
LOG_FATAL("%s => eventfd error:%d", __PRETTY_FUNCTION__, errno);
}
return evtfd;
}

// 构造函数
EventLoop::EventLoop()
: looping_(false),
quit_(false),
callingPendingFunctors_(false),
threadId_(CurrentThread::tid()),
poller_(Poller::newDefaultPoller(this)),
wakeupFd_(createEventFd()),
wakeupChannel_(new Channel(this, wakeupFd_)) {
// 打印日志信息
LOG_DEBUG("%s => EventLoop created %p in thread %d", __PRETTY_FUNCTION__, this, threadId_);

// 防止一个线程创建多个 EventLoop
if (t_loopInThisThread) {
LOG_FATAL("%s => Another EventLoop existed in this thread %d", __PRETTY_FUNCTION__, threadId_);
} else {
// 将当前 EventLoop 对象赋值给线程局部变量
t_loopInThisThread = this;
}

// 设置 Wakeup Channel 的读事件回调函数
wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));

// 启用 Wakeup Channel 的读事件监听
wakeupChannel_->enableReading();
}

// 析构函数
EventLoop::~EventLoop() {
LOG_DEBUG("%s => EventLoop %p of thread %d destructs in thread", __PRETTY_FUNCTION__, this, CurrentThread::tid());
// 关闭 Wakeup Channel
wakeupChannel_->disableAll();
// 移除 Wakeup Channel
wakeupChannel_->remove();
// 关闭 wakeupFd_
::close(wakeupFd_);
// 重置线程局部变量
t_loopInThisThread = nullptr;
}

// 开启事件循环
void EventLoop::loop() {
// 标记事件循环开始
looping_ = true;

// 标记退出事件循环的状态
quit_ = false;

// 打印日志信息
LOG_DEBUG("%s => EventLoop %p start looping", __PRETTY_FUNCTION__, this);

while (!quit_) {
activeChannels_.clear();
// Poller 会阻塞监听有哪些 Channel 发生了事件,然后上报给 EventLoop,通知 Channel 处理相应的事件
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
for (Channel* channel : activeChannels_) {
channel->handleEvent(pollReturnTime_);
}
// 执行当前 EventLoop 需要处理的回调操作
doPendingFunctors();
}

// 打印日志信息
LOG_DEBUG("%s => EventLoop %p stop looping", __PRETTY_FUNCTION__, this);

// 标记事件循环结束
looping_ = false;
}

// 退出事件循环
void EventLoop::quit() {
// 标记退出事件循环的状态
quit_ = true;

// 如果不是在当前 EventLoop 所在的线程上调用的 quit() 方法,则需要唤醒 EventLoop 所在的线程
if (!isInLoopThread()) {
wakeup();
}
}

// 唤醒 EventLoop 所在的线程
void EventLoop::wakeup() {
uint64_t one = 1;
// 向 wakeupFd_ 写一个数据,wakeupChannel_ 就会发生读事件,当前的 EventLoop 就会被唤醒
ssize_t n = ::write(wakeupFd_, &one, sizeof one);
if (n != sizeof one) {
LOG_ERROR("%s write %zd bytes instead of 8", __PRETTY_FUNCTION__, n);
}
}

// 获取 Poller 返回发生事件的时间点
Timestamp EventLoop::pollReturnTime() const {
return pollReturnTime_;
}

// 判断当前线程是否是 EventLoop 所在的线程
bool EventLoop::isInLoopThread() const {
return threadId_ == CurrentThread::tid();
}

// 如果当前线程不是 EventLoop 所在的线程,则触发断言失败
void EventLoop::assertInLoopThread() {
if (!isInLoopThread()) {
abortNotInLoopThread();
}
}

// 如果当前线程不是 EventLoop 所在的线程,则中止程序运行
void EventLoop::abortNotInLoopThread() {
LOG_FATAL("%s => EventLoop %p was created in threadId_ = %d, current thread id = %d", __PRETTY_FUNCTION__, this,
threadId_, CurrentThread::tid());
}

// 在当前 EventLoop 所在的线程上执行回调操作
void EventLoop::runInLoop(Functor cb) {
// 如果在 EventLoop 所在的线程上执行回调操作
if (isInLoopThread()) {
// 则直接执行回调操作
cb();
} else {
// 否则,将回调操作添加到队列中,并唤醒 EventLoop 所在的线程执行回调操作
queueInLoop(std::move(cb));
}
}

// 将回调操作添加到队列中,并唤醒 EventLoop 所在的线程执行回调操作
void EventLoop::queueInLoop(Functor cb) {
{
// 将回调操作添加到队列中(需要保证线程安全)
std::unique_lock<std::mutex> lock(mutex_);
pendingFunctors_.emplace_back(cb);
}

// 如果不是在当前 EventLoop 所在的线程上执行回调操作,或者当前 EventLoop 正在执行回调操作
if (!isInLoopThread() || callingPendingFunctors_) {
// 则唤醒当前 EventLoop 所在的线程去执行回调操作
wakeup();
}
}

// 更新 Channel
void EventLoop::updateChannel(Channel* channel) {
poller_->updateChannel(channel);
}

// 移除 Channel
void EventLoop::removeChannel(Channel* channel) {
poller_->removeChannel(channel);
}

// 判断 EventLoop 中是否存在某个 Channel
bool EventLoop::hasChannel(Channel* channel) {
return poller_->hasChannel(channel);
}

// 处理 Wakeup Channel 的读事件
void EventLoop::handleRead() {
uint64_t one = 1;
ssize_t n = ::read(wakeupFd_, &one, sizeof one);
if (n != sizeof one) {
LOG_ERROR("%s reads %zd bytes instead of 8", __PRETTY_FUNCTION__, n);
}
}

// 执行当前 EventLoop 需要执行的回调操作
void EventLoop::doPendingFunctors() {
std::vector<Functor> functors;

// 标记当前 EventLoop 正在执行回调操作
callingPendingFunctors_ = true;

{
std::unique_lock<std::mutex> lock(mutex_);
// 将需要执行的回调操作交换到局部变量 functors 中,以减少锁的持有时间,提高运行效率
functors.swap(pendingFunctors_);
}

// 执行当前 EventLoop 需要执行的回调操作
for (const Functor& functor : functors) {
functor();
}

// 标记当前 EventLoop 已经执行完回调操作
callingPendingFunctors_ = false;
}

Thread

  • Thread.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#pragma once

#include <atomic>
#include <functional>
#include <iostream>
#include <memory>
#include <thread>

#include "noncopyable.h"

// 线程类
class Thread : noncopyable {
public:
// 线程执行函数的类型定义
using ThreadFunc = std::function<void()>;

// 构造函数
explicit Thread(ThreadFunc func, const std::string& name = std::string());

// 析构函数
~Thread();

// 启动线程
void start();

// 等待线程执行结束
void join();

// 获取线程 ID
pid_t tid();

// 获取线程名称
const std::string& name() const;

// 获取已创建的线程数量
static int numCreated();

private:
// 设置线程的默认名称
void setDefaultName();

bool started_; // 标记线程是否已启动
bool joined_; // 标记线程是否已经 join,防止重复 join 或析构时未 join
std::shared_ptr<std::thread> thread_; // 线程对象
pid_t tid_; // 线程 ID
ThreadFunc func_; // 线程执行函数
std::string name_; // 线程名称
static std::atomic_int numCreated_; // 已创建的线程数量
};
  • Thread.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#include "Thread.h"

#include <semaphore.h>

#include <string>

#include "CurrentThread.h"

std::atomic_int Thread::numCreated_(0);

// 构造函数
Thread::Thread(ThreadFunc func, const std::string& name)
: started_(false), joined_(false), tid_(0), func_(std::move(func)), name_(name) {
}

// 析构函数
Thread::~Thread() {
// 如果线程已启动且未被 join
if (started_ && !joined_) {
// 设置分离线程(避免资源泄露)
thread_->detach();
}
}

// 启动线程
void Thread::start() {
// 标记线程为已启动
started_ = true;

// 声明信号量
sem_t sem;

// 初始化信号量
sem_init(&sem, false, 0);

// 启动新的线程
thread_ = std::shared_ptr<std::thread>(new std::thread([&]() {
// 获取新线程的 ID
tid_ = CurrentThread::tid();

// 通知主线程已获取新线程的 ID
sem_post(&sem);

// 新线程执行线程函数
func_();
}));

// 阻塞等待新线程获取线程 ID
sem_wait(&sem);
}

// 等待线程执行结束
void Thread::join() {
// 如果线程已启动且未被 join
if (started_ && !joined_) {
// 标记线程已 join
joined_ = true;
// 等待线程执行结束
thread_->join();
}
}

// 获取线程 ID
pid_t Thread::tid() {
return tid_;
}

// 获取线程名称
const std::string& Thread::name() const {
return name_;
}

// 获取已创建的线程数量
int Thread::numCreated() {
return numCreated_.load();
}

// 设置线程的默认名称
void Thread::setDefaultName() {
int num = ++numCreated_;
if (name_.empty()) {
char buf[32] = {0};
snprintf(buf, sizeof buf, "Thread%d", num);
name_ = buf;
}
}

CurrentThread

  • CurrentThread.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#pragma once

namespace CurrentThread {

// 声明线程局部变量(thread-local),用于缓存当前线程的 ID
extern __thread int t_cachedTid;

// 声明缓存当前线程的 ID 的函数
void cacheTid();

// 获取当前线程的 ID
inline int tid() {
if (__builtin_expect(t_cachedTid == 0, 0)) {
cacheTid();
}
return t_cachedTid;
}

}
  • CurrentThread.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include "CurrentThread.h"

#include <sys/syscall.h>
#include <unistd.h>

namespace CurrentThread {

// 定义线程局部变量(thread-local),用于缓存当前线程的 ID
__thread int t_cachedTid = 0;

// 定义缓存当前线程的 ID 的函数
void cacheTid() {
if (t_cachedTid == 0) {
// 通过 Linux 系统调用,获取当前线程的 ID
t_cachedTid = static_cast<pid_t>(::syscall(SYS_gettid));
}
}

}

EventLoopThread

  • EventLoopThread.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#pragma once

#include <condition_variable>
#include <functional>
#include <mutex>

#include "Thread.h"
#include "noncopyable.h"

// 类前置声明
class EventLoop;

// 事件循环线程类,封装了 EventLoop 与 Thread
class EventLoopThread : noncopyable {
public:
// 线程初始化回调操作的类型定义
using ThreadInitCallback = std::function<void(EventLoop *)>;

// 构造函数
EventLoopThread(const ThreadInitCallback &cb = ThreadInitCallback(), const std::string &name = std::string());

// 析构函数
~EventLoopThread();

// 在对应的线程中启动事件循环
EventLoop *startLoop();

private:
// 线程执行函数
void threadFunc();

EventLoop *loop_; // 事件循环
bool exiting_; // 标记线程是否正在退出
Thread thread_; // 线程对象(EventLoop 所在的线程)
std::mutex mutex_; // 互斥锁
std::condition_variable cond_; // 条件变量
ThreadInitCallback callback_; // 线程初始化回调操作
};
  • EventLoopThread.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#include "EventLoopThread.h"

#include <EventLoop.h>

#include <memory>

// 构造函数
EventLoopThread::EventLoopThread(const ThreadInitCallback &cb, const std::string &name)
: loop_(nullptr),
exiting_(false),
thread_(std::bind(&EventLoopThread::threadFunc, this), name),
mutex_(),
cond_(),
callback_(cb) {
}

// 析构函数
EventLoopThread::~EventLoopThread() {
// 标记线程正在退出
exiting_ = true;

if (loop_ != nullptr) {
// 退出线程循环
loop_->quit();
// 等待线程执行结束
thread_.join();
}
}

// 在对应的线程中启动事件循环
EventLoop *EventLoopThread::startLoop() {
// 启动底层新创建的线程
thread_.start();

EventLoop *loop = nullptr;
{
// 等待线程函数 threadFunc() 创建好 EventLoop 对象
std::unique_lock<std::mutex> lock(mutex_);
while (loop_ == nullptr) {
cond_.wait(lock);
}
loop = loop_;
}

return loop;
}

// 线程执行函数
void EventLoopThread::threadFunc() {
// 新创建一个独立的事件循环,和上面底层新创建的线程一一对应
EventLoop loop;

// 执行线程初始化回调操作
if (callback_) {
callback_(&loop);
}

{
// 将新创建的事件循环对象赋值给成员变量 loop_,需要保证线程安全
std::unique_lock<std::mutex> lock(mutex_);
loop_ = &loop;

// 通知 startLoop() 成员函数,成员变量 loop_ 已经赋值完毕
cond_.notify_one();
}

// 开启事件循环
loop.loop();

// 事件循环退出后,重置成员变量 loop_
std::unique_lock<std::mutex> lock(mutex_);
loop_ = nullptr;
}

EventLoopThreadPool

  • EventLoopThreadPool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#pragma once

#include <functional>
#include <memory>
#include <string>
#include <vector>

#include "noncopyable.h"

// 类前置声明
class EventLoop;
class EventLoopThread;

// 事件循环线程池类
class EventLoopThreadPool : noncopyable {
public:
// 线程初始化回调操作的类型定义
using ThreadInitCallback = std::function<void(EventLoop*)>;

// 构造函数
EventLoopThreadPool(EventLoop* baseLoop, const std::string& nameArg);

// 析构函数
~EventLoopThreadPool();

// 设置线程池的线程数量
void setThreadNum(int numThreads);

// 启动线程池
void start(const ThreadInitCallback& cb = ThreadInitCallback());

// 获取下一个被选中的事件循环(如果工作在多线程中,baseLoop 默认以轮询的方式分配 Channel 给 subLoop)
EventLoop* getNextLoop();

// 返回所有事件循环
std::vector<EventLoop*> getAllLoops();

// 返回线程池是否已启动
bool started() const;

// 返回线程池的名称
const std::string& name() const;

private:
EventLoop* baseLoop_; // 基础事件循环(通常是主线程上的事件循环,也称作 mainLoop)
std::string name_; // 线程池名称
bool started_; // 标记线程池是否已启动
int numThreads_; // 线程数量
int next_; // 下一个被选中的事件循环的索引
std::vector<std::unique_ptr<EventLoopThread>> threads_; // 事件循环线程对象的集合
std::vector<EventLoop*> loops_; // 事件循环对象的集合
};
  • EventLoopThreadPool.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#include "EventLoopThreadPool.h"

#include "EventLoopThread.h"

// 构造函数
EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop, const std::string& nameArg)
: baseLoop_(baseLoop), name_(nameArg), started_(false), numThreads_(0), next_(0) {
}

// 析构函数
EventLoopThreadPool::~EventLoopThreadPool() {
// 析构时不需要删除 loop,因为它是栈变量
}

// 设置线程池的线程数量
void EventLoopThreadPool::setThreadNum(int numThreads) {
numThreads_ = numThreads;
}

// 启动线程池
void EventLoopThreadPool::start(const ThreadInitCallback& cb) {
// 标记线程池已启动
started_ = true;

// 当整个服务端有多个线程(负责运行一个 baseLoop 和多个 subLoop)
for (int i = 0; i < numThreads_; ++i) {
// 拼接线程的名称
std::string tname = name_ + std::to_string(i);
// 创建事件循环线程
EventLoopThread* t = new EventLoopThread(cb, tname);
// 将事件循环线程添加到线程池中
threads_.push_back(std::unique_ptr<EventLoopThread>(t));
// 启动事件循环线程,并获取该线程对应的事件循环对象,将其添加到事件循环对象的集合中
loops_.push_back(t->startLoop());
}

// 当整个服务端只有一个线程(负责运行 baseLoop),就执行初始化回调操作
if (numThreads_ == 0 && cb) {
cb(baseLoop_);
}
}

// 获取下一个被选中的事件循环(如果工作在多线程中,baseLoop 默认以轮询的方式分配 Channel 给 subLoop)
EventLoop* EventLoopThreadPool::getNextLoop() {
EventLoop* loop = baseLoop_;

// 通过轮询方式获取一下个处理事件的 EventLoop
if (!loops_.empty()) {
loop = loops_[next_];
++next_;
if (next_ >= loops_.size()) {
next_ = 0;
}
}

return loop;
}

// 返回所有事件循环
std::vector<EventLoop*> EventLoopThreadPool::getAllLoops() {
if (loops_.empty()) {
return std::vector<EventLoop*>(1, baseLoop_);
} else {
return loops_;
}
}

// 返回线程池是否已启动
bool EventLoopThreadPool::started() const {
return started_;
}

// 返回线程池的名称
const std::string& EventLoopThreadPool::name() const {
return name_;
}

Socket

  • Socket.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#pragma once

#include "noncopyable.h"

// 类前置声明
class InetAddress;

// 套接字类
class Socket : noncopyable {
public:
// 构造函数
explicit Socket(int sockFd);

// 析构函数
~Socket();

// 获取 socket 的文件描述符
int fd() const;

// 绑定地址
void bindAddress(const InetAddress& localaddr);

// 监听连接请求
void listen();

// 接受连接请求
int accept(InetAddress* peeraddr);

// 关闭写入
void shutdownWrite();

// 是否开启 TCP_NODELAY,开启后关闭 Nagle 算法,减少延迟
void setTcpNoDelay(bool on);

// 是否开启地址重用,允许端口在短时间内被重复绑定
void setReuseAddr(bool on);

// 是否开启端口重用,让多个进程/线程可以绑定同一端口
void setReusePort(bool on);

// 是否开启 TCP 保活,用于检测对端是否还存活
void setKeepAlive(bool on);

private:
const int sockFd_; // socket 的文件描述符
};
  • Socket.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#include "Socket.h"

#include <errno.h>
#include <netinet/tcp.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>

#include "InetAddress.h"
#include "Logger.h"

// 构造函数
Socket::Socket(int sockFd) : sockFd_(sockFd) {
}

// 析构函数
Socket::~Socket() {
::close(sockFd_);
}

// 获取 socket 的文件描述符
int Socket::fd() const {
return sockFd_;
}

// 绑定地址
void Socket::bindAddress(const InetAddress& localaddr) {
if (0 != ::bind(sockFd_, (sockaddr*)localaddr.getSockAddr(), sizeof(sockaddr_in))) {
LOG_FATAL("%s => bind socketFd:%d failed, errno:%d", __PRETTY_FUNCTION__, sockFd_, errno);
}
}

// 监听连接请求
void Socket::listen() {
if (0 != ::listen(sockFd_, SOMAXCONN)) {
LOG_FATAL("%s => listen socketFd:%d failed, errno:%d", __PRETTY_FUNCTION__, sockFd_, errno);
}
}

// 接受连接请求
int Socket::accept(InetAddress* peeraddr) {
sockaddr_in addr;
socklen_t len = sizeof addr;
bzero(&addr, sizeof addr);
// 接受客户端新连接,返回新连接对应的 socket fd(非阻塞的),用来和客户端进行读写
int connfd = ::accept4(sockFd_, (sockaddr*)&addr, &len, SOCK_NONBLOCK | SOCK_CLOEXEC);
if (connfd >= 0) {
peeraddr->setSockAddr(addr);
}
return connfd;
}

// 关闭写入
void Socket::shutdownWrite() {
if (::shutdown(sockFd_, SHUT_WR) < 0) {
LOG_FATAL("%s => shutdown write socketFd:%d failed, errno:%d", __PRETTY_FUNCTION__, sockFd_, errno);
}
}

// 是否开启 TCP_NODELAY,开启后关闭 Nagle 算法,减少延迟
void Socket::setTcpNoDelay(bool on) {
int optval = on ? 1 : 0;
::setsockopt(sockFd_, IPPROTO_TCP, TCP_NODELAY, &optval, static_cast<socklen_t>(sizeof optval));
}

// 是否开启地址重用,允许端口在短时间内被重复绑定
void Socket::setReuseAddr(bool on) {
int optval = on ? 1 : 0;
::setsockopt(sockFd_, SOL_SOCKET, SO_REUSEADDR, &optval, static_cast<socklen_t>(sizeof optval));
}

// 是否开启端口重用,让多个进程/线程可以绑定同一端口
void Socket::setReusePort(bool on) {
int optval = on ? 1 : 0;
int ret = ::setsockopt(sockFd_, SOL_SOCKET, SO_REUSEPORT, &optval, static_cast<socklen_t>(sizeof optval));
if (ret < 0 && on) {
LOG_FATAL("%s => set reuse port failed, errno:%d", __PRETTY_FUNCTION__, sockFd_, errno);
}
}

// 是否开启 TCP 保活,用于检测对端是否还存活
void Socket::setKeepAlive(bool on) {
int optval = on ? 1 : 0;
::setsockopt(sockFd_, SOL_SOCKET, SO_KEEPALIVE, &optval, static_cast<socklen_t>(sizeof optval));
}

Buffer

  • Buffer.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#pragma once

#include <iostream>
#include <string>
#include <vector>

#include "copyable.h"

/// A buffer class modeled after org.jboss.netty.buffer.ChannelBuffer
///
/// @code
/// +-------------------+------------------+------------------+
/// | prependable bytes | readable bytes | writable bytes |
/// | | (CONTENT) | |
/// +-------------------+------------------+------------------+
/// | | | |
/// 0 <= readerIndex <= writerIndex <= size
/// @endcode

class Buffer : public copyable {
public:
static const size_t kCheapPrepend = 8; // 预留空间大小
static const size_t kInitialSize = 1024; // 初始缓冲区大小

// 构造函数
explicit Buffer(size_t initialSize = kInitialSize);

// 析构函数
~Buffer();

// 获取缓冲区中可读的字节数
size_t readableBytes() const;

// 获取缓冲区中可写的字节数
size_t writableBytes() const;

// 获取缓冲区中可预留的字节数
size_t prependableBtes() const;

// 返回缓冲区中可读数据的起始地址
const char* peek() const;

// 移动读指针
void retrieve(size_t len);

// 重置读指针与写指针
void retrieveAll();

// 将缓冲区中所有可读数据以字符串形式返回
std::string retrieveAllAsString();

// 将缓冲区中指定长度的可读数据以字符串形式返回
std::string retrieveAsString(size_t len);

// 确保缓冲区有足够的可写空间
void ensureWritableBytes(size_t len);

// 扩容缓冲区以容纳更多数据
void makeSpace(size_t len);

// 向缓冲区追加数据
void append(const char* data, size_t len);

// 通知缓冲区已写入数据
void hasWritten(size_t len);

// 返回缓冲区中可写数据的起始地址
char* beginWrite();

// 返回缓冲区中可写数据的起始地址
const char* beginWrite() const;

// 从 fd 上读取数据,并写到缓冲区中(返回值:n > 0:读取成功;n == 0:连接关闭;n < 0:读取出错)
ssize_t readFd(int fd, int* saveErrno);

// 从缓冲区中读取数据,并写到 fd 上(返回值:n > 0:写入成功;n == 0:没有数据可写入;n < 0:写入出错)
ssize_t writeFd(int fd, int* saveErrno);

private:
// 返回 vector 底层数组的首元素地址(即数组的起始地址)
char* begin();

// 返回 vector 底层数组的首元素地址(即数组的起始地址)
const char* begin() const;

std::vector<char> buffer_; // 底层缓冲区
size_t readerIndex_; // 读指针位置
size_t writerIndex_; // 写指针位置
};
  • Buffer.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#include "Buffer.h"

#include <assert.h>
#include <errno.h>
#include <sys/uio.h>
#include <unistd.h>

// 构造函数
Buffer::Buffer(size_t initialSize)
: buffer_(kCheapPrepend + initialSize), readerIndex_(kCheapPrepend), writerIndex_(kCheapPrepend) {
}

// 析构函数
Buffer::~Buffer() {
}

// 获取缓冲区中可读的字节数
size_t Buffer::readableBytes() const {
return writerIndex_ - readerIndex_;
}

// 获取缓冲区中可写的字节数
size_t Buffer::writableBytes() const {
return buffer_.size() - writerIndex_;
}

// 获取缓冲区中可预留的字节数
size_t Buffer::prependableBtes() const {
return readerIndex_;
}

// 返回缓冲区中可读数据的起始地址
const char* Buffer::peek() const {
return begin() + readerIndex_;
}

// 移动读指针
void Buffer::retrieve(size_t len) {
assert(len <= readableBytes());
if (len < readableBytes()) {
readerIndex_ += len;
} else {
retrieveAll();
}
}

// 重置读指针与写指针
void Buffer::retrieveAll() {
readerIndex_ = kCheapPrepend;
writerIndex_ = kCheapPrepend;
}

// 将缓冲区中所有可读数据以字符串形式返回
std::string Buffer::retrieveAllAsString() {
return retrieveAsString(readableBytes());
}

// 将缓冲区中指定长度的可读数据以字符串形式返回
std::string Buffer::retrieveAsString(size_t len) {
assert(len <= readableBytes());
// 构造字符串
std::string result(peek(), len);
// 移动读指针
retrieve(len);
return result;
}

// 确保缓冲区有足够的可写空间
void Buffer::ensureWritableBytes(size_t len) {
if (writableBytes() < len) {
// 缓冲区扩容
makeSpace(len);
}
assert(writableBytes() >= len);
}

// 扩容缓冲区以容纳更多数据
void Buffer::makeSpace(size_t len) {
// 判断是否需要通过移动数据来腾出空间
if (writableBytes() + prependableBtes() < len + kCheapPrepend) {
// 没有空闲的空间,直接扩容
buffer_.resize(writerIndex_ + len);
} else {
// 有空闲的空间,通过移动数据来腾出空间
assert(kCheapPrepend < readerIndex_);
size_t readable = readableBytes();
std::copy(begin() + readerIndex_, begin() + writerIndex_, begin() + kCheapPrepend);
readerIndex_ = kCheapPrepend;
writerIndex_ = readerIndex_ + readable;
assert(readable == readableBytes());
}
}

// 向缓冲区追加数据
void Buffer::append(const char* data, size_t len) {
ensureWritableBytes(len);
std::copy(data, data + len, beginWrite());
hasWritten(len);
}

// 通知缓冲区已写入数据
void Buffer::hasWritten(size_t len) {
assert(len <= writableBytes());
writerIndex_ += len;
}

// 返回缓冲区中可写数据的起始地址
char* Buffer::beginWrite() {
return begin() + writerIndex_;
}

// 返回缓冲区中可写数据的起始地址
const char* Buffer::beginWrite() const {
return begin() + writerIndex_;
}

// 从 fd 上读取数据,并写到缓冲区中(返回值:n > 0:读取成功;n == 0:连接关闭;n < 0:读取出错)
ssize_t Buffer::readFd(int fd, int* saveErrno) {
// 在栈上分配内存空间(64KB)
char extrabuf[65536] = {0};

// 主缓冲区可写的字节数
const size_t writable = writableBytes();

// 采用 scatter-gather 读技术,同时将数据读入主缓冲区和 extrabuf
struct iovec vec[2];
vec[0].iov_base = begin() + writerIndex_;
vec[0].iov_len = writable;
vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof extrabuf;

// 当主缓冲区 writable 小于 extrabuf(64KB)时,说明主缓冲区的空间可能不够装下数据,
// 需要使用两个 iovec:第一个写入 buffer_,第二个写入 extrabuf,从而尽可能读完内核中的数据。
// 否则,如果主缓冲区足够大,只需一个 iovec。
const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1;

// 读取数据
const ssize_t n = ::readv(fd, vec, iovcnt);

// 如果发生错误
if (n < 0) {
*saveErrno = errno;
}
// 如果只写入了主缓冲区,没有写入了 extrabuf
else if (n <= writable) {
writerIndex_ += n;
}
// 如果不仅写入了主缓冲区,还写入了 extrabuf
else {
writerIndex_ = buffer_.size();
append(extrabuf, n - writable);
}

return n;
}

// 从缓冲区中读取数据,并写到 fd 上(返回值:n > 0:写入成功;n == 0:没有数据可写入;n < 0:写入出错)
ssize_t Buffer::writeFd(int fd, int* saveErrno) {
ssize_t n = ::write(fd, peek(), readableBytes());
if (n < 0) {
// 写入出错,记录错误码
*saveErrno = errno;
}
return n;
}

// 返回 vector 底层数组的首元素地址(即数组的起始地址)
char* Buffer::begin() {
return &*buffer_.begin();
}

// 返回 vector 底层数组的首元素地址(即数组的起始地址)
const char* Buffer::begin() const {
return &*buffer_.begin();
}

TcpConnection

  • TcpConnection.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#pragma once

#include <atomic>
#include <iostream>
#include <memory>
#include <string>

#include "Buffer.h"
#include "Callbacks.h"
#include "InetAddress.h"
#include "noncopyable.h"

// 类前置声明
class EventLoop;
class Channel;
class Socket;

// TCP 连接类
class TcpConnection : noncopyable, public std::enable_shared_from_this<TcpConnection> {
public:
// 构造函数
TcpConnection(EventLoop* loop, const std::string& nameArg, int sockfd, const InetAddress& localAddr,
const InetAddress& peerAddr);

// 析构函数
~TcpConnection();

// 获取 TCP 连接所在的事件循环
EventLoop* getLoop() const;

// 获取 TCP 连接的名称
const std::string& name() const;

// 获取 TCP 连接的本地网络地址
const InetAddress& localAddress() const;

// 获取 TCP 连接的远程网络地址
const InetAddress& peerAddress() const;

// 判断 TCP 连接是否处于已连接状态
bool connected() const;

// 判断 TCP 连接是否处于断开状态
bool disconnected() const;

// 发送数据到输出缓冲区
void send(const std::string& message);

// 关闭 TCP 连接
void shutdown();

// 强制关闭连接
void forceClose();

// 设置连接建立/关闭时的回调操作
void setConnectionCallback(const ConnectionCallback& cb);

// 设置有数据到来时的回调操作
void setMessageCallback(const MessageCallback& cb);

// 设置数据发送完成时的回调操作
void setWriteCompleteCallback(const WriteCompleteCallback& cb);

// 设置触发高水位时的回调操作
void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark);

// 设置连接关闭时的回调操作
void setCloseCallback(const CloseCallback& cb);

// 获取输入缓冲区
Buffer* inputBuffer();

// 获取输出缓冲区
Buffer* outputBuffer();

// 连接建立
void connectEstablished();

// 连接销毁
void connectDestroyed();

private:
// TCP 连接的状态
enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };

// 处理读事件
void handleRead(Timestamp receiveTime);

// 处理写事件
void handleWrite();

// 处理关闭事件
void handleClose();

// 处理错误事件
void handleError();

// 在事件循环(EventLoop)中发送数据到输出缓冲区
void sendInLoop(const void* message, size_t len);

// 在事件循环(EventLoop)中关闭 TCP 连接
void shutdownInLoop();

// 在事件循环(EventLoop)中强制关闭 TCP 连接
void forceCloseInLoop();

// 设置 TCP 连接的状态
void setState(StateE state);

// 将 TCP 连接的状态转换为字符串
const char* stateToString() const;

EventLoop* loop_; // TCP 连接所在的事件循环,TCP 连接运行在 subLoop 中
const std::string name_; // TCP 连接的名称
std::atomic_int state_; // TCP 连接的状态
bool reading_; // 标记是否正在读数据

std::unique_ptr<Socket> socket_; // TCP 连接对应的 Socket 对象
std::unique_ptr<Channel> channel_; // TCP 连接对应的 Channel 对象

const InetAddress localAddr_; // TCP 连接的本地网络地址
const InetAddress peerAddr_; // TCP 连接的远程网络地址

ConnectionCallback connectionCallback_; // 连接建立/关闭时的回调操作
MessageCallback messageCallback_; // 有数据到来时的回调操作
WriteCompleteCallback writeCompleteCallback_; // 数据发送完成时的回调操作
HighWaterMarkCallback highWaterMarkCallback_; // 触发高水位时的回调操作
CloseCallback closeCallback_; // 连接关闭时的回调操作

size_t highWaterMark_; // 高水位的大小(默认 64M)
Buffer inputBuffer_; // 输入缓冲区(用于接收数据的缓冲区)
Buffer outputBuffer_; // 输出缓冲区(用于发送数据的缓冲区)
};
  • TcpConnection.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
#include "TcpConnection.h"

#include <assert.h>
#include <error.h>
#include <unistd.h>

#include "Channel.h"
#include "EventLoop.h"
#include "Logger.h"
#include "Socket.h"
#include "SocketsOps.h"

// 检查 EventLoop 指针是否为空
static EventLoop* CheckLoopNotNull(EventLoop* loop) {
if (loop == nullptr) {
LOG_FATAL("%s => eventloop is null", __PRETTY_FUNCTION__);
}
return loop;
}

// 默认连接建立/关闭时的回调操作
void defaultConnectionCallback(const TcpConnectionPtr& conn) {
LOG_DEBUG("%s => %s -> %s is %s", __PRETTY_FUNCTION__, conn->localAddress().toIpPort().c_str(),
conn->peerAddress().toIpPort().c_str(), (conn->connected() ? "UP" : "DOWN"));
}

// 默认有数据到来时的回调操作
void defaultMessageCallback(const TcpConnectionPtr&, Buffer* buf, Timestamp) {
buf->retrieveAll();
}

// 构造函数
TcpConnection::TcpConnection(EventLoop* loop, const std::string& nameArg, int sockfd, const InetAddress& localAddr,
const InetAddress& peerAddr)
: loop_(CheckLoopNotNull(loop)),
name_(nameArg),
state_(kConnecting),
socket_(new Socket(sockfd)),
channel_(new Channel(loop, sockfd)),
localAddr_(localAddr),
peerAddr_(peerAddr),
connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),
highWaterMark_(64 * 1024 * 1024) {
// 给 Channel 设置相应的回调函数,Poller 会通知 Channel 它感兴趣的事件发生了,然后 Channel 会回调相应的操作函数
channel_->setReadCallback(std::bind(&TcpConnection::handleRead, this, std::placeholders::_1));
channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite, this));
channel_->setCloseCallback(std::bind(&TcpConnection::handleClose, this));
channel_->setErrorCallback(std::bind(&TcpConnection::handleError, this));
// 打印日志信息
LOG_DEBUG("%s => create tcp connection [%s] at %p, fd=%d", __PRETTY_FUNCTION__, name_.c_str(), this, sockfd);
// 开启 TCP 保活机制
socket_->setKeepAlive(true);
}

// 析构函数
TcpConnection::~TcpConnection() {
// 打印日志信息
LOG_DEBUG("%s => destruct tcp connection [%s] at %p, fd=%d, state=%s", __PRETTY_FUNCTION__, name_.c_str(), this,
channel_->fd(), stateToString());
}

// 获取 TCP 连接所在的事件循环
EventLoop* TcpConnection::getLoop() const {
return loop_;
}

// 获取 TCP 连接的名称
const std::string& TcpConnection::name() const {
return name_;
}

// 获取 TCP 连接的本地网络地址
const InetAddress& TcpConnection::localAddress() const {
return localAddr_;
}

// 获取 TCP 连接的远程网络地址
const InetAddress& TcpConnection::peerAddress() const {
return peerAddr_;
}

// 判断 TCP 连接是否处于已连接状态
bool TcpConnection::connected() const {
return state_ == kConnected;
}

// 判断 TCP 连接是否处于断开状态
bool TcpConnection::disconnected() const {
return state_ == kDisconnected;
}

// 发送数据到输出缓冲区
void TcpConnection::send(const std::string& message) {
if (state_ == kConnected) {
// 如果当前线程是 loop_ 所在的线程
if (loop_->isInLoopThread()) {
// 直接将数据发送到输出缓冲区
sendInLoop(message.c_str(), message.size());
} else {
// 唤醒 loop_ 对应的线程将数据发送到输出缓冲区
loop_->runInLoop(std::bind(&TcpConnection::sendInLoop, this, message.c_str(), message.size()));
}
}
}

// 关闭 TCP 连接
void TcpConnection::shutdown() {
if (state_ == kConnected) {
// 设置 TCP 连接的状态
setState(kDisconnecting);
// 唤醒 loop_ 对应的线程去关闭 TCP 连接
loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));
}
}

// 强制关闭连接
void TcpConnection::forceClose() {
// 判断 TCP 连接的状态
if (state_ == kConnected || state_ == kDisconnecting) {
// 设置连接状态
setState(kDisconnecting);
// 唤醒 loop_ 对应的线程去强制关闭 TCP 连接
loop_->queueInLoop(std::bind(&TcpConnection::forceCloseInLoop, shared_from_this()));
}
}

// 连接建立/关闭时的回调操作
void TcpConnection::setConnectionCallback(const ConnectionCallback& cb) {
connectionCallback_ = cb;
}

// 设置有数据到来时的回调操作
void TcpConnection::setMessageCallback(const MessageCallback& cb) {
messageCallback_ = cb;
}

// 设置数据发送完成时的回调操作
void TcpConnection::setWriteCompleteCallback(const WriteCompleteCallback& cb) {
writeCompleteCallback_ = cb;
}

// 设置触发高水位时的回调操作
void TcpConnection::setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark) {
highWaterMarkCallback_ = cb;
highWaterMark_ = highWaterMark;
}

// 设置连接关闭时的回调操作
void TcpConnection::setCloseCallback(const CloseCallback& cb) {
closeCallback_ = cb;
}

// 获取输入缓冲区
Buffer* TcpConnection::inputBuffer() {
return &inputBuffer_;
}

// 获取输出缓冲区
Buffer* TcpConnection::outputBuffer() {
return &outputBuffer_;
}

// 连接建立
void TcpConnection::connectEstablished() {
assert(state_ == kConnecting);
// 设置 TCP 连接的状态
setState(kConnected);
// Channel 绑定 TCP 连接
channel_->tie(shared_from_this());
// Channel 开启监听 fd 上的读事件
channel_->enableReading();
// 调用用户设置的回调操作
connectionCallback_(shared_from_this());
}

// 连接销毁
void TcpConnection::connectDestroyed() {
if (state_ == kConnected) {
// 设置 TCP 连接的状态
setState(kDisconnected);
// Channel 禁止监听 fd 上的所有事件
channel_->disableAll();
// 调用用户设置的回调操作
connectionCallback_(shared_from_this());
}
// 从 Poller 中删除 Channel
channel_->remove();
}

// 处理读事件
void TcpConnection::handleRead(Timestamp receiveTime) {
// 临时错误码
int saveErrno = 0;

// 从 fd 上读取数据,并写入到输入缓冲区中
ssize_t n = inputBuffer_.readFd(channel_->fd(), &saveErrno);

if (n > 0) {
// 已建立连接的客户端,有可读事件发生了,调用用户设置的回调操作
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
} else if (n == 0) {
// 处理连接关闭
handleClose();
} else {
// 设置错误码
errno = saveErrno;
// 打印日志信息
LOG_ERROR("%s => read fd error, fd=%d, errno=%d", __PRETTY_FUNCTION__, channel_->fd(), errno);
// 处理连接错误
handleError();
}
}

// 处理写事件
void TcpConnection::handleWrite() {
// 判断 Channel 是否正在监听写事件
if (channel_->isWriting()) {
// 临时错误码
int saveErrno = 0;

// 从输出缓冲区读取数据,并写入到 fd 上
ssize_t n = outputBuffer_.writeFd(channel_->fd(), &saveErrno);

if (n > 0) {
// 移动输出缓冲区的读指针(标记有哪些数据被发送了)
outputBuffer_.retrieve(n);

// 如果输出缓冲区中的所有数据都发送完了
if (outputBuffer_.readableBytes() == 0) {
// 关闭监听 fd 上的写事件
channel_->disableWriting();
// 调用用户设置的回调操作
if (writeCompleteCallback_) {
// 唤醒 loop_ 所在的线程去执行用户设置的回调操作
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
// 如果正在断开 TCP 连接,则关闭 TCP 连接
if (state_ == kDisconnecting) {
shutdownInLoop();
}
}
} else if (n < 0) {
// 打印日志信息
LOG_ERROR("%s => write fd error, fd=%d, errno=%d", __PRETTY_FUNCTION__, channel_->fd(), errno);
}
} else {
// 打印日志信息
LOG_DEBUG("%s => tcp connection [%s] is down, no more writing, fd=%d", __PRETTY_FUNCTION__, name_.c_str(),
channel_->fd());
}
}

// 处理关闭事件
void TcpConnection::handleClose() {
// 打印日志信息
LOG_DEBUG("%s => tcp connection [%s] is close, fd=%d, state=%s", __PRETTY_FUNCTION__, name_.c_str(), channel_->fd(),
stateToString());

// 设置 TCP 连接的状态
setState(kDisconnected);

// 禁止 Channel 监听 fd 上的所有事件
channel_->disableAll();

// 获取当前的 TCP 连接
TcpConnectionPtr guardThis(shared_from_this());

// 调用用户设置的连接建立/关闭时的回调操作
connectionCallback_(guardThis);

// 调用用户设置的连接关闭时的回调操作
if (closeCallback_) {
closeCallback_(guardThis);
}
}

// 处理错误事件
void TcpConnection::handleError() {
// 获取 Socket 错误码
int savedErrno = getSocketError(channel_->fd());

// 打印日志信息
LOG_ERROR("%s => tcp connection [%s] occurred error, fd=%d, SO_ERROR:%d", __PRETTY_FUNCTION__, name_.c_str(),
channel_->fd(), savedErrno);
}

// 在事件循环(EventLoop)中发送数据到输出缓冲区
void TcpConnection::sendInLoop(const void* message, size_t len) {
loop_->assertInLoopThread();

// 已发送数据的字节数
ssize_t nwrote = 0;

// 剩下未发送数据的字节数
size_t remaining = len;

// 是否发生致命错误
bool faultError = false;

// 如果 TCP 连接已断开,则放弃发送数据
if (state_ == kDisconnected) {
LOG_ERROR("%s => tcp connection [%s] disconnected, give up writing", __PRETTY_FUNCTION__, name_.c_str());
return;
}

// 如果 Channel 是第一次写入数据,且输出缓冲区里面没有待发送的数据
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
// 直接发送数据(成功:返回已发送的字节数,失败:返回小于零的数字)
nwrote = ::write(channel_->fd(), message, len);
// 发送数据成功
if (nwrote >= 0) {
// 剩下未发送的字节数
remaining = len - nwrote;
// 如果所有数据都发送完
if (remaining == 0 && writeCompleteCallback_) {
// 唤醒 loop_ 所在的线程去执行用户设置的回调操作
loop_->runInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
}
// 发送数据失败
else {
nwrote = 0;
if (errno != EWOULDBLOCK) {
LOG_ERROR("%s => occurred error", __PRETTY_FUNCTION__);
if (errno == EPIPE || errno == ECONNRESET) {
faultError = true;
}
}
}
}

assert(remaining <= len);

// 如果发送数据没有发生致命错误,且有剩下的数据未发送
if (!faultError && remaining > 0) {
// 输出缓冲区中原先未发送数据的字节数
size_t oldLen = outputBuffer_.readableBytes();
// 判断所有未发送数据的大小是否触及了高水位线
if (oldLen + remaining >= highWaterMark_ && oldLen < highWaterMark_ && highWaterMarkCallback_) {
// 唤醒 loop_ 所在的线程去执行用户设置的回调操作
loop_->runInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
}
// 往输出缓冲区中写入上面未发送完的数据
outputBuffer_.append(static_cast<const char*>(message) + nwrote, remaining);
// 让 Channel 开启监听 fd 上的写事件
if (!channel_->isWriting()) {
channel_->enableWriting();
}
}
}

// 在事件循环(EventLoop)中关闭 TCP 连接
void TcpConnection::shutdownInLoop() {
loop_->assertInLoopThread();
// 如果输出缓冲区中的所有数据都发送完
if (!channel_->isWriting()) {
// Socket 关闭写入
socket_->shutdownWrite();
}
}

// 在事件循环(EventLoop)中强制关闭 TCP 连接
void TcpConnection::forceCloseInLoop() {
loop_->assertInLoopThread();
// 判断 TCP 连接的状态
if (state_ == kConnected || state_ == kDisconnecting) {
// 处理关闭事件
handleClose();
}
}

// 设置 TCP 连接的状态
void TcpConnection::setState(StateE state) {
state_ = state;
}

// 将 TCP 连接的状态转换为字符串
const char* TcpConnection::stateToString() const {
switch (state_) {
case kDisconnected:
return "kDisconnected";
case kConnecting:
return "kConnecting";
case kConnected:
return "kConnected";
case kDisconnecting:
return "kDisconnecting";
default:
return "unknown state";
}
}

Acceptor

  • Acceptor.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#pragma once

#include <functional>

#include "Channel.h"
#include "Socket.h"
#include "noncopyable.h"

// 类前置声明
class EventLoop;
class InetAddress;

// TCP 连接接受器类
class Acceptor : noncopyable {
public:
// 有新连接到来时的回调操作类型定义
using NewConnectionCallback = std::function<void(int sockFd, const InetAddress&)>;

// 构造函数
Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport);

// 析构函数
~Acceptor();

// 设置有新连接到来时的回调操作
void setNewConnectionCallback(const NewConnectionCallback& cb);

// 监听连接请求(即监听有新的客户端连接进来)
void listen();

// 获取是否正在监听连接请求
bool listenning() const;

private:
// 处理读事件(即处理有新客户端连接进来)
void handleRead();

EventLoop* loop_; // Acceptor 使用的就是用户自定义的那个 baseLoop,也称作 mainLoop
Socket acceptSocket_; // 用于监听的 socket
Channel acceptChannel_; // 用于监听 acceptSocket_ 上的可读事件(即有新连接到来)
NewConnectionCallback newConnectionCallback_; // 有新连接到来时的回调操作
bool listenning_; // 标记是否正在监听连接请求
};
  • Acceptor.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include "Acceptor.h"

#include <errno.h>
#include <sys/socket.h>
#include <unistd.h>

#include "InetAddress.h"
#include "Logger.h"
#include "SocketsOps.h"

// 构造函数
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport)
: loop_(loop),
acceptSocket_(createNonblockingSocket()),
acceptChannel_(loop, acceptSocket_.fd()),
listenning_(false) {
acceptSocket_.setReuseAddr(true);
acceptSocket_.setReusePort(reuseport);
acceptSocket_.bindAddress(listenAddr);
// 设置 acceptChannel_ 的读事件回调操作为 Acceptor::handleRead 方法
acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}

// 析构函数
Acceptor::~Acceptor() {
// 关闭 acceptChannel_ 上的所有事件监听
acceptChannel_.disableAll();
// 从 Poller 中删除 acceptChannel_
acceptChannel_.remove();
}

// 设置有新连接到来时的回调操作
void Acceptor::setNewConnectionCallback(const NewConnectionCallback& cb) {
newConnectionCallback_ = cb;
}

// 监听连接请求(即监听有新的客户端连接进来)
void Acceptor::listen() {
listenning_ = true;
// 监听客户端的连接请求
acceptSocket_.listen();
// 启用 acceptChannel_ 的读事件监听(即监听有新连接到来)
acceptChannel_.enableReading();
}

// 获取是否正在监听连接请求
bool Acceptor::listenning() const {
return listenning_;
}

// 处理读事件(即处理有新客户端连接进来)
void Acceptor::handleRead() {
InetAddress peerAddr;
// 接受客户端新连接,返回新连接对应的 socket fd,用来和客户端进行读写
int connfd = acceptSocket_.accept(&peerAddr);
if (connfd >= 0) {
// 有客户端新连接到来,执行回调操作(如果存在)
if (newConnectionCallback_) {
// 回调操作的职责:轮询找到 subLoop,将新客户端的 fd 分发给 subLoop,然后唤醒 subLoop 以处理该新客户端的连接
newConnectionCallback_(connfd, peerAddr);
} else {
::close(connfd);
}
} else {
LOG_ERROR("%s => accept failed, errno:%d", __PRETTY_FUNCTION__, errno);
if (errno == EMFILE) {
LOG_ERROR("%s => sockfd reached limit", __PRETTY_FUNCTION__);
}
}
}

TcpServer

  • TcpServer.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#pragma once

#include <functional>
#include <iostream>
#include <memory>
#include <string>

#include "Acceptor.h"
#include "Callbacks.h"
#include "EventLoop.h"
#include "EventLoopThreadPool.h"
#include "InetAddress.h"
#include "TcpConnection.h"
#include "atomic"
#include "noncopyable.h"
#include "unordered_map"

// TCP 服务器类
class TcpServer : noncopyable {
public:
// 线程初始化回调操作类型定义
using ThreadInitCallback = std::function<void(EventLoop*)>;

// 端口复用选项枚举类型定义
enum Option {
kNoReusePort,
kReusePort,
};

// 构造函数
TcpServer(EventLoop* loop, const InetAddress& listenAddr, const std::string nameArg, Option option = kNoReusePort);

// 析构函数
~TcpServer();

// 获取服务器监听的 IP 和端口信息
const std::string& ipPort() const;

// 获取服务器名称
const std::string& name() const;

// 获取服务器的事件循环
EventLoop* getLoop() const;

// 设置线程池的线程数量(即底层 subLoop 的数量)
void setThreadNum(int numThreads);

// 启动服务器(线程安全)
void start();

// 设置线程初始化回调操作
void setThreadInitCallback(const ThreadInitCallback& cb);

// 设置有新连接到来时的回调操作
void setConnectionCallback(const ConnectionCallback& cb);

// 设置有数据到来时的回调操作
void setMessageCallback(const MessageCallback& cb);

// 设置数据发送完成时的回调操作
void setWriteCompleteCallback(const WriteCompleteCallback& cb);

private:
// TCP 连接集合类型定义
using ConnectionMap = std::unordered_map<std::string, TcpConnectionPtr>;

// 创建 TCP 连接(在 baseLoop 上执行)
void newConnection(int sockfd, const InetAddress& peerAddr);

// 移除 TCP 连接
void removeConnection(const TcpConnectionPtr& conn);

// 移除 TCP 连接(在 baseLoop 上执行)
void removeConnectionInLoop(const TcpConnectionPtr& conn);

EventLoop* loop_; // 用户自定义的 EventLoop(即 baseLoop,也称作 mainLoop,运行在主线程上)

const std::string name_; // 服务器名称
const std::string ipPort_; // 服务器监听的 IP 和端口信息
std::unique_ptr<Acceptor> acceptor_; // 用于监听新连接的 Acceptor 对象,运行在 baseLoop 上

std::shared_ptr<EventLoopThreadPool> threadPool_; // 事件循环线程池

ConnectionCallback connectionCallback_; // 有新连接到来时的回调操作
MessageCallback messageCallback_; // 有数据到来时的回调操作
WriteCompleteCallback writeCompleteCallback_; // 数据发送完成时的回调操作
ThreadInitCallback threadInitCallback_; // 线程初始化回调操作

std::atomic_int started_; // 标记服务器是否已经启动
int nextConnId_; // 下一个 TCP 连接的 ID
ConnectionMap connections_; // 保存所有的 TCP 连接
};
  • TcpServer.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#include "TcpServer.h"

#include <assert.h>
#include <string.h>

#include "Logger.h"
#include "TcpConnection.h"

static EventLoop* CheckLoopNotNull(EventLoop* loop) {
if (loop == nullptr) {
LOG_FATAL("%s => baseLoop is null", __PRETTY_FUNCTION__);
}
return loop;
}

// 构造函数
TcpServer::TcpServer(EventLoop* loop, const InetAddress& listenAddr, const std::string nameArg, Option option)
: loop_(CheckLoopNotNull(loop)),
ipPort_(listenAddr.toIpPort()),
name_(nameArg),
acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),
threadPool_(new EventLoopThreadPool(loop, name_)),
connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),
nextConnId_(1),
started_(0) {
// 当有新客户端连接进来时,会调用 TcpServer::newConnection() 函数
acceptor_->setNewConnectionCallback(
std::bind(&TcpServer::newConnection, this, std::placeholders::_1, std::placeholders::_2));
}

// 析构函数
TcpServer::~TcpServer() {
// 打印日志信息
LOG_DEBUG("%s => tcp server [%s] destructing", __PRETTY_FUNCTION__, name_.c_str());

// 遍历所有 TCP 连接
for (auto& item : connections_) {
// 这个局部的智能指针对象出了右括号后,会自动释放掉对应的 TcpConnection 资源
TcpConnectionPtr conn(item.second);
// 重置原有的智能指针
item.second.reset();
// 唤醒 TCP 连接所在的 EventLoop 去执行 TcpConnection::connectDestroyed() 函数
conn->getLoop()->runInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
}
}

// 获取服务器监听的 IP 和端口信息
const std::string& TcpServer::ipPort() const {
return ipPort_;
};

// 获取服务器名称
const std::string& TcpServer::name() const {
return name_;
};

// 获取服务器的事件循环
EventLoop* TcpServer::getLoop() const {
return loop_;
}

// 设置线程池的线程数量(即底层 subLoop 的数量)
void TcpServer::setThreadNum(int numThreads) {
threadPool_->setThreadNum(numThreads);
}

// 启动服务器(线程安全)
void TcpServer::start() {
// 防止 TcpServer 被多次启动
if (started_++ == 0) {
// 启动多个子线程,并各自运行一个 subLoop
threadPool_->start(threadInitCallback_);
// 在 baseLoop(运行在主线程)上监听连接请求(即监听有新的客户端连接进来)
loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get()));
}
}

// 设置线程初始化回调操作
void TcpServer::setThreadInitCallback(const ThreadInitCallback& cb) {
threadInitCallback_ = cb;
}

// 设置有新连接到来时的回调操作
void TcpServer::setConnectionCallback(const ConnectionCallback& cb) {
connectionCallback_ = cb;
}

// 设置有数据到来时的回调操作
void TcpServer::setMessageCallback(const MessageCallback& cb) {
messageCallback_ = cb;
}

// 设置数据发送完成时的回调操作
void TcpServer::setWriteCompleteCallback(const WriteCompleteCallback& cb) {
writeCompleteCallback_ = cb;
}

// 创建 TCP 连接(在 baseLoop 上执行)
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) {
// 通过轮询算法,获取下一个 subLoop(也称作 ioLoop)
EventLoop* ioLoop = threadPool_->getNextLoop();

// 拼接 TCP 连接的名称
char buf[64] = {0};
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;
std::string connName = name_ + buf;

// 打印日志信息
LOG_DEBUG("%s => tcp server [%s] new connection [%s] from %s", __PRETTY_FUNCTION__, name_.c_str(), connName.c_str(),
ipPort_.c_str());

// 获取本地网络地址
sockaddr_in local;
::bzero(&local, sizeof local);
socklen_t addrlen = sizeof local;
if (::getsockname(sockfd, (sockaddr*)&local, &addrlen) < 0) {
LOG_ERROR("%s => fail to get local internet address", __PRETTY_FUNCTION__);
}
InetAddress localAddr(local);

// 根据连接成功的 sockfd,创建 TCP 连接对象
TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));

// 将新创建的 TCP 连接对象放进集合中
connections_[connName] = conn;

// 设置 TCP 连接的回调操作(由用户自定义)
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, std::placeholders::_1));

// 唤醒 ioLoop 所在的线程去执行 TcpConnection::connectEstablished() 函数
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}

// 移除 TCP 连接
void TcpServer::removeConnection(const TcpConnectionPtr& conn) {
// 唤醒 baseLoop 所在的线程去执行 TcpServer::removeConnectionInLoop() 函数
loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}

// 移除 TCP 连接(在 baseLoop 上执行)
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn) {
loop_->assertInLoopThread();
// 打印日志信息
LOG_DEBUG("%s => tcp server [%s] remove connection [%s]", __PRETTY_FUNCTION__, name_.c_str(), conn->name().c_str());

// 移除 TCP 连接
size_t n = connections_.erase(conn->name());

// 唤醒 TCP 连接所在的 EventLoop 去执行 TcpConnection::connectDestroyed() 函数
assert(n == 1);
EventLoop* ioLoop = conn->getLoop();
ioLoop->runInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
}

Connector

  • Connector.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#pragma once

#include <atomic>
#include <functional>
#include <memory>

#include "InetAddress.h"
#include "noncopyable.h"

// 类前置声明
class Channel;
class EventLoop;

// TCP 连接器类
class Connector : noncopyable, public std::enable_shared_from_this<Connector> {
public:
// 有新连接建立时的回调操作类型定义
using NewConnectionCallback = std::function<void(int sockfd)>;

// 构造函数
Connector(EventLoop* loop, const InetAddress& serverAddr);

// 析构函数
~Connector();

// 设置有新连接建立时的回调操作类型定义
void setNewConnectionCallback(const NewConnectionCallback& cb);

// 启动连接器
void start();

// 重启连接器(必须在 EventLoop 所处的线程上执行)
void restart();

// 停止连接器
void stop();

// 获取服务器地址
const InetAddress& serverAddress() const;

private:
// 连接器的状态
enum States { kDisconnected, kConnecting, kConnected };
// 最大重试延迟时间(毫秒)
static const int kMaxRetryDelayMs;
// 初始重试延迟时间(毫秒)
static const int kInitRetryDelayMs;

// 设置连接状态
void setState(States s);

// 在 EventLoop 所处的线程上启动连接器
void startInLoop();

// 在 EventLoop 所处的线程上停止连接器
void stopInLoop();

// 发起连接操作
void connect();

// 处理正在连接的 Socket
void connecting(int sockfd);

// 处理写事件
void handleWrite();

// 处理错误事件
void handleError();

// 重试连接
void retry(int sockfd);

// 移除并重置 Channel
int removeAndResetChannel();

// 重置 Channel
void resetChannel();

EventLoop* loop_; // 连接器所在的事件循环
InetAddress serverAddr_; // 服务器地址
std::atomic_int connect_; // 标记是否需要连接
States state_; // 连接状态
std::unique_ptr<Channel> channel_; // 连接器对应的 Channel
NewConnectionCallback newConnectionCallback_; // 新连接建立时的回调操作
int retryDelayMs_; // 重试连接的延迟时间(毫秒)
};
  • Connector.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
#include "Connector.h"

#include <assert.h>
#include <error.h>
#include <sys/socket.h>
#include <unistd.h>

#include <chrono>
#include <thread>

#include "Channel.h"
#include "EventLoop.h"
#include "Logger.h"
#include "SocketsOps.h"

// 定义初始重试延迟时间(毫秒)
const int Connector::kInitRetryDelayMs = 500;
// 定义最大重试延迟时间(毫秒)
const int Connector::kMaxRetryDelayMs = 30 * 1000;

// 构造函数
Connector::Connector(EventLoop* loop, const InetAddress& serverAddr)
: loop_(loop), serverAddr_(serverAddr), connect_(false), state_(kDisconnected), retryDelayMs_(kInitRetryDelayMs) {
// 打印日志信息
LOG_DEBUG("%s => create connector at %p", __PRETTY_FUNCTION__, this);
}

// 析构函数
Connector::~Connector() {
// 打印日志信息
LOG_DEBUG("%s => destruct connector at %p", __PRETTY_FUNCTION__, this);
}

// 设置有新连接建立时的回调操作类型定义
void Connector::setNewConnectionCallback(const NewConnectionCallback& cb) {
newConnectionCallback_ = cb;
}

// 启动连接器
void Connector::start() {
// 标记需要连接
connect_ = true;
// 唤醒 loop_ 对应的线程去启动连接器
loop_->runInLoop(std::bind(&Connector::startInLoop, this));
}

// 在 EventLoop 所处的线程上启动连接器
void Connector::startInLoop() {
loop_->assertInLoopThread();
assert(state_ == kDisconnected);
// 判断是否需要连接
if (connect_) {
// 发起连接操作
connect();
} else {
LOG_DEBUG("%s => do not connect");
}
}

// 停止连接器
void Connector::stop() {
// 标记不再连接
connect_ = false;
// 唤醒 loop_ 对应的线程去关闭连接器
loop_->queueInLoop(std::bind(&Connector::stopInLoop, this));
}

// 在 EventLoop 所处的线程上停止连接器
void Connector::stopInLoop() {
loop_->assertInLoopThread();
if (state_ == kConnecting) {
// 设置连接状态为已断开
setState(kDisconnected);
// 移除并重置 Channel
int sockfd = removeAndResetChannel();
// 重试连接
retry(sockfd);
}
}

// 重启连接器(必须在 EventLoop 所处的线程上执行)
void Connector::restart() {
loop_->assertInLoopThread();
// 设置连接状态
setState(kDisconnected);
// 重置重试延迟时间
retryDelayMs_ = kInitRetryDelayMs;
// 标记需要连接
connect_ = true;
// 启动连接器
startInLoop();
}

// 获取服务器地址
const InetAddress& Connector::serverAddress() const {
return serverAddr_;
}

// 设置连接状态
void Connector::setState(States s) {
state_ = s;
}

// 发起连接操作
void Connector::connect() {
// 创建非阻塞的 Socket
int sockfd = createNonblockingSocket();
// 连接 TCP 服务器
int ret = ::connect(sockfd, (sockaddr*)serverAddr_.getSockAddr(), sizeof(sockaddr_in));
// 处理连接结果
int savedErrno = (ret == 0) ? 0 : errno;
switch (savedErrno) {
case 0:
case EINPROGRESS:
case EINTR:
case EISCONN:
// 处理正在连接的 Socket
connecting(sockfd);
break;

case EAGAIN:
case EADDRINUSE:
case EADDRNOTAVAIL:
case ECONNREFUSED:
case ENETUNREACH:
// 重新连接
retry(sockfd);
break;

case EACCES:
case EPERM:
case EAFNOSUPPORT:
case EALREADY:
case EBADF:
case EFAULT:
case ENOTSOCK:
// 打印日志信息
LOG_ERROR("%s => connect error, errno:%d", __PRETTY_FUNCTION__, errno);
// 关闭连接
::close(sockfd);
break;

default:
// 打印日志信息
LOG_ERROR("%s => unexpected error, errno:%d", __PRETTY_FUNCTION__, errno);
// 关闭连接
::close(sockfd);
break;
}
}

// 处理正在连接的 Socket
void Connector::connecting(int sockfd) {
// 设置连接状态为正在连接
setState(kConnecting);
// 创建 Channel 并注册写事件和错误事件的回调操作
channel_.reset(new Channel(loop_, sockfd));
channel_->setWriteCallback(std::bind(&Connector::handleWrite, this));
channel_->setErrorCallback(std::bind(&Connector::handleError, this));
// Channel 开启监听 fd 上的写事件
channel_->enableWriting();
}

// 处理写事件
void Connector::handleWrite() {
// 打印日志信息
LOG_DEBUG("%s => state:%d", __PRETTY_FUNCTION__, state_);

if (state_ == kConnecting) {
// 移除并重置 Channel
int sockfd = removeAndResetChannel();
// 获取 Socket 错误码
int savedErrno = getSocketError(sockfd);

// 发生错误
if (savedErrno) {
// 打印日志信息
LOG_WARN("%s => SO_ERROR=%d", __PRETTY_FUNCTION__, savedErrno);
// 重新连接
retry(sockfd);
}
// 发生自连接
else if (isSelfConnect(sockfd)) {
// 打印日志信息
LOG_WARN("%s => self connect", __PRETTY_FUNCTION__);
// 重新连接
retry(sockfd);
}
// 连接成功
else {
// 设置连接状态为已连接
setState(kConnected);
// 判断是否需要连接
if (connect_) {
// 需要连接,执行有新连接建立时的回调操作
newConnectionCallback_(sockfd);
} else {
// 不需要连接,关闭该连接
::close(sockfd);
}
}
} else {
assert(state_ == kDisconnected);
}
}

// 处理错误事件
void Connector::handleError() {
// 打印日志信息
LOG_ERROR("%s => occurred error, state:%d", __PRETTY_FUNCTION__, state_);

if (state_ == kConnecting) {
// 移除并重置 Channel
int sockfd = removeAndResetChannel();
// 获取 Socket 错误码
int savedErrno = getSocketError(sockfd);
// 打印日志信息
LOG_DEBUG("%s => SO_ERROR:%d", __PRETTY_FUNCTION__, savedErrno);
// 重新连接
retry(sockfd);
}
}

// 重试连接
void Connector::retry(int sockfd) {
// 关闭连接
::close(sockfd);

// 设置连接状态
setState(kDisconnected);

// 判断是否需要连接
if (connect_) {
// 获取当前的重试延迟时间
int delay = retryDelayMs_;

// 获取 shared_ptr 指向的自身对象
auto self = shared_from_this();

// 打印日志信息
LOG_INFO("%s => retry connecting to %s in %d milliseconds", __PRETTY_FUNCTION__, serverAddr_.toIpPort().c_str(),
delay);

// 在一个独立的线程中等待一段时间后启动连接器
std::thread([self, delay]() {
// 等待一段时间
std::this_thread::sleep_for(std::chrono::milliseconds(delay));
// 唤醒 loop_ 对应的线程去启动连接器
self->loop_->queueInLoop([self]() { self->startInLoop(); });
}).detach();

// 指数退避算法,增加重试延迟时间
retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);
} else {
LOG_DEBUG("%s => do not connect", __PRETTY_FUNCTION__);
}
}

// 移除并重置 Channel
int Connector::removeAndResetChannel() {
// 禁用 Channel 的所有事件监听
channel_->disableAll();
// 从 Poller 中删除 Channel
channel_->remove();
// 获取 Channel 对应的 sockfd
int sockfd = channel_->fd();
// 唤醒 loop_ 对应的线程去重置 Channel
loop_->queueInLoop(std::bind(&Connector::resetChannel, this));
return sockfd;
}

// 重置 Channel
void Connector::resetChannel() {
channel_.reset();
}

TcpClient

  • TcpClient.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#pragma once

#include <atomic>
#include <mutex>

#include "EventLoop.h"
#include "TcpConnection.h"
#include "noncopyable.h"

// 类前置声明
class Connector;

// TCP 连接器智能指针类型定义
using ConnectorPtr = std::shared_ptr<Connector>;

// TCP 客户端
class TcpClient : noncopyable {
public:
// 构造函数
TcpClient(EventLoop* loop, const InetAddress& serverAddr, const std::string& nameArg);

// 析构函数
~TcpClient();

// 发起连接
void connect();

// 断开连接
void disconnect();

// 关闭客户端
void stop();

// 获取当前的 TCP 连接
TcpConnectionPtr connection();

// 获取事件循环
EventLoop* getLoop() const;

// 是否允许重试连接
bool retry() const;

// 允许重试连接
void enableRetry();

// 获取客户端名称
const std::string& name() const;

// 设置连接建立/关闭时的回调操作
void setConnectionCallback(ConnectionCallback cb);

// 设置有数据到来时的回调操作
void setMessageCallback(MessageCallback cb);

// 设置数据发送完成时的回调操作
void setWriteCompleteCallback(WriteCompleteCallback cb);

private:
// 创建新连接
void newConnection(int sockfd);

// 移除连接
void removeConnection(const TcpConnectionPtr& conn);

EventLoop* loop_; // 事件循环
ConnectorPtr connector_; // 连接器
const std::string name_; // 客户端名称
ConnectionCallback connectionCallback_; // 连接建立/关闭时的回调操作
MessageCallback messageCallback_; // 有数据到来时的回调操作
WriteCompleteCallback writeCompleteCallback_; // 数据发送完成时的回调操作
std::atomic_bool retry_; // 是否允许重试连接(即断线重连)
std::atomic_bool connect_; // 是否需要连接
int nextConnId_; // 下一个 TCP 连接的 ID
std::mutex mutex_; // 互斥锁
TcpConnectionPtr connection_; // TCP 连接
};
  • TcpClient.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
#include "TcpClient.h"

#include <assert.h>

#include <chrono>
#include <functional>
#include <memory>
#include <thread>

#include "Connector.h"
#include "EventLoop.h"
#include "Logger.h"
#include "SocketsOps.h"

// 检查 EventLoop 指针是否为空
static EventLoop* CheckLoopNotNull(EventLoop* loop) {
if (loop == nullptr) {
LOG_FATAL("%s => eventloop is null", __PRETTY_FUNCTION__);
}
return loop;
}

namespace detail {

// 移除 TCP 连接
void removeConnection(EventLoop* loop, const TcpConnectionPtr& conn) {
loop->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
}

// 移除连接器
void removeConnector(const ConnectorPtr& connector) {
}

} // namespace detail

// 构造函数
TcpClient::TcpClient(EventLoop* loop, const InetAddress& serverAddr, const std::string& nameArg)
: loop_(CheckLoopNotNull(loop)),
connector_(new Connector(loop_, serverAddr)),
name_(nameArg),
connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),
retry_(false),
connect_(true),
nextConnId_(1) {
// 设置有新连接建立时的回调操作
connector_->setNewConnectionCallback(std::bind(&TcpClient::newConnection, this, std::placeholders::_1));
// 打印日志信息
LOG_INFO("%s => crate tcp client [%s] - connector %p", __PRETTY_FUNCTION__, name_.c_str(), connector_.get());
}

// 析构函数
TcpClient::~TcpClient() {
// 打印日志信息
LOG_INFO("%s => destruct tcp client [%s] - connector %p", __PRETTY_FUNCTION__, name_.c_str(), connector_.get());

// 获取当前 TcpConnection 的智能指针副本,并判断它是否是唯一拥有者
TcpConnectionPtr conn;
bool unique = true;
{
std::unique_lock<std::mutex> lock(mutex_);
unique = connection_.unique();
conn = connection_;
}

if (conn) {
assert(loop_ == conn->getLoop());
// 设置 TCP 连接关闭时的回调操作
CloseCallback cb = std::bind(&detail::removeConnection, loop_, std::placeholders::_1);
loop_->runInLoop(std::bind(&TcpConnection::setCloseCallback, conn, cb));
// 如果 TCP 连接唯一
if (unique) {
// 强制关闭 TCP 连接
conn->forceClose();
}
} else {
// 关闭连接器
connector_->stop();
// 获取当前的连接器
auto connector = connector_;
// 唤醒 loop_ 所在的线程去移除连接器
loop_->runInLoop([connector]() { detail::removeConnector(connector); });
}
}

// 发起连接
void TcpClient::connect() {
// 打印日志信息
LOG_INFO("%s => connect to %s", __PRETTY_FUNCTION__, connector_->serverAddress().toIpPort().c_str());
// 标记需要连接
connect_ = true;
// 启动连接器
connector_->start();
}

// 断开连接
void TcpClient::disconnect() {
// 标记不需要连接
connect_ = false;
// 关闭当前 TCP 连接
{
std::unique_lock<std::mutex> lock(mutex_);
if (connection_) {
connection_->shutdown();
}
}
}

// 关闭客户端
void TcpClient::stop() {
// 标记不需要连接
connect_ = false;
// 关闭连接器
connector_->stop();
}

// 获取当前的 TCP 连接
TcpConnectionPtr TcpClient::connection() {
std::unique_lock<std::mutex> lock(mutex_);
return connection_;
}

// 获取事件循环
EventLoop* TcpClient::getLoop() const {
return loop_;
}

// 是否允许重试连接
bool TcpClient::retry() const {
return retry_;
}

// 允许重试连接
void TcpClient::enableRetry() {
retry_ = true;
}

// 获取客户端名称
const std::string& TcpClient::name() const {
return name_;
}

// 设置连接建立/关闭时的回调操作
void TcpClient::setConnectionCallback(ConnectionCallback cb) {
connectionCallback_ = std::move(cb);
}

// 设置有数据到来时的回调操作
void TcpClient::setMessageCallback(MessageCallback cb) {
messageCallback_ = std::move(cb);
}

// 设置数据发送完成时的回调操作
void TcpClient::setWriteCompleteCallback(WriteCompleteCallback cb) {
writeCompleteCallback_ = std::move(cb);
}

// 创建新连接
void TcpClient::newConnection(int sockfd) {
loop_->assertInLoopThread();

// 远端地址
InetAddress peerAddr(getPeerAddr(sockfd));

// 拼接 TCP 连接的名称
char buf[32] = {0};
snprintf(buf, sizeof buf, ":%s#%d", peerAddr.toIpPort().c_str(), nextConnId_);
++nextConnId_;
std::string connName = name_ + buf;

// 本端地址
InetAddress localAddr(getLocalAddr(sockfd));

// 创建 TCP 连接对象
TcpConnectionPtr conn(new TcpConnection(loop_, connName, sockfd, localAddr, peerAddr));

// 设置 TCP 连接的回调操作
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(std::bind(&TcpClient::removeConnection, this, std::placeholders::_1));

// 设置当前的 TCP 连接
{
std::unique_lock<std::mutex> lock(mutex_);
connection_ = conn;
}

// 建立连接
conn->connectEstablished();
}

// 移除连接
void TcpClient::removeConnection(const TcpConnectionPtr& conn) {
loop_->assertInLoopThread();
assert(loop_ == conn->getLoop());

// 重置当前的 TCP 连接
{
std::unique_lock<std::mutex> lock(mutex_);
assert(connection_ == conn);
connection_.reset();
}

// 唤醒 loop_ 所在的线程去销毁 TCP 连接
loop_->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));

// 如果允许重试连接,且需要连接
if (retry_ && connect_) {
// 打印日志信息
LOG_INFO("%s => tcp client [%s] reconnecting to %s", __PRETTY_FUNCTION__, name_.c_str(),
connector_->serverAddress().toIpPort().c_str());
// 重启连接器
connector_->restart();
}
}

项目测试

测试代码

  • ChatClient.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 基于 MyMuduo 网络库开发 TCP 客户端程序
*/

#pragma once

#include <iostream>

#include "TcpClient.h"

// 聊天客户端
class ChatClient {
public:
// 构造函数
ChatClient(EventLoop* loop, const InetAddress& serverAddr, const std::string& nameArg);

// 析构函数
~ChatClient();

// 连接服务器
void connect();

private:
// 客户端绑定连接回调函数,当连接或者断开服务器时调用
void onConnection(const TcpConnectionPtr& conn);

// 客户端绑定消息回调函数,当有数据接收时调用
void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp time);

// TCP 客户端
TcpClient client_;

// EventLoop 事件循环
EventLoop* loop_;
};
  • ChatClient.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* 基于 MyMuduo 网络库开发 TCP 客户端程序
*/

#include "ChatClient.h"

#include "Logger.h"

// 构造函数
ChatClient::ChatClient(EventLoop* loop, const InetAddress& serverAddr, const std::string& nameArg)
: client_(loop, serverAddr, nameArg), loop_(loop) {
// 允许重试连接
client_.enableRetry();
// 设置客户端TCP连接的回调
client_.setConnectionCallback(std::bind(&ChatClient::onConnection, this, std::placeholders::_1));
// 设置客户端接收数据的回调
client_.setMessageCallback(
std::bind(&ChatClient::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
}

// 析构函数
ChatClient::~ChatClient() {
// 发起断开连接
client_.disconnect();
// 停止内部 Connector 的重连机制,避免异步行为
client_.stop();
}

// 连接服务器
void ChatClient::connect() {
client_.connect();
}

// 客户端绑定连接回调函数,当连接或者断开服务器时调用
void ChatClient::onConnection(const TcpConnectionPtr& conn) {
// 连接创建
if (conn->connected()) {
// 打印日志信息
LOG_INFO("ChatClient - new connection [%s] -> [%s], state: connected", conn->localAddress().toIpPort().c_str(),
conn->peerAddress().toIpPort().c_str());
// 发送消息
conn->send("I'm " + client_.name());
}
// 连接断开
else {
// 打印日志信息
LOG_INFO("ChatClient - close connection [%s] -> [%s], state: disconnected",
conn->localAddress().toIpPort().c_str(), conn->peerAddress().toIpPort().c_str());
}
}

// 客户端绑定消息回调函数,当有数据接收时调用
void ChatClient::onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp time) {
// 获取服务器发送的消息
std::string message = buf->retrieveAllAsString();

// 去掉消息末尾的 '\r' 和 '\n' 字符(nc 命令会发送 CRLF)
while (!message.empty() && (message.back() == '\n' || message.back() == '\r')) {
message.pop_back();
}

LOG_INFO("ChatClient - receive message: [%s], time: %s", message.c_str(), time.toString().c_str());
}
  • ChatServer.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 基于 MyMuduo 网络库开发 TCP 服务器程序
*/

#pragma once

#include <iostream>

#include "TcpServer.h"

// 聊天服务器
class ChatServer {
public:
// 构造函数
ChatServer(EventLoop *loop, const InetAddress &listenAddr, const std::string &nameArg);

// 析构函数
~ChatServer();

// 启动服务器
void start();

private:
// 处理用户的连接创建和断开
void onConnection(const TcpConnectionPtr &conn);

// 服务器绑定消息回调函数,当有数据接收时调用
void onMessage(const TcpConnectionPtr &conn, Buffer *buffer, Timestamp time);

// TCP 服务器
TcpServer server_;

// EventLoop 事件循环
EventLoop *loop_;
};
  • ChatServer.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* 基于 MyMuduo 网络库开发 TCP 服务器程序
*/

#include "ChatServer.h"

#include "Logger.h"

// 构造函数
ChatServer::ChatServer(EventLoop *loop, const InetAddress &listenAddr, const std::string &nameArg)
: server_(loop, listenAddr, nameArg), loop_(loop) {
// 设置服务器注册用户连接的创建和断开回调
server_.setConnectionCallback(std::bind(&ChatServer::onConnection, this, std::placeholders::_1));

// 设置服务器注册用户读写事件的回调
server_.setMessageCallback(
std::bind(&ChatServer::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

// 设置线程池的线程数量(比如:1个I/O线程,3个Worker线程)
server_.setThreadNum(4);
}

// 析构函数
ChatServer::~ChatServer() {
}

// 启动服务器
void ChatServer::start() {
// 开启事件循环处理
server_.start();
// 打印日志信息
LOG_INFO("ChatServer - start success, listening on %s", server_.ipPort().c_str());
}

// 处理用户的连接创建和断开
void ChatServer::onConnection(const TcpConnectionPtr &conn) {
// 连接创建
if (conn->connected()) {
LOG_INFO("ChatServer - Connection UP : %s", conn->peerAddress().toIpPort().c_str());
}
// 连接断开
else {
LOG_INFO("ChatServer - Connection DOWN : %s", conn->peerAddress().toIpPort().c_str());
}
}

// 处理用户读写事件(比如接收客户端发送的数据)
void ChatServer::onMessage(const TcpConnectionPtr &conn, Buffer *buffer, Timestamp time) {
// 获取客户端发送的消息
std::string message = buffer->retrieveAllAsString();

// 去掉消息末尾的 '\r' 和 '\n' 字符(telnet 命令会发送 CRLF)
while (!message.empty() && (message.back() == '\n' || message.back() == '\r')) {
message.pop_back();
}

// 打印日志信息
LOG_INFO("ChatServer - receive message: [%s], time: %s, ip: %s", message.c_str(), time.toString().c_str(),
conn->peerAddress().toIpPort().c_str());

// 发送数据给客户端
conn->send("You just said: " + message + "\n");
}
  • main.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* MyMuduo 网络库的使用案例
*
* Linux 上运行程序:./bin/mymuduo_example
*/

#include <chrono>
#include <iostream>
#include <thread>

#include "ChatClient.h"
#include "ChatServer.h"
#include "Logger.h"

// 启动聊天服务器
void startChatServer() {
// 创建服务器
EventLoop loop;
InetAddress addr(6000, "127.0.0.1");
ChatServer server(&loop, addr, "ChatServer");

// 启动服务器
server.start();

// 以阻塞方式等待新客户端的连接、已连接客户端的读写事件等
loop.loop();
}

// 启动聊天客户端
void startChatClient() {
// 创建客户端
EventLoop loop;
InetAddress addr(6000, "127.0.0.1");
ChatClient client(&loop, addr, "ChatClient");

// 连接服务器
client.connect();

// 以阻塞方式等待服务器发送过来的数据
loop.loop();
}

int main() {
// 设置日志级别
Logger::instance().setLogLevel(LogLevel::INFO);

// 在独立的线程上启动聊天服务器
std::thread serverThread([]() { startChatServer(); });
serverThread.detach();

// 等待一段时间,让聊天服务器先启动(可选,因为聊天客户端会自动重连)
std::this_thread::sleep_for(std::chrono::milliseconds(200));

// 在独立的线程上启动聊天客户端
std::thread clientThrad([]() { startChatClient(); });
clientThrad.detach();

// 阻塞等待用户按下任意键,然后结束程序运行
getchar();

return 0;
}

测试步骤

  • 编译项目代码
1
2
3
4
5
# 进入项目根目录
cd c++-project-mymuduo

# 执行项目自动构建脚本
./autobuild.sh
  • 运行测试程序
1
2
3
4
5
# 执行 MyMuduo 网络库使用案例的可执行文件
./bin/mymuduo_example

# 执行 telnet 命令连接 TCP 服务器(成功连接后,输入任意字符,按回车键即可发送消息给服务器,之后服务器会返回相应的消息内容)
telnet 127.0.0.1 6000
  • 测试程序输出的日志信息如下:
1
2
3
4
5
6
7
2025-11-15 22:10:01 => 6609 [INFO] ChatServer - start success, listening on 127.0.0.1:6000
2025-11-15 22:10:01 => 6614 [INFO] TcpClient::TcpClient(EventLoop*, const InetAddress&, const std::string&) => crate tcp client [ChatClient] - connector 0x7f52b8000e20
2025-11-15 22:10:01 => 6614 [INFO] void TcpClient::connect() => connect to 127.0.0.1:6000
2025-11-15 22:10:01 => 6614 [INFO] ChatClient - new connection [127.0.0.1:42170] -> [127.0.0.1:6000], state: connected
2025-11-15 22:10:01 => 6610 [INFO] ChatServer - Connection UP : 127.0.0.1:42170
2025-11-15 22:10:01 => 6610 [INFO] ChatServer - receive message: [I'm ChatClient], time: 2025-11-15 22:10:01, ip: 127.0.0.1:42170
2025-11-15 22:10:01 => 6614 [INFO] ChatClient - receive message: [You just said: I'm ChatClient], time: 2025-11-15 22:10:01

项目扩展

上面的 MyMuduo 网络库代码只实现了 Muduo 的核心功能,并不支持 Muduo 的定时事件机制(TimerQueue)、IPV6 / DNS / HTTP / RPC 协议等,日后可以从以下几方面继续对其进行扩展:

  • (1) 定时事件机制

    • TimerQueue:支持 EventLoop 内的定时任务调度,常见实现方式包括:
      • 链表队列:实现简单,但不适合大量定时器场景(需要线性扫描)。
      • 红黑树(如 nginx):按照到期时间排序,可快速找到最早到期的定时器,插入 / 删除的时间复杂度为 O(logN)
      • 时间轮(如 libevent):适合大量、定时精度要求不高的场景,插入 / 删除的时间复杂度为 O(1),整体性能出色。
  • (2) IPV6 / DNS / HTTP / RPC 协议支持

    • IPV6:支持 IPv6 套接字、地址解析与双栈接入,确保网络库的所有连接与事件处理流程均可透明兼容 IPv6。
    • DNS:实现异步域名解析(如 getaddrinfo_a),将域名解析和网络事件循环结合,避免阻塞 I/O。
    • HTTP:构建基础的 HTTP 请求解析、响应封装,可扩展为简单的 Web 服务器或客户端;需要支持 Keep-Alive、Chunked 等机制。
    • RPC:在已有 TCP 框架上封装请求 / 响应协议,实现序列化、服务注册、方法调用、超时与重试等功能(可仿照 gRPC 实现)。
  • (3) 服务器性能测试

    • 为了验证网络库的性能,需要进行专业的性能压测和系统配置优化:
    • 系统性能优化
      • Linux 最大文件描述符数设置:包括
        • /proc/sys/fs/file-max(系统级限制)
        • /etc/security/limits.conf(用户 / 进程级限制)
        • ulimit -n(当前会话限制)
    • 性能测试工具
      • JMeter:可压测 HTTP 服务与自定义 TCP 服务,能够生成聚合报告和可视化图表。
      • wrk:高性能 HTTP 压测工具,支持多线程 + epoll,需要手动编译安装,仅支持 HTTP 协议。

项目问答

新 TCP 连接的派发问题

在 Muduo 网络库中,mainLoop 是如何将新来的 TCP 连接派发给 subLoop 的,同时还让新 TCP 连接的所有 I/O 事件回调操作都在 subLoop 所在的线程上执行?

  • (1) Acceptor 在 mainLoop(运行在主线程)上监听 listenfd
  • (2) mainLoop 在收到新连接事件时,会调用 Acceptor::handleRead(),得到 connfd(新连接的文件描述符)
  • (3) mainLoop 选择一个 subLoop(通过 EventLoopThreadPool 的轮询)
  • (4) mainLoop 创建 TcpConnection,并把它的所有回调操作注册到 subLoop
  • (5) mainLoop 调用 subLoop->runInLoop(),将注册 connfd 读写事件到 subLoop 的 Poller 的任务丢给 subLoop
  • (6) subLoop 线程最终向自己的 Poller 注册事件,使得 connfd 的所有读写事件(包括 I/O 事件、回调处理等)永远在 subLoop 上执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Acceptor::listen()
| |
| 1-件 |
v
Acceptor::handleRead()
| |
| 2-) |
v
TcpServer::newConnection(connfd)
| |
| 3-) |
v
选中一个 subLoop (ioLoop)
| |
| 4-r |
v
创建 TcpConnection(subLoop)
| |
| 5-去 |
v
subLoop->runInLoop(connectEstablished)
| |
| 6-p |
v
----------------------------------------------------
↓ subLoop (I/O 线程) 被唤醒后执行 connectEstablished()
----------------------------------------------------
| |
| 7-行 |
v
channel_->enableReading()
| |
| 8-) |
v
事件到来 → Poller 触发 → 执行 TcpConnection 的回调操作 (全部都会在 subLoop 线程上执行)

特别注意

  • 在 Muduo 中,新连接的建立仅发生在 mainLoop:它负责监听 listenfd,并在有新连接到来时调用 accept()。mainLoop 只负责接受连接,不参与任何与该连接相关的后续 I/O 操作(读和写等)。在 mainLoop 完成 accept() 后,Muduo 会将得到的新连接文件描述符 connfd 分发给某个 subLoop(由 EventLoopThreadPool 按轮询算法选择)。之后,该新连接的所有读写事件(包括 I/O 事件、回调处理等)都由对应的 subLoop 独立处理,与 mainLoop 无关。

EventLoop 之间的通信问题

mainLoop 与 subLoop 分别运行在不同的线程上,它们之间是如何进行通信的,也就是说 mainLoop 是如何将新来的 TCP 连接派发给 subLoop 的,还有 mainLoop 是如何唤醒 subLoop 的?

  • (1) mainLoop 与 subLoop 分别运行在不同线程中,每个 EventLoop 拥有自己独立的线程与 Poller。
  • (2) 它们之间通过 EventLoop 的异步任务队列(pendingFunctors)进行通信,任何跨线程的操作,都会封装成回调函数投递到目标 EventLoop 的异步任务队列中。
  • (3) mainLoop 接收(accept())到新连接后,调用 subLoop->runInLoop(),将 TcpConnection 的初始化任务(如 connectEstablished())投递给指定的 subLoop 执行。
  • (4) mainLoop 向 subLoop 的任务队列中插入新任务后,会向 subLoop 的 wakeupFd 写入一个字节,目的是唤醒 subLoop 去执行 pendingFunctors 队列中的任务。
  • (5) 写入 wakeupFd 会触发 subLoop 的 wakeupChannel 可读事件,wakeupChannel 是注册在 subLoop 上的一个 Channel,用来专门处理 “被唤醒” 事件。
  • (6) 被唤醒的 subLoop 从阻塞的 epoll_wait() 中立即返回,然后执行 wakeupChannel 的读事件回调。
  • (7) subLoop 随后继续执行其 pendingFunctors 队列中的任务,包括由 mainLoop 投递过来的 TcpConnection 初始化操作。
  • (8) 从此以后,该 TcpConnection 的所有 I/O 事件都由该 subLoop 负责处理,包括读写事件回调、关闭回调、错误回调等全部在 subLoop 所在线程执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
Acceptor::listen()  
| |
| 1-件 |
v
Acceptor::handleRead()
| |
| 2-) |
v
TcpServer::newConnection(connfd)
| |
| 3-p |
v
EventLoopThreadPool::getNextLoop()
| |
| 4-p |
v
new TcpConnection(subLoop, connfd)
| |
| 5-p |
v
subLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn))
| |
| 6-) |
v
EventLoop::queueInLoop(cb)
| |
| 7-p |
v
EventLoop::wakeup()
| |
| 8-件 |
v
----------------------------------------------
↓ subLoop 所在线程(I/O 线程)被唤醒执行
----------------------------------------------
| |
| 9-件 |
v
wakeupChannel->handleEvent()
| |
| 1-) |
v
EventLoop::handleRead()
| |
| 1-务 |
v
EventLoop::doPendingFunctors()
| |
| 1-) |
v
TcpConnection::connectEstablished()
| |
| 1-r |
v
Channel::enableReading()
| |
| 1-r |
v
Poller::updateChannel(channel)
| |
| 1-发 |
v
事件到来 → Poller 触发 → 返回活跃事件 → 调用 channel->handleEvent() 处理活跃事件
| |
| 1-) |
v
回调操作在 subLoop 线程执行,保证线程安全

参考资料