基于 C++ 开发集群聊天服务器

大纲

前言

本文将基于 C++ 实现集群聊天服务器,使用了 Json 库和 Muduo 网络库,并引入了 Redis、MySQL、Nginx 等中间件。

开发工具

软件版本说明
C++ 标准11
Boost1.74.0.3Muduo 库依赖 Boost 库
Muduo2.0.3Muduo 库,基于 C++ 开发,用于网络编程
hiredis1.3.0Reids 库 ,基于 C 语言开发,用于操作 Redis
nlohmann/json3.12.0Json 库,基于 C++ 开发,用于 Json 序列化和反序列化
Redis7.0.15Redis 服务器
MySQL8.4.5MySQL 服务器
Nginx1.28.0Nginx 服务器
G++(GCC)12.2.0建议使用 5.57.5 版本的 G++(GCC) 编译器
CMake3.25.1C/C++ 项目构建工具
LinuxDebian 12Muduo 库不支持 Windows 平台
Visual Studio Code1.100.2使用 VSCode 远程开发特性

平台兼容性说明

由于使用了 Muduo 库,且 Muduo 库仅支持 Linux 平台;因此本文提供的所有 C++ 集群聊天服务器代码支持在 Linux 平台运行,不支持 Windows 平台,默认是基于 Debian 12 进行远程开发。

开发环境

使用 C++ 开发 Linux 应用时,常见的开发环境有以下几种:

提示

上面介绍的三种开发环境,任意选择一种自己熟悉的就可以;如果日常使用的是 Windows 系统,建议选择第四种开发环境(VSCode 远程开发);如果习惯使用 Linux 系统,强烈建议选择第一种开发环境(Linux 本地开发)。

准备工作

安装工具

  • 安装常用的工具
1
2
3
4
5
# 安装开发工具
sudo apt-get install -y vim git

# 安装网络工具
sudo apt-get install -y wget curl telnet netcat-openbsd socat tcpdump hping3

安装 GCC

1
2
3
4
5
6
7
8
# 安装 GCC、G++、GDB
sudo apt-get install -y build-essential gdb

# 查看 GCC 版本
gcc --version

# 查看 G++ 版本
g++ --version

安装 CMake

1
2
3
4
5
# 安装 CMake
sudo apt-get -y install cmake

# 查看 CMake 版本
cmake -version

安装 Redis

  • 安装 Redis
1
2
3
4
5
6
7
8
# 安装 Redis
sudo apt-get install -y redis-server

# 开机自启动 Redis
sudo systemctl enable redis-server

# 查看 Redis 的运行状态
sudo systemctl status redis-server
  • 验证安装
1
2
3
4
5
6
7
8
# 查看 Redis 版本
redis-server --version

# Redis 客户端执行 Ping 操作
redis-cli ping

# 当 Redis 服务器响应以下内容,则说明 Redis 服务器正常运行
PONG

Redis 默认配置文件的路径

通过 APT 安装 Redis 服务器后,其主配置文件的路径为 /etc/redis/redis.conf

安装 Nginx

  • 安装依赖包
1
2
# 安装依赖软件(比如 pcre、zlib、ssl)
sudo apt install -y build-essential libpcre3 libpcre3-dev zlib1g zlib1g-dev libssl-dev
  • 编译安装 Nginx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 下载源码
wget https://nginx.org/download/nginx-1.28.0.tar.gz

# 解压源码
tar -xvf nginx-1.28.0.tar.gz

# 进入源码目录
cd nginx-1.28.0

# 生成构建文件
./configure --with-stream --with-http_ssl_module

# 编译源码
make -j$(nproc)

# 执行安装
sudo make install
  • Nginx 安装说明
安装说明路径
Nginx 默认安装路径/usr/local/nginx
Nginx 主配置文件路径/usr/local/nginx/conf/nginx.conf
Nginx 二进制可执行文件路径/usr/local/nginx/sbin/nginx
  • Nginx 管理命令
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 启动 Nginx
sudo /usr/local/nginx/sbin/nginx

# 优雅关闭 Nginx
sudo /usr/local/nginx/sbin/nginx -s quit

# 立刻关闭 Nginx
sudo /usr/local/nginx/sbin/nginx -s stop

# 检查 Nginx 配置文件
sudo /usr/local/nginx/sbin/nginx -t

# 重载 Nginx 配置文件
sudo /usr/local/nginx/sbin/nginx -s reload
  • 添加 Systemd 服务(实现 Nginx 服务自启动)
1
2
# 创建 Systemd 服务配置文件,添加以下配置内容
sudo vi /etc/systemd/system/nginx.service
1
2
3
4
5
6
7
8
9
10
11
12
13
[Unit]
Description=The NGINX HTTP and reverse proxy server
After=network.target

[Service]
ExecStart=/usr/local/nginx/sbin/nginx
ExecReload=/usr/local/nginx/sbin/nginx -s reload
ExecStop=/usr/local/nginx/sbin/nginx -s quit
PIDFile=/usr/local/nginx/logs/nginx.pid
Restart=on-failure

[Install]
WantedBy=multi-user.target
1
2
3
4
5
# 重载系统配置文件
sudo systemctl daemon-reload

# 开机自启动 Nginx 服务
sudo systemctl enable nginx
1
2
3
4
5
6
7
8
9
10
11
# 启动 Nginx 服务
sudo systemctl start nginx

# 查看 Nginx 服务的运行状态
sudo systemctl status nginx

# 优雅关闭 Nginx 服务
sudo systemctl stop nginx

# 重载 Nginx 服务的配置文件
sudo systemctl reload nginx

特别注意

在 Nginx 1.9.0 版本之前,Nginx 仅支持基于 HTTP 协议 的 Web 服务器负载均衡,无法处理 TCP 层的流量转发。自 Nginx 1.9.0 版本开始,官方引入了名为 stream 的新模块,使 Nginx 能够支持基于 TCP 和 UDP 的四层负载均衡,从而扩展了其在数据库代理、邮件服务、消息中间件等非 HTTP 场景中的应用能力。值得一提的是,尽管 stream 模块在 1.9.0 版本中开始被引入,但在官方源码中该模块默认并未启用。因此,在编译 Nginx 源码时,如果希望使用 stream 模块的功能,则必须显式添加 --with-stream 编译参数,这样才能将其集成进最终构建的二进制可执行文件中。

安装 MySQL

  • 添加 MySQL 官方 APT 源
1
2
3
4
5
6
7
8
9
10
# 下载 APT 配置包(访问 https://dev.mysql.com/downloads/repo/apt/ 可以获取最新版本)
wget https://dev.mysql.com/get/mysql-apt-config_0.8.34-1_all.deb

# 安装 APT 配置包
# 在安装过程中会弹出配置界面,通常默认选择的是 MySQL 8.4 版本,如果需要安装其他版本(如 8.0),可以按回车键进入子菜单选择其他版本
# 由于集群聊天服务器需要使用到 MySQL 客户端,因此还需要安装 MySQL Connectors(不需要改动选择,因为默认已经选择安装)
sudo dpkg -i mysql-apt-config_0.8.34-1_all.deb

# 更新 APT 索引
sudo apt-get update
  • 安装 MySQL
1
2
3
4
5
6
7
8
9
10
11
# 安装 MySQL(安装过程中会提示输入 MySQL 的 root 用户的密码)
sudo apt-get install -y mysql-server

# 安装 MySQL 开发包(包含了 MySQL C API 的头文件和动态库文件)
sudo apt-get install -y libmysqlclient-dev

# 开机自启动 MySQL
sudo systemctl enable mysql

# 查看 MySQL 的运行状态
sudo systemctl status mysql

MySQL 默认配置文件的路径

通过 APT 安装 MySQL 服务器后,其主配置文件的路径为 /etc/mysql/my.cnf

  • 验证 MySQL 安装
1
2
# 登录 MySQL
mysql -u root -p
1
2
-- 查看 MySQL 版本
select version();
  • 验证 MySQL 开发包的安装
1
sudo find /usr -iname libmysqlclient*
1
2
3
4
/usr/lib/x86_64-linux-gnu/libmysqlclient.so
/usr/lib/x86_64-linux-gnu/libmysqlclient.a
/usr/lib/x86_64-linux-gnu/libmysqlclient.so.24
/usr/lib/x86_64-linux-gnu/libmysqlclient.so.24.0.5

MySQL C API 库

本文使用 libmysqlclient.so 库来操作 MySQL 数据库,该库称为 MySQL C API(也叫 Connector/C),它是基于 C 语言实现的。C++ 项目也可以使用这个库,只要用 extern "C" 来链接(或者直接使用 MySQL 提供的头文件中已经加好的处理)。

安装 Boost 库

1
2
3
4
5
# 安装 Boost 的所有组件和头文件
sudo apt-get install -y libboost-all-dev

# 查看 Bootst 版本
sudo dpkg -s libboost-all-dev | grep Version

提示

由于 Muduo 使用了 Boost 库(如 boost::any),因此需要安装 Boost 库。

安装 Muduo 库

  • 编译安装 Muduo 库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Git 克隆代码
git clone https://github.com/chenshuo/muduo.git

# 进入代码目录
cd muduo

# 创建构建目录
mkdir -p build

# 进入构建目录
cd build

# 生成构建文件
cmake ..

# 编译源码
make -j$(nproc)

# 执行安装
sudo make install

# 更新系统的共享库缓存
sudo ldconfig /usr/local/lib/
  • 验证安装
1
2
3
4
5
# 查看 Muduo 库的头文件
ls -al /usr/local/include/muduo

# 查看 Muduo 库的静态库
ls -al /usr/local/lib | grep muduo

提示

  • Muduo 的编译依赖 CMake 和 Boost 库,默认编译生成的是静态库(.a),如果需要编译生成共享库(.so),可以自行修改 CMakeLists.txt 中的配置。
  • Muduo 支持 C++ 11,仅支持 Linux 平台,不支持 Windows 平台,建议使用 7.x 及以后版本的 g++ 编译器。

安装 Hiredis 库

  • 编译安装 Hiredis 库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Git 克隆代码
git clone https://github.com/redis/hiredis.git

# 进入源码目录
cd hiredis

# 编译源码
make -j$(nproc)

# 执行安装
sudo make install

# 更新系统的共享库缓存
sudo ldconfig /usr/local/lib/
  • 验证安装
1
2
3
4
5
# 查看 Hiredis 库的头文件
ls -al /usr/local/include/hiredis

# 查看 Hiredis 库的静态库和动态库
ls -al /usr/local/lib | grep hiredis

项目介绍

项目需求

  • 客户端新用户注册
  • 客户端用户登录
  • 添加好友和添加群组
  • 好友聊天
  • 群组聊天
  • 离线消息
  • Nginx 配置 TCP 负载均衡
  • 集群聊天系统支持客户端跨服务器通信

项目目标

  • 掌握 Json 的编程应用
  • 掌握 CMake 构建自动化编译环境
  • 掌握 Muduo 网络库的编程以及实现原理
  • 掌握 Nginx 配置部署 TCP 负载均衡器的应用以及原理
  • 掌握服务器的网络 I/O 模块、业务模块、数据模块分层的设计思想
  • 掌握 Redis 发布 - 订阅的编程实践以及应用原理

项目架构

在集群聊天服务器项目中,使用 Nginx 作为 TCP 负载均衡器,同时使用 Redis 的发布 - 订阅特性来解决客户端跨服务器通信问题。整体工作流程如下图所示:

提示

Nginx 单机作为负载均衡器时,经过合理的系统参数优化和配置(如 ulimit、内核参数调整等),通常可以稳定支撑 5 万~6 万个并发 TCP 连接。但是,由于 Nginx 本质上是属于应用层的四层代理,其性能仍受限于单机的 CPU、内存、网络带宽和文件描述符等资源限制。若需支持 超过 10 万甚至上百万的并发 TCP 连接,可采用 LVS + Keepalived + Nginx 的分层架构实现更高性能、更高可用性、更强伸缩性的负载均衡方案,该方案的部署拓扑结构 如图 所示。

项目技术栈

  • 单例设计模式
  • Muduo 网络库
  • MySQL 数据库编程
  • CMake 构建编译环境
  • Json 序列化和反序列化
  • Nginx 的 TCP 负载均衡器使用
  • Redis 的发布 - 订阅编程实践

项目开发

Nginx 负载均衡配置

在集群聊天服务器项目中,由于使用了 Nginx 作为 TCP 负载均衡器,因此需要在 Nginx 的配置文件中(nginx.conf)添加 stream 模块的配置内容,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
events {
........
}

# nginx tcp loadbalance config
stream {
# 集群聊天的服务器列表
upstream chat_server {
server 127.0.0.1:6000 weight=1 max_fails=3 fail_timeout=30s;
server 127.0.0.1:6002 weight=1 max_fails=3 fail_timeout=30s;
}

# TCP 负载均衡器(负责转发流量)
server {
listen 8000;
proxy_pass chat_server;
proxy_connect_timeout 1s;
tcp_nodelay on;
}
}

http {
......
}

Redis 发布 - 订阅使用

在集群聊天服务器项目中,使用 Redis 的发布 - 订阅特性来解决客户端跨服务器通信问题。Reids 的发布 - 订阅功能,主要使用以下几个 Redis 命令来实现:

  • 订阅指定的 Channel
1
subscribe news
  • 发布消息到指定的 Channel
1
publish news "hello"
  • 取消订阅指定的 Channel
1
unsubscribe news

提示

在集群聊天服务器项目中,为了方便操作 Redis,会使用到 Hiredis 库。

MySQL 数据库初始化

数据库设计

C++ 集群聊天服务器项目的数据库表设计如下:

数据库初始化

  • 创建数据库
1
CREATE DATABASE `chat` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
  • 切换数据库
1
USE `chat`;
  • 创建用户表
1
2
3
4
5
6
7
8
CREATE TABLE `user` (
`id` int NOT NULL AUTO_INCREMENT,
`name` varchar(50) DEFAULT NULL,
`password` varchar(50) DEFAULT NULL,
`state` enum('online','offline') DEFAULT 'offline',
PRIMARY KEY (`id`),
UNIQUE KEY `name` (`name`)
) ENGINE=InnoDB AUTO_INCREMENT=22 DEFAULT CHARSET=utf8mb4;
  • 创建好友表
1
2
3
4
5
CREATE TABLE `friend` (
`userid` int NOT NULL,
`friendid` int NOT NULL,
UNIQUE KEY `userid` (`userid`,`friendid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  • 创建用户组表
1
2
3
4
5
6
7
CREATE TABLE `allgroup` (
`id` int NOT NULL AUTO_INCREMENT,
`groupname` varchar(50) NOT NULL,
`groupdesc` varchar(200) DEFAULT '',
PRIMARY KEY (`id`),
UNIQUE KEY `groupname` (`groupname`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;
  • 创建用户与用户组关联表
1
2
3
4
5
6
CREATE TABLE `groupuser` (
`groupid` int NOT NULL,
`userid` int NOT NULL,
`grouprole` enum('creator','normal') DEFAULT NULL,
UNIQUE KEY `groupid` (`groupid`,`userid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  • 创建离线消息表
1
2
3
4
5
CREATE TABLE `offlinemessage` (
`userid` int NOT NULL,
`message` varchar(500) NOT NULL,
`createtime` bigint NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

集群聊天服务器开发

项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
├── CMakeLists.txt
├── README.md
├── autobuild.sh
├── sql
│   └── db.sql
├── include
│ ├── public
│ │ ├── config.hpp
│ │ ├── public.hpp
│ │ └── times.hpp
│ └── server
│ ├── chatserver.hpp
│ ├── chatservice.hpp
│ ├── dao
│ │ ├── friendmodel.hpp
│ │ ├── groupmodel.hpp
│ │ ├── groupusermodel.hpp
│ │ ├── offlinemessagemodel.hpp
│ │ └── usermodel.hpp
│ ├── db
│ │ └── db.hpp
│ ├── domain
│ │ ├── friend.hpp
│ │ ├── group.hpp
│ │ ├── groupuser.hpp
│ │ ├── offlinemessage.hpp
│ │ └── user.hpp
│ └── redis
│ └── redis.hpp
├── src
│ ├── CMakeLists.txt
│ ├── client
│ │ ├── CMakeLists.txt
│ │ └── main.cpp
│ └── server
│ ├── CMakeLists.txt
│ ├── chatserver.cpp
│ ├── chatservice.cpp
│ ├── dao
│ │ ├── friendmodel.cpp
│ │ ├── groupmodel.cpp
│ │ ├── groupusermodel.cpp
│ │ ├── offlinemessagemodel.cpp
│ │ └── usermodel.cpp
│ ├── db
│ │ └── db.cpp
│ ├── main.cpp
│ └── redis
│ └── redis.cpp
└── thirdparty
└── json.hpp

项目代码

下载完整的项目代码

由于篇幅有限,下面只给出集群聊天服务端和客户端的部分核心代码,完整的项目代码可以在 这里 下载得到。

服务端核心代码

提示

在集群聊天服务端中,使用了 Json 库和 Muduo 网络库,并引入了 MySQL、Redis。

Redis 代码
  • include/server/redis/redis.hpp 头文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#ifndef REDIS_H
#define REDIS_H

#include <hiredis/hiredis.h>

#include <functional>
#include <iostream>
#include <thread>

using namespace std;

// Redis 操作类
class Redis {
public:
// 构造函数
Redis();

// 析构函数
~Redis();

// 连接redis服务器
bool connect();

// 向redis指定的通道subscribe消息
bool subscribe(int channel);

// 向redis指定的通道unsubscribe消息
bool unsubscribe(int channel);

// 在独立线程中异步接收订阅通道中的消息
void observer_channel_message();

// 向redis指定的通道publish消息
bool publish(int channel, string message);

// 初始化向业务层上报通道消息的回调对象
void init_notify_handler(function<void(int, string)> fn);

private:
// hiredis同步上下文对象(即redis客户端),负责publish消息
redisContext *_publish_context;

// hiredis同步上下文对象(即redis客户端),负责subscribe消息
redisContext *_subcribe_context;

// 回调操作,收到订阅的消息,给service层上报
function<void(int, string)> _notify_message_handler;
};

#endif
  • src/server/redis/redis.cpp 源文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#include "redis.hpp"

#include "config.hpp"

// 构造函数
Redis::Redis() : _publish_context(nullptr), _subcribe_context(nullptr) {
}

// 析构函数
Redis::~Redis() {
if (_publish_context != nullptr) {
redisFree(_publish_context);
}

if (_subcribe_context != nullptr) {
redisFree(_subcribe_context);
}
}

// 连接redis服务器
bool Redis::connect() {
// 负责publish发布消息的上下文对象(即redis客户端)
_publish_context = redisConnect(REDIS_IP.c_str(), REDIS_PORT);
if (nullptr == _publish_context || _publish_context->err) {
cerr << "connect redis failed!" << endl;
if (_publish_context) {
cerr << "connect redis error: " << _publish_context->errstr << endl;
redisFree(_publish_context);
}
return false;
}

// 负责subscribe订阅消息的上下文对象(即redis客户端)
_subcribe_context = redisConnect(REDIS_IP.c_str(), REDIS_PORT);
if (nullptr == _subcribe_context || _subcribe_context->err) {
cerr << "connect redis failed!" << endl;
if (_subcribe_context) {
cerr << "connect redis error: " << _subcribe_context->errstr << endl;
redisFree(_subcribe_context);
}
return false;
}

// 在单独的线程中,监听通道上的事件,有消息就给业务层进行上报
thread t([&]() { observer_channel_message(); });
t.detach();

cout << "connect redis success!" << endl;

return true;
}

// 向redis指定的通道subscribe消息
bool Redis::subscribe(int channel) {
// SUBSCRIBE命令本身会造成线程阻塞等待通道里面发生消息,这里只做订阅通道,不接收通道消息
// 通道消息的接收专门在observer_channel_message()函数中的独立线程中进行
// 这里只负责发送订阅命令,不阻塞接收Redis服务器的响应消息,否则会和notifyMsg线程抢占响应资源
if (REDIS_ERR == redisAppendCommand(this->_subcribe_context, "SUBSCRIBE %d", channel)) {
cerr << "subscribe command execute failed!" << endl;
return false;
}

// redisBufferWrite()函数可以循环发送缓冲区中的数据,直到缓冲区数据发送完毕(done被置为1)
int done = 0;
while (!done) {
if (REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done)) {
cerr << "subscribe command execute failed!" << endl;
return false;
}
}

return true;
}

// 向redis指定的通道unsubscribe消息
bool Redis::unsubscribe(int channel) {
if (REDIS_ERR == redisAppendCommand(this->_subcribe_context, "UNSUBSCRIBE %d", channel)) {
cerr << "unsubscribe command execute failed!" << endl;
return false;
}

// redisBufferWrite()函数可以循环发送缓冲区中的数据,直到缓冲区数据发送完毕(done被置为1)
int done = 0;
while (!done) {
if (REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done)) {
cerr << "unsubscribe command execute failed!" << endl;
return false;
}
}

return true;
}

// 在独立线程中异步接收订阅通道中的消息
void Redis::observer_channel_message() {
redisReply *reply = nullptr;
while (REDIS_OK == redisGetReply(this->_subcribe_context, (void **)&reply)) {
// 订阅收到的消息是一个带三个元素的数组
if (reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr) {
// 给业务层上报通道上发生的消息
_notify_message_handler(atoi(reply->element[1]->str), reply->element[2]->str);
}

// 释放资源
freeReplyObject(reply);
}

cerr << ">>>>>>>>>>>>> observer_channel_message quit <<<<<<<<<<<<<" << endl;
}

// 向redis指定的通道publish消息
bool Redis::publish(int channel, string message) {
// 发布消息
redisReply *reply = (redisReply *)redisCommand(_publish_context, "PUBLISH %d %s", channel, message.c_str());
if (nullptr == reply) {
cerr << "publish command execute failed!" << endl;
return false;
}

// 释放资源
freeReplyObject(reply);
return true;
}

// 初始化向业务层上报通道消息的回调对象
void Redis::init_notify_handler(function<void(int, string)> fn) {
this->_notify_message_handler = fn;
}
MySQL 代码
  • include/server/db/db.hpp 源文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#ifndef DB_H
#define DB_H

#include <mysql/mysql.h>

#include <iostream>
#include <string>

using namespace std;

// 数据库操作类
class MySQL {
public:
// 初始化数据库连接
MySQL();

// 释放数据库连接
~MySQL();

// 连接数据库
bool connect();

// 更新操作
bool update(string sql);

// 查询操作
MYSQL_RES *query(string sql);

// 获取数据库连接
MYSQL *getConnection();

private:
MYSQL *_conn; // 数据库连接
};

#endif // DB_H
  • src/server/db/db.cpp 源文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include "db.hpp"

#include <muduo/base/Logging.h>

#include "config.hpp"

// 初始化数据库连接
MySQL::MySQL() {
_conn = mysql_init(nullptr);
}

// 释放数据库连接
MySQL::~MySQL() {
if (_conn != nullptr) {
mysql_close(_conn);
}
}

// 连接数据库
bool MySQL::connect() {
MYSQL *p = mysql_real_connect(_conn, DB_IP.c_str(), DB_USER.c_str(), DB_PASSWORD.c_str(), DB_NAME.c_str(), DB_PORT,
nullptr, 0);
if (p != nullptr) {
// C和C++代码默认的编码字符是ASCII,如果不设置,从MySQL查询到的中文内容可能会显示?乱码
mysql_query(_conn, "set names utf8mb4");
LOG_INFO << "connect mysql success!";
} else {
LOG_ERROR << "connect mysql failed!";
LOG_ERROR << mysql_error(_conn);
}

return p;
}

// 更新操作
bool MySQL::update(string sql) {
LOG_DEBUG << sql;
if (mysql_query(_conn, sql.c_str())) {
LOG_ERROR << __FILE__ << ":" << __LINE__ << " " << sql << " execute failed!";
LOG_ERROR << mysql_error(_conn);
return false;
}

return true;
}

// 查询操作
MYSQL_RES *MySQL::query(string sql) {
LOG_DEBUG << sql;
if (mysql_query(_conn, sql.c_str())) {
LOG_ERROR << __FILE__ << ":" << __LINE__ << " " << sql << " execute failed!";
LOG_ERROR << mysql_error(_conn);
return nullptr;
}

return mysql_store_result(_conn);
}

// 获取数据库连接
MYSQL *MySQL::getConnection() {
return _conn;
}
聊天服务端代码

聊天服务器的代码

  • include/server/chatserver.hpp 头文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#ifndef CHATSERVER_H
#define CHATSERVER_H

/**
* 聊天服务器的头文件
*/

#include <muduo/net/EventLoop.h>
#include <muduo/net/TcpServer.h>

using namespace muduo;
using namespace muduo::net;

// 聊天服务器类
class ChatServer {
public:
// 构造函数
ChatServer(EventLoop* loop, const InetAddress& listenAddr, const string& nameArg);

// 析构函数
~ChatServer();

// 启动服务器
void start();

private:
// 处理用户的连接创建和断开
void onConnection(const TcpConnectionPtr& conn);

// 处理用户读写事件(比如接收客户端发送的数据)
void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp time);

TcpServer _server; // TCP 服务器对象
EventLoop* _loop; // 指向事件回环的指针
};

#endif // CHATSERVER_H
  • src/server/chatserver.cpp 源文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/**
* 聊天服务器的实现
*/

#include "chatserver.hpp"

#include <muduo/base/Logging.h>

#include <iostream>
#include <string>

#include "chatservice.hpp"
#include "json.hpp"
#include "public.hpp"

using namespace std;
using namespace muduo;

// 类型重定义
using json = nlohmann::json;

// 构造函数
ChatServer::ChatServer(EventLoop* loop, const InetAddress& listenAddr, const string& nameArg)
: _server(loop, listenAddr, nameArg), _loop(loop) {
// 设置服务端注册用户连接的创建和断开回调
_server.setConnectionCallback(bind(&ChatServer::onConnection, this, placeholders::_1));

// 设置服务端注册用户读写事件的回调
_server.setMessageCallback(
bind(&ChatServer::onMessage, this, placeholders::_1, placeholders::_2, placeholders::_3));

// 设置EventLoop的线程数量(比如:1个I/O线程,3个Worker线程)
_server.setThreadNum(4);
}

// 析构函数
ChatServer::~ChatServer() {
}

// 启动服务器
void ChatServer::start() {
// 开启事件循环处理
_server.start();
}

// 处理用户的连接创建和断开
void ChatServer::onConnection(const TcpConnectionPtr& conn) {
if (!conn->connected()) {
// 处理用户连接关闭的情况
ChatService::instance()->clientConnClose(conn);
// 断开连接(释放资源)
conn->shutdown();
}
}

// 处理用户读写事件(比如接收客户端发送的数据)
void ChatServer::onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp time) {
// 获取客户端发送的数据
string message = buf->retrieveAllAsString();

// 打印日志信息
LOG_DEBUG << "server received message : " << message;

// JSON 字符串反序列化
json jsonObj = json::parse(message);

// 非法消息直接忽略处理
if (!jsonObj.contains("msgType")) {
// 返回数据给客户端
json response;
response["errNum"] = ErrorCode::INVALID_MESSAGE_TYPE;
response["errMsg"] = "消息类型无效";
conn->send(response.dump());
return;
}

// 消息类型
int msgType = jsonObj["msgType"].get<int>();

// 获取消息处理器
auto msgHandler = ChatService::instance()->getMsgHandler(msgType);

// 调用消息处理器,执行相应的业务处理
msgHandler(conn, make_shared<json>(jsonObj), time);
}

聊天核心业务的代码

  • include/server/chatservice.hpp 头文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#ifndef CHATSERVICE_H
#define CHATSERVICE_H

/**
* 聊天核心业务的头文件
*/

#include <muduo/net/TcpConnection.h>

#include <functional>
#include <iostream>
#include <mutex>
#include <unordered_map>

#include "friendmodel.hpp"
#include "groupmodel.hpp"
#include "groupusermodel.hpp"
#include "json.hpp"
#include "offlinemessagemodel.hpp"
#include "redis.hpp"
#include "usermodel.hpp"

using namespace std;
using namespace muduo;
using namespace muduo::net;

// 类型重定义
using json = nlohmann::json;

// 处理消息的回调类型(使用智能指针是为了兼容低版本的G++编译器)
using MsgHandler = function<void(const TcpConnectionPtr& conn, const shared_ptr<json>& data, Timestamp time)>;

// 聊天服务器的业务类(单例对象)
class ChatService {
public:
// 获取单例对象
static ChatService* instance();

// 处理登录业务
void login(const TcpConnectionPtr& conn, const shared_ptr<json>& data, Timestamp time);

// 处理注册业务
void reg(const TcpConnectionPtr& conn, const shared_ptr<json>& data, Timestamp time);

// 处理一对一聊天消息
void singleChat(const TcpConnectionPtr& conn, const shared_ptr<json>& data, Timestamp time);

// 处理添加好友消息
void addFriend(const TcpConnectionPtr& conn, const shared_ptr<json>& data, Timestamp time);

// 处理创建群组消息
void createGroup(const TcpConnectionPtr& conn, const shared_ptr<json>& data, Timestamp time);

// 处理加入群组消息
void joinGroup(const TcpConnectionPtr& conn, const shared_ptr<json>& data, Timestamp time);

// 处理群聊天消息
void groupChat(const TcpConnectionPtr& conn, const shared_ptr<json>& data, Timestamp time);

// 处理退出登录消息
void loginOut(const TcpConnectionPtr& conn, const shared_ptr<json>& data, Timestamp time);

// 获取消息对应的处理器
MsgHandler getMsgHandler(int msgType);

// 处理Redis订阅通道中发生的消息
void handleRedisSubScribeMessage(int userid, string msg);

// 处理用户连接关闭的情况
void clientConnClose(const TcpConnectionPtr& conn);

// 处理服务器退出(Ctrl+C)后的业务重置
void reset();

private:
// 私有构造函数
ChatService();

// 删除拷贝构造函数
ChatService(const ChatService&) = delete;

// 删除赋值运算符
ChatService& operator=(const ChatService&) = delete;

// 关联消息ID和消息处理器(用于解耦业务代码)
unordered_map<int, MsgHandler> _msgHandlerMap;

// 存储在线用户的通信连接(操作时必须自行保证线程安全)
unordered_map<int, TcpConnectionPtr> _userConnMap;

// 互斥锁,保证 _userConnMap 的线程安全
mutex _connMapmutex;

// User 表的数据操作对象
UserModel _userModel;

// OfflineMessage 表的数据操作对象
OfflineMessageModel _offflineMessageModel;

// Friend 表的数据操作对象
FriendModel _friendModel;

// Group 表的数据操作对象
GroupModel _groupModel;

// GroupUser 表的数据操作对象
GroupUserModel _groupUserModel;

// Redis 操作对象
Redis _redis;
};

#endif // CHATSERVICE_H
  • src/server/chatservice.cpp 源文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
/**
* 聊天服务器的业务实现
*/

#include "chatservice.hpp"

#include <muduo/base/Logging.h>

#include <iostream>
#include <vector>

#include "friendmodel.hpp"
#include "offlinemessagemodel.hpp"
#include "public.hpp"
#include "times.hpp"
#include "usermodel.hpp"

using namespace std;
using namespace muduo;

// 构造函数
ChatService::ChatService() {
// 连接Redis服务器
if (_redis.connect()) {
// 设置Redis订阅通道的回调对象(负责处理Redis订阅消息)
_redis.init_notify_handler(
bind(&ChatService::handleRedisSubScribeMessage, this, placeholders::_1, placeholders::_2));
}

// 关联登录业务
_msgHandlerMap.insert(
{MsgType::LOGIN_MSG, bind(&ChatService::login, this, placeholders::_1, placeholders::_2, placeholders::_3)});
// 关联注册业务
_msgHandlerMap.insert(
{MsgType::REGISTER_MSG, bind(&ChatService::reg, this, placeholders::_1, placeholders::_2, placeholders::_3)});
// 关联一对一聊天业务
_msgHandlerMap.insert({MsgType::SINGLE_CHAT_MSG,
bind(&ChatService::singleChat, this, placeholders::_1, placeholders::_2, placeholders::_3)});
// 关联添加好友业务
_msgHandlerMap.insert({MsgType::ADD_FRIEND_MSG,
bind(&ChatService::addFriend, this, placeholders::_1, placeholders::_2, placeholders::_3)});
// 关联添加群组业务
_msgHandlerMap.insert({MsgType::CREATE_GROUP_MSG, bind(&ChatService::createGroup, this, placeholders::_1,
placeholders::_2, placeholders::_3)});
// 关联加入群组业务
_msgHandlerMap.insert({MsgType::JOIN_GROUP_MSG,
bind(&ChatService::joinGroup, this, placeholders::_1, placeholders::_2, placeholders::_3)});
// 关联群聊天业务
_msgHandlerMap.insert({MsgType::GROUP_CHAT_MSG,
bind(&ChatService::groupChat, this, placeholders::_1, placeholders::_2, placeholders::_3)});
// 关联退出登录业务
_msgHandlerMap.insert({MsgType::LOGIN_OUT_MSG,
bind(&ChatService::loginOut, this, placeholders::_1, placeholders::_2, placeholders::_3)});
}

// 获取单例对象
ChatService* ChatService::instance() {
// 局部静态变量(线程安全)
static ChatService instance;
return &instance;
}

// 获取消息对应的处理器
MsgHandler ChatService::getMsgHandler(int msgType) {
// 查找消息处理器
auto it = _msgHandlerMap.find(msgType);
// 如果消息处理器不存在
if (it == _msgHandlerMap.end()) {
// 返回一个默认的消息处理器(空操作)
return [=](const TcpConnectionPtr& conn, const shared_ptr<json>& data, Timestamp time) {
// 打印日志信息
LOG_ERROR << "not found message handler by message type " << msgType;
};
}
return _msgHandlerMap[msgType];
}

// 处理Redis订阅通道中发生的消息
void ChatService::handleRedisSubScribeMessage(int userid, string msg) {
// JSON 反序列化
json js = json::parse(msg.c_str());

// 消息发送的时间
long timestamp = js["fromTimestamp"].get<long>();

// 获取互斥锁
unique_lock<mutex> lock(_connMapmutex);

// 获取消息接收者的连接信息
auto it = _userConnMap.find(userid);
if (it != _userConnMap.end()) {
// 消息接收者在线(指在当前聊天服务器中),直接转发消息
it->second->send(js.dump());
} else {
// 当接收到Redis订阅消息时,如果消息接收者刚好下线,则存储离线消息
OfflineMessage msg(userid, js.dump(), timestamp);
_offflineMessageModel.insert(msg);
}
}

// 处理登录业务
void ChatService::login(const TcpConnectionPtr& conn, const shared_ptr<json>& data, Timestamp time) {
string name = (*data)["name"].get<string>();
string password = (*data)["password"].get<string>();

// 查询用户信息
User user = _userModel.selectByName(name);

// 登录成功
if (user.getId() != -1 && user.getPassword() == password) {
// 重复登录
if (user.getState() == "online") {
// 返回数据给客户端
json response;
response["errNum"] = ErrorCode::REPEAT_LOGIN;
response["errMsg"] = "该账号在其他设备已登录";
response["msgType"] = MsgType::LOGIN_MSG_ACK;
conn->send(response.dump());
}
// 登录成功
else {
// 获取互斥锁
unique_lock<mutex> lock(_connMapmutex);

// 存储在线用户的通信连接
_userConnMap.insert({user.getId(), conn});

// 释放互斥锁
lock.unlock();

// 向Redis订阅Channel
_redis.subscribe(user.getId());

// 更新用户登录状态
user.setState("online");
_userModel.updateState(user);

// 返回给客户端的数据
json response;
response["errNum"] = ErrorCode::SUCCESS;
response["userId"] = user.getId();
response["userName"] = user.getName();
response["msgType"] = MsgType::LOGIN_MSG_ACK;

// 查询该用户是否有离线消息
vector<OfflineMessage> messages = _offflineMessageModel.select(user.getId());
if (!messages.empty()) {
// 返回该用户的所有离线消息
response["offlinemsg"] = messages;
// 读取该用户的离线消息后,将该用户的离线消息全部删除掉
_offflineMessageModel.remove(user.getId());
}

// 查询该用户的好友列表
vector<User> friends = _friendModel.select(user.getId());
if (!friends.empty()) {
// 返回该用户的好友列表
response["friends"] = friends;
}

// 查询该用户的群组列表
vector<Group> groups = _groupUserModel.select(user.getId());
if (!groups.empty()) {
response["groups"] = groups;
}

// 返回数据给客户端
conn->send(response.dump());
}
}
// 登录失败
else {
// 返回数据给客户端
json response;
response["errNum"] = ErrorCode::LOGIN_AUTH_FAIL;
response["errMsg"] = "用户名或密码不正确";
response["msgType"] = MsgType::LOGIN_MSG_ACK;
conn->send(response.dump());
}
}

// 处理注册业务
void ChatService::reg(const TcpConnectionPtr& conn, const shared_ptr<json>& data, Timestamp time) {
// 创建用户对象
string name = (*data)["name"].get<string>();
string password = (*data)["password"].get<string>();

// 查询用户名是否已被注册
User oldUser = _userModel.selectByName(name);
if (oldUser.getId() != -1) {
// 返回数据给客户端
json response;
response["errNum"] = ErrorCode::REPEAT_REGISTER;
response["errMsg"] = "用户名已被注册";
response["msgType"] = MsgType::REGISTER_MSG_ACK;
conn->send(response.dump());
return;
}

// 插入用户记录
User newUser(name, password);
bool result = _userModel.insert(newUser);

// 插入用户记录成功
if (result) {
// 返回数据给客户端
json response;
response["errNum"] = ErrorCode::SUCCESS;
response["userId"] = newUser.getId();
response["userName"] = newUser.getName();
response["msgType"] = MsgType::REGISTER_MSG_ACK;
conn->send(response.dump());
}
// 插入用户记录失败
else {
// 返回数据给客户端
json response;
response["errNum"] = ErrorCode::REGISTER_FAIL;
response["errMsg"] = "用户注册失败";
response["msgType"] = MsgType::REGISTER_MSG_ACK;
conn->send(response.dump());
}
}

// 处理一对一聊天消息
void ChatService::singleChat(const TcpConnectionPtr& conn, const shared_ptr<json>& data, Timestamp time) {
// 消息发送者的用户ID
int fromId = (*data)["fromId"].get<int>();

// 消息发送者的用户名称
string fromName = (*data)["fromName"].get<string>();

// 消息发送者的消息内容
string fromMsg = (*data)["fromMsg"].get<string>();

// 消息发送的时间戳
long fromTimestamp = (*data)["fromTimestamp"].get<long>();

// 消息接收者的用户ID
int toId = (*data)["toId"].get<int>();

// 消息接收者是否在当前聊天服务器中
bool toExisted = false;

// 判断是否已经添加消息接收者为好友
Friend friendRel = _friendModel.select(fromId, toId);
if (friendRel.getUserId() == -1 || friendRel.getFriendId() == -1) {
json response;
response["errNum"] = ErrorCode::SINGLE_CHAT_FAIL;
response["errMsg"] = "未添加对方好友, 无法进行一对一聊天";
response["msgType"] = MsgType::SINGLE_CHAT_MSG_ACK;
conn->send(response.dump());
return;
}

// 获取互斥锁
unique_lock<mutex> lock(_connMapmutex);

// 获取消息接收者的连接信息
auto it = _userConnMap.find(toId);
if (it != _userConnMap.end()) {
// 记录消息接收者在线(指在当前聊天服务器中)
toExisted = true;
// 消息接收者在线(指在当前聊天服务器中),直接转发消息
it->second->send((*data).dump());
}

// 释放互斥锁
lock.unlock();

// 消息接收者不在当前聊天服务器中
if (!toExisted) {
User toUser = _userModel.select(toId);
// 判断消息接收者是否在线(指在其他聊天服务器中)
if (toUser.getState() == "online") {
// 消息接收者在线,通过Redis发布消息
_redis.publish(toId, (*data).dump());
} else {
// 消息接收者不在线,存储离线消息
OfflineMessage msg(toId, (*data).dump(), fromTimestamp);
_offflineMessageModel.insert(msg);
}
}

// 返回数据给客户端
json response;
response["errNum"] = ErrorCode::SUCCESS;
response["msgType"] = MsgType::SINGLE_CHAT_MSG_ACK;
conn->send(response.dump());
}

// 处理添加好友消息
void ChatService::addFriend(const TcpConnectionPtr& conn, const shared_ptr<json>& data, Timestamp time) {
// 当前用户的ID
int userId = (*data)["userId"].get<int>();

// 好友的用户ID
int friendId = (*data)["friendId"].get<int>();

// 控制不允许添加自己为好友
if (userId == friendId) {
// 返回数据给客户端
json response;
response["errNum"] = ErrorCode::ADD_FRIEND_FAIL;
response["errMsg"] = "不允许添加自己为好友";
response["msgType"] = MsgType::ADD_FRIEND_MSG_ACK;
conn->send(response.dump());
return;
}

// 判断好友是否真实存在
User friendUser = _userModel.select(friendId);
if (friendUser.getId() == -1) {
// 返回数据给客户端
json response;
response["errNum"] = ErrorCode::ADD_FRIEND_FAIL;
response["errMsg"] = "好友ID不存在";
response["msgType"] = MsgType::ADD_FRIEND_MSG_ACK;
conn->send(response.dump());
return;
}

// 新增好友关系
_friendModel.insert(userId, friendId);

// 返回数据给客户端
json response;
response["errNum"] = ErrorCode::SUCCESS;
response["msgType"] = MsgType::ADD_FRIEND_MSG_ACK;
conn->send(response.dump());
}

// 处理创建群组消息
void ChatService::createGroup(const TcpConnectionPtr& conn, const shared_ptr<json>& data, Timestamp time) {
// 当前用户的ID
int userId = (*data)["userId"].get<int>();

// 群组名称
string groupName = (*data)["groupName"].get<string>();

// 群组描述
string groupDesc = (*data)["groupDesc"].get<string>();

// 新增群组
Group group(groupName, groupDesc);
bool result = _groupModel.insert(group);

// 添加群组的创建人信息
if (result && group.getId() != -1) {
GroupUser groupUser;
groupUser.setGroupId(group.getId());
groupUser.setUserId(userId);
groupUser.setGroupRole("creator");
// 新增群组和用户的关联信息
_groupUserModel.insert(groupUser);
}

// 返回数据给客户端
json response;
response["errNum"] = ErrorCode::SUCCESS;
response["groupId"] = group.getId();
response["msgType"] = MsgType::CREATE_GROUP_MSG_ACK;
conn->send(response.dump());
}

// 处理加入群组消息
void ChatService::joinGroup(const TcpConnectionPtr& conn, const shared_ptr<json>& data, Timestamp time) {
// 当前用户的ID
int userId = (*data)["userId"].get<int>();

// 群组ID
int groupId = (*data)["groupId"].get<int>();

// 判断群组是否真实存在
Group group = _groupModel.select(groupId);
if (group.getId() == -1) {
// 返回数据给客户端
json response;
response["errNum"] = ErrorCode::JOIN_GROUP_FAIL;
response["errMsg"] = "群组ID不存在";
response["msgType"] = MsgType::JOIN_GROUP_MSG_ACK;
conn->send(response.dump());
return;
}

// 新增群组和用户的关联信息
GroupUser groupUser(groupId, userId, "normal");
_groupUserModel.insert(groupUser);

// 返回数据给客户端
json response;
response["errNum"] = ErrorCode::SUCCESS;
response["msgType"] = MsgType::JOIN_GROUP_MSG_ACK;
conn->send(response.dump());
}

// 处理群聊天消息
void ChatService::groupChat(const TcpConnectionPtr& conn, const shared_ptr<json>& data, Timestamp time) {
// 消息发送者的用户ID
int fromId = (*data)["fromId"].get<int>();

// 消息发送者的用户名称
string fromName = (*data)["fromName"].get<string>();

// 消息发送的时间戳
long fromTimestamp = (*data)["fromTimestamp"].get<long>();

// 群组的ID
int groupId = (*data)["groupId"].get<int>();

// 群组消息的内容
string groupMsg = (*data)["groupMsg"].get<string>();

// 判断用户是否已经加入群组
GroupUser groupUser = _groupUserModel.select(groupId, fromId);
if (groupUser.getGroupId() == -1 || groupUser.getUserId() == -1) {
// 返回数据给客户端
json response;
response["errNum"] = ErrorCode::JOIN_GROUP_FAIL;
response["errMsg"] = "未加入该群组, 无法进行群聊";
response["msgType"] = MsgType::JOIN_GROUP_MSG_ACK;
conn->send(response.dump());
return;
}

// 查询群组内的用户(除了发送群组消息的用户)
vector<User> users = _groupUserModel.selectGroupUsers(groupId, fromId);

// 处理群聊消息
if (!users.empty()) {
// 获取互斥锁
unique_lock<mutex> lock;

// 遍历群组内的用户
for (User& user : users) {
// 获取用户的连接信息
auto it = _userConnMap.find(user.getId());
if (it != _userConnMap.end()) {
// 用户在线(指在当前聊天服务器中),直接转发群聊消息
it->second->send((*data).dump());
} else {
User toUser = _userModel.select(user.getId());
// 判断用户是否在线(指在其他聊天服务器中)
if (toUser.getState() == "online") {
// 用户在线,通过Redis发布群聊消息
_redis.publish(user.getId(), (*data).dump());
} else {
// 用户不在线,存储离线群聊消息
OfflineMessage message;
message.setUserId(user.getId());
message.setCreateTime(fromTimestamp);
message.setMessage((*data).dump());
_offflineMessageModel.insert(message);
}
}
}
}

// 返回数据给客户端
json response;
response["errNum"] = ErrorCode::SUCCESS;
response["msgType"] = MsgType::GROUP_CHAT_MSG_ACK;
conn->send(response.dump());
}

// 处理退出登录消息
void ChatService::loginOut(const TcpConnectionPtr& conn, const shared_ptr<json>& data, Timestamp time) {
// 当前用户的ID
int userId = (*data)["userId"].get<int>();

// 获取互斥锁
unique_lock<mutex> lock(_connMapmutex);

// 移除连接信息
auto it = _userConnMap.find(userId);
if (it != _userConnMap.end()) {
_userConnMap.erase(it->first);
}

// 释放互斥锁
lock.unlock();

// 往Redis取消订阅Channel
_redis.unsubscribe(userId);

// 更新用户的登录状态
User user;
user.setId(userId);
user.setState("offline");
_userModel.updateState(user);

// 返回数据给客户端
json response;
response["errNum"] = ErrorCode::SUCCESS;
response["msgType"] = MsgType::LOGIN_OUT_MSG_ACK;
conn->send(response.dump());
}

// 处理用户连接关闭的情况
void ChatService::clientConnClose(const TcpConnectionPtr& conn) {
// 用户信息
User user;

// 获取互斥锁
unique_lock<mutex> lock(_connMapmutex);

// 从Map表中删除用户对应的连接信息
for (auto it = _userConnMap.begin(); it != _userConnMap.end(); ++it) {
if (it->second == conn) {
// 记录用户ID
user.setId(it->first);
// 移除连接信息
_userConnMap.erase(it->first);
break;
}
}

// 释放互斥锁
lock.unlock();

if (user.getId() != -1) {
// 往Redis取消订阅Channel
_redis.unsubscribe(user.getId());

// 更新用户的登录状态
user.setState("offline");
_userModel.updateState(user);
}
}

// 处理服务器退出(Ctrl+C)后的业务重置
void ChatService::reset() {
// 重置所有用户的登录状态
_userModel.resetState();
}

数据库操作的代码

  • include/server/dao/friendmodel.hpp 头文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#ifndef FRIENDMODEL_H
#define FRIENDMODEL_H

#include <vector>

#include "friend.hpp"
#include "user.hpp"

// Friend 表的数据操作类
class FriendModel {
public:
// 添加好友关系
bool insert(int userid, int friendid);

// 查找好友列表
vector<User> select(int userid);

// 查找好友关系
Friend select(int userid, int friendid);
};

#endif // FRIENDMODEL_H
  • src/server/dao/friendmodel.cpp 源文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#include <friendmodel.hpp>
#include <iostream>
#include <string>

#include "db.hpp"

using namespace std;

// 添加好友关系
bool FriendModel::insert(int userid, int friendid) {
char sql[1024] = {0};

// 拼接 SQL 语句
sprintf(sql, "insert into friend(userid, friendid) values(%d, %d)", userid, friendid);

// 执行 SQL 语句
MySQL mysql;
if (mysql.connect() && mysql.update(sql)) {
return true;
}

return false;
}

// 查找好友列表
vector<User> FriendModel::select(int userid) {
char sql[1024] = {0};

// 查询结果
vector<User> result;

// 拼接 SQL 语句
sprintf(sql,
"select u.id, u.name, u.state from friend f inner join user u on f.friendid = u.id where f.userid = %d",
userid);

// 执行 SQL 语句
MySQL mysql;
if (mysql.connect()) {
MYSQL_RES* res = mysql.query(sql);
if (res != nullptr && mysql_num_rows(res) > 0) {
// 获取所有查询结果
MYSQL_ROW row;
while ((row = mysql_fetch_row(res)) != nullptr) {
int id = atoi(row[0]);
string name = row[1];
string state = row[2];
result.emplace_back(id, name, state);
}
}
// 释放资源
mysql_free_result(res);
}

// 返回查询结果
return result;
}

// 查找好友关系
Friend FriendModel::select(int userid, int friendid) {
char sql[1024] = {0};

// 查询结果
Friend result;

// 拼接 SQL 语句
sprintf(sql, "select userid, friendid from friend where userid = %d and friendid = %d", userid, friendid);

// 执行 SQL 语句
MySQL mysql;
if (mysql.connect()) {
MYSQL_RES* res = mysql.query(sql);
if (res != nullptr && mysql_num_rows(res) > 0) {
// 获取所有查询结果
MYSQL_ROW row = mysql_fetch_row(res);
result.setUserId(atoi(row[0]));
result.setFriendId(atoi(row[1]));
}
// 释放资源
mysql_free_result(res);
}

// 返回查询结果
return result;
}
客户端核心代码

提示

在集群聊天客户端中,使用了 Linux 的 socketsemaphore,并没有引入 Muduo 网络库。

  • src/client/main.cpp 源文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
#include <arpa/inet.h>
#include <netinet/in.h>
#include <semaphore.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

#include <atomic>
#include <chrono>
#include <ctime>
#include <functional>
#include <iostream>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>

#include "group.hpp"
#include "groupmodel.hpp"
#include "json.hpp"
#include "offlinemessage.hpp"
#include "offlinemessagemodel.hpp"
#include "public.hpp"
#include "times.hpp"
#include "user.hpp"
#include "usermodel.hpp"

using namespace std;

// 类型重定义
using json = nlohmann::json;

// 记录当前登录用户的基本信息
User g_currentUser;
// 记录当前登录用户的好友列表信息
vector<User> g_currentUserFriendList;
// 记录当前登录用户的群组列表信息
vector<Group> g_currentUserGroupList;

// 用于读/写线程之间的通信
sem_t rwsem;
// 控制主菜单程序运行
bool isMainMenuRunning = false;
// 记录用户的登录状态
atomic_bool g_isLoginSuccess{false};

// 主菜单程序
void mainMenu(int);
// 子线程接收到消息后的处理逻辑
void readTaskHandler(int clientfd);

// 显示当前登录用户的基本信息
void showCurrentUserData();

/////////////////////////////////////////////////////首页功能/////////////////////////////////////////////////////

// 聊天客户端程序实现, 主线程用作消息发送线程, 子线程用作消息接收线程
int main(int argc, char **argv) {
if (argc < 3) {
cerr << "command invalid, example: ./chat_client 127.0.0.1 8000" << endl;
exit(-1);
}

// 解析通过命令行参数传递的IP和端口号
char *ip = argv[1];
uint16_t port = atoi(argv[2]);

// 创建client端的socket
int clientfd = socket(AF_INET, SOCK_STREAM, 0);
if (-1 == clientfd) {
cerr << "socket create failed" << endl;
exit(-1);
}

// 填写client需要连接的服务器信息(IP和端口号)
sockaddr_in server;
memset(&server, 0, sizeof(sockaddr_in));

server.sin_family = AF_INET;
server.sin_port = htons(port);
server.sin_addr.s_addr = inet_addr(ip);

// client和server进行连接
if (-1 == connect(clientfd, (sockaddr *)&server, sizeof(sockaddr_in))) {
cerr << "connect server failed" << endl;
close(clientfd);
exit(-1);
}

// 初始化读写线程通信用的信号量
sem_init(&rwsem, 0, 0);

// 连接服务器成功, 启动一个接收消息的子线程
thread readTask(readTaskHandler, clientfd);
readTask.detach();

// 主线程用于接收用户输入, 负责发送数据
for (;;) {
// 显示首页面菜单: 登录、注册、退出程序
cout << "========================" << endl;
cout << "1. login" << endl;
cout << "2. register" << endl;
cout << "3. quit" << endl;
cout << "========================" << endl;

int choice = 0;

// 用户输入验证循环
while (true) {
cout << "choice: ";
cin >> choice;
// 判断输入是否合法
if (cin.fail()) {
// 输入不是整数,清除错误标志
cin.clear();
// 清空输入缓冲区
cin.ignore(10000, '\n');
cerr << "invalid choice!" << endl;
} else {
// 清除残留的换行符
cin.ignore(10000, '\n');
break;
}
}

// 根据用户输入执行操作
switch (choice) {
// 登录业务
case 1: {
char name[50] = {0};
char password[50] = {0};
cout << "user name: ";
cin.getline(name, 50);
cout << "user password: ";
cin.getline(password, 50);

json js;
js["msgType"] = LOGIN_MSG;
js["name"] = name;
js["password"] = password;
string request = js.dump();

g_isLoginSuccess = false;

int len = send(clientfd, request.c_str(), strlen(request.c_str()) + 1, 0);
if (len == -1) {
cerr << "send login msg error: " << request << endl;
}

// 等待信号量, 由子线程处理完登录的响应消息后, 通知主线程继续执行
sem_wait(&rwsem);

// 用户登录成功
if (g_isLoginSuccess) {
// 进入聊天主菜单
isMainMenuRunning = true;
mainMenu(clientfd);
}
break;
}
// 注册业务
case 2: {
char name[50] = {0};
char pwd[50] = {0};
cout << "user name: ";
cin.getline(name, 50);
cout << "user password: ";
cin.getline(pwd, 50);

json js;
js["msgType"] = REGISTER_MSG;
js["name"] = name;
js["password"] = pwd;
string request = js.dump();

int len = send(clientfd, request.c_str(), strlen(request.c_str()) + 1, 0);
if (len == -1) {
cerr << "send reg msg error: " << request << endl;
}

// 等待信号量, 由子线程处理完注册的响应消息后, 通知主线程继续执行
sem_wait(&rwsem);
break;
}
// 退出程序业务
case 3:
close(clientfd);
sem_destroy(&rwsem);
exit(0);
default:
cerr << "invalid choice!" << endl;
break;
}
}

return 0;
}

// 处理注册的响应逻辑
void doRegResponse(json &responsejs) {
// 注册失败
if (0 != responsejs["errNum"].get<int>()) {
cerr << "注册失败: " << responsejs["errMsg"].get<string>() << endl;
}
// 注册成功
else {
cout << "注册成功, 用户ID: " << responsejs["userId"] << " , 用户名称: " << responsejs["userName"].get<string>()
<< endl;
}
}

// 处理登录的响应逻辑
void doLoginResponse(json &responsejs) {
// 登录失败
if (0 != responsejs["errNum"].get<int>()) {
cerr << "登录失败: " << responsejs["errMsg"].get<string>() << endl;
g_isLoginSuccess = false;
}
// 登录成功
else {
// 记录当前登录用户的基本信息
g_currentUser.setId(responsejs["userId"].get<int>());
g_currentUser.setName(responsejs["userName"].get<string>());

// 记录当前用户的好友列表信息
if (responsejs.contains("friends")) {
// 初始化好友列表
g_currentUserFriendList.clear();

vector<User> vec = responsejs["friends"];
for (User &user : vec) {
g_currentUserFriendList.push_back(user);
}
}

// 记录当前用户的群组列表信息
if (responsejs.contains("groups")) {
// 初始化群组列表
g_currentUserGroupList.clear();

vector<Group> vec = responsejs["groups"];
for (Group &group : vec) {
g_currentUserGroupList.push_back(group);
}
}

// 显示登录用户的基本信息
showCurrentUserData();

// 显示当前用户的离线消息(个人聊天信息或者群组消息)
if (responsejs.contains("offlinemsg")) {
vector<OfflineMessage> vec = responsejs["offlinemsg"];
for (OfflineMessage &message : vec) {
// 离线消息的内容(JSON字符串)
json content = json::parse(message.getMessage());
// 离线消息的发送时间
string datetime = formatTimestampLocal(message.getCreateTime(), "%Y-%m-%d %H:%M:%S");
// 打印一对一聊天消息
if (SINGLE_CHAT_MSG == content["msgType"].get<int>()) {
cout << "好友消息[" << content["fromId"] << "] " << datetime << " "
<< content["fromName"].get<string>() << " said: " << content["fromMsg"].get<string>() << endl;
}
// 打印群组聊天消息
else {
cout << "群聊消息[" << content["groupId"] << "] " << datetime << " [" << content["fromId"] << "] "
<< content["fromName"].get<string>() << " said: " << content["groupMsg"].get<string>() << endl;
}
}
}

g_isLoginSuccess = true;
}
}

// 子线程(接收消息的线程)执行的业务逻辑
void readTaskHandler(int clientfd) {
for (;;) {
char buffer[1024] = {0};
int len = recv(clientfd, buffer, 1024, 0); // 阻塞等待消息
if (-1 == len || 0 == len) {
close(clientfd);
exit(-1);
}

// 接收ChatServer转发的数据, 反序列化生成JSON数据对象
json js = json::parse(buffer);

// 消息类型
int msgType = js["msgType"].get<int>();

// 错误编码
int errNum = js.contains("errNum") ? js["errNum"].get<int>() : -1;

// 处理接收到的一对一聊天消息
if (SINGLE_CHAT_MSG == msgType) {
string datetime = formatTimestampLocal(js["fromTimestamp"].get<long>(), "%Y-%m-%d %H:%M:%S");
cout << "好友消息[" << js["fromId"] << "] " << datetime << " " << js["fromName"].get<string>()
<< " said: " << js["fromMsg"].get<string>() << endl;
continue;
}

// 处理接收到的群组聊天消息
if (GROUP_CHAT_MSG == msgType) {
string datetime = formatTimestampLocal(js["fromTimestamp"].get<long>(), "%Y-%m-%d %H:%M:%S");
cout << "群聊消息[" << js["groupId"] << "] " << datetime << " [" << js["fromId"] << "] "
<< js["fromName"].get<string>() << " said: " << js["groupMsg"].get<string>() << endl;
continue;
}

// 处理登录响应的业务逻辑
if (LOGIN_MSG_ACK == msgType) {
doLoginResponse(js);
sem_post(&rwsem); // 通知主线程, 登录结果处理完成
continue;
}

// 处理注册响应的业务逻辑
if (REGISTER_MSG_ACK == msgType) {
doRegResponse(js);
sem_post(&rwsem); // 通知主线程, 注册结果处理完成
continue;
}

// 处理成功创建群组响应的业务逻辑
if (CREATE_GROUP_MSG_ACK == msgType && SUCCESS == errNum) {
cout << "群组创建成功, 群组ID: " << js["groupId"].get<int>() << endl;
continue;
}

// 处理成功加入群组响应的业务逻辑
if (JOIN_GROUP_MSG_ACK == msgType && SUCCESS == errNum) {
cout << "加入群组成功" << endl;
continue;
}

// 处理成功添加好友响应的业务逻辑
if (ADD_FRIEND_MSG_ACK == msgType && SUCCESS == errNum) {
cout << "好友添加成功" << endl;
continue;
}

// 处理其他业务的错误响应
if (SUCCESS != errNum && js.contains("errMsg")) {
cerr << "操作失败: " << js["errMsg"].get<string>() << endl;
continue;
}
}
}

// 显示当前登录成功用户的基本信息
void showCurrentUserData() {
cout << "======================login user======================" << endl;
cout << "current login user => id:" << g_currentUser.getId() << " name:" << g_currentUser.getName() << endl;
cout << "----------------------friend list---------------------" << endl;
if (!g_currentUserFriendList.empty()) {
for (User &user : g_currentUserFriendList) {
cout << user.getId() << " " << user.getName() << " " << user.getState() << endl;
}
}
cout << "----------------------group list----------------------" << endl;
if (!g_currentUserGroupList.empty()) {
for (Group &group : g_currentUserGroupList) {
cout << group.getId() << " " << group.getGroupName() << " " << group.getGroupDesc() << endl;
for (User &user : group.getUsers()) {
cout << user.getId() << " " << user.getName() << " " << user.getState() << endl;
}
}
}
cout << "======================================================" << endl;
}

/////////////////////////////////////////////////////主菜单功能/////////////////////////////////////////////////////

// "help" command handler
void help(int fd = 0, string str = "");
// "singlechat" command handler
void singlechat(int, string);
// "addfriend" command handler
void addfriend(int, string);
// "creategroup" command handler
void creategroup(int, string);
// "joingroup" command handler
void joingroup(int, string);
// "groupchat" command handler
void groupchat(int, string);
// "loginout" command handler
void loginout(int, string);

// 系统支持的客户端命令列表
unordered_map<string, string> commandMap = {{"help", "显示所有支持的命令, 格式 help"},
{"singlechat", "一对一聊天, 格式 singlechat:friendid:message"},
{"addfriend", "添加好友, 格式 addfriend:friendid"},
{"creategroup", "创建群组, 格式 creategroup:groupname:groupdesc"},
{"joingroup", "加入群组, 格式 joingroup:groupid"},
{"groupchat", "群组聊天, 格式 groupchat:groupid:message"},
{"loginout", "退出登录, 格式 loginout"}};

// 注册系统支持的客户端命令处理
unordered_map<string, function<void(int, string)>> commandHandlerMap = {
{"help", help}, {"singlechat", singlechat}, {"addfriend", addfriend}, {"creategroup", creategroup},
{"joingroup", joingroup}, {"groupchat", groupchat}, {"loginout", loginout}};

// 主菜单程序
void mainMenu(int clientfd) {
help();

char buffer[1024] = {0};
while (isMainMenuRunning) {
// 存储用户选择执行的命令
string command;
cin.getline(buffer, 1024);
string commandbuf(buffer);
int idx = commandbuf.find(":");
if (-1 == idx) {
command = commandbuf;
} else {
command = commandbuf.substr(0, idx);
}

// 查找相应命令的事件处理器
auto it = commandHandlerMap.find(command);
if (it == commandHandlerMap.end()) {
cerr << "invalid input command!" << endl;
continue;
}

// 调用相应命令的事件处理回调函数
it->second(clientfd, commandbuf.substr(idx + 1, commandbuf.size() - idx));
}
}

// "help" command handler
void help(int, string) {
cout << ">>> show command list >>> " << endl;
for (auto &p : commandMap) {
cout << p.first << " : " << p.second << endl;
}
cout << endl;
}

// "addfriend" command handler
void addfriend(int clientfd, string str) {
// 数据格式: friendid
int friendId = atoi(str.c_str());
if (friendId <= 0) {
cerr << "add friend command invalid!" << endl;
return;
}

// 请求参数
json request;
request["msgType"] = ADD_FRIEND_MSG;
request["userId"] = g_currentUser.getId();
request["friendId"] = friendId;

// 发送数据
string buffer = request.dump();
int len = send(clientfd, buffer.c_str(), strlen(buffer.c_str()) + 1, 0);
if (-1 == len) {
cerr << "send add friend msg error -> " << buffer << endl;
}
}

// "singlechat" command handler
void singlechat(int clientfd, string str) {
// 数据格式: friendid:message
int idx = str.find(":");
if (-1 == idx) {
cerr << "single chat command invalid!" << endl;
return;
}

int friendId = atoi(str.substr(0, idx).c_str());
string message = str.substr(idx + 1, str.size() - idx);

// 请求参数
json request;
request["msgType"] = SINGLE_CHAT_MSG;
request["fromId"] = g_currentUser.getId();
request["fromName"] = g_currentUser.getName();
request["fromMsg"] = message;
request["fromTimestamp"] = getTimestampMs();
request["toId"] = friendId;

// 发送数据
string buffer = request.dump();
int len = send(clientfd, buffer.c_str(), strlen(buffer.c_str()) + 1, 0);
if (-1 == len) {
cerr << "send single chat msg error -> " << buffer << endl;
}
}

// "creategroup" command handler
void creategroup(int clientfd, string str) {
// 数据格式: groupname:groupdesc
int idx = str.find(":");
if (-1 == idx) {
cerr << "create group command invalid!" << endl;
return;
}

string groupName = str.substr(0, idx);
string groupDesc = str.substr(idx + 1, str.size() - idx);

// 请求参数
json request;
request["msgType"] = CREATE_GROUP_MSG;
request["userId"] = g_currentUser.getId();
request["groupName"] = groupName;
request["groupDesc"] = groupDesc;

// 发送数据
string buffer = request.dump();
int len = send(clientfd, buffer.c_str(), strlen(buffer.c_str()) + 1, 0);
if (-1 == len) {
cerr << "send create group msg error -> " << buffer << endl;
}
}

// "joingroup" command handler
void joingroup(int clientfd, string str) {
// 数据格式:groupid
int groupId = atoi(str.c_str());
if (groupId <= 0) {
cerr << "join group command invalid!" << endl;
return;
}

// 请求参数
json request;
request["msgType"] = JOIN_GROUP_MSG;
request["userId"] = g_currentUser.getId();
request["groupId"] = groupId;

// 发送数据
string buffer = request.dump();
int len = send(clientfd, buffer.c_str(), strlen(buffer.c_str()) + 1, 0);
if (-1 == len) {
cerr << "send join group msg error -> " << buffer << endl;
}
}

// "groupchat" command handler
void groupchat(int clientfd, string str) {
// 数据格式:groupid:message
int idx = str.find(":");
if (-1 == idx) {
cerr << "group chat command invalid!" << endl;
return;
}

int groupId = atoi(str.substr(0, idx).c_str());
string groupMsg = str.substr(idx + 1, str.size() - idx);

// 请求参数
json request;
request["msgType"] = GROUP_CHAT_MSG;
request["fromId"] = g_currentUser.getId();
request["fromName"] = g_currentUser.getName();
request["fromTimestamp"] = getTimestampMs();
request["groupId"] = groupId;
request["groupMsg"] = groupMsg;

// 发送数据
string buffer = request.dump();
int len = send(clientfd, buffer.c_str(), strlen(buffer.c_str()) + 1, 0);
if (-1 == len) {
cerr << "send group chat msg error -> " << buffer << endl;
}
}

// "loginout" command handler
void loginout(int clientfd, string) {
// 请求参数
json request;
request["msgType"] = LOGIN_OUT_MSG;
request["userId"] = g_currentUser.getId();

// 发送数据
string buffer = request.dump();
int len = send(clientfd, buffer.c_str(), strlen(buffer.c_str()) + 1, 0);
if (-1 == len) {
cerr << "send login out msg error -> " << buffer << endl;
} else {
isMainMenuRunning = false;
}
}

项目输出

输出求职简历

  • 项目名称

    • 集群聊天服务器
    • 基于 Muduo 网络库实现的集群聊天服务器
  • 开发工具

    • VSCode 远程 Linux 开发
    • CMake 构建 C/C++ 项目
    • Linux Shell 编写项目自动编译脚本
  • 项目内容

    • 使用 Muduo 网络库实现项目的网络核心模块,提供高并发网络 I/O 服务,解耦网络和业务模块的代码
    • 使用 Json 序列化和反序列化消息作为私有通信协议
    • 配置 Nginx 基于 TCP 的负载均衡,实现聊天服务器的集群功能,提高后端服务的并发能力
    • 基于 Redis 的发布 - 订阅功能,实现客户端跨服务器通信
    • 使用 MySQL 关系型数据库作为项目数据的落地存储
    • 使用数据库连接池提高数据库的访问性能
  • 项目收获

    • 熟悉了基于 Muduo 网络库进行服务端程序开发
    • 掌握了 Nginx 的 TCP 负载均衡配置
    • 掌握了 MySQL 和服务端中间件 Redis 的应用
  • 项目问题

    • 问题描述
      • 通过代码脚本或者专业的压测工具(比如 JMeter)测试聊天服务器的并发性能
    • 问题解决
      • 设置进程可使用文件描述符(fd)资源的上限数量,提高聊天服务器的并发性能

特别注意

在面试流程中描述项目内容时,切忌详细罗列项目中的业务,重点是介绍项目用到什么技术,突出技术点 。

常见的面试题

为什么要使用 Redis

  • 问题描述

    • 为什么要使用 Redis 来实现客户端跨服务器通信?各个聊天服务器之间能不能直接进行通信呢?
  • 问题解答

    • 这里的设计,会在各个 ChatServer 服务器互相之间直接建立 TCP 连接进行通信,相当于在服务器网络之间进行广播。这样的设计使得各个服务器之间耦合度太高,不利于系统扩展,并且会占用系统大量的 Socket 资源,各服务器之间的带宽压力很大,不能够节省资源给更多的客户端提供服务,因此绝对不是一个好的设计。
    • 集群部署的服务器之间进行通信,最好的方式就是引入消息队列中间件,解耦各个服务器,使整个系统松耦合,提高服务器的响应能力,节省服务器的带宽资源,整体的设计应该 如此
    • 在集群环境中,经常使用的消息队列中间件有 ActiveMQ、RabbitMQ、Kafka、RocketMQ 等,它们都是应用场景广泛并且性能很好的消息队列,供集群服务器、分布式服务之间进行消息通信。限于集群聊天服务器项目的业务并不是非常复杂,并且对并发性能也没有太高的要求,因此消息队列选型的是 - Redis 发布 - 订阅。

Redis 实现的功能不稳定

  • 问题描述

    • 当消息的生产速度大于消息的消费速度时,随着时间的推移,会造成 Redis 积压消息;如果消息积压的数量太大,会导致内存占用激增,Redis 最终可能会宕机。
  • 问题解答

    • 使用消息队列中间件替代 Redis 的发布 - 订阅功能,比如 Kafka、RabbitMQ、RocketMQ 等。

特别注意

Redis 的主要功能有:缓存数据库(支持持久化)、分布式锁、发布 - 订阅,其中发布 - 订阅功能只适用于非核心业务、流量不是很大的业务。

如何保证消息的可靠传输

  • 问题描述

    • 如何保证客户端发送出去的消息,一定能够被服务端接收到,从而保证消息不丢失呢?
  • 问题解答

    • 在业务层中,可以通过消息序号 + ACK 应答机制实现消息的可靠传输,实现步骤如下:
    • (1) 客户端发送的每条消息都附加一个递增的 seq 序号(比如 1、2、3)。
    • (2) 客户端将未被确认的消息保存在本地的缓存队列中,用于后续重发和确认处理。
    • (3) 服务端收到消息后,返回 ACK 应答给客户端,标明确认的消息序号(比如 seq:1)。
    • (4) 客户端处理 ACK 响应:客户端接收到 ACK 响应后,从本地缓存队列中移除对应 seq 的消息,表示消息已被成功确认。
    • (5) 消息重发机制:客户端启动一个定时器线程,定时扫描缓存队列中未被确认的消息(比如每 3 秒扫描一次)。如果某条消息在一定时间内(比如 5 秒)未收到 ACK 确认,则自动重发该消息,直到收到服务端的 ACK 确认或者超过最大重试次数。

如何保证数据传输的安全性

  • 问题描述

    • 由于数据(比如聊天消息)是明文传输的,存在一定的数据安全问题,如何解决?
  • 问题解答

    • 使用对称加密算法(如 AES)和非对称加密算法(如 RSA)来保证数据传输的安全性,实现步骤如下:
    • (1) 客户端登录时,使用服务端的 RSA 公钥加密数据,其中的数据包含一个随机生成的 AES 密钥和登录信息,然后将加密后的数据发送给服务端。
    • (2) 服务端收到数据后,使用自己的 RSA 私钥解密数据,获得客户端的 AES 密钥和登录信息,并完成身份验证。
    • (3) 后续通信阶段,客户端与服务端使用同一个 AES 密钥对数据进行对称加密和解密,从而实现高效且安全的数据传输。
    • (4) AES 密钥应为一次性生成的会话密钥(Session Key)。RSA 加解密只用于密钥交换和登录信息保护,后续通信使用高性能的 AES 加解密。

加密算法介绍

  • 在大型的 IM 软件(比如 QQ)中,会同时使用到对称加密算法和非对称加密算法,兼顾考虑数据安全性和加解密效率。
  • 对称加密算法:使用同一个密钥进行加密和解密,速度快,但需要考虑安全地共享密钥。常见的对称加密算法包括 DES、AES、3DES 等。
  • 非对称加密算法:使用一对公钥和私钥,公钥负责加密、私钥负责解密,安全性高,速度慢,适用于密钥交换与身份验证。常见的非对称加密算法包括 DSA、RSA、ECC 等。

客户端消息如何按顺序显示

  • 问题描述

    • 客户端接收到服务端发送的消息,如何按顺序显示?
  • 问题解答

    • (1) 服务端消息加序号:服务端发送的每条消息都附加一个递增的 seq 序号,每个用户或会话维护独立的 seq 序号。
    • (2) 客户端缓存乱序消息:客户端接收到消息后,放入本地的有序缓存中,并使用 expected_seq 表示下一条应该显示的消息的序号。
    • (3) 按序显示 + 缓存清理:若客户端接收到消息序号等于 expected_seq,则立即显示,并从缓存中继续查找后续的连续消息,依次显示。
    • (4) 处理乱序和丢包:如果客户端接收到的是 seq > expected_seq,则先缓存消息,暂时不显示。若某条消息长时间未到达,可发起消息重传请求或跳过处理。

历史聊天消息应该如何存储

  • 问题描述

    • 历史聊天消息,有哪些存储方案?
  • 问题解答

    • 本地消息存储
      • 使用 SQLite 等嵌入式数据库:轻量、便于查询,适合单设备离线存储。
      • 使用本地文件系统:以用户或群组为单位创建文件夹,按日期或大小分多个文件保存,适合大批量存储,读取简单但查询不方便。
    • 云端消息存储
      • 使用关系型数据库(如 MySQL):结构化存储,支持高效查询和分页加载,适合结构清晰的聊天记录。
      • 使用文件存储系统(如对象存储或分布式文件系统):适合存储大批量的聊天原始记录或备份数据,读取顺序性强,但查询性能较弱。

大规模系统中的历史聊天消息存储

  • (1) 消息队列 + 消息落库架构,提升写入性能。
  • (2) Elasticsearch 作为全文索引系统,用于历史聊天记录搜索。
  • (3) 冷热数据分离:近期消息存储在数据库,历史消息转存在文件存储系统(如对象存储或分布式文件系统)。

如何感知客户端在线还是掉线

  • 问题描述

    • 如果网络拥堵严重,ChatServer(聊天服务端)如何感知 ChatClient(聊天客户端)在线还是掉线呢?
  • 问题解答

    • 在 ChatServer(聊天服务端)和 ChatClient(聊天客户端)之间实现心跳保持机制,实现步骤如下:
      • (1) 客户端定期发送心跳包(比如:每秒发一次 MSG_TYPE: heartbeat
      • (2) 服务端为每个客户端维护一个心跳计数器,每秒自动加一
      • (3) 服务端每收到一次客户端发送的心跳包,就将该客户端的心跳计数器归零
      • (4) 若客户端的心跳计数器超过 5(即 5 秒内未收到心跳),则判断客户端已掉线
      • (5) 客户端掉线后,服务端开始清理该客户端的连接和资源

特别注意

  • TCP 传输层有 Keepalive 机制,可以通过 Linux 内核参数来调整(如下表所示)。但是,ChatServer 不能依赖该机制来实现心跳保持机制,因为 ChatServer 检测到 ChatClient 掉线后,需要主动清理相应的连接和资源。
参数名作用默认值(一般情况)
net.ipv4.tcp_keepalive_timeTCP 连接空闲多久后开始发送 Keepalive 探测包(单位:秒)7200 秒(2 小时)
net.ipv4.tcp_keepalive_intvl发送 Keepalive 探测包之间的时间间隔(单位:秒)75 秒
net.ipv4.tcp_keepalive_probesKeepalive 最大探测次数,超过则认定连接失效(断开)9 次

参考资料