C++ 实现 RPC 分布式网络通信框架

大纲

前言

本文将基于 C++ 17 + Muduo 高性能网络库 + Protobuf 开发一个 RPC 框架,并使用中间件 ZooKeeper 作为注册中心。

开发工具列表

软件版本说明
C++ 标准17高版本的 Protobuf 库依赖 C++ 17
Boost1.74.0.3Muduo 库依赖 Boost 库
Muduo2.0.3Muduo 库,基于 C++ 开发,用于网络编程
Protobuf31.1Protobuf 库,核心代码主要是用 C++ 开发
ZooKeeper C API3.8.0ZooKeeper 提供的原生 C API,基于 C 语言开发
G++(GCC)12.2.0建议使用 9 版本的 G++(GCC) 编译器
CMake3.25.1C/C++ 项目构建工具
ZooKeeper3.8.0ZooKeeper 服务器
LinuxDebian 12Muduo 库不支持 Windows 平台
Visual Studio Code1.100.2使用 VSCode 远程开发特性

平台兼容性说明

由于使用了 Muduo 库,且 Muduo 库仅支持 Linux 平台;因此本文提供的所有 RPC 框架代码支持在 Linux 平台运行,不支持 Windows 平台,默认是基于 Debian 12 进行远程开发。

开源 RPC 框架

大厂开源框架

  • gRPC 是一个由 Google 开源、基于 HTTP/2 和 Protocol Buffers 的高性能 RPC 框架,使用 C++ 开发,支持多种语言(如 Go、Java、Python 等)。
  • bRPC 是百度开源的高性能、可扩展的 RPC 框架,主要用于服务器间通信,采用 C++ 编写,广泛应用于百度内部大规模分布式系统中。
  • phxrpc 是腾讯微信后台团队推出的一个非常简洁小巧的 RPC 框架,编译生成的库只有 450K。

个人开源框架

  • tinyrpc - c++ async rpc framework. 14w+ qps.
  • rocket - c++ rpc framework, simplified version of tinyrpc.
  • rest_rpc - modern c++, simple, easy to use rpc framework.
  • rpclib - a modern c++ msgpack-RPC server and client library.
  • TarsCpp - c++ language framework rpc source code implementation.

基础概念

集群和分布式

  • 集群

    • 概念
      • 每一台服务器独立运行一个工程的所有模块。
    • 优点
      • 部署简单:每台机器部署一样的工程,维护相对简单。
      • 容错性强:某台机器宕机,其它机器可以顶上,保证服务连续。
      • 扩展方便:增加机器即可横向扩展,提高并发处理能力。
      • 负载均衡容易:前端加个负载均衡器即可实现请求均衡分发。
    • 缺点
      • 资源浪费:每台机器都部署所有模块,某些模块可能资源使用率低。
      • 维护更新不灵活:改动一个模块需要重新部署整个工程。
      • 扩展粒度粗:不能单独扩展某个压力大的模块(有些模块是 CPU 密集型,有些模块是 I/O 密集型),只能整体扩容。
  • 分布式

    • 概念
      • 一个工程拆分了很多模块,每一个模块独立部署运行在一个服务器主机上,所有服务器协同工作共同提供服务。
      • 每一台服务器称作分布式的一个节点,根据节点的并发要求,对一个节点可以再做节点模块集群部署。
    • 优点
      • 资源利用率高:按模块实际资源需求进行部署,提高系统整体资源使用效率。
      • 扩展灵活:哪个模块压力大就单独扩展它,不影响其它模块。
      • 服务解耦:模块之间独立部署、独立维护,开发和运维更灵活。
      • 技术异构性好:不同模块可以使用不同的技术栈,更自由地选择工具。
    • 缺点
      • 系统复杂度高:涉及服务间通信、分布式事务、网络延迟等问题。
      • 开发运维成本高:部署、调试、监控、容错等方面的技术要求更高。
      • 依赖网络稳定性:模块之间通过网络通信,一旦网络出问题可能会引发级联故障。

特别注意

在企业的生产环境中,集群和分布式是并存的,两者并不是分开的。

RPC 通信原理

RPC(Remote Procedure Call)远程过程调用的工作流程如下图所示:

  • 黄色部分:设计 RPC 方法参数的打包和解析,也就是数据的序列化和反序列化,可以使用 Protobuf 实现。
  • 绿色部分:网络通信部分,包括寻找 RPC 服务主机、发起 RPC 调用请求和响应 RPC 调用结果,可以使用 Muduo 网络库和 ZooKeeper(用于服务注册与发现)实现 。

网络 I/O 模型

主流的网络 I/O 模型有以下几种,Muduo 采用的是第四种(reactors in threads - one loop per thread)。

  • (1) accept + read/write

    • 不适用于并发服务器
  • (2) accept + fork - process-pre-connection

    • 适合并发连接数不大,计算任务工作量大于 Fork 的开销。
  • (3) accept + thread - thread-pre-connection

    • 比第二种网络 I/O 模型的开销小了一点,但是并发造成的线程堆积过多。
  • (4) reactors in threads - one loop per thread

    • 这是 Muduo 库的网络设计方案,底层实质上是基于 Linux 的 epoll + pthread 线程池实现,且依赖了 Boost 库,适用于并发连接数较大的场景。
    • 有一个 Main Reactor 负载 Accept 连接,然后将连接分发给某个 SubReactor(采用轮询的方式来选择 SubReactor),该连接的所用操作都在那个 SubReactor 所处的线程中完成。多个连接可能被分派到多个线程中被处理,以充分利用 CPU。
    • 有一个 Base I/O Thread 负责 Accept 新的连接,接收到新的连接以后,使用轮询的方式在 Reactor Pool 中找到合适的 SubReactor 将这个连接挂载上去,这个连接上的所有任务都在这个 SubReactor 所处的线程中完成。
    • Reactor Poll 的大小是固定的,根据 CPU 的核心数量来确定。如果有过多的耗费 CPU 资源的计算任务,可以提交到 ThreadPool 线程池中专门处理耗时的计算任务。
  • (5) reactors in process - one loop pre process

    • 这是 Nginx 服务器的网络设计方案,基于进程设计,采用多个 Reactors 充当 I/O 进程和工作进程,通过一个 accept 锁,完美解决多个 Reactors 之间的 “惊群现象”。

框架介绍

本文基于 C++ 实现 RPC 框架时,使用到以下框架,主要包括 Muduo、Protobuf、ZooKeeper C API。

Muduo 的简介

Muduo 是一个用 C++ 编写的高性能、基于事件驱动的网络库,专门设计用于构建 Linux 下高并发、低延迟的网络服务,特别适合开发分布式系统、微服务、消息中间件、网络游戏服务器等后端程序。

  • 核心特性

    • 基于事件驱动模型:使用 Reactor 模式,即单线程 I/O + 多线程计算。
    • 高性能:使用 epoll I/O 多路复用技术、非阻塞 I/O、零内存拷贝技术。
    • 线程安全:网络部分是线程安全的,使用线程池和回调。
    • C++ 11 标准:需要使用支持 C++ 11 的编译器。
    • 仅支持 Linux 平台:利用 Linux 特性优化性能,不支持跨平台。
    • 可组合性强:解耦的模块设计,便于扩展和组合。
  • 核心模块

    • base(基础模块)
      • 非网络相关的通用工具
      • 如线程池、时间戳、日志系统、原子操作等
    • net(网络模块)
      • TCP 服务器 / 客户端模型
      • Reactor 事件分发器
      • Buffer、Channel、EventLoop、TcpConnection 等核心组件
  • 核心组件

    • EventLoop
      • 事件循环,是每个线程的核心对象
      • 封装了 epoll 库,处理文件描述符的读写事件
    • Channel
      • 表示一个 fd(文件描述符)及其感兴趣的事件(如读写)
      • 是 EventLoop 与具体 I/O 事件之间的桥梁
    • Poller
      • 封装 epollpoll 的接口(Muduo 默认用 epoll
    • TcpServer / TcpClient
      • 高层封装,简化服务端和客户端的使用
      • 支持多线程连接处理
    • Callback 机制
      • 所有 I/O 事件都通过用户注册的回调函数处理(高扩展性)
  • 性能优势

    • 完全采用非阻塞、异步 I/O 模型
    • 使用智能指针管理资源(如 std::shared_ptr<TcpConnection>
    • 零内存拷贝的数据缓冲机制(Buffer)
    • 合理利用多线程资源(EventLoopThreadPool)
  • 适用场景

    • 高并发 TCP 服务器(如 Redis、MQTT、游戏网关)
    • 微服务通信框架(可自定义通信协议)
    • 高性能 HTTP 服务(支持 HTTP 1.0/1.1)
    • 自研 RPC 系统

平台兼容性

  • Muduo 库只支持 Linux 平台,不兼容 Windows 平台,因为其底层使用了 Linux 平台的 pthreadepoll

Protobuf 的简介

  • Protocol Buffers(简称 Protobuf)是 Google 提出的一种高效、可扩展的结构化数据序列化格式,用于数据交换。它独立于平台和编程语言,具有良好的跨平台兼容性和扩展性。

  • Google 为多种主流编程语言提供了 Protobuf 的官方实现,包括 Java、C#、C++、Go 和 Python 等。每种语言的实现都包含相应的编译器插件(protoc)和运行时库,使得开发者可以在不同语言间无缝进行数据通信。

  • 由于 Protobuf 采用紧凑的二进制编码格式,其序列化和反序列化效率远高于基于文本的格式。相比 XML,Protobuf 的传输效率可提高约 20 倍;相比 JSON,也有近 10 倍的性能提升。这使得它特别适用于对性能要求高的场景。

  • Protobuf 广泛应用于分布式系统间的数据通信、异构平台的数据交换,也适合用作网络传输协议的数据格式、高效配置文件的载体、或用于数据持久化存储。作为一种兼具效率与可维护性的序列化方案,Protobuf 在大规模系统设计中具有极高的实用价值。

ZooKeeper C API 的简介

ZooKeeper C API 是 ZooKeeper 提供的一套原生 C 语言接口,它提供两个版本,包括 libzookeeper_mt(多线程版)和 libzookeeper_st(单线程版),可以用于 C/C++ 编写客户端程序,通常使用的是 libzookeeper_mt(多线程版)。

准备工作

安装 Boost 库

1
2
3
4
5
# 安装 Boost 的所有组件和头文件
sudo apt-get install -y libboost-all-dev

# 查看 Bootst 版本
sudo dpkg -s libboost-all-dev | grep Version

提示

由于 Muduo 使用了 Boost 库(如 boost::any),因此需要安装 Boost 库。

安裝 Muduo 库

  • 编译安装 Muduo 库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Git 克隆代码
git clone https://github.com/chenshuo/muduo.git

# 进入代码目录
cd muduo

# 创建构建目录
mkdir -p build

# 进入构建目录
cd build

# 生成构建文件
cmake ..

# 编译源码
make -j$(nproc)

# 执行安装
sudo make install

# 更新系统的共享库缓存
sudo ldconfig /usr/local/lib/
  • 验证安装
1
2
3
4
5
# 查看 Muduo 库的头文件
ls -al /usr/local/include/muduo

# 查看 Muduo 库的静态库
ls -al /usr/local/lib | grep muduo

提示

  • Muduo 的编译依赖 CMake 和 Boost 库,默认编译生成的是静态库(.a),如果需要编译生成共享库(.so),可以自行修改 CMakeLists.txt 中的配置。
  • Muduo 支持 C++ 11,仅支持 Linux 平台,不支持 Windows 平台,建议使用 7.x 及以后版本的 g++ 编译器。

安装 Protobuf 库

提示

  • Protobuf 各个版本的源码包可以从 GitHub Release 下载得到。
  • Protobuf 从 3.21 版本开始,Google 官方已经弃用了 autogen.shconfigure 构建系统,转而使用 CMake 作为主要构建系统。
  • Protobuf 从源码编译后,默认只会生成 .a 静态库文件,若希望生成 .so 动态库文件,需要在编译时添加 CMake 参数 -DBUILD_SHARED_LIBS=ON,加上参数后只会生成 .so 动态库文件。
  • 安装依赖包
1
sudo apt-get -y install cmake g++ make git wget
  • 编译安装 Protobuf 库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 下载源码
wget https://github.com/protocolbuffers/protobuf/archive/refs/tags/v31.1.tar.gz -O protobuf-v31.1.tar.gz

# 解压源码
tar -xvf protobuf-v31.1.tar.gz

# 进入解压目录
cd protobuf-31.1

# 初始化子模块
git init && git submodule update --init --recursive

# 创建构建目录
mkdir build

# 进入构建目录
cd build

# 生成构建文件(Makefile)
cmake .. -DBUILD_SHARED_LIBS=ON -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=/usr/local

# 编译源码(耗时较长)
make -j2

# 执行安装(包括可执行文件、头文件和库)
sudo make install

# 更新系统的共享库缓存
sudo ldconfig /usr/local/lib/
  • 验证 Protobuf 库安装
1
2
3
4
5
6
7
8
# 查看 Protobuf 库的版本
protoc --version

# 查看 Protobuf 库的头文件
ls -al /usr/local/include/google/protobuf

# 查看 Protobuf 库的动态库
ls -al /usr/local/lib/libproto*

安装 ZooKeeper 中间件

安装 ZooKeeper 服务器

  • 安装 ZooKeeper 服务器
1
2
# 安装 ZooKeeper 客户端和服务器
sudo apt install -y zookeeper zookeeperd
  • 添加 ZooKeeper 客户端的软链接
1
2
3
4
5
6
# 查看 ZooKeeper 客户端的安装位置
sudo dpkg -L zookeeper | grep -E 'zkCli.sh|zkEnv.sh'

# 创建 ZooKeeper 客户端的软链接(方便以后直接运行客户端)
sudo ln -s /usr/share/zookeeper/bin/zkCli.sh /usr/local/bin/zkCli.sh
sudo ln -s /usr/share/zookeeper/bin/zkEnv.sh /usr/local/bin/zkEnv.sh
  • 验证 ZooKeeper 服务器安装
1
2
3
4
5
# 查看 ZooKeeper 服务器的运行状态
sudo systemctl status zookeeper

# 通过 ZooKeeper 客户端连接 ZooKeeper 服务器
zkCli.sh -server 127.0.0.1:2181
  • ZooKeeper 服务器的安装信息
安装信息说明
默认的监听端口2181
默认的数据目录/tmp/zookeeper
默认配置文件的路径/etc/zookeeper/conf/zoo.cfg

安装 ZooKeeper C API 库

由于本文的 RPC 项目使用 ZooKeeper C API 库来实现 RPC 服务动态注册和发现,因此需要安装 ZooKeeper C API 库。值得一提的是,ZooKeeper 提供了一个 libzookeeper_mt(多线程版)或 libzookeeper_st(单线程版)原生 C API,可以用于 C/C++ 编写客户端程序,通常使用的是 libzookeeper_mt(多线程版)。在 Linux 系统上,除了可以通过 APT / YUM 包管理工具直接安装 ZooKeeper C API 库,还可以手动编译 ZooKeeper 的源码来安装 ZooKeeper C API 库,具体编译步骤为 .configure + make + makeinstall,默认会编译生成多线程版本的库(libzookeeper_mt.so)和单线程版的库(如 libzookeeper_st.so)。

  • 安装 ZooKeeper C API 库
1
2
3
4
5
# 安装 ZooKeeper 多线程版的开发包(单线程版本是:libzookeeper-st-dev)
sudo apt install -y libzookeeper-mt-dev

# 更新系统的共享库缓存
sudo ldconfig /usr/lib/
  • 验证 ZooKeeper C API 库安装
1
2
3
4
5
6
7
8
# 查看 ZooKeeper 的头文件
sudo dpkg -L libzookeeper-mt-dev | grep zookeeper.h

# 查看 ZooKeeper 的静态库
sudo dpkg -L libzookeeper-mt-dev | grep '\.a$'

# 查看 ZooKeeper 的动态库
sudo dpkg -L libzookeeper-mt-dev | grep '\.so'
  • ZooKeeper C API 库的安装信息
安装信息说明
头文件的路径/usr/include/zookeeper/zookeeper.h
静态库的路径/usr/lib/x86_64-linux-gnu/libzookeeper_mt.a
动态库的路径/usr/lib/x86_64-linux-gnu/libzookeeper_mt.so

使用原生 ZooKeeper C API 库的注意事项

  • (1) 设置监听 Watcher 是一次性的,监听事件触发后 Watch 会立即失效,ZooKeeper 不会自动重新注册。若希望持续监听同一个节点变化,需要在回调中手动再次注册 Watch。
  • (2) ZNode 节点仅支持存储简单的 byte 字节数组(最大 1MB),不具备结构化存储能力。如果需要存储结构体或对象,需要自行将其转换为字节数组,例如使用 JSON、Protobuf 等序列化方式进行编码与解码。

项目介绍

项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
c++-project-mprpc
├── CMakeLists.txt
├── README.md
├── autobuild.sh
├── bin
├── build
├── conf
│ └── rpc.conf
├── example
│ ├── CMakeLists.txt
│ ├── consumer
│ │ ├── CMakeLists.txt
│ │ └── rpcconsumer.cc
│ ├── generated
│ │ ├── friend.pb.cc
│ │ ├── friend.pb.h
│ │ ├── user.pb.cc
│ │ └── user.pb.h
│ ├── proto
│ │ ├── friend.proto
│ │ └── user.proto
│ └── provider
│ ├── CMakeLists.txt
│ └── rpcprovider.cc
├── lib
├── src
│ ├── CMakeLists.txt
│ ├── generated
│ │ ├── rpcheader.pb.cc
│ │ └── rpcheader.pb.h
│ ├── include
│ │ ├── lockqueue.h
│ │ ├── logger.h
│ │ ├── mprpccontext.h
│ │ ├── mprpcchannel.h
│ │ ├── mprpcconfig.h
│ │ ├── mprpccontroller.h
│ │ ├── mprpcprovider.h
│ │ ├── networkutil.h
│ │ └── zookeeperclient.h
│ ├── logger.cc
│ ├── mprpccontext.cc
│ ├── mprpcchannel.cc
│ ├── mprpcconfig.cc
│ ├── mprpccontroller.cc
│ ├── mprpcprovider.cc
│ ├── networkutil.cc
│ ├── proto
│ │ └── rpcheader.proto
│ └── zookeeperclient.cc
└── test
├── CMakeLists.txt
├── protobuf
│ ├── CMakeLists.txt
│ ├── generated
│ │ ├── addressbook.pb.cc
│ │ ├── addressbook.pb.h
│ │ ├── friendservice.pb.cc
│ │ ├── friendservice.pb.h
│ │ ├── groupservice.pb.cc
│ │ ├── groupservice.pb.h
│ │ ├── userservice.pb.cc
│ │ └── userservice.pb.h
│ ├── main.cc
│ └── proto
│ ├── addressbook.proto
│ ├── friendservice.proto
│ ├── groupservice.proto
│ └── userservice.proto
└── zookeeper
├── CMakeLists.txt
└── main.cc
目录名称目录说明
buildCMake 编译构建项目的目录(项目首次编译后才会有)
bin存放项目编译生成的可执行文件的目录(项目首次编译后才会有)
lib存放项目编译生成的 RPC 框架头文件和静态库的目录(项目首次编译后才会有)
conf存放 RPC 框架的配置文件
proto存放 Protobuf 的 .proto 协议文件
generated存放根据 .proto 协议文件生成的 C++ 头文件和源文件
srcRPC 框架源码
test第三方库的测试代码
test/protobuf/Protobuf 的测试代码
test/zookeeper/ZooKeeper 的测试代码
exampleRPC 框架的使用案例代码
example/providerRPC 框架(服务提供者)的使用案例代码
example/consumerRPC 框架(服务调用者)的使用案例代码
autobuild.sh项目一键编译构建的脚本文件

项目原理

项目技术栈

基于 C++ 开发 RPC 框架时,使用到以下技术:

  • 单例模式
  • Conf 配置文件读取
  • Muduo 网络库编程
  • CMake 构建项目集成编译环境
  • 异步日志记录(线程同步通信实现)
  • Protobuf 数据序列化和反序列化协议
  • ZooKeeper 分布式一致性协调服务应用以及编程

项目代码

RPC 框架核心代码

由于篇幅有限,下面只给出 RPC 框架项目的核心代码,完整的项目代码可以从 这里 获取得到。

网络工具

  • networkutil.h 头文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#pragma once

#include <arpa/inet.h>
#include <ifaddrs.h>
#include <net/if.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

#include <cstring>
#include <iostream>
#include <string>

// 网络工具类(单例对象)
class NetworkUtil {
public:
// 获取单例对象
static NetworkUtil& GetInstance();

// 获取本地的 IP 地址,可指定网络接口名称(比如 eth0)
std::string FindLocalIp(const std::string& network_interface = "");

// 获取一个未被占用的端口,可指定端口区间(比如 [7000, 9000])
int FindAvailablePort(int low = 7000, int high = 9000);

private:
// 构造函数
NetworkUtil() = default;

// 删除拷贝构造函数
NetworkUtil(const NetworkUtil&) = delete;

// 删除赋值运算符
NetworkUtil& operator=(const NetworkUtil&) = delete;
};
  • networkutil.cc 源文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#include "networkutil.h"

// 获取单例对象
NetworkUtil& NetworkUtil::GetInstance() {
// 局部静态变量(线程安全)
static NetworkUtil instance;
return instance;
}

// 获取本地的 IP 地址,可指定网络接口名称(比如 eth0)
std::string NetworkUtil::FindLocalIp(const std::string& network_interface) {
std::string result;

// 获取本地网络接口的信息链表
struct ifaddrs* ifaddr;
if (getifaddrs(&ifaddr) == -1) {
// Fallback 处理
return "127.0.0.1";
}

// 遍历本地网络接口的信息链表
for (auto* ifa = ifaddr; ifa; ifa = ifa->ifa_next) {
if (!ifa->ifa_addr || ifa->ifa_addr->sa_family != AF_INET) {
continue;
}

// 获取网络接口的名称
std::string name(ifa->ifa_name);

// 跳过 lo 接口(回环接口)
if (ifa->ifa_flags & IFF_LOOPBACK) {
continue;
}

// 如果指定了网络接口(如 eth0),且匹配上了,就直接使用
if (!network_interface.empty() && name == network_interface) {
char ip[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &((struct sockaddr_in*)ifa->ifa_addr)->sin_addr, ip, sizeof(ip));
result = ip;
break;
}

// 默认选择 eth0、ens33、enp0s3 等常见有线网卡
if (network_interface.empty() && (name.find("eth") == 0 || name.find("en") == 0)) {
char ip[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &((struct sockaddr_in*)ifa->ifa_addr)->sin_addr, ip, sizeof(ip));
result = ip;
break;
}
}

// 释放资源
freeifaddrs(ifaddr);

// 返回结果
return result.empty() ? "127.0.0.1" : result;
}

// 获取一个未被占用的端口,可指定端口区间(比如 [7000, 9000])
int NetworkUtil::FindAvailablePort(int low, int high) {
// 遍历指定区间内的所有端口
for (int port = low; port <= high; ++port) {
// 创建一个 IPv4 TCP Socket
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
// 创建 Socket 失败,尝试下一个端口
continue;
}

// 初始化 Socket 地址,绑定到任意本地地址(0.0.0.0)和当前端口
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY; // 0.0.0.0
addr.sin_port = htons(port);

// 设置 Socket 选项:SO_REUSEADDR 允许端口重复绑定(避免 TIME_WAIT 问题)
int opt = 1;
setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

// 尝试将 Socket 绑定到指定端口
if (bind(sockfd, (sockaddr*)&addr, sizeof(addr)) == 0) {
// 绑定成功,释放资源(因为这里只是探测端口是否可用)
close(sockfd);
// 返回找到的可用端口
return port;
}

// 端口绑定失败,说明端口已被占用或其他错误,释放资源
close(sockfd);
}

// 遍历完指定的端口区间,没有找到可用端口,返回 -1 表示失败
return -1;
}

异步日志记录

  • lockqueue.h 头文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#pragma once

#include <condition_variable>
#include <iostream>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>

// 异步写日志的日志队列(线程安全)
template <typename T>
class LockQueue {
public:
// 往队尾插入数据
void Push(const T& data) {
// 获取互斥锁
std::lock_guard<std::mutex> lock(m_mutex);

// 插入数据
m_queue.push(data);

// 唤醒日志写入线程去消费队列中的数据
m_condvariable.notify_all();
}

// 往队头弹出数据
T Pop() {
// 获取互斥锁
std::unique_lock<std::mutex> lock(m_mutex);

// 阻塞等待,直到队列不为空或者已退出
m_condvariable.wait(lock, [this]() { return !m_queue.empty() || m_exit; });

// 视业务逻辑而定,可以返回空数据或者抛出异常
if (m_exit && m_queue.empty()) {
return {};
}

// 获取队头元素
T data = m_queue.front();

// 弹出队头元素
m_queue.pop();

return data;
}

// 关闭队列
void Stop() {
// 获取互斥锁
std::lock_guard<std::mutex> lock(m_mutex);
// 设置退出标志
m_exit = true;
// 唤醒正在等待的日志写入线程
m_condvariable.notify_all();
}

// 获取退出标志
bool isExit() const {
return m_exit;
}

private:
std::mutex m_mutex; // 互斥锁
std::queue<T> m_queue; // 队列
std::condition_variable m_condvariable; // 条件变量
bool m_exit = false; // 退出标志,用于避免发生线程死锁
};
  • logger.h 头文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#pragma once

#include <string>

#include "lockqueue.h"

// 定义宏
#define LOG_DEBUG(logmsgformat, ...) \
do { \
Logger& logger = Logger::GetInstance(); \
if (logger.GetLogLevel() <= DEBUG) { \
char c[1024] = {0}; \
snprintf(c, 1024, logmsgformat, ##__VA_ARGS__); \
std::thread::id thread_id = std::this_thread::get_id(); \
LogMessage msg = {DEBUG, c, thread_id}; \
logger.Log(msg); \
} \
} while (0)

#define LOG_INFO(logmsgformat, ...) \
do { \
Logger& logger = Logger::GetInstance(); \
if (logger.GetLogLevel() <= INFO) { \
char c[1024] = {0}; \
snprintf(c, 1024, logmsgformat, ##__VA_ARGS__); \
std::thread::id thread_id = std::this_thread::get_id(); \
LogMessage msg = {INFO, c, thread_id}; \
logger.Log(msg); \
} \
} while (0)

#define LOG_WARN(logmsgformat, ...) \
do { \
Logger& logger = Logger::GetInstance(); \
if (logger.GetLogLevel() <= WARN) { \
char c[1024] = {0}; \
snprintf(c, 1024, logmsgformat, ##__VA_ARGS__); \
std::thread::id thread_id = std::this_thread::get_id(); \
LogMessage msg = {WARN, c, thread_id}; \
logger.Log(msg); \
} \
} while (0)

#define LOG_ERROR(logmsgformat, ...) \
do { \
Logger& logger = Logger::GetInstance(); \
if (logger.GetLogLevel() <= ERROR) { \
char c[1024] = {0}; \
snprintf(c, 1024, logmsgformat, ##__VA_ARGS__); \
std::thread::id thread_id = std::this_thread::get_id(); \
LogMessage msg = {ERROR, c, thread_id}; \
logger.Log(msg); \
} \
} while (0)

// 日志级别(DEBUG < INFO < WARN < ERROR)
enum LogLevel {
DEBUG, // 调试日志信息
INFO, // 普通日志信息
WARN, // 警告日志信息
ERROR, // 错误日志信息
};

// 日志信息
struct LogMessage {
LogLevel m_loglevel; // 日志级别
std::string m_logcontent; // 日志内容
std::thread::id m_threadid; // 打印日志的线程的 ID
};

// Mprpc 框架提供的日志系统(单例对象,异步写入日志文件)
class Logger {
public:
// 获取单例对象
static Logger& GetInstance();

// 写入日志信息
void Log(const LogMessage& message);

// 获取日志级别
LogLevel GetLogLevel();

// 设置日志级别
void SetLogLevel(LogLevel level);

private:
LogLevel m_loglevel; // 记录日志级别
std::thread m_writeThread; // 日志写入线程
LockQueue<LogMessage> m_lckQue; // 日志缓冲队列

// 构造函数
Logger();

// 析构函数
~Logger();

// 删除拷贝构造函数
Logger(const Logger&) = delete;

// 删除赋值运算操作符
Logger& operator=(const Logger&) = delete;

// 获取日志级别的名称
std::string LogLevelToString(LogLevel level);
};
  • logger.cc 源文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#include "logger.h"

#include <cstdlib>
#include <ctime>
#include <iostream>
#include <sstream>
#include <thread>

// 构造函数
Logger::Logger() {
// 设置默认的日志级别
this->m_loglevel = INFO;

// 启动专门写日志文件的线程
m_writeThread = std::thread([this]() {
for (;;) {
// 获取当前日期
time_t now = time(nullptr);
tm* now_tm = localtime(&now);

// 获取日志文件的名称
char file_name[128];
sprintf(file_name, "%d-%d-%d-log.txt", now_tm->tm_year + 1900, now_tm->tm_mon + 1, now_tm->tm_mday);

// 打开日志文件
FILE* pf = fopen(file_name, "a+");
if (pf == nullptr) {
std::cout << "logger file " << file_name << " open failed!" << std::endl;
// 退出程序
exit(EXIT_FAILURE);
}

// 从日志缓冲队列获取日志信息(会阻塞当前线程,直到日志队列不为空)
LogMessage message = m_lckQue.Pop();

// 检查退出标志
if (m_lckQue.isExit()) {
// 关闭日志文件
fclose(pf);
// 跳出外层 For 循环,结束日志写入线程的运行(会丢失未被写入的日志信息)
break;
}

// 获取打印日志信息的线程的 ID(由外部传入,不一定是 Linux 内核线程 ID)
std::thread::id real_thread_id = message.m_threadid;
std::ostringstream oss;
oss << real_thread_id;
std::string log_thread_id = oss.str();

// 获取日志内容和日志级别的名称
std::string& log_content = message.m_logcontent;
std::string log_level_name = LogLevelToString(message.m_loglevel);

// 获取当前时间
char time_buf[128] = {0};
sprintf(time_buf, "%d-%d-%d %d:%d:%d => %s [%s] ", now_tm->tm_year + 1900, now_tm->tm_mon + 1,
now_tm->tm_mday, now_tm->tm_hour, now_tm->tm_min, now_tm->tm_sec, log_thread_id.c_str(),
log_level_name.c_str());

// 添加当前时间到日志内容的最前面
log_content.insert(0, time_buf);

// 添加换行符到日志内容的最后面
log_content.append("\n");

// 打印日志内容到控制台
std::cout << log_content;

// 将日志内容写入日志文件
fputs(log_content.c_str(), pf);

// 关闭日志文件
fclose(pf);
}
});
}

// 析构函数
Logger::~Logger() {
// 关闭队列,通知日志写入线程停止运行,避免发生线程死锁
this->m_lckQue.Stop();
// 等待日志线程安全退出
if (m_writeThread.joinable()) {
m_writeThread.join();
}
}

// 获取单例对象
Logger& Logger::GetInstance() {
// 局部静态变量(线程安全)
static Logger logger;
return logger;
}

// 写入日志信息
void Logger::Log(const LogMessage& message) {
// 将日志信息写入缓冲队列中
this->m_lckQue.Push(message);
}

// 设置日志级别
void Logger::SetLogLevel(LogLevel level) {
this->m_loglevel = level;
}

// 获取日志级别
LogLevel Logger::GetLogLevel() {
return this->m_loglevel;
}

// 获取日志级别的名称
std::string Logger::LogLevelToString(LogLevel level) {
switch (level) {
case DEBUG:
return "DEBUG";
case INFO:
return "INFO";
case WARN:
return "WARN";
case ERROR:
return "ERROR";
default:
return "UNKNOWN";
}
}

配置文件读取

  • rpc.conf 配置文件
1
2
3
4
5
6
# ZooKeeper的IP地址(必填)
zk_server_host=127.0.0.1
# ZooKeeper的端口号(必填)
zk_server_port=2181
# RPC服务提供者优先使用的网卡接口(可选)
rpc_network_interface=eth1
  • mprpcconfig.h 头文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#pragma once

#include <iostream>
#include <string>
#include <unordered_map>

#include "logger.h"

static const std::string ZK_SERVER_HOST_KEY = "zk_server_host";
static const std::string ZK_SERVER_PORT_KEY = "zk_server_port";
static const std::string RPC_NETWORK_INTERFACE_KEY = "rpc_network_interface";

// RPC 框架读取配置文件的类
class MprpcConfig {
public:
// 加载配置文件
void LoadConfigFile(const char* config_file);

// 获取配置项信息
std::string Load(const std::string& key);

private:
// 配置信息(无需考虑线程安全问题)
std::unordered_map<std::string, std::string> m_configMap;

// 去掉字符串前后的空白字符
void Trim(std::string& str);
};
  • mprpcconfig.cc 源文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#include "mprpcconfig.h"

#include <cstdio>
#include <cstdlib>

// 加载配置文件
void MprpcConfig::LoadConfigFile(const char* config_file) {
// 判断是否已经加载过配置文件
if (!m_configMap.empty()) {
return;
}

// 打开配置文件
FILE* pf = fopen(config_file, "r");
if (nullptr == pf) {
std::cout << "config file " << config_file << " is not exist!" << std::endl;
// 退出程序
exit(EXIT_FAILURE);
}

// 解析配置文件
char buf[1024];
while (fgets(buf, sizeof(buf), pf)) {
std::string src_buf(buf);

// 去掉字符串前后的空白字符
Trim(src_buf);

// 判断注释内容
if (src_buf.empty() || src_buf[0] == '#') {
continue;
}

// 解析配置项
int idx = src_buf.find_first_of('=');
if (idx == -1) {
// 配置项不合法
continue;
}

// 获取配置项的 Key
std::string key = src_buf.substr(0, idx);
Trim(key);

// 获取配置项的 Value
std::string value = src_buf.substr(idx + 1);
Trim(value);

// 检查配置项的合法性
if (key.empty() || value.empty()) {
continue;
}

// 存储配置项
m_configMap.insert({key, value});

// 打印日志信息
LOG_DEBUG("%s=%s", key.c_str(), value.c_str());
}

// 关闭文件
fclose(pf);
}

// 获取配置项信息
std::string MprpcConfig::Load(const std::string& key) {
auto it = m_configMap.find(key);
return it != m_configMap.end() ? it->second : "";
}

// 去掉字符串前后的空白字符
void MprpcConfig::Trim(std::string& str) {
// 定义空白字符
const std::string whitespace = " \n\r\t";

// 去除字符串前面多余的空白字符
size_t start = str.find_first_not_of(whitespace);
if (start != std::string::npos) {
str = str.substr(start);
} else {
// 字符串全是空白字符
str.clear();
return;
}

// 去除字符串后面多余的空白字符
size_t end = str.find_last_not_of(whitespace);
if (end != std::string::npos) {
str = str.substr(0, end + 1);
}
}

Protobuf 协议文件

  • rpcheader.proto 协议文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Protobuf 语法的版本
syntax = "proto3";

// 定义包名,便于在生成的代码中区分不同模块(类似 C++ 的命名空间)
package mprpc;

message RpcHeader {
// RPC 调用的服务名称
bytes service_name = 1;

// RPC 调用的方法名称
bytes method_name = 2;

// RPC 调用的参数长度(参数的序列化字符串的长度)
uint32 args_size = 3;
}

RPC 框架的初始化

  • mprpccontext.h 头文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#pragma once

#include "mprpcchannel.h"
#include "mprpcconfig.h"

// ZooKeeper 节点的路径前缀
const static std::string ZNODE_PATH_PREFIX = "/mprpc/services";

// MPRPC 框架的上下文类(单例对象)
class MprpcContext {
public:
// 获取单例对象
static MprpcContext& GetInstance();

// 初始化 RPC 框架
static void Init(int argc, char** argv);

// 获取配置信息
static MprpcConfig& GetConfig();

private:
// 配置信息
static MprpcConfig m_config;

// 私有构造函数
MprpcContext();

// 私有析构函数
~MprpcContext();

// 删除拷贝构造函数
MprpcContext(const MprpcContext&) = delete;

// 删除赋值运算符
MprpcContext& operator=(const MprpcContext&) = delete;
};
  • mprpccontext.cc 源文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include "mprpccontext.h"

#include <unistd.h>

#include <cstdlib>
#include <iostream>
#include <string>

#include "logger.h"

// 初始化类静态成员变量
MprpcConfig MprpcContext::m_config;

// 构造函数
MprpcContext::MprpcContext() {
}

// 析构函数
MprpcContext::~MprpcContext() {
}

// 获取单例对象
MprpcContext& MprpcContext::GetInstance() {
// 局部静态变量(线程安全)
static MprpcContext singleton;
return singleton;
}

// 打印命令帮助内容
void ShowArgsHelp() {
std::cout << "format: command -i <configfile>" << std::endl;
}

// 初始化 RPC 框架
void MprpcContext::Init(int argc, char** argv) {
// 校验命令行参数
if (argc < 2) {
// 打印命令帮助内容
ShowArgsHelp();
// 退出程序
exit(EXIT_FAILURE);
}

// 从命令行获取配置文件的路径
int c = 0;
std::string config_file;
while ((c = getopt(argc, argv, "i:")) != -1) {
switch (c) {
case 'i':
config_file = optarg;
break;
case '?':
std::cout << "invalid command args!" << std::endl;
// 打印命令帮助内容
ShowArgsHelp();
// 退出程序
exit(EXIT_FAILURE);
case ':':
std::cout << "need <configfile>" << std::endl;
// 打印命令帮助内容
ShowArgsHelp();
// 退出程序
exit(EXIT_FAILURE);
default:
break;
}
}

// 打印日志信息
LOG_DEBUG("loading rpc config file %s", config_file.c_str());

// 读取配置文件内容
m_config.LoadConfigFile(config_file.c_str());
}

// 获取配置信息
MprpcConfig& MprpcContext::GetConfig() {
return m_config;
}

RPC 框架的服务注册

  • mprpcprovider.h 头文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#pragma once

#include <google/protobuf/descriptor.h>
#include <muduo/base/Timestamp.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/net/TcpServer.h>

#include <functional>
#include <iostream>
#include <string>
#include <unordered_map>

#include "google/protobuf/service.h"

// RPC 框架专门提供用来发布 RPC 服务的网络对象类
class RpcProvider {
public:
// 发布 RPC 服务
void PublishService(google::protobuf::Service* service);

// 启动 RPC 服务节点,开始对外提供 RPC 远程网络调用服务(针对 RPC 服务提供者)
void Run();

private:
// TCP 服务器的事件回环
muduo::net::EventLoop m_eventloop;

// RPC 服务信息
struct ServiceInfo {
// RPC 服务
google::protobuf::Service* m_service;
// RPC 服务拥有的方法
std::unordered_map<std::string, const google::protobuf::MethodDescriptor*> m_methodMap;
};

// 存储注册成功的 RPC 服务的集合
std::unordered_map<std::string, ServiceInfo> m_serviceMap;

// 处理 TCP 连接的创建和断开
void onConnection(const muduo::net::TcpConnectionPtr& conn);

// 处理已建立 TCP 连接的读写事件(比如接收客户端发送的数据)
void onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buf, muduo::Timestamp time);

// 用于序列化 RPC 调用的响应结果和发送网络响应数据
void SendRpcResponse(const muduo::net::TcpConnectionPtr& conn, google::protobuf::Message* response);
};
  • mprpcprovider.cc 源文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
#include "mprpcprovider.h"

#include "mprpccontext.h"
#include "networkutil.h"
#include "rpcheader.pb.h"
#include "zookeeperclient.h"

// 发布 RPC 服务
void RpcProvider::PublishService(google::protobuf::Service* service) {
// RPC 服务的信息
ServiceInfo servcieInfo;

// 获取 RPC 服务的描述信息
const google::protobuf::ServiceDescriptor* pserviceDesc = service->GetDescriptor();

// 获取 RPC 服务的完整名称(加上包名),比如 user.UserServiceRpc
const std::string serviceName(pserviceDesc->full_name());

// 获取 RPC 服务的方法数量
int methodCount = pserviceDesc->method_count();

// 遍历 RPC 服务的所有方法
for (int i = 0; i < methodCount; i++) {
// 获取 RPC 服务的方法的描述信息
const google::protobuf::MethodDescriptor* pmethodDesc = pserviceDesc->method(i);
// 获取 RPC 服务的方法的名称
const std::string methodName(pmethodDesc->name());
// 存储 RPC 服务的方法的描述信息
servcieInfo.m_methodMap.insert({methodName, pmethodDesc});
}

// 存储 RPC 服务的信息
servcieInfo.m_service = service;
m_serviceMap.insert({serviceName, servcieInfo});
}

// 启动 RPC 服务节点,开始对外提供 RPC 远程网络调用服务(针对 RPC 服务提供者)
void RpcProvider::Run() {
// 获取配置信息
const std::string zk_server_host = MprpcContext::GetInstance().GetConfig().Load(ZK_SERVER_HOST_KEY);
const std::string zk_server_port = MprpcContext::GetInstance().GetConfig().Load(ZK_SERVER_PORT_KEY);
const std::string rpc_network_interface =
MprpcContext::GetInstance().GetConfig().Load(RPC_NETWORK_INTERFACE_KEY);

// 获取 RPC 服务提供者的 IP 和端口
const std::string rpc_server_ip = NetworkUtil::GetInstance().FindLocalIp(rpc_network_interface);
const int rpc_server_port = NetworkUtil::GetInstance().FindAvailablePort();

// 判断 RPC 服务提供者的端口是否有效
if (-1 == rpc_server_port) {
LOG_ERROR("not found available port for rpc server!");
return;
}

// 创建 TCP 服务器
muduo::net::InetAddress address(rpc_server_ip, rpc_server_port);
muduo::net::TcpServer tcpServer(&m_eventloop, address, "RpcProvider");

// 设置 TCP 连接创建和断开的回调
tcpServer.setConnectionCallback(bind(&RpcProvider::onConnection, this, std::placeholders::_1));

// 设置已建立 TCP 连接读写事件的回调
tcpServer.setMessageCallback(
bind(&RpcProvider::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

// 设置 EventLoop 的线程数量(比如:1 个 I/O 线程,3 个 Worker 线程)
tcpServer.setThreadNum(4);

// 创建 ZK 客户端
ZkClient zkClient;

// 启动 ZK 客户端
bool started = zkClient.Start(zk_server_host, atoi(zk_server_port.c_str()));
// ZK 服务端连接失败
if (!started) {
// 停止往下继续执行,直接返回
return;
}

// 将所有已发布的 RPC 服务注册进 ZK 服务端
for (auto& service : m_serviceMap) {
// RPC 服务的 IP 和端口信息
const std::string rpc_address = rpc_server_ip + ":" + std::to_string(rpc_server_port);

// RPC 服务的名称(加上包名),比如 user.UserServiceRpc
const std::string service_name = service.first;

// ZNode 节点的路径前缀,比如 /mprpc/services/user.UserServiceRpc
const std::string path_prefix = ZNODE_PATH_PREFIX + "/" + service_name;

// ZNode 节点的完整路径,比如 /mprpc/services/user.UserServiceRpc/127.0.0.1:7070
const std::string node_full_path = path_prefix + "/" + rpc_address;

// ZNode 节点的数据,比如 127.0.0.1:7070
const char* node_data = rpc_address.c_str();

// ZNode 节点的数据长度
const int node_data_len = rpc_address.length();

// 创建 ZNode 节点(临时节点)
const std::string created_path =
zkClient.CreateRecursive(node_full_path.c_str(), node_data, node_data_len, ZOO_EPHEMERAL);

// 判断 ZNode 节点是否创建成功(即 RPC 服务是否注册成功)
if (!created_path.empty()) {
// ZNode 节点创建成功
LOG_INFO("success to register rpc service, name: %s, path: %s, data: %s", service_name.c_str(),
node_full_path.c_str(), node_data);
} else {
// ZNode 节点创建失败
LOG_ERROR("failed to register rpc service, name: %s, path: %s, data: %s", service_name.c_str(),
node_full_path.c_str(), node_data);
}
}

// 打印日志信息
LOG_INFO("rpc provider start at %s:%d", rpc_server_ip.c_str(), rpc_server_port);

// 启动 TCP 服务器
tcpServer.start();

// 以阻塞方式等待新客户端的连接、已连接客户端的读写事件等
m_eventloop.loop();
}

// 处理 TCP 连接的创建和断开
void RpcProvider::onConnection(const muduo::net::TcpConnectionPtr& conn) {
if (!conn->connected()) {
// 断开连接(释放资源)
conn->shutdown();
}
}

// 处理 TCP 连接的读写事件(比如接收客户端发送的数据)
void RpcProvider::onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buf, muduo::Timestamp time) {
// 接收到的字符流,数据格式:header_size(4 字节) + header_str(service_name + method_name + args_size) + args_str
const std::string recv_buf = buf->retrieveAllAsString();

// 从字符流中读取前 4 个字节的内容
uint32_t header_size = 0;
recv_buf.copy((char*)&header_size, 4, 0);

// 根据 header_size 读取请求数据头的原始字符流
const std::string rpc_header_str = recv_buf.substr(4, header_size);

// RPC 调用的基础信息
std::string service_name;
std::string method_name;
uint32_t args_size;

// RPC 请求数据头的反序列化
mprpc::RpcHeader rpcHeader;
if (rpcHeader.ParseFromString(rpc_header_str)) {
// 数据反序列化成功
service_name = rpcHeader.service_name();
method_name = rpcHeader.method_name();
args_size = rpcHeader.args_size();
} else {
// 数据反序列化失败
LOG_ERROR("rpc header string %s unserialize error!", rpc_header_str.c_str());
return;
}

// 获取 RPC 调用的参数的字符流数据
const std::string rpc_args_str = recv_buf.substr(4 + header_size, args_size);

// 打印日志信息
LOG_DEBUG("===========================================");
LOG_DEBUG("header_size: %u", header_size);
LOG_DEBUG("rpc_header_str: %s", rpc_header_str.c_str());
LOG_DEBUG("service_name: %s", service_name.c_str());
LOG_DEBUG("method_name: %s", method_name.c_str());
LOG_DEBUG("args_size: %u", args_size);
LOG_DEBUG("args_str: %s", rpc_args_str.c_str());
LOG_DEBUG("===========================================");

// 查找 RPC 服务
auto sit = m_serviceMap.find(service_name);
// 如果找不到对应的 RPC 服务
if (sit == m_serviceMap.end()) {
// 打印日志信息
LOG_ERROR("rpc service %s is not exist!", service_name.c_str());
return;
}

// 查找 RPC 服务的方法
auto mit = sit->second.m_methodMap.find(method_name);
if (mit == sit->second.m_methodMap.end()) {
// 打印日志信息
LOG_ERROR("rpc method %s::%s is not exist!", service_name.c_str(), method_name.c_str());
return;
}

// 获取 RPC 调用的服务和方法
google::protobuf::Service* service = sit->second.m_service;
const google::protobuf::MethodDescriptor* method = mit->second;

// 通过反序列化生成本地 RPC 方法调用的请求参数
google::protobuf::Message* request = service->GetRequestPrototype(method).New();
if (!request->ParseFromString(rpc_args_str)) {
// 打印日志信息
LOG_ERROR("rpc request args '%s' unserialize error!", rpc_args_str.c_str());
return;
}

// 生成本地 RPC 方法调用的响应结果
google::protobuf::Message* response = service->GetResponsePrototype(method).New();

// 本地 RPC 方法调用的回调,实际上调用的是 RpcProvider::SendRpcResponse()
google::protobuf::Closure* done =
google::protobuf::NewCallback<RpcProvider, const muduo::net::TcpConnectionPtr&, google::protobuf::Message*>(
this, &RpcProvider::SendRpcResponse, conn, response);

// 调用 RPC 节点的本地方法
service->CallMethod(method, nullptr, request, response, done);
}

// 用于序列化 RPC 调用的响应结果和发送网络响应数据
void RpcProvider::SendRpcResponse(const muduo::net::TcpConnectionPtr& conn, google::protobuf::Message* response) {
// 序列化 RPC 调用的响应结果
std::string response_str;
if (response->SerializeToString(&response_str)) {
// 通过网络将本地 RPC 方法的执行结果发送给 RPC 服务调用方
conn->send(response_str);
} else {
// 打印日志信息
LOG_ERROR("rpc response serialize error!");
}
// 模拟 HTTP 的短连接服务,由 RPC 服务提供方主动断开连接
conn->shutdown();
}

RPC 框架的服务调用

  • mprpccontroller.h 头文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#pragma once

#include <google/protobuf/service.h>

#include "logger.h"

// RPC 调用的状态控制器
class MprpcController : public google::protobuf::RpcController {
public:
MprpcController();
void Reset();
bool Failed() const;
std::string ErrorText() const;
void SetFailed(const std::string& reason);

// 目前未实现具体的功能
void StartCancel();
bool IsCanceled() const;
void NotifyOnCancel(google::protobuf::Closure* callback);

private:
bool m_failed; // RPC 方法执行过程中的状态
std::string m_errText; // RPC 方法执行过程中的错误信息
};
  • mprpccontroller.cc 源文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include "mprpccontroller.h"

MprpcController::MprpcController() {
m_failed = false;
m_errText = "";
}

void MprpcController::Reset() {
m_failed = false;
m_errText = "";
}

bool MprpcController::Failed() const {
return m_failed;
}

std::string MprpcController::ErrorText() const {
return m_errText;
}

void MprpcController::SetFailed(const std::string& reason) {
m_failed = true;
m_errText = reason;
LOG_ERROR(reason.c_str());
}

void MprpcController::StartCancel() {
}

bool MprpcController::IsCanceled() const {
return false;
}

void MprpcController::NotifyOnCancel(google::protobuf::Closure* callback) {
}
  • mprpcchannel.h 头文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#pragma once

#include <google/protobuf/descriptor.h>
#include <google/protobuf/message.h>
#include <google/protobuf/service.h>

class MprpcChannel : public google::protobuf::RpcChannel {
public:
// 随机生成一个整数,范围 [0, range-1]
int randomInt(int range);

// 统一实现 RPC 方法调用的数据序列化和网络数据发送(针对 RPC 服务调用者)
void CallMethod(const google::protobuf::MethodDescriptor* method, google::protobuf::RpcController* controller,
const google::protobuf::Message* request, google::protobuf::Message* response,
google::protobuf::Closure* done);
};
  • mprpcchannel.cc 源文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
#include "mprpcchannel.h"

#include <arpa/inet.h>
#include <sys/socket.h>
#include <unistd.h>

#include <cerrno>
#include <cstdlib>
#include <random>
#include <string>

#include "logger.h"
#include "mprpccontext.h"
#include "mprpccontroller.h"
#include "rpcheader.pb.h"
#include "zookeeperclient.h"

// 随机生成一个整数,范围 [0, range-1]
int MprpcChannel::randomInt(int range) {
if (range > 0) {
static thread_local std::mt19937 gen(std::random_device{}());
std::uniform_int_distribution<> dis(0, range - 1);
return dis(gen);
}
return 0;
}

// 统一实现 RPC 方法调用的数据序列化和网络数据发送(针对 RPC 服务调用者)
void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller, const google::protobuf::Message* request,
google::protobuf::Message* response, google::protobuf::Closure* done) {
// 获取 RPC 服务的完整名称(加上包名),比如 user.UserServiceRpc
const std::string service_name(method->service()->full_name());

// 获取 RPC 调用的方法名称
const std::string method_name(method->name());

// 获取 RPC 调用参数的序列化字符串的长度
uint32_t args_size = 0;
std::string rpc_args_str;
if (request->SerializeToString(&rpc_args_str)) {
args_size = rpc_args_str.size();
} else {
// 设置 RPC 调用状态
controller->SetFailed("rpc request serialize error!");
return;
}

// 定义 RPC 调用的请求数据头
mprpc::RpcHeader rpcHeader;
rpcHeader.set_service_name(service_name);
rpcHeader.set_method_name(method_name);
rpcHeader.set_args_size(args_size);

// 获取 RPC 请求数据头的序列化字符串的长度
uint32_t header_size = 0;
std::string rpc_header_str;
if (rpcHeader.SerializeToString(&rpc_header_str)) {
header_size = rpc_header_str.size();
} else {
// 设置 RPC 调用状态
controller->SetFailed("rpc header serialize error!");
return;
}

// 通过网络发送的数据,格式:header_size(4 字节) + header_str(service_name + method_name + args_size) + args_str
std::string rpc_send_str;
rpc_send_str.insert(0, std::string((char*)&header_size, 4));
rpc_send_str += rpc_header_str;
rpc_send_str += rpc_args_str;

// 打印日志信息
LOG_DEBUG("===========================================");
LOG_DEBUG("header_size: %u", header_size);
LOG_DEBUG("rpc_header_str: %s", rpc_header_str.c_str());
LOG_DEBUG("service_name: %s", service_name.c_str());
LOG_DEBUG("method_name: %s", method_name.c_str());
LOG_DEBUG("args_size: %u", args_size);
LOG_DEBUG("args_str: %s", rpc_args_str.c_str());
LOG_DEBUG("===========================================");

// 本地创建一个 TCP 客户端
int clientfd = socket(AF_INET, SOCK_STREAM, 0);
if (-1 == clientfd) {
char errtxt[512] = {0};
sprintf(errtxt, "create socket failed, errno is %d", errno);
// 设置 RPC 调用状态
controller->SetFailed(errtxt);
return;
}

// 获取 ZK 服务端的连接信息
const std::string zk_server_host = MprpcContext::GetInstance().GetConfig().Load(ZK_SERVER_HOST_KEY);
const std::string zk_server_port = MprpcContext::GetInstance().GetConfig().Load(ZK_SERVER_PORT_KEY);

// 创建 ZK 客户端
ZkClient zkClient;

// 启动 ZK 客户端
bool started = zkClient.Start(zk_server_host, atoi(zk_server_port.c_str()));

// 如果 ZK 服务端启动失败
if (!started) {
// 设置 RPC 调用状态
controller->SetFailed("zookeeper client connect failed");
return;
}

// RPC 服务对应的 ZNode 节点的路径,比如 /mprpc/services/user.UserServiceRpc
const std::string node_path = ZNODE_PATH_PREFIX + "/" + service_name;

// 获取 ZNode 子节点列表(即已注册的 RPC 服务列表),比如 127.0.0.1:7070
std::vector<std::string> child_list = zkClient.GetChildren(node_path.c_str());

// 如果 ZNode 子节点列表为空(即查找不到已注册的 RPC 服务)
if (child_list.empty()) {
char errtxt[512] = {0};
sprintf(errtxt, "not found rpc service %s", service_name.c_str());
// 设置 RPC 调用状态
controller->SetFailed(errtxt);
return;
}

// 随机获取一个 RPC 服务提供者的地址,比如 127.0.0.1:7070
const int index = child_list.size() == 1 ? 0 : randomInt(child_list.size());
const std::string rpc_provider_addr = child_list[index];

// 解析 PRC 服务提供者的 IP 和端口
const size_t pos = rpc_provider_addr.find(":");
// 如果 RPC 服务提供者的地址无效
if (std::string::npos == pos) {
char errtxt[512] = {0};
sprintf(errtxt, "invalid rpc service address %s", rpc_provider_addr.c_str());
// 设置 RPC 调用状态
controller->SetFailed(errtxt);
return;
}
const std::string rpc_provider_ip = rpc_provider_addr.substr(0, pos);
const std::string rpc_provider_port = rpc_provider_addr.substr(pos + 1);

// 打印日志信息
LOG_INFO("ready to invoke rpc service, name: %s, address: %s", service_name.c_str(), rpc_provider_addr.c_str());

// 封装 TCP 客户端的连接信息
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(atoi(rpc_provider_port.c_str()));
server_addr.sin_addr.s_addr = inet_addr(rpc_provider_ip.c_str());

// 通过 TCP 客户端连接 RPC 服务节点
if (-1 == connect(clientfd, (struct sockaddr*)&server_addr, sizeof(server_addr))) {
// 关闭连接
close(clientfd);
// 设置 RPC 调用状态
char errtxt[512] = {0};
sprintf(errtxt, "connect server failed, errno is %d", errno);
controller->SetFailed(errtxt);
return;
}

// 通过网络发送 RPC 调用的请求参数
if (-1 == send(clientfd, rpc_send_str.c_str(), rpc_send_str.size(), 0)) {
// 关闭连接
close(clientfd);
// 设置 RPC 调用状态
char errtxt[512] = {0};
sprintf(errtxt, "send rpc rquest failed, errno is %d", errno);
controller->SetFailed(errtxt);
return;
}

// 通过网络接收 RPC 调用的响应结果
int recv_size = 0;
char recv_buf[1024] = {0};
if (-1 == (recv_size = recv(clientfd, recv_buf, 1024, 0))) {
// 关闭连接
close(clientfd);
// 设置 RPC 调用状态
char errtxt[512] = {0};
sprintf(errtxt, "receive rpc response failed, errno is %d", errno);
controller->SetFailed(errtxt);
return;
}

// 反序列化 RPC 调用的响应结果
if (!response->ParseFromArray(recv_buf, recv_size)) {
// 关闭连接
close(clientfd);
// 设置 RPC 调用状态
char errtxt[1024] = {0};
sprintf(errtxt, "rpc response unserialize failed, response content is %s", recv_buf);
controller->SetFailed(errtxt);
return;
}

// 关闭连接
close(clientfd);
}

ZooKeeper 的访问操作

  • zookeeperclient.h 头文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#pragma once

#include <semaphore.h>
#include <zookeeper/zookeeper.h>

#include <cstdlib>
#include <ctime>
#include <string>
#include <vector>

#include "logger.h"
#include "mprpccontext.h"

// ZNode 节点的状态
enum ZNodeStatus {
EXIST, // 已存在
NOTEXIST, // 不存在
UNKNOWN // 未知状态
};

// ZooKeeper 客户端的封装类
class ZkClient {
public:
// 构造函数
ZkClient();

// 析构函数
~ZkClient();

// 启动 ZK 客户端
bool Start(const std::string &host, const int port);

// 在 ZK 服务器上,根据指定的 Path 创建 ZNode 节点
std::string Create(const char *path, const char *data, int datalen = 0, int mode = ZOO_PERSISTENT);

// 在 ZK 服务器上,根据指定的 Path 递归创建 ZNode 节点
std::string CreateRecursive(const char *path, const char *data, int datalen = 0, int mode = ZOO_PERSISTENT);

// 在 ZK 服务器上,根据指定的 Path 获取子节点列表
std::vector<std::string> GetChildren(const char *path);

// 在 ZK 服务器上,根据指定的 Path 获取 ZNode 节点的数据
std::string GetData(const char *path);

// 在 ZK 服务器上,根据指定的 Path 获取 ZNode 节点的状态
Stat GetStat(const char *path);

// 在 ZK 服务器上,根据指定的 Path 判断 ZNode 节点是否存在
ZNodeStatus Exist(const char *path);

private:
zhandle_t *m_zhandle; // ZK 的客户端句柄

// 检查节点路径是否合法
bool checkPath(const char *path);
};
  • zookeeperclient.cc 源文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
#include "zookeeperclient.h"

/////////////////////////////////////////// ZK 客户端同步操作扩展代码 ///////////////////////////////////////////

// 同步检查 ZNode 节点是否存在的上下文结构
struct SyncExistsContext {
sem_t sem; // 信号量
int rc = ZSYSTEMERROR; // 检查结果
};

// 异步检查 ZNode 是否存在的回调
void znode_exists_completion(int rc, const struct Stat *stat, const void *data) {
SyncExistsContext *ctx = (SyncExistsContext *)data;
// 存储检查结果
ctx->rc = rc;
// 唤醒正在等待检查结果的线程
sem_post(&ctx->sem);
}

// 同步检查 ZNode 节点是否存在
int zoo_exists_sync(zhandle_t *zh, const char *path, int watch) {
// 上下文信息
SyncExistsContext ctx;

// 初始化信号量
sem_init(&ctx.sem, 0, 0);

// 发起 ZK 异步请求的调用
int ret = zoo_aexists(zh, path, watch, znode_exists_completion, &ctx);

// 这里必须判断 ZK 的异步请求调用是否正常,否则可能因为异步请求未正常发出,导致回调永不执行,最终造成线程死锁
if (ret != ZOK) {
// 销毁信号量
sem_destroy(&ctx.sem);
// ZK 的异步请求发出失败
return ret;
}

// 阻塞等待检查结果
sem_wait(&ctx.sem);

// 销毁信号量
sem_destroy(&ctx.sem);

// 返回检查结果
return ctx.rc;
}

// 同步创建 ZNode 节点的上下文结构
struct SyncCreateContext {
sem_t sem; // 信号量
int rc = ZSYSTEMERROR; // 创建结果
char path_buf[512] = {0}; // 用于返回实际创建的节点路径
};

// 异步创建 ZNode 节点的回调
void znode_create_completion(int rc, const char *path, const void *data) {
// 上下文信息
SyncCreateContext *ctx = (SyncCreateContext *)data;

// 存储创建结果
ctx->rc = rc;

// 存储实际创建的节点路径
if (ZOK == rc && path) {
strncpy(ctx->path_buf, path, sizeof(ctx->path_buf) - 1);
}

// 唤醒正在等待创建结果的线程
sem_post(&ctx->sem);
}

// 同步创建 ZNode 节点
int zoo_create_sync(zhandle_t *zh, const char *path, const char *data, int datalen, const struct ACL_vector *acl,
int mode, char *path_buf_out, int path_buf_out_len) {
// 上下文信息
SyncCreateContext ctx;

// 初始化信号量
sem_init(&ctx.sem, 0, 0);

// 发起 ZK 异步请求的调用
int ret = zoo_acreate(zh, path, data, datalen, acl, mode, znode_create_completion, &ctx);

// 这里必须判断 ZK 的异步请求调用是否正常,否则可能因为异步请求未正常发出,导致回调永不执行,最终造成线程死锁
if (ret != ZOK) {
// 销毁信号量
sem_destroy(&ctx.sem);
// ZK 的异步请求发出失败
return ret;
}

// 阻塞等待检查结果
sem_wait(&ctx.sem);

// 销毁信号量
sem_destroy(&ctx.sem);

// 返回实际创建的节点路径
if (path_buf_out && path_buf_out_len > 0) {
strncpy(path_buf_out, ctx.path_buf, path_buf_out_len - 1);
path_buf_out[path_buf_out_len - 1] = '\0';
}

// 返回检查结果
return ctx.rc;
}

// 同步获取 ZNode 节点数据和状态的上下文结构
struct SyncGetContext {
sem_t sem; // 信号量
int rc = ZSYSTEMERROR; // 操作结果
char *buf = nullptr; // 数据缓冲区
int buf_len = 0; // 数据缓冲区的大小
struct Stat *stat = nullptr; // ZNode 节点的状态
};

// 异步获取 ZNode 节点数据和状态的回调
void znode_get_completion(int rc, const char *value, int value_len, const struct Stat *stat, const void *data) {
// 上下文信息
SyncGetContext *ctx = (SyncGetContext *)data;

// 存储操作结果
ctx->rc = rc;

// 存储 ZNode 节点的状态
if (rc == ZOK && stat && ctx->stat) {
*ctx->stat = *stat;
}

// 存储 ZNode 节点的数据
if (ZOK == rc && value && value_len > 0 && ctx->buf && ctx->buf_len > 0) {
int copy_len = (value_len < ctx->buf_len - 1) ? value_len : ctx->buf_len - 1;
memcpy(ctx->buf, value, copy_len);
ctx->buf[copy_len] = '\0';
}

// 唤醒正在等待获取结果的线程
sem_post(&ctx->sem);
}

// 同步获取 ZNode 节点的数据和状态
int zoo_get_sync(zhandle_t *zh, const char *path, int watch, char *buf_out, int buf_out_len, struct Stat *stat_out) {
// 上下文信息
SyncGetContext ctx;
ctx.stat = stat_out;
ctx.buf = buf_out;
ctx.buf_len = buf_out_len;

// 初始化信号量
sem_init(&ctx.sem, 0, 0);

// 发起 ZK 异步请求的调用
int ret = zoo_aget(zh, path, watch, znode_get_completion, &ctx);

// 这里必须判断 ZK 的异步请求调用是否正常,否则可能因为异步请求未正常发出,导致回调永不执行,最终造成线程死锁
if (ret != ZOK) {
// 销毁信号量
sem_destroy(&ctx.sem);
// ZK 的异步请求发出失败
return ret;
}

// 阻塞等待获取结果
sem_wait(&ctx.sem);

// 销毁信号量
sem_destroy(&ctx.sem);

// 返回操作结果
return ctx.rc;
}

// 同步获取 ZNode 子节点列表的上下文结构
struct SyncGetChildrenContext {
sem_t sem; // 信号量
int rc = ZSYSTEMERROR; // 操作结果
std::vector<std::string> children; // 子节点列表
};

// 异步获取 ZNode 子节点列表的回调
void zoo_get_children_completion(int rc, const struct String_vector *strings, const void *data) {
// 上下文信息
SyncGetChildrenContext *ctx = (SyncGetChildrenContext *)data;

// 存储操作结果
ctx->rc = rc;

// 存储子节点列表
if (ZOK == rc && strings) {
for (int i = 0; i < strings->count; i++) {
ctx->children.emplace_back(strings->data[i]);
}
}

// 唤醒正在等待获取结果的线程
sem_post(&ctx->sem);
}

// 同步获取 ZNode 子节点列表
std::vector<std::string> zoo_get_children_sync(zhandle_t *zh, const char *path, int watch) {
// 子节点列表
std::vector<std::string> result;

// 上下文信息
SyncGetChildrenContext ctx;

// 初始化信号量
sem_init(&ctx.sem, 0, 0);

// 发起 ZK 异步请求的调用
int ret = zoo_aget_children(zh, path, watch, zoo_get_children_completion, &ctx);

// 这里必须判断 ZK 的异步请求调用是否正常,否则可能因为异步请求未正常发出,导致回调永不执行,最终造成线程死锁
if (ret != ZOK) {
// 销毁信号量
sem_destroy(&ctx.sem);
// ZK 的异步请求发出失败
return result;
}

// 阻塞等待获取结果
sem_wait(&ctx.sem);

// 销毁信号量
sem_destroy(&ctx.sem);

// 获取子节点列表成功
if (ZOK == ctx.rc) {
// 直接转移 children 所有权给 result
result = std::move(ctx.children);
}
// 获取子节点列表失败
else {
// 打印日志信息
LOG_ERROR("failed to get children of node %s", path);
}

// 返回子节点列表
return result;
}

// 全局的 Watcher 监听器,接收 ZkServer 给 ZkClient 发送的通知
void global_watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {
// 判断接收到的事件类型是不是会话事件类型
if (type == ZOO_SESSION_EVENT) {
// ZK 客户端连接成功
if (state == ZOO_CONNECTED_STATE) {
// 从 ZK 客户端的上下文中获取预设置的信号量
sem_t *init_sem = (sem_t *)zoo_get_context(zh);
// 唤醒正在等待 ZK 客户端初始化完成的线程
sem_post(init_sem);
}
// ZK 客户端身份认证失败
else if (state == ZOO_AUTH_FAILED_STATE) {
// 打印日志信息
LOG_ERROR("zookeeper auth failed");
}
// ZK 客户端会话过期
else if (state == ZOO_EXPIRED_SESSION_STATE) {
// 打印日志信息
LOG_ERROR("zookeeper session expired");
}
}
}

/////////////////////////////////////////// ZK 客户端实现代码 ///////////////////////////////////////////

// 构造函数
ZkClient::ZkClient() : m_zhandle(nullptr) {
}

// 析构函数
ZkClient::~ZkClient() {
if (m_zhandle != nullptr) {
// 关闭 ZK 的客户端句柄(释放资源)
zookeeper_close(m_zhandle);
}
}

// 启动 ZK 客户端
bool ZkClient::Start(const std::string &host, const int port) {
// 拼接 ZK 服务端的连接信息
const std::string conn_str = host + ":" + std::to_string(port);

/**
* 初始化 ZK 的客户端句柄,连接 ZK 服务端(特别注意:这里是异步初始化)
* ZooKeeper C API 的多线程版本有三个线程,包括:
* (1) API 调用线程(当前调用 ZK API 的线程)
* (2) 网络 I/O 线程,基于 pthread_create() + poll 实现
* (3) Watcher 回调线程,基于 pthread_create() 实现
*/
m_zhandle = zookeeper_init(conn_str.c_str(), global_watcher, 30000, nullptr, nullptr, 0);
if (nullptr == m_zhandle) {
// 打印日志信息
LOG_ERROR("zookeeper client init failed");
return false;
}

// 创建并初始化信号量
sem_t init_sem;
sem_init(&init_sem, 0, 0);

// 将信号量存放到 ZK 客户端的上下文中
zoo_set_context(m_zhandle, &init_sem);

// 设置等待 ZK 客户端连接的超时时间(10 秒)
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += 10;

// 阻塞等待 ZK 客户端初始化完成
if (sem_timedwait(&init_sem, &ts) != 0) {
// 销毁信号量
sem_destroy(&init_sem);
// 打印日志信息
LOG_ERROR("zookeeper client connect timeout");
return false;
}

// 销毁信号量
sem_destroy(&init_sem);

// 打印日志信息
LOG_INFO("zookeeper client init success");

return true;
}

// 在 ZK 服务器上根据指定的 Path 创建 ZNode 节点
std::string ZkClient::Create(const char *path, const char *data, int datalen, int mode) {
// 检查节点路径是否合法
if (!checkPath(path)) {
// 返回空字符串
return "";
}

// 同步判断 ZNode 节点是否存在
int flag = zoo_exists_sync(m_zhandle, path, 0);

// ZNode 节点已存在
if (ZOK == flag) {
// 打印日志信息
LOG_WARN("znode %s create failed, because it existed", path);
// 返回空字符串
return "";
}
// ZNode 节点不存在
else if (ZNONODE == flag) {
// 实际创建的节点路径
const int path_buf_len = 512;
char path_buf[path_buf_len] = {0};

// 同步创建 ZNode 节点
flag = zoo_create_sync(m_zhandle, path, data, datalen, &ZOO_OPEN_ACL_UNSAFE, mode, path_buf, path_buf_len);
// 节点创建成功
if (ZOK == flag) {
// 打印日志信息
LOG_INFO("znode %s create success", path_buf);
// 返回实际创建的节点路径
return path_buf;
}
// 节点创建失败
else {
// 打印日志信息
LOG_ERROR("znode %s create failed", path);
// 返回空字符串
return "";
}
}
// 发生错误,比如会话过期、身份认证失败等
else {
// 打印日志信息
LOG_ERROR("znode %s create failed", path);
// 返回空字符串
return "";
}
}

// 在 ZK 服务器上,根据指定的 Path 递归创建 ZNode 节点
std::string ZkClient::CreateRecursive(const char *path, const char *data, int datalen, int mode) {
// 检查节点路径是否合法
if (!checkPath(path)) {
// 返回空字符串
return "";
}

std::string current_path;
std::string result_path;
size_t current_pos = 1; // 跳过第一个 '/'
std::string full_path(path); // 拷贝节点路径,避免修改原始字符串

while (current_pos <= full_path.size()) {
size_t next_pos = full_path.find('/', current_pos);
if (next_pos == std::string::npos) {
// 最后一级路径(完整路径)
current_path = full_path;
} else {
current_path = full_path.substr(0, next_pos);
}

bool is_last_path = (next_pos == std::string::npos); // 是否为最后一级路径
const char *path_data = is_last_path ? data : ""; // 父路径不写入数据
int path_data_len = is_last_path ? datalen : 0; // 父路径的数据长度为零
int path_mode = is_last_path ? mode : ZOO_PERSISTENT; // 父路径为持久节点

// 创建节点
std::string created_path = Create(current_path.c_str(), path_data, path_data_len, path_mode);

// 如果节点创建失败
if (created_path.empty()) {
// 判断节点是否存在
int flag = zoo_exists_sync(m_zhandle, current_path.c_str(), 0);
// 如果节点存在,使用(兼容)已存在的节点
if (ZOK == flag) {
created_path = current_path;
}
// 如果节点不存在或者发生错误,则直接返回空字符串
else {
// 打印日志信息
LOG_ERROR("znode %s create failed", path);
// 返回空字符串
return "";
}
}

// 如果是最后一级路径,则跳出 While 循环
if (is_last_path) {
result_path = created_path;
break;
}

current_pos = next_pos + 1;
}

return result_path;
}

// 在 ZK 服务器上,根据指定的 Path 获取子节点列表
std::vector<std::string> ZkClient::GetChildren(const char *path) {
// 检查节点路径是否合法
if (!checkPath(path)) {
// 返回空列表
return {};
}

// 同步获取子节点列表
return zoo_get_children_sync(m_zhandle, path, 0);
}

// 在 ZK 服务器上,根据指定的 Path 获取 ZNode 节点的数据
std::string ZkClient::GetData(const char *path) {
// 检查节点路径是否合法
if (!checkPath(path)) {
// 返回空字符串
return "";
}

// 节点数据
const int data_buf_len = 2048;
char data_buf[data_buf_len] = {0};

// 同步获取 ZNode 节点的数据
int flag = zoo_get_sync(m_zhandle, path, 0, data_buf, data_buf_len, nullptr);
// 获取节点数据成功
if (ZOK == flag) {
// 返回节点数据
return data_buf;
}
// 获取节点数据失败
else {
// 打印日志信息
LOG_ERROR("get znode data failed, path: %s", path);
// 返回空字符串
return "";
}
}

// 在 ZK 服务器上,根据指定的 Path 获取 ZNode 节点的状态
Stat ZkClient::GetStat(const char *path) {
// 检查节点路径是否合法
if (!checkPath(path)) {
// 返回空数据
return {};
}

// 节点状态
struct Stat stat;

// 同步获取 ZNode 节点的状态
int flag = zoo_get_sync(m_zhandle, path, 0, nullptr, 0, &stat);
// 获取节点状态成功
if (ZOK == flag) {
// 返回节点状态
return stat;
}
// 获取节点状态失败
else {
// 打印日志信息
LOG_ERROR("get znode stat failed, path: %s", path);
// 返回空数据
return {};
}
}

// 在 ZK 服务器上,根据指定的 Path 判断 ZNode 节点是否存在
ZNodeStatus ZkClient::Exist(const char *path) {
// 检查节点路径是否合法
if (!checkPath(path)) {
return UNKNOWN;
}

// 同步判断 ZNode 节点是否存在
int flag = zoo_exists_sync(m_zhandle, path, 0);

// ZNode 节点已存在
if (ZOK == flag) {
return EXIST;
}
// ZNode 节点不存在
else if (ZNONODE == flag) {
return NOTEXIST;
}
// 发生错误,比如会话过期、身份认证失败等
else {
return UNKNOWN;
}
}

// 检查节点路径是否合法
bool ZkClient::checkPath(const char *path) {
if (path == nullptr || path[0] != '/') {
// 打印日志信息
LOG_ERROR("invalid node path: %s", path);
return false;
}
return true;
}

RPC 框架测试代码

这里的项目测试代码位于 example 目录下,实质是 RPC 框架的使用案例代码,用于给 RPC 框架的第三方使用者演示如何发布和调用 RPC 服务。

Protobuf 协议文件

  • user.proto 协议文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// Protobuf 语法的版本
syntax = "proto3";

// 定义包名,便于在生成的代码中区分不同模块(类似 C++ 的命名空间)
package user;

// 允许生成通用的 C++ 服务接口(可选项)
option cc_generic_services = true;

// 请求结果
message ResultCode {
uint32 errcode = 1;
bytes errmsg = 2;
}

// 登录请求
message LoginRequest {
bytes name = 1;
bytes password = 2;
}

// 登录响应
message LoginResponse {
ResultCode result = 1;
bool success = 2;
}

// 注册请求
message RegisterRequest {
bytes name = 1;
bytes password = 2;
}

// 注册响应
message RegisterResponse {
ResultCode result = 1;
bool success = 2;
}

// 定义RPC服务接口类和服务函数
service UserServiceRpc {
rpc Register(RegisterRequest) returns (RegisterResponse);

rpc Login(LoginRequest) returns (LoginResponse);
}
  • friend.proto 协议文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// Protobuf 语法的版本
syntax = "proto3";

// 定义包名,便于在生成的代码中区分不同模块(类似 C++ 的命名空间)
package friends;

// 允许生成通用的 C++ 服务接口(可选项)
option cc_generic_services = true;

// 请求结果
message ResultCode {
uint32 errcode = 1;
bytes errmsg = 2;
}

// 好友信息
message Friend {
uint32 userid = 1;
bytes username = 2;

// 枚举类型
enum SEX {
MAN = 0;
WOMAN = 1;
}

SEX sex = 3;
}

// 获取好友列表的请求
message GetFriendListRequest {
uint32 userid = 1;
}

// 获取好友列表的响应
message GetFriendListResponse {
ResultCode result = 1;
// 好友列表
repeated Friend friends = 2;
}

// 定义RPC服务接口类和服务函数
service FriendServiceRpc {
rpc GetFriendList(GetFriendListRequest) returns (GetFriendListResponse);
}

使用 RPC 框架提供服务

  • rpcprovider.cc 源文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#include <iostream>

#include "friend.pb.h"
#include "logger.h"
#include "mprpccontext.h"
#include "mprpcprovider.h"
#include "user.pb.h"
#include "vector"

// RPC 服务,继承基类 UserServiceRpc(由 Protobuf 自动生成)
class UserService : public user::UserServiceRpc {
public:
// 本地的登录函数
bool Login(std::string name, std::string password) {
LOG_INFO("invoke local Login function, name: %s, password: %s", name.c_str(), password.c_str());
return true;
}

// 本地的注册函数
bool Register(std::string name, std::string password) {
LOG_INFO("invoke local Register function, name: %s, password: %s", name.c_str(), password.c_str());
return true;
}

// 重写基类 UserServiceRpc(由 Protobuf 自动生成)的虚函数
void Login(::google::protobuf::RpcController* controller, const ::user::LoginRequest* request,
::user::LoginResponse* response, ::google::protobuf::Closure* done) override {
// 获取 RPC 请求参数
std::string name = request->name();
std::string password = request->password();

// 调用本地业务函数
bool success = Login(name, password);

// 设置 RPC 响应结果
user::ResultCode* result = response->mutable_result();
result->set_errcode(0);
result->set_errmsg("");
response->set_success(success);

// 执行回调操作,返回响应结果给 RPC 服务调用者
done->Run();
}

// 重写基类 UserServiceRpc(由 Protobuf 自动生成)的虚函数
void Register(::google::protobuf::RpcController* controller, const ::user::RegisterRequest* request,
::user::RegisterResponse* response, ::google::protobuf::Closure* done) override {
// 获取 RPC 请求参数
std::string name = request->name();
std::string password = request->password();

// 调用本地业务函数
bool success = Register(name, password);

// 设置 RPC 响应结果
user::ResultCode* result = response->mutable_result();
result->set_errcode(0);
result->set_errmsg("");
response->set_success(success);

// 执行回调操作,返回响应结果给RPC服务调用者
done->Run();
}
};

// RPC 服务,继承基类 FriendServiceRpc(由 Protobuf 自动生成)
class FriendServcie : public friends::FriendServiceRpc {
// 本地获取好友列表的函数
std::vector<friends::Friend> GetFriendList(uint32_t userid) {
LOG_INFO("invoke local GetFriendList function, userid: %u", userid);

// 返回结果
std::vector<friends::Friend> result;

friends::Friend f1;
f1.set_userid(1);
f1.set_username("Jim");
f1.set_sex(friends::Friend::MAN);
result.push_back(f1);

friends::Friend f2;
f2.set_userid(2);
f2.set_username("Tom");
f2.set_sex(friends::Friend::MAN);
result.push_back(f2);

return result;
}

// 重写基类 FriendServiceRpc(由 Protobuf 自动生成)的虚函数
void GetFriendList(::google::protobuf::RpcController* controller, const ::friends::GetFriendListRequest* request,
::friends::GetFriendListResponse* response, ::google::protobuf::Closure* done) {
// 获取 RPC 请求参数
uint32_t userid = request->userid();

// 调用本地业务函数
std::vector<friends::Friend> friends = GetFriendList(userid);

// 设置 RPC 响应结果
friends::ResultCode* result = response->mutable_result();
result->set_errcode(0);
result->set_errmsg("");

// 设置响应的数据
for (friends::Friend& item : friends) {
friends::Friend* f = response->add_friends();
f->set_sex(item.sex());
f->set_userid(item.userid());
f->set_username(item.username());
}

// 执行回调操作,返回响应结果给RPC服务调用者
done->Run();
}
};

// 测试 RPC 服务的发布
int main(int argc, char** argv) {
// 设置日志级别
Logger::GetInstance().SetLogLevel(INFO);

// 调用 RPC 框架的初始化操作(比如加载 RPC 配置文件)
LOG_INFO("init rpc framework...");
MprpcContext::GetInstance().Init(argc, argv);

// 创建用来发布 RPC 服务的网络对象类
RpcProvider provider;

// 发布 RPC 服务
provider.PublishService(new UserService());

// 发布 RPC 服务
provider.PublishService(new FriendServcie());

// 启动 RPC 服务节点,开始对外提供 RPC 远程网络调用服务(针对 RPC 服务提供者)
provider.Run();

return 0;
}

使用 RPC 框架调用服务

  • rpcconsumer.cc 源文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#include <iostream>
#include <memory>

#include "friend.pb.h"
#include "logger.h"
#include "mprpccontext.h"
#include "mprpccontroller.h"
#include "user.pb.h"

// 调用 RPC 注册方法
void Register() {
// RPC 调用的通道
std::unique_ptr<google::protobuf::RpcChannel> channel = std::make_unique<MprpcChannel>();

// RPC 调用的状态控制器
std::unique_ptr<google::protobuf::RpcController> controller = std::make_unique<MprpcController>();

// RPC 调用的代理对象
user::UserServiceRpc_Stub stub(channel.get());

// RPC 调用的请求参数
user::RegisterRequest request;
request.set_name("jim");
request.set_password("123456");

// RPC 调用的响应结果
user::RegisterResponse response;

// 发起 RPC 调用,底层实际上调用的是 MprpcChannel::CallMethod()
stub.Register(controller.get(), &request, &response, nullptr);

// 判断 RPC 调用是否成功
if (controller->Failed()) {
return;
}

// 获取 RPC 调用的响应结果
if (0 == response.result().errcode()) {
LOG_INFO("rpc function Register invoke success");
} else {
LOG_ERROR("rpc function Register invoke error: %s", response.result().errmsg().c_str());
}
}

// 调用 RPC 登录方法
void Login() {
// RPC 调用的通道
std::unique_ptr<google::protobuf::RpcChannel> channel = std::make_unique<MprpcChannel>();

// RPC 调用的状态控制器
std::unique_ptr<google::protobuf::RpcController> controller = std::make_unique<MprpcController>();

// RPC 调用的代理对象
user::UserServiceRpc_Stub stub(channel.get());

// RPC 调用的请求参数
user::LoginRequest request;
request.set_name("jim");
request.set_password("123456");

// RPC 调用的响应结果
user::LoginResponse response;

// 发起 RPC 调用,底层实际上调用的是 MprpcChannel::CallMethod()
stub.Login(controller.get(), &request, &response, nullptr);

// 判断 RPC 调用是否成功
if (controller->Failed()) {
return;
}

// 获取 RPC 调用的响应结果
if (0 == response.result().errcode()) {
LOG_INFO("rpc function Login invoke success");
} else {
LOG_ERROR("rpc function Login invoke error: %s", response.result().errmsg().c_str());
}
}

// 调用 RPC 获取好友列表方法
void GetFriendList() {
// RPC 调用的通道
std::unique_ptr<google::protobuf::RpcChannel> channel = std::make_unique<MprpcChannel>();

// RPC 调用的状态控制器
std::unique_ptr<google::protobuf::RpcController> controller = std::make_unique<MprpcController>();

// RPC 调用的代理对象
friends::FriendServiceRpc_Stub stub(channel.get());

// RPC 调用的请求参数
friends::GetFriendListRequest request;
request.set_userid(3);

// RPC 调用的响应结果
friends::GetFriendListResponse response;

// 发起 RPC 调用,底层实际上调用的是 MprpcChannel::CallMethod()
stub.GetFriendList(controller.get(), &request, &response, nullptr);

// 判断 RPC 调用是否成功
if (controller->Failed()) {
return;
}

// 获取 RPC 调用的响应结果
if (0 == response.result().errcode()) {
LOG_INFO("rpc function GetFriendList invoke success");

// 获取返回的数据
auto friends = response.friends();
for (auto& item : friends) {
LOG_INFO("userid: %u, username: %s, sex: %d", item.userid(), item.username().c_str(), item.sex());
}
} else {
LOG_ERROR("rpc function GetFriendList invoke error: %s", response.result().errmsg().c_str());
}
}

// 测试 RPC 服务的调用
int main(int argc, char** argv) {
// 设置日志级别
Logger::GetInstance().SetLogLevel(INFO);

// 调用 RPC 框架的初始化操作(比如加载 RPC 配置文件)
LOG_INFO("init rpc framework...");
MprpcContext::GetInstance().Init(argc, argv);

// 调用远程的 RPC 注册方法
Register();
std::cout << std::endl;

// 调用远程的 RPC 登录方法
Login();
std::cout << std::endl;

// 调用远程的 RPC 获取好友列表方法
GetFriendList();
std::cout << std::endl;

// 阻塞等待一段时间,在程序结束之前,尽量让日志信息都被写入日志文件
sleep(1);

return 0;
}

项目代码下载

完整的 RPC 框架项目代码可以在 这里 下载得到。

项目测试

  • (1) 更改 conf 目录下的 rpc.conf 配置文件,指定 ZooKeeper 服务器的 IP 和端口号等信息
1
2
3
4
5
6
# ZooKeeper的IP地址(必填)
zk_server_host=127.0.0.1
# ZooKeeper的端口号(必填)
zk_server_port=2181
# RPC服务提供者优先使用的网卡接口(可选)
rpc_network_interface=eth1
  • (2) 通过 CMake 编译构建整个项目
1
2
3
4
5
6
7
8
# 进入项目的根目录
cd c++-project-mprpc

# 授权脚本执行
chmod +x autobuild.sh

# 执行一键编译构建脚本
./autobuild.sh
  • (3) 启动 ZooKeeper 服务器
1
2
3
4
5
# 启动 ZooKeeper 服务器
sudo systemctl start zookeeper

# 查看 ZooKeeper 服务器的运行状态
sudo systemctl status zookeeper
  • (4) 启动 RPC 服务提供者的案例程序
1
2
3
4
5
# 进入项目的 bin 目录
cd c++-project-mprpc/bin

# 启动 RPC 服务提供者的案例程序(可使用相同的命令启动多个 RPC 服务提供者,不需要考虑端口冲突问题)
./example_rpc_provider -i rpc.conf
  • (5) 启动 RPC 服务调用者的案例程序
1
2
3
4
5
# 进入项目的 bin 目录
cd c++-project-mprpc/bin

# 启动 RPC 服务调用者的案例程序(可使用相同的命令启动多个 RPC 服务调用者,不需要考虑端口冲突问题)
./example_rpc_consumer -i rpc.conf
  • (6) 查看 RPC 框架运行输出的日志文件
1
2
3
4
5
# 进入项目的 bin 目录
cd c++-project-mprpc/bin

# 查看 RPC 框架运行输出的日志文件(请自行更改日志文件名)
vim 2025-06-12-log.txt

项目调试

  • GDB 调试 C/C++ 代码,可用于定位线程死锁等问题的发生
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# GDB 调试指定的可执行文件(应用程序)
gdb example_rpc_provider

# GDB 设置断点(格式:源文件名称+行号)
(gdb) break mprpcconfig.cc:19

# GDB 指定参数运行应用程序(运行后,会停留在断点处)
(gdb) run -i rpc.conf

# GDB 断点调试 - 运行下一行代码
(gdb) n

# GDB 断点调试 - 查看指定变量的值
(gdb) p src_buf

# GDB 退出调试
(gdb) quit

项目优化

在上述的 RPC 框架代码中,每当 RPC 服务端处理完成一个 RPC 请求后,都会主动断开与 RPC 客户端的 TCP 连接;同样,RPC 客户端在获取到响应结果后,也会主动断开与 RPC 服务端的 TCP 连接。这种实现方式属于短连接模式,在生产环境中通常比较少见。原因如下:

  • TCP 连接的创建和释放代价较高,每次请求都涉及:

    • 三次握手(建立连接)
    • 四次挥手(关闭连接)
  • 在高并发场景下,频繁建立和关闭 TCP 连接会导致:

    • 系统资源消耗大(文件描述符、内核网络栈负载)
    • 服务端的性能瓶颈,特别是处理大量短时请求时

以阿里巴巴开源的 RPC 框架 Dubbo 为例,Dubbo 默认采用 TCP 的单一长连接和 NIO 异步通信模型,其主要特点是:

  • 客户端与服务端之间建立持久化的长连接(通常是连接池或多路复用)
  • 异步非阻塞通信(基于 Netty 实现)
  • 复用连接发送多次请求,避免了重复的 TCP 握手和断开操作,显著降低了网络资源和 CPU 消耗

因此在生产环境中,建议使用 TCP 的单一长连接和 NIO 异步通信模型(Muduo 网络库天生支持),以此保证 RPC 框架的高并发性能。值得一提的是,使用 TCP 长连接后,需要额外处理 TCP 分包的问题,也就是将 TCP 字节流里的数据识别为一个个消息(RPC 请求消息)。

模式特点场景
短连接(请求 - 响应后立即断开)简单实现,资源浪费大一般用于简单的 HTTP/1.0 接口或调试环境
长连接(复用 TCP 连接)性能优越,适合高并发 gRPC、Dubbo、Redis、数据库连接等

参考资料