基于 C++ 手写线程池

前言

本文将基于 C++ 手写一个线程池,并分别提供 C++ 11 和 C++ 17 两种版本的线程池实现。线程池作为五大池之一(内存池、连接池、协程池、线程池、进程池),应用非常广泛,不管是客户端程序,还是后台服务程序,都是提高业务处理能力的必备模块。有很多开源的线程池实现,虽然各自接口在使用上稍有区别,但是其核心实现原理都是基本相同的。

知识背景

在基于 C++ 手写线程池之前,应该熟悉并掌握以下技术内容:

  • 熟练基于 C++ 11 的面向对象编程
  • 熟悉组合和继承、继承多态、STL 容器、智能指针、函数对象、绑定器、lambda 表达式、可变参数模板编程等。
  • 熟悉 C++ 11 多线程编程,比如线程互斥、线程同步、原子操作、CAS 等。
  • 熟悉 threadmutexunique_lockcondition_variableatomic 等。
  • 熟悉 C++ 17 和 C++ 20 的新特性,比如 C++ 17 的 any 类型和 C++ 20 的 counting_semaphore 信号量类型等。

开发工具

本文使用以下工具来开发 C++ 线程池项目:

  • C++ 11 / C++ 17
  • Visual Studio 2019
  • CMake 构建编译环境
  • GDB 调试分析定位线程死锁问题
  • Linux 编译项目生成动态链接库(.so

版本特性

C++ 各版本支持的特性如下:

特性名称对应头文件最低支持的 C++ 标准版本说明
std::make_unique<memory>C++ 14C++ 14 引入,用于简洁安全地创建 unique_ptr 智能指针。
std::any<any>C++ 17 提供类型安全的类型擦除容器。
std::counting_semaphore<semaphore>C++ 20C++ 20 引入的信号量机制。
std::packaged_task<future>C++ 11 将可调用对象封装起来,并用于异步执行。
std::future<future>C++ 11 异步操作的结果获取机制,可以与 std::asyncstd::promise 搭配使用。
可变参数模板语言特性 C++ 11 支持模板中参数数量可变,用于构建灵活函数模板,如递归参数展开等。

提示

如果使用的是 GCC、Clang 或 MSVC 编译器,需确保编译器版本也支持相应的 C++ 标准。若需进一步查询编译器支持的标准或特性情况,可以参考 C/C++ 参考手册

兼容平台

本文提供的所有 C++ 线程池代码都可以兼容 Windows 和 Linux 平台,并分别提供 C++ 11 和 C++ 17 两种版本的线程池实现。

核心概念

并发与并行

  • 并发(Concurrency)

    • 在单核处理器上,多个线程之间通过操作系统的调度机制交替执行,每个线程轮流占用 CPU 的时间片(例如每 10 毫秒切换一次)。由于每个线程执行的时间片非常短,人们在宏观上感受到这些线程像是 “同时” 在执行一样,虽然它们在物理层面上是串行执行的。这种 “看起来同时执行” 的场景被称为并发。
  • 并行(Parallelism)

    • 在多核处理器或多 CPU 系统中,多个线程可以被分配到不同的核心上同时执行,彼此之间互不抢占 CPU 时间资源,是真正意义上的 “同时执行”。这种多个任务在物理层面并行的执行方式称为并行。

总结

简而言之,并发是 "逻辑上的同时发生",并行是 "物理上的同时进行"。

多线程的适用场景

多线程程序的性能就一定好吗?不一定,要看具体的应用场景:

  • I/O 密集型场景

    • 对于涉及大量 I/O 操作(如磁盘读写、网络通信、数据库访问等)的程序,这些操作通常会导致线程阻塞,从而释放 CPU 时间片。此时,通过多线程并发处理,可以有效利用 CPU 的空闲时间,提高程序的吞吐量和响应速度。因此,无论是 CPU 单核、CPU 多核、多 CPU,多线程通常都能带来较好的性能提升,因此适合运行多线程程序。
  • CPU 密集型场景

    • CPU 单核
      • 在 CPU 单核中,多个线程无法真正并行执行,只能通过 CPU 时间片轮转实现 “伪并发”。多线程会导致频繁的线程上下文切换,增加 CPU 调度开销,甚至可能因为线程数量过多而拖慢整体性能。在这种情况下,使用单线程可能反而更高效,因此不适合运行多线程程序
    • CPU 多核、多 CPU
      • 多个线程可以被调度到多个核心上并行运行,从而充分利用计算资源,提高程序的运行效率。对于 CPU 密集型任务(如图像处理、大规模计算等),多线程可以显著提升性能,尤其是在合理控制线程数量、避免过度竞争的前提下,因此适合运行多线程程序。

多线程的协作机制

在 C++ 中,为了实现多线程之间的正确协作与安全访问共享资源,通常需要使用线程互斥与线程同步机制。

线程互斥

线程互斥用于防止多个线程同时访问共享资源,从而避免数据竞争和数据不一致的问题。常用机制包括:

  • mutex 互斥锁

    • 提供基本的加锁与解锁操作(lock() / unlock()lock_guard / unique_lock 自动管理),确保同一时间只有一个线程能够访问临界区(共享资源)。
  • recursive_mutex 递归互斥锁

    • 支持同一个线程多次对同一个互斥锁加锁,适用于递归函数调用场景。
  • atomic 原子类型

    • 提供无锁的并发访问机制,支持原子操作,如自增、比较交换等,适用于简单共享变量的并发访问,性能较高。

线程同步

线程同步用于协调多个线程之间的执行顺序,例如一个线程等待另一个线程完成某项任务后再继续执行。常用机制包括:

  • condition_variable 条件变量

    • 配合互斥锁(mutex)使用,允许线程在满足某个条件之前进入等待状态,并在条件满足后被唤醒。适合用于生产者 - 消费者模型等场景。
  • semaphore 信号量(C++ 20 引入)

    • 用于控制对某个资源的并发访问数量,即限制同一时刻最多有 N 个线程访问某个共享资源(可用作限流器),适用于如电商秒杀、停车场停车等业务场景。
  • barrier / latch(C++ 20 引入)

    • 用于多线程之间的阶段性同步。例如,所有线程都执行到某一步后再一起进入下一阶段。

多线程的额外开销

为了完成任务,创建很多的线程可以吗?线程真的是越多越好吗?答案是否定的。

  • 线程的创建和销毁都是非常 “重” 的操作(涉及用户空间和内核空间的切换);
  • 线程栈(通常为 8MB)本身占用大量内存空间;
  • 线程的上下文切换需要占用大量时间;
  • 大量线程同时唤醒,会导致操作系统经常出现锯齿状负载或者瞬间负载量很大,造成系统宕机。

特别注意

在 Linux 平台下,通过 pthread 库默认最多可以创建约 380 个线程。若希望创建更多的线程,可以降低线程栈的大小(比如 8KB),还可以增大 Linux 系统的最大可打开文件描述符数(ulimit -n),或者最大可创建的用户进程数 / 线程数(ulimit -u)。

线程池的介绍

线程池的使用优势

在操作系统中,线程的创建和销毁都是较为 “昂贵” 的操作,不仅耗时,而且会消耗较多的系统资源。如果在服务运行过程中,每当有任务到来就动态创建线程来执行,任务完成后又立即销毁线程,那么在高并发或大流量场景下,频繁的线程创建与销毁会显著降低系统的实时响应能力,增加 CPU 开销,从而影响整体业务的处理效率。线程池的出现正是为了解决这一问题。在线程池机制中,服务进程在启动阶段就会预先创建好一组可复用的线程(即线程池),这些线程会在后台处于等待状态。当业务任务到来时,系统无需重新创建线程,而是直接从线程池中取出一个空闲线程来执行任务(Task)。任务执行完毕后,该线程不会被销毁,而是归还给线程池,等待下一次任务(Task)的分配。

  • 线程池带来了多种优势:
    • 减少线程创建和销毁的开销,提高系统性能;
    • 避免线程资源耗尽,通过线程池大小限制线程总数,控制系统并发量;
    • 提升响应速度,线程可立即复用,减少任务启动延迟;
    • 便于线程管理和监控,统一由线程池控制线程生命周期和运行状态。

总结

线程池是一种非常重要的并发编程工具,特别适用于高并发、高吞吐、对实时性有要求的业务场景。

线程池的两种模式

  • Fixed 模式线程池(固定大小线程池)

    • 在这种模式下,线程池中的线程数量在创建时就被固定,且在整个线程池的生命周期内保持不变。一般在初始化线程池时,会根据当前机器的 CPU 核心数量或业务需求设定一个合理的线程数。
    • 当有任务到来时,如果线程池中有空闲线程,则立即分配执行;如果线程都处于忙碌状态,则任务会被放入等待队列中,排队等待有空闲的线程。
    • Fixed 模式适合任务量较为稳定、对系统资源可控性要求较高的场景,有助于防止线程数量膨胀导致系统负载过高。
  • Cached 模式线程池(缓存线程池)

    • 在这种模式下,线程池的线程数量不是固定的,而是可以根据实际任务量动态增长。当有新的任务到来时,如果线程池中没有空闲线程,会临时创建新的线程来处理任务,并在任务完成后将其归还到线程池中。
    • 为了防止线程无限增长,通常会设置一个线程最大数量的阀值。此外,如果某个线程在空闲超过指定时间(如 60 秒)后仍无新任务可执行,它将被自动销毁,从而释放资源。线程池会始终保留初始的核心线程数,用于处理正常业务流量。
    • Cached 模式适用于任务处理量波动较大、流量突发性强的场景,可以在短时间内快速扩展线程池以应对高并发流量,但需要注意合理设置线程上限以避免资源耗尽。

线程池的实现

整体设计

功能描述

  • (1) 基于 C++ 可变参数模板编程和引用折叠原理,实现线程池 submitTask 接口,支持任意任务函数和任意参数的传递
  • (2) 使用 future 类型定制 submitTask 接口提交任务的返回值
  • (3) 使用 mapqueue 容器管理线程对象和任务
  • (4) 基于条件变量 condition_variable 和互斥锁 mutex 实现任务提交线程和任务执行线程间的同步通信机制
  • (5) 支持 fixed 模式和 cached 模式的线程池定制

代码实现

C++ 11 版本

这里基于 C++ 11 手写一个线程池,并自行实现 make_unique()any 类型、信号量类型。

代码下载

C++ 11 实现线程池的完整案例代码可以从 这里 下载得到,所有案例代码都兼容 Windows 和 Linux 平台。

核心代码
  • extend.h 源文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#ifndef EXTEND_H
#define EXTEND_H

#include <memory>
#include <utility>
#include <type_traits>


///////////////////////////////////// make_unique() /////////////////////////////////////


namespace extend {
// 创建非数组类型对象
template<typename T, typename... Args>
typename std::enable_if<!std::is_array<T>::value, std::unique_ptr<T>>::type
make_unique(Args &&... args) {
return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
}

// 创建未知大小的数组(例如 make_unique<T[]>(n))
template<typename T>
typename std::enable_if<std::is_array<T>::value && std::extent<T>::value == 0, std::unique_ptr<T>>::type
make_unique(std::size_t size) {
using ElementType = typename std::remove_extent<T>::type;
return std::unique_ptr<T>(new ElementType[size]());
}

// 禁止使用定长数组(例如 make_unique<int[10]> 是不合法的)
template<typename T, typename... Args>
typename std::enable_if<(std::extent<T>::value != 0), void>::type
make_unique(Args &&...) = delete;
}


///////////////////////////////////// Any 类型 /////////////////////////////////////


// Any 类型(可以接收任意数据类型)
class Any {

public:
// 构造函数
Any() = default;

// 析构函数
~Any() = default;

// 禁用带左值的拷贝构造函数
Any(const Any &) = delete;

// 禁用带左值的赋值运算符
Any &operator=(const Any &) = delete;

// 带右值的拷贝构造函数(移动拷贝构造)
Any(Any&& other) = default;

// 带右值的赋值运算符(移动赋值运算符)
Any &operator=(Any &&other) = default;

// 通用构造函数(让 Any 类型可以接收任意数据类型)
template<typename T>
Any(T&& data) : base_(extend::make_unique<Derive<typename std::decay<T>::type>>(std::forward<T>(data))) {

}

// 类型转换(将 Any 类型存储的数据类型提取出来)
template<typename T>
T cast() {
if (base_ == nullptr) {
throw std::runtime_error("Any is empty");
}

// 将基类指针转换为派生类指针(类型向下转换)
Derive <T> *p = dynamic_cast<Derive <T> *>(base_.get());
if (p == nullptr) {
throw std::runtime_error("type is unmatch!");
}

// 返回真实的数据类型
return p->getData();
}

private:
// 基类类型
class Base {

public:
// 虚析构函数
virtual ~Base() = default;
};

// 派生类类型
template<typename T>
class Derive : public Base {

public:
// 通用构造函数
template<typename U>
Derive(U &&data) : data_(std::forward<U>(data)) {

}

// 析构函数
~Derive() {

}

T getData() const {
return data_;
}

private:
T data_;
};

private:
std::unique_ptr<Base> base_; // 基类指针

};


///////////////////////////////////// Semaphore 信号量 /////////////////////////////////////


// 信号量类(用于线程通信)
class Semaphore {

public:
// 构造函数
Semaphore(int limit = 0) : limit_(limit), isDestroyed(false) {

}

// 析构函数
~Semaphore() {
// 标记当前对象已经被析构
isDestroyed = true;
};

// 获取一个信号量资源
void wait() {
if (!isDestroyed) {
// 获取互斥锁
std::unique_lock<std::mutex> lock(mtx_);

// 等待信号量资源
cond_.wait(lock, [this]() { return limit_ > 0; });

// 更改资源计数
limit_--;
}
}

// 增加一个信号量资源
void post() {
if (!isDestroyed) {
// 获取互斥锁
std::unique_lock<std::mutex> lock(mtx_);

// 更改资源计数
limit_++;

// 通知其他线程获取信号量资源
// 特别注意,在默认情况下,Linux 平台中 condition_variable 的析构函数什么也没做,会导致这里状态已经失效;一旦外部使用它的对象(比如 Result)提前析构,就会无故阻塞线程,造成线程死锁
cond_.notify_all();
}
}

private:
int limit_; // 资源计数
std::mutex mtx_; // 互斥锁
std::condition_variable cond_; // 条件变量
std::atomic_bool isDestroyed; // 是否已经被析构(用于解决Linux平台的兼容问题)
};

#endif // EXTEND_H
  • thread_pool.h 源文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include<iostream>
#include<functional>
#include<vector>
#include<queue>
#include<memory>
#include<atomic>
#include<mutex>
#include<condition_variable>
#include<stdexcept>
#include<unordered_map>
#include<climits>
#include"extend.h"


////////////////////////////////////////// 线程池核心参数 ///////////////////////////////////////////


// 初始的线程数量(默认是CPU的核心数)
const int INIT_THREAD_SIZE = std::thread::hardware_concurrency();

// 任务队列的最大任务数量
const int TASK_MAX_THRESHHOLD = INT_MAX;

// 线程池Cached模式的最大线程数量
const int THREAD_SIZE_MAX_THRESHHOLD = 1024;

// 线程允许的最大空闲时间(单位秒)
const int THREAD_MAX_IDLE_TIME = 60;

// 线程池支持的模式
enum class PoolMode {
MODE_FIXED, // 固定大小线程池
MODE_CACHED // 缓存线程池
};


////////////////////////////////////////// 任务抽象类 /////////////////////////////////////////////


// 任务执行结果类的前置声明
class Result;

class Task {

public:
// 构造函数
Task();

// 析构函数
~Task() = default;

// 纯虚函数,实现用户自定义的任务处理逻辑
virtual Any run() = 0;

// 执行任务
void exec();

// 设置任务执行结果
void setResult(Result *p);

private:
Result *result_; // 任务执行结果(使用裸指针,避免智能指针循环引用问题)
};


////////////////////////////////////////// 任务结果类 /////////////////////////////////////////////


class Result {

public:
// 构造函数
Result(std::shared_ptr<Task> task, bool isValid = true);

// 析构函数
~Result() = default;

// 获取任务执行结果
Any get();

// 设置任务执行结果
void setVal(Any data);

// 判断任务执行结果是否有效
bool isValid() const;

// 判断关联的任务是否已完成
bool isFinished() const;

private:
Any data_; // 存储任务执行的结果
Semaphore sem_; // 线程通信的信号量
std::atomic_bool isValid_; // 任务执行结果是否有效
std::shared_ptr<Task> task_; // 关联的任务
std::atomic_bool isFinished_; // 关联的任务是否已执行完成
};


////////////////////////////////////////// 线程类 /////////////////////////////////////////////


class Thread {

public:
// 线程处理函数对象的类型
using ThreadHandler = std::function<void(int)>;

// 线程构造
Thread(ThreadHandler handler);

// 线程析构
~Thread();

// 启动线程
void start();

// 获取线程ID
int getId() const;

private:
int threadId_; // 线程ID
static int generateId_; // 用于辅助生成全局唯一的线程ID
ThreadHandler threadHandler_; // 线程处理函数
};


////////////////////////////////////////// 线程池类 /////////////////////////////////////////////


class ThreadPool {

public:
// 线程池构造
ThreadPool();

// 线程池析构
~ThreadPool();

// 设置线程池的工作模式
void setMode(PoolMode mode);

// 设置线程池Cached模式的最大线程数量
void setThreadSizeMaxThreshHold(int threshhold);

// 设置任务队列的最大任务数量
void setTaskQueMaxThreshHold(size_t threshhold);

// 启动线程池
void start(int initThreadSize = INIT_THREAD_SIZE);

// 提交任务给线程池
std::shared_ptr<Result> submitTask(std::shared_ptr<Task> task);

// 禁止拷贝构造
ThreadPool(const ThreadPool &) = delete;

// 禁止赋值
ThreadPool &operator=(const ThreadPool &) = delete;

private:
// 线程处理函数(负责执行任务)
void threadHandler(int threadId);

// 检查线程池的运行状态
bool checkRunningState() const;

// 清理已完成的任务执行结果
void cleanTaskResult();

private:
std::unordered_map<int, std::unique_ptr<Thread>> threads_; // 线程集合
PoolMode poolMode_; // 线程池的模式
std::atomic_bool isPoolRuning_; // 表示线程池是否正在运行

size_t initThreadSize_; // 初始的线程数量
std::atomic_int idleThreadSize_; // 空闲线程的数量
std::atomic_int curThreadSize_; // 当前线程池的线程数量
int threadSizeMaxThreshHold_; // 线程池Cached模式的最大线程数量

std::queue<std::shared_ptr<Task>> taskQueue_; // 任务队列
std::atomic_uint taskSize_; // 当前任务队列的任务数量
size_t taskQueMaxThreshHold_; // 任务队列的最大任务数量

std::mutex taskQueMtx_; // 任务队列操作的互斥锁
std::condition_variable notFull_; // 表示任务队列不满,用于通知用户线程提交任务
std::condition_variable notEmpty_; // 表示任务队列不空,用于通知线程池中的线程执行任务
std::condition_variable allExit_; // 表示等待线程池回收所有线程

std::vector<std::shared_ptr<Result>> taskResults_; // 任务执行结果列表,用于避免任务执行结果比任务早被析构
std::mutex taskResultsMtx_; // 任务执行结果的互斥锁
};

#endif // THREAD_POOL_H
  • thread_pool.cpp 源文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
#include<iostream>
#include<algorithm>
#include<functional>
#include<thread>
#include<memory>
#include<mutex>
#include"thread_pool.h"


////////////////////////////////////////// 任务抽象类 /////////////////////////////////////////////


// 构造函数
Task::Task() : result_(nullptr) {

}

// 执行任务
void Task::exec() {
// 执行任务处理逻辑(发生多态调用)
Any data = run();

// 设置任务执行结果
if (result_ != nullptr) {
result_->setVal(std::move(data));
}
}

// 设置任务执行结果
void Task::setResult(Result *p) {
result_ = p;
}


////////////////////////////////////////// 任务结果类 /////////////////////////////////////////////


// 构造函数
Result::Result(std::shared_ptr<Task> task, bool isValid) : task_(task), isValid_(isValid), isFinished_(false) {
// 关联任务和任务执行结果
task->setResult(this);
}

// 获取任务执行结果
Any Result::get() {
// 如果任务执行结果无效,则直接返回
if (!isValid_) {
return "";
}

// 等待获取一个信号量资源(即让当前线程等待任务执行完成)
sem_.wait();

// 返回任务执行完成的结果
return std::move(data_);
}

// 设置任务执行结果
void Result::setVal(Any data) {
// 存储任务执行结果
data_ = std::move(data);

// 设置关联的任务已执行完成
isFinished_ = true;

// 增加一个信号量资源(即通知其他线程获取任务执行结果)
sem_.post();
}

// 判断任务执行结果是否有效
bool Result::isValid() const {
return isValid_;
}

// 判断关联的任务是否已完成
bool Result::isFinished() const {
return isFinished_;
}


////////////////////////////////////////// 线程类 /////////////////////////////////////////////


// 线程构造
Thread::Thread(ThreadHandler handler) : threadHandler_(handler), threadId_(generateId_++) {

}

// 线程析构
Thread::~Thread() {

}

// 启动线程
void Thread::start() {
// 创建一个线程,并执行线程处理函数
std::thread t(threadHandler_, threadId_);

// 将子线程设置为分离线程
t.detach();
}


// 获取线程ID
int Thread::getId() const {
return threadId_;
}

// 初始化用于辅助生成全局唯一的线程ID
int Thread::generateId_ = 0;


////////////////////////////////////////// 线程池类 ////////////////////////////////////////////


// 线程池构造
ThreadPool::ThreadPool() {
idleThreadSize_ = 0;
curThreadSize_ = INIT_THREAD_SIZE;
initThreadSize_ = INIT_THREAD_SIZE;
threadSizeMaxThreshHold_ = THREAD_SIZE_MAX_THRESHHOLD;
taskSize_ = 0;
taskQueMaxThreshHold_ = TASK_MAX_THRESHHOLD;
poolMode_ = PoolMode::MODE_FIXED;
isPoolRuning_ = false;

}

// 线程池析构
ThreadPool::~ThreadPool() {
// 设置线程池的运行状态
isPoolRuning_ = false;

// 获取互斥锁,用于等待线程池里面所有的线程结束运行(线程有两种状态:阻塞等待获取任务 & 正在执行任务中)
std::unique_lock<std::mutex> lock(taskQueMtx_);

// 必须先获取互斥锁,然后再唤醒所有正在等待获取任务的线程,避免发生线程死锁问题
notEmpty_.notify_all();

// 等待线程池里的所有线程回收完成
allExit_.wait(lock, [this]() { return threads_.size() == 0; });
}

// 设置线程池的工作模式
void ThreadPool::setMode(PoolMode mode) {
if (!checkRunningState()) {
poolMode_ = mode;
}
}

// 设置线程池Cached模式的最大线程数量
void ThreadPool::setThreadSizeMaxThreshHold(int threshhold) {
if (PoolMode::MODE_CACHED == poolMode_ && !checkRunningState()) {
threadSizeMaxThreshHold_ = threshhold;
}
}

// 设置任务队列的最大任务数量
void ThreadPool::setTaskQueMaxThreshHold(size_t threshhold) {
if (!checkRunningState()) {
taskQueMaxThreshHold_ = threshhold;
}
}

// 检查线程池的运行状态
bool ThreadPool::checkRunningState() const {
return isPoolRuning_;
}

// 清理已完成的任务执行结果
void ThreadPool::cleanTaskResult() {
// 获取互斥锁
std::lock_guard<std::mutex> resultLock(taskResultsMtx_);
// 将满足条件的元素移动到容器末尾
auto new_end = std::remove_if(taskResults_.begin(), taskResults_.end(), [](const std::shared_ptr<Result> &res) {
return res->isFinished();
});
// 删除容器末尾那段区域的所有元素
taskResults_.erase(new_end, taskResults_.end());
}

// 启动线程池
void ThreadPool::start(int initThreadSize) {
// 设置线程池的运行状态
isPoolRuning_ = true;

// 记录初始的线程数量
initThreadSize_ = initThreadSize;

// 记录当前线程池的线程数量
curThreadSize_ = initThreadSize;

// 创建初始的线程
for (int i = 0; i < initThreadSize_; i++) {
// 创建线程对象,并将线程处理函数传递给线程对象的构造函数
std::unique_ptr<Thread> thread = extend::make_unique<Thread>(std::bind(&ThreadPool::threadHandler, this, std::placeholders::_1));
// 将线程对象放入线程集合中
threads_.emplace(thread->getId(), std::move(thread));
}

// 启动初始的线程
for (int i = 0; i < initThreadSize_; i++) {
threads_[i]->start(); // 启动一个线程去执行线程处理函数
idleThreadSize_++; // 记录初始空闲线程的数量
}
}

// 线程处理函数(负责执行任务)
void ThreadPool::threadHandler(int threadId) {
// 记录当前线程首次运行的时间
auto lastTime = std::chrono::high_resolution_clock().now();

// For死循环,为了实现在线程池结束时,所有任务必须执行完成,线程池才可以回收线程
for (;;) {
// 获取互斥锁
std::unique_lock<std::mutex> lock(taskQueMtx_);

// 打印日志信息
std::cout << "thread " << std::this_thread::get_id() << " 等待获取任务..." << std::endl;

// 让当前线程等待获取任务,使用While循环避免虚假唤醒
while (taskQueue_.size() == 0) {
// 如果任务列表为空,且线程池要结束运行,则回收当前线程
if (!checkRunningState()) {
// 从线程集合中删除当前线程
threads_.erase(threadId);
// 唤醒等待线程池回收完毕的线程
allExit_.notify_all();
// 打印日志信息
std::cout << "thread pool destroy, thread " << std::this_thread::get_id() << " exited." << std::endl;
// 结束线程处理函数的执行,相当于结束当前线程
return;
}
// 线程池Cached模式的处理,由于Cached模式下有可能已经创建了很多的线程,但是空闲时间超过最大阀值,因此需要将多余的空闲线程回收掉
if (PoolMode::MODE_CACHED == poolMode_) {
std::cv_status waitResult = notEmpty_.wait_for(lock, std::chrono::seconds(1));
// 需要区分超时返回,还是线程正常被唤醒返回
if (std::cv_status::timeout == waitResult) {
auto nowTime = std::chrono::high_resolution_clock().now();
auto duration = std::chrono::duration_cast<std::chrono::seconds>(nowTime - lastTime);
// 当线程的空闲时间超过最大阀值,且当前线程池的线程数量大于初始线程数量,则开始回收线程池中的空闲线程
if (duration.count() > THREAD_MAX_IDLE_TIME && curThreadSize_ > initThreadSize_) {
// 从线程集合中删除当前线程
threads_.erase(threadId);
// 更新空闲线程的数量
idleThreadSize_--;
// 更新当前线程池的线程数量
curThreadSize_--;
// 打印日志信息
std::cout << "idle thread " << std::this_thread::get_id() << " exited." << std::endl;
// 结束线程处理函数的执行,相当于结束当前线程
return;
}
}
}
// 线程池Fixed模式的处理
else {
// 等待任务队列不为空
notEmpty_.wait(lock);
}
}

// 更新空闲线程数量(在当前线程执行任务之前)
idleThreadSize_--;

// 从任务队列中获取需要执行的任务
std::shared_ptr<Task> task = taskQueue_.front();

// 将任务从任务队列中移除
taskQueue_.pop();
taskSize_--;

// 打印日志信息
std::cout << "thread " << std::this_thread::get_id() << " 成功获取任务..." << std::endl;

// 如果获取了任务之后,任务队列依旧不为空,则继续通知其他线程执行任务
if (taskQueue_.size() > 0) {
notEmpty_.notify_all();
}

// 因为刚获取了任务,任务队列肯定有空余位置(不满),通知用户提交任务到线程池
notFull_.notify_all();

// 释放互斥锁(在当前线程执行任务之前)
lock.unlock();

// 当前线程负责执行任务
if (task != nullptr) {
task->exec();
}

// 更新空闲线程数量(在当前线程执行完任务之后)
idleThreadSize_++;

// 清理已完成的任务执行结果(在当前线程执行完任务之后)
cleanTaskResult();

// 更新当前线程最后执行完任务的时间
lastTime = std::chrono::high_resolution_clock().now();
}
}

// 提交任务给线程池
std::shared_ptr<Result> ThreadPool::submitTask(std::shared_ptr<Task> task) {
// 获取互斥锁
std::unique_lock<std::mutex> lock(taskQueMtx_);

// 等待任务队列有空余位置(不满)
bool waitResult = notFull_.wait_for(lock, std::chrono::seconds(1), [this]() { return taskQueue_.size() < taskQueMaxThreshHold_; });
// 如果等待超时,则返回无效的任务执行结果
if (!waitResult) {
// 打印错误信息
std::cerr << "task queue is full, submit task failed.";
// 无效的任务执行结果
std::shared_ptr<Result> result = std::make_shared<Result>(task, false);
// 获取互斥锁
std::unique_lock<std::mutex> resultLock(taskResultsMtx_);
// 将任务执行结果保存起来,防止用户未使用而导致提前析构
taskResults_.emplace_back(result);
// 返回任务执行结果
return result;
}

// 如果任务队列有空余位置(不满),则将任务放入任务队列中
taskQueue_.emplace(task);
taskSize_++;

// 因为刚放入了新任务,任务队列肯定不为空,通知线程池中的线程去执行任务
notEmpty_.notify_all();

// 线程池Cached模式的处理,根据任务数量动态增加线程池的线程数量
if (PoolMode::MODE_CACHED == poolMode_ && taskSize_ > idleThreadSize_ && curThreadSize_ < threadSizeMaxThreshHold_) {
// 打印日志信息
std::cout << "expand and create new thread." << std::endl;
// 创建新线程对象,并将线程处理函数传递给线程对象的构造函数
std::unique_ptr<Thread> thread = extend::make_unique<Thread>(std::bind(&ThreadPool::threadHandler, this, std::placeholders::_1));
// 获取线程ID(必须在线程放入线程集合之前获取一次线程ID,否则后续将可能获取到空值)
int threadId = thread->getId();
// 将新线程对象放入线程集合中
threads_.emplace(threadId, std::move(thread));
// 启动新线程
threads_[threadId]->start();
// 更新空闲线程的数量
idleThreadSize_++;
// 更新当前线程池的线程数量
curThreadSize_++;
}

// 有效的任务执行结果
std::shared_ptr<Result> result = std::make_shared<Result>(task);
// 获取互斥锁
std::unique_lock<std::mutex> resultLock(taskResultsMtx_);
// 将任务执行结果保存起来,防止用户未使用而导致提前析构
taskResults_.emplace_back(result);
// 返回任务执行结果
return result;
}
测试代码
  • test.cpp 源文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#include<iostream>
#include<vector>
#include<thread>
#include"thread_pool.h"

// 类型重定义
using ULong = unsigned long long;

// 自定义任务类(模拟并行计算)
class CalculateTask : public Task {

public:

CalculateTask(ULong begin, ULong end) : begin_(begin), end_(end) {

}

virtual Any run() override {
std::cout << "execute task by thread " << std::this_thread::get_id() << std::endl;

// 当前线程执行计算
ULong sum = 0;
for (ULong i = begin_; i <= end_; ++i) {
sum += i;
}

// 模拟任务执行耗时
std::this_thread::sleep_for(std::chrono::seconds(5));

// 返回当前线程的计算结果
return Any(sum);
}

private:
ULong begin_; // 开始计算的位置
ULong end_; // 结束计算的位置
};

int main() {
// 线程池的工作模式
int poolMode = -1;

while (true) {
// 获取用户输入
std::cout << "请选择线程池的工作模式,0 - Fixed,1 - Cached" << std::endl;
std::cin >> poolMode;

// 判断是否为非法输入
if (std::cin.fail()) {
// 清除错误标志位
std::cin.clear();
// 丢弃错误输入
std::cin.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
std::cout << "输入无效,请重新输入一个合法的数字!\n" << std::endl;
} else {
// 清空输入缓冲区
std::cin.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
break;
}
}

// 局部作用域开始
{
// 创建线程池
ThreadPool pool;

// 设置线程池的工作模式
if (poolMode == 0) {
// Fixed模式(固定大小线程池)
pool.setMode(PoolMode::MODE_FIXED);
std::cout << "线程池工作模式:Fixed" << std::endl;
} else {
// Cached模式(缓存线程池)
pool.setMode(PoolMode::MODE_CACHED);

// 设置线程池Cached模式的最大线程数量
pool.setThreadSizeMaxThreshHold(8);

std::cout << "线程池工作模式:Cached" << std::endl;
}

// 启动线程池(指定初始的线程数量)
pool.start(4);

std::vector<std::shared_ptr<Result>> results;
ULong begin = 0;
ULong end = 0;
ULong step = 100000;

// 提交多个任务
for (int i = 0; i < 10; i++) {
// 计算区间
begin = end + 1;
end = begin + step - 1;

// 创建任务
std::shared_ptr<Task> task = std::make_shared<CalculateTask>(begin, end);

// 提交任务
std::shared_ptr<Result> result = pool.submitTask(task);

// 存储任务执行结果
if (result->isValid()) {
results.emplace_back(result);
}
}

// 统计任务执行结果
ULong sum = 0;
for (int i = 0; i < results.size(); i++) {
// 阻塞等待任务执行完成,并获取任务执行结果
ULong result = results[i]->get().cast<ULong>();
sum += result;
}

// 输出并行计算结果
std::cout << "==> 计算结果:1 + 2 + ... + " << end << " = " << sum << std::endl;

// 如果是Cached模式,则等待一段时间,触发线程池回收空闲线程
if (poolMode == 1) {
std::cout << "==> 等待空闲线程被回收(默认的最大空闲时间是60秒)..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(THREAD_MAX_IDLE_TIME + 5));
}

} // 局部作用域结束,线程池自动析构,回收线程池中的所有线程

// 阻塞主线程,直到用户按下任意键才结束程序
char c = getchar();

return 0;
}
  • Linux 平台编译测试代码,生成并运行可执行测试程序(使用的 g++ 版本是 4.8.5
1
2
3
4
5
6
7
8
# 进入项目的源码根目录
cd c++-11-thread-pool

# 编译生成可执行测试程序
g++ -Iinclude src/thread_pool.cpp src/test.cpp -o thread_pool_test -pthread -std=c++11

# 运行可执行测试程序
./thread_pool_test
  • 程序运行的输出结果(线程池使用 Fixed 模式)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
线程池工作模式:Fixed
thread 11872 等待获取任务...
thread 11872 成功获取任务...
execute task by thread 11872
thread 480 等待获取任务...
thread 480 成功获取任务...
execute task by thread 480
thread 9140 等待获取任务...
thread 9140 成功获取任务...
execute task by thread 9140
thread 2620 等待获取任务...
thread 2620 成功获取任务...
execute task by thread 2620
thread 480 等待获取任务...
thread 480 成功获取任务...
execute task by thread 480
thread 9140 等待获取任务...
thread 9140 成功获取任务...
execute task by thread 9140
thread 11872 等待获取任务...
thread 11872 成功获取任务...
execute task by thread 11872
thread 2620 等待获取任务...
thread 2620 成功获取任务...
execute task by thread 2620
thread 9140 等待获取任务...
thread 9140 成功获取任务...
execute task by thread 9140
thread 11872 等待获取任务...
thread 11872 成功获取任务...
execute task by thread 11872
thread 2620 等待获取任务...
thread 480 等待获取任务...
thread 11872 等待获取任务...
thread 9140 等待获取任务...
==> 计算结果:1 + 2 + ... + 1000000 = 500000500000
thread pool destroy, thread 480 exited.
thread pool destroy, thread 11872 exited.
thread pool destroy, thread 2620 exited.
thread pool destroy, thread 9140 exited.
>>> 按下任意键,然后再按下回车键结束程序
  • 程序运行的输出结果(线程池使用 Cached 模式)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
线程池工作模式:Cached
expand and create new thread.
expand and create new thread.
expand and create new thread.
expand and create new thread.
thread 15144 等待获取任务...
thread 15144 成功获取任务...
execute task by thread 15144
thread 11812 等待获取任务...
thread 11812 成功获取任务...
execute task by thread 11812
thread 20864 等待获取任务...
thread 20864 成功获取任务...
execute task by thread 20864
thread 20812 等待获取任务...
thread 20812 成功获取任务...
execute task by thread 20812
thread 3904 等待获取任务...
thread 3904 成功获取任务...
execute task by thread 3904
thread 15616 等待获取任务...
thread 15616 成功获取任务...
execute task by thread 15616
thread 9800 等待获取任务...
thread 9800 成功获取任务...
execute task by thread 9800
thread 480 等待获取任务...
thread 480 成功获取任务...
execute task by thread 480
thread 11812 等待获取任务...
thread 11812 成功获取任务...
execute task by thread 11812
thread 20864 等待获取任务...
thread 20864 成功获取任务...
execute task by thread 20864
thread 20812 等待获取任务...
thread 15144 等待获取任务...
thread 15616 等待获取任务...
thread 9800 等待获取任务...
thread 3904 等待获取任务...
thread 480 等待获取任务...
thread 11812 等待获取任务...
thread 20864 等待获取任务...
==> 计算结果:1 + 2 + ... + 1000000 = 500000500000
==> 等待空闲线程被回收(默认的最大空闲时间是60秒)...
idle thread 15616 exited.
idle thread 9800 exited.
idle thread 480 exited.
idle thread 15144 exited.
thread pool destroy, thread 20864 exited.
thread pool destroy, thread 11812 exited.
thread pool destroy, thread 20812 exited.
thread pool destroy, thread 3904 exited.
>>> 按下任意键,然后再按下回车键结束程序
动态链接库

为了便于将线程池库提供给第三方使用,可以将线程池项目的核心源码编译成动态链接库。下面将以 Linux 平台为例,介绍使用不同的方式将线程池项目编译为动态链接库(.so 文件)。值得一提的是,C++ 线程池项目的目录结构如下:

1
2
3
4
5
6
7
8
c++-11-thread-pool
├── CMakeLists.txt
├── include
│   ├── extend.h
│   └── thread_pool.h
└── src
├── test.cpp
└── thread_pool.cpp
GCC 编译

版本说明

本文使用的 g++ 版本是 4.8.5,若版本过低,可能会出现线程池代码编译失败的问题。

  • 项目的编译构建
1
2
3
4
5
6
7
8
# 进入项目的源码根目录
cd c++-11-thread-pool

# 编译生成动态链接库
g++ -fPIC -Iinclude -shared src/thread_pool.cpp -o libthread_pool.so -pthread -std=c++11

# 编译生成可执行测试程序
g++ -Iinclude src/thread_pool.cpp src/test.cpp -o thread_pool_test -pthread -std=c++11
  • 编译参数的说明
编译参数参数说明
-fPIC生成位置无关代码,.so 文件必须使用
-Iinclude指定头文件目录为 include
-shared指定编译生成共享库(动态链接库)
src/thread_pool.cpp指定源文件路径
-o libthread_pool.so指定编译输出的文件名
-pthread链接 pthread
-std=c++11指定 C++ 版本
  • 动态链接库的安装(可选),如果不安装,第三方程序在运行时可能会遇到找不到动态链接库的问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 进入项目的源码根目录
cd c++-11-thread-pool

# 将头文件安装到系统中
sudo cp ./include/* /usr/local/include

# 将编译生成的动态链接库安装到系统中
sudo cp libthread_pool.so /usr/local/lib

# 创建系统的动态链接库配置文件
sudo echo "/usr/local/lib" > /etc/ld.so.conf.d/custom.conf

# 重载系统的动态链接库配置
sudo ldconfig
CMake 编译

版本说明

本文使用的 g++ 版本是 4.8.5,CMake 版本是 3.25.1,若版本过低,可能会出现线程池代码编译失败的问题。值得一提的是,CMake 的详细使用教程可以参考 这里

  • 项目的 CMake 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
cmake_minimum_required(VERSION 3.15)

# 项目名称和语言
project(cxx_11_thread_pool VERSION 1.0 LANGUAGES CXX)

# 设置 C++ 版本
set(CMAKE_CXX_STANDARD 11)

# 设置构建模式
set(CMAKE_BUILD_TYPE Debug CACHE STRING "" FORCE)

# 设置头文件目录
include_directories(${CMAKE_SOURCE_DIR}/include)

# 设置源文件
file(GLOB SRC_FILES src/*.cpp)

# 编译生成动态链接库
add_library(thread_pool SHARED ${SRC_FILES})

# 设置动态链接库的文件名称
set_target_properties(thread_pool PROPERTIES
OUTPUT_NAME "thread_pool"
PREFIX "lib"
SUFFIX ".so"
POSITION_INDEPENDENT_CODE ON
)

# 设置动态链接库的链接标志
if (CMAKE_SYSTEM_NAME STREQUAL "Linux")
target_link_libraries(thread_pool PRIVATE pthread)
endif ()

# 编译生成可执行测试程序
add_executable(thread_pool_test ${SRC_FILES})

# 设置可执行测试程序的链接标志
if (CMAKE_SYSTEM_NAME STREQUAL "Linux")
target_link_libraries(thread_pool_test PRIVATE pthread)
endif ()

# 设置默认的安装前缀路径
if(CMAKE_INSTALL_PREFIX_INITIALIZED_TO_DEFAULT)
set(CMAKE_INSTALL_PREFIX "/usr/local" CACHE PATH "Install path prefix." FORCE)
endif()

# 将头文件和编译生成的动态链接库安装到指定位置(可选)
install(DIRECTORY include/ DESTINATION include)
install(TARGETS thread_pool DESTINATION lib)
  • 项目的编译构建
1
2
3
4
5
# 进入项目的源码根目录
cd c++-11-thread-pool

# 编译生成动态链接库和可执行测试程序
cmake -S . -B build && cmake --build build
  • 编译参数的说明
编译参数参数说明
-S .指定源码目录为当前目录(包含 CMakeLists.txt
-B build指定构建目录为 build,不存在时会自动创建
cmake --build build执行编译(相当于 make
  • 动态链接库的安装(可选),如果不安装,第三方程序在运行时可能会遇到找不到动态链接库的问题
1
2
3
4
5
6
7
8
9
10
11
# 进入项目的源码根目录
cd c++-11-thread-pool

# 将头文件和编译生成的动态链接库安装到系统中
sudo cmake --install build

# 创建系统的动态链接库配置文件
sudo echo "/usr/local/lib" > /etc/ld.so.conf.d/custom.conf

# 重载系统的动态链接库配置
sudo ldconfig

C++ 17 版本

这里基于 C++ 17 手写一个线程池,使用了可变参数模板、make_unique()packaged_taskfuture 特性。

代码下载

C++ 17 实现线程池的完整案例代码可以从 这里 下载得到,所有案例代码都兼容 Windows 和 Linux 平台。

核心代码
  • thread_pool.h 源文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include<iostream>
#include<functional>
#include<vector>
#include<queue>
#include<unordered_map>
#include<memory>
#include<future>
#include<atomic>
#include<mutex>
#include<condition_variable>
#include<stdexcept>
#include<climits>


////////////////////////////////////////// 线程池核心参数 ///////////////////////////////////////////


// 初始的线程数量(默认是CPU的核心数)
const int INIT_THREAD_SIZE = std::thread::hardware_concurrency();

// 任务队列的最大任务数量
const int TASK_MAX_THRESHHOLD = INT_MAX;

// 线程池Cached模式的最大线程数量
const int THREAD_SIZE_MAX_THRESHHOLD = 1024;

// 线程允许的最大空闲时间(单位秒)
const int THREAD_MAX_IDLE_TIME = 60;

// 线程池支持的模式
enum class PoolMode {
MODE_FIXED, // 固定大小线程池
MODE_CACHED // 缓存线程池
};


////////////////////////////////////////// 线程类 /////////////////////////////////////////////


class Thread {

public:
// 线程处理函数对象的类型
using ThreadHandler = std::function<void(int)>;

// 线程构造
Thread(ThreadHandler handler) : threadHandler_(handler), threadId_(generateId_++) {

}

// 线程析构
~Thread() {

}

// 启动线程
void start() {
// 创建一个线程,并执行线程处理函数
std::thread t(threadHandler_, threadId_);

// 将子线程设置为分离线程
t.detach();
}

// 获取线程ID
int getId() const {
return threadId_;
}

private:
int threadId_; // 线程ID
static int generateId_; // 用于辅助生成全局唯一的线程ID
ThreadHandler threadHandler_; // 线程处理函数
};

// 初始化用于辅助生成全局唯一的线程ID
int Thread::generateId_ = 0;


////////////////////////////////////////// 线程池类 /////////////////////////////////////////////


class ThreadPool {

public:
// 线程池构造
ThreadPool() {
idleThreadSize_ = 0;
curThreadSize_ = INIT_THREAD_SIZE;
initThreadSize_ = INIT_THREAD_SIZE;
threadSizeMaxThreshHold_ = THREAD_SIZE_MAX_THRESHHOLD;
taskSize_ = 0;
taskQueMaxThreshHold_ = TASK_MAX_THRESHHOLD;
poolMode_ = PoolMode::MODE_FIXED;
isPoolRuning_ = false;
}

// 线程池析构
~ThreadPool() {
// 设置线程池的运行状态
isPoolRuning_ = false;

// 获取互斥锁,用于等待线程池里面所有的线程结束运行(线程有两种状态:阻塞等待获取任务 & 正在执行任务中)
std::unique_lock<std::mutex> lock(taskQueMtx_);

// 必须先获取互斥锁,然后再唤醒所有正在等待获取任务的线程,避免发生线程死锁问题
notEmpty_.notify_all();

// 等待线程池里的所有线程回收完成
allExit_.wait(lock, [this]() { return threads_.size() == 0; });
}

// 设置线程池的工作模式
void setMode(PoolMode mode) {
if (!checkRunningState()) {
poolMode_ = mode;
}
}

// 设置线程池Cached模式的最大线程数量
void setThreadSizeMaxThreshHold(int threshhold) {
if (PoolMode::MODE_CACHED == poolMode_ && !checkRunningState()) {
threadSizeMaxThreshHold_ = threshhold;
}
}

// 设置任务队列的最大任务数量
void setTaskQueMaxThreshHold(size_t threshhold) {
if (!checkRunningState()) {
taskQueMaxThreshHold_ = threshhold;
}
}

// 启动线程池
void start(int initThreadSize) {
// 设置线程池的运行状态
isPoolRuning_ = true;

// 记录初始的线程数量
initThreadSize_ = initThreadSize;

// 记录当前线程池的线程数量
curThreadSize_ = initThreadSize;

// 创建初始的线程
for (int i = 0; i < initThreadSize_; i++) {
// 创建线程对象,并将线程处理函数传递给线程对象的构造函数
std::unique_ptr<Thread> thread = std::make_unique<Thread>(std::bind(&ThreadPool::threadHandler, this, std::placeholders::_1));
// 将线程对象放入线程集合中
threads_.emplace(thread->getId(), std::move(thread));
}

// 启动初始的线程
for (int i = 0; i < initThreadSize_; i++) {
threads_[i]->start(); // 启动一个线程去执行线程处理函数
idleThreadSize_++; // 记录初始空闲线程的数量
}
}

// 提交任务给线程池(使用可变参数模板 + 引用折叠 + 完美转发)
template<typename Func, typename... Args>
auto submitTask(Func &&func, Args &&...args) -> std::future<decltype(func(args...))> {
// 推导任务执行结果的类型(返回值类型)
using RType = decltype(func(args...));

// 封装一个任意可调用对象(函数、函数对象、Lambda表达式等)为异步任务
auto task = std::make_shared<std::packaged_task<RType()>>(
std::bind(std::forward<Func>(func), std::forward<Args>(args)...));

// 获取与异步任务关联的Future,方便用户获取任务执行结果
std::future<RType> future = task->get_future();

// 获取互斥锁
std::unique_lock<std::mutex> lock(taskQueMtx_);

// 等待任务队列有空余位置(不满)
bool waitResult = notFull_.wait_for(lock, std::chrono::seconds(1), [this]() { return taskQueue_.size() < taskQueMaxThreshHold_; });
// 如果等待超时,则直接返回Future
if (!waitResult) {
// 打印错误信息
std::cerr << "task queue is full, submit task failed.";
// 封装一个空的任务
auto task = std::make_shared<std::packaged_task<RType()>>([]() -> RType { return RType(); });
// 执行一个空的任务
(*task)();
// 返回与任务关联的Future
return task->get_future();
}

// 如果任务队列有空余位置(不满),则将任务放入任务队列中
taskQueue_.emplace([task]() {
(*task)();
});

// 更新当前任务队列的任务数量
taskSize_++;

// 因为刚放入了新任务,任务队列肯定不为空,通知线程池中的线程去执行任务
notEmpty_.notify_all();

// 线程池Cached模式的处理,根据任务数量动态增加线程池的线程数量
if (PoolMode::MODE_CACHED == poolMode_ && taskSize_ > idleThreadSize_ && curThreadSize_ < threadSizeMaxThreshHold_) {
// 打印日志信息
std::cout << "expand and create new thread." << std::endl;
// 创建新线程对象,并将线程处理函数传递给线程对象的构造函数
std::unique_ptr<Thread> thread = std::make_unique<Thread>(std::bind(&ThreadPool::threadHandler, this, std::placeholders::_1));
// 获取线程ID(必须在线程放入线程集合之前获取一次线程ID,否则后续将可能获取到空值)
int threadId = thread->getId();
// 将新线程对象放入线程集合中
threads_.emplace(threadId, std::move(thread));
// 启动新线程
threads_[threadId]->start();
// 更新空闲线程的数量
idleThreadSize_++;
// 更新当前线程池的线程数量
curThreadSize_++;
}

// 返回与任务关联的Future
return future;
}

// 禁止拷贝构造
ThreadPool(const ThreadPool &) = delete;

// 禁止赋值
ThreadPool &operator=(const ThreadPool &) = delete;

private:
// 线程处理函数(负责执行任务)
void threadHandler(int threadId) {
// 记录当前线程首次运行的时间
auto lastTime = std::chrono::high_resolution_clock().now();

// For死循环,为了实现在线程池结束时,所有任务必须执行完成,线程池才可以回收线程
for (;;) {
// 获取互斥锁
std::unique_lock<std::mutex> lock(taskQueMtx_);

// 打印日志信息
std::cout << "thread " << std::this_thread::get_id() << " 等待获取任务..." << std::endl;

// 让当前线程等待获取任务,使用While循环避免虚假唤醒
while (taskQueue_.size() == 0) {
// 如果任务列表为空,且线程池要结束运行,则回收当前线程
if (!checkRunningState()) {
// 从线程集合中删除当前线程
threads_.erase(threadId);
// 唤醒等待线程池回收完毕的线程
allExit_.notify_all();
// 打印日志信息
std::cout << "thread pool destroy, thread " << std::this_thread::get_id() << " exited." << std::endl;
// 结束线程处理函数的执行,相当于结束当前线程
return;
}
// 线程池Cached模式的处理,由于Cached模式下有可能已经创建了很多的线程,但是空闲时间超过最大阀值,因此需要将多余的空闲线程回收掉
if (PoolMode::MODE_CACHED == poolMode_) {
std::cv_status waitResult = notEmpty_.wait_for(lock, std::chrono::seconds(1));
// 需要区分超时返回,还是线程正常被唤醒返回
if (std::cv_status::timeout == waitResult) {
auto nowTime = std::chrono::high_resolution_clock().now();
auto duration = std::chrono::duration_cast<std::chrono::seconds>(nowTime - lastTime);
// 当线程的空闲时间超过最大阀值,且当前线程池的线程数量大于初始线程数量,则开始回收线程池中的空闲线程
if (duration.count() > THREAD_MAX_IDLE_TIME && curThreadSize_ > initThreadSize_) {
// 从线程集合中删除当前线程
threads_.erase(threadId);
// 更新空闲线程的数量
idleThreadSize_--;
// 更新当前线程池的线程数量
curThreadSize_--;
// 打印日志信息
std::cout << "idle thread " << std::this_thread::get_id() << " exited." << std::endl;
// 结束线程处理函数的执行,相当于结束当前线程
return;
}
}
}
// 线程池Fixed模式的处理
else {
// 等待任务队列不为空
notEmpty_.wait(lock);
}
}

// 更新空闲线程数量(在当前线程执行任务之前)
idleThreadSize_--;

// 从任务队列中获取需要执行的任务
Task task = taskQueue_.front();

// 将任务从任务队列中移除
taskQueue_.pop();
taskSize_--;

// 打印日志信息
std::cout << "thread " << std::this_thread::get_id() << " 成功获取任务..." << std::endl;

// 如果获取了任务之后,任务队列依旧不为空,则继续通知其他线程执行任务
if (taskQueue_.size() > 0) {
notEmpty_.notify_all();
}

// 因为刚获取了任务,任务队列肯定有空余位置(不满),通知用户提交任务到线程池
notFull_.notify_all();

// 释放互斥锁(在当前线程执行任务之前)
lock.unlock();

// 当前线程负责执行任务
if (task != nullptr) {
task();
}

// 更新空闲线程数量(在当前线程执行完任务之后)
idleThreadSize_++;

// 更新当前线程最后执行完任务的时间
lastTime = std::chrono::high_resolution_clock().now();
}
}

// 检查线程池的运行状态
bool checkRunningState() const {
return isPoolRuning_;
}

private:
std::unordered_map<int, std::unique_ptr<Thread>> threads_; // 线程集合
PoolMode poolMode_; // 线程池的模式
std::atomic_bool isPoolRuning_; // 表示线程池是否正在运行

size_t initThreadSize_; // 初始的线程数量
std::atomic_int idleThreadSize_; // 空闲线程的数量
std::atomic_int curThreadSize_; // 当前线程池的线程数量
int threadSizeMaxThreshHold_; // 线程池Cached模式的最大线程数量

using Task = std::function<void()>; // 类型重定义,使用Function类模板作为任务
std::queue<Task> taskQueue_; // 任务队列
std::atomic_uint taskSize_; // 当前任务队列的任务数量
size_t taskQueMaxThreshHold_; // 任务队列的最大任务数量

std::mutex taskQueMtx_; // 任务队列操作的互斥锁
std::condition_variable notFull_; // 表示任务队列不满,用于通知用户线程提交任务
std::condition_variable notEmpty_; // 表示任务队列不空,用于通知线程池中的线程执行任务
std::condition_variable allExit_; // 表示等待线程池回收所有线程
};

#endif // THREAD_POOL_H
测试代码
  • test.cpp 源文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#include<iostream>
#include<vector>
#include<thread>
#include"thread_pool.h"

// 类型重定义
using ULong = unsigned long long;

// 计算逻辑
ULong sum(ULong begin, ULong end) {
std::cout << "execute task by thread " << std::this_thread::get_id() << std::endl;

// 当前线程执行加法计算
ULong sum = 0;
for (ULong i = begin; i <= end; ++i) {
sum += i;
}

// 模拟任务执行耗时
std::this_thread::sleep_for(std::chrono::seconds(5));

// 返回计算结果
return sum;
}

int main() {
// 线程池的工作模式
int poolMode = -1;

while (true) {
// 获取用户输入
std::cout << "请选择线程池的工作模式,0 - Fixed,1 - Cached" << std::endl;
std::cin >> poolMode;

// 判断是否为非法输入
if (std::cin.fail()) {
// 清除错误标志位
std::cin.clear();
// 丢弃错误输入
std::cin.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
std::cout << "输入无效,请重新输入一个合法的数字!\n" << std::endl;
} else {
// 清空输入缓冲区
std::cin.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
break;
}
}

// 局部作用域开始
{
// 创建线程池
ThreadPool pool;

// 设置线程池的工作模式
if (poolMode == 0) {
// Fixed模式(固定大小线程池)
pool.setMode(PoolMode::MODE_FIXED);
std::cout << "线程池工作模式:Fixed" << std::endl;
} else {
// Cached模式(缓存线程池)
pool.setMode(PoolMode::MODE_CACHED);

// 设置线程池Cached模式的最大线程数量
pool.setThreadSizeMaxThreshHold(8);

std::cout << "线程池工作模式:Cached" << std::endl;
}

// 启动线程池(指定初始的线程数量)
pool.start(4);

// 存储与任务关联的Future
std::vector<std::future<ULong>> results;

ULong begin = 0;
ULong end = 0;
ULong step = 100000;

// 提交多个任务
for (int i = 0; i < 10; i++) {
// 计算区间
begin = end + 1;
end = begin + step - 1;

// 提交任务
std::future<ULong> result = pool.submitTask(sum, begin, end);

// 存储与任务关联的Future
results.emplace_back(std::move(result));
}

// 统计任务执行结果
ULong sum = 0;
for (int i = 0; i < results.size(); i++) {
// 阻塞等待任务执行完成,并获取任务执行结果
ULong result = results[i].get();
sum += result;
}

// 输出并行计算结果
std::cout << "==> 计算结果:1 + 2 + ... + " << end << " = " << sum << std::endl;

// 如果是Cached模式,则等待一段时间,触发线程池回收空闲线程
if (poolMode == 1) {
std::cout << "==> 等待空闲线程被回收(默认的最大空闲时间是60秒)..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(THREAD_MAX_IDLE_TIME + 5));
}

} // 局部作用域结束,线程池自动析构,回收线程池中的所有线程

// 阻塞主线程,直到用户按下任意键才结束程序
char c = getchar();

return 0;
}
  • Linux 平台编译测试代码,生成并运行可执行测试程序(使用的 g++ 版本是 12.2.0
1
2
3
4
5
6
7
8
# 进入项目的源码根目录
cd c++-17-thread-pool

# 编译生成可执行测试程序
g++ -Iinclude src/test.cpp -o thread_pool_test -pthread -std=c++17

# 运行可执行测试程序
./thread_pool_test
  • 程序运行的输出结果(线程池使用 Fixed 模式)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
线程池工作模式:Fixed
thread 11872 等待获取任务...
thread 11872 成功获取任务...
execute task by thread 11872
thread 480 等待获取任务...
thread 480 成功获取任务...
execute task by thread 480
thread 9140 等待获取任务...
thread 9140 成功获取任务...
execute task by thread 9140
thread 2620 等待获取任务...
thread 2620 成功获取任务...
execute task by thread 2620
thread 480 等待获取任务...
thread 480 成功获取任务...
execute task by thread 480
thread 9140 等待获取任务...
thread 9140 成功获取任务...
execute task by thread 9140
thread 11872 等待获取任务...
thread 11872 成功获取任务...
execute task by thread 11872
thread 2620 等待获取任务...
thread 2620 成功获取任务...
execute task by thread 2620
thread 9140 等待获取任务...
thread 9140 成功获取任务...
execute task by thread 9140
thread 11872 等待获取任务...
thread 11872 成功获取任务...
execute task by thread 11872
thread 2620 等待获取任务...
thread 480 等待获取任务...
thread 11872 等待获取任务...
thread 9140 等待获取任务...
==> 计算结果:1 + 2 + ... + 1000000 = 500000500000
thread pool destroy, thread 480 exited.
thread pool destroy, thread 11872 exited.
thread pool destroy, thread 2620 exited.
thread pool destroy, thread 9140 exited.
>>> 按下任意键,然后再按下回车键结束程序
  • 程序运行的输出结果(线程池使用 Cached 模式)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
线程池工作模式:Cached
expand and create new thread.
expand and create new thread.
expand and create new thread.
expand and create new thread.
thread 15144 等待获取任务...
thread 15144 成功获取任务...
execute task by thread 15144
thread 11812 等待获取任务...
thread 11812 成功获取任务...
execute task by thread 11812
thread 20864 等待获取任务...
thread 20864 成功获取任务...
execute task by thread 20864
thread 20812 等待获取任务...
thread 20812 成功获取任务...
execute task by thread 20812
thread 3904 等待获取任务...
thread 3904 成功获取任务...
execute task by thread 3904
thread 15616 等待获取任务...
thread 15616 成功获取任务...
execute task by thread 15616
thread 9800 等待获取任务...
thread 9800 成功获取任务...
execute task by thread 9800
thread 480 等待获取任务...
thread 480 成功获取任务...
execute task by thread 480
thread 11812 等待获取任务...
thread 11812 成功获取任务...
execute task by thread 11812
thread 20864 等待获取任务...
thread 20864 成功获取任务...
execute task by thread 20864
thread 20812 等待获取任务...
thread 15144 等待获取任务...
thread 15616 等待获取任务...
thread 9800 等待获取任务...
thread 3904 等待获取任务...
thread 480 等待获取任务...
thread 11812 等待获取任务...
thread 20864 等待获取任务...
==> 计算结果:1 + 2 + ... + 1000000 = 500000500000
==> 等待空闲线程被回收(默认的最大空闲时间是60秒)...
idle thread 15616 exited.
idle thread 9800 exited.
idle thread 480 exited.
idle thread 15144 exited.
thread pool destroy, thread 20864 exited.
thread pool destroy, thread 11812 exited.
thread pool destroy, thread 20812 exited.
thread pool destroy, thread 3904 exited.
>>> 按下任意键,然后再按下回车键结束程序
特别注意

动态链接库说明

由于上面基于 C++ 17 实现的线程池,其核心源码全部都写在头文件中,也就是说 thread_pool.h 是纯 Header-Only(例如类模板 / 函数模板全部实现都在头文件中),因此该线程池项目是无法编译成动态链接库(比如 .so)的。如果一定要编译成动态链接库,可以将线程池的核心代码逻辑移植到 .cpp 源文件,然后再编译成动态链接库。由于篇幅有限,这里不再累述。

常见问题

问题描述

  • (1) 基于 C++ 11 / C++ 17 实现线程池,当线程池准备结束运行(销毁),需要等待线程池中所有线程退出时,发生线程死锁问题,导致应用进程无法正常退出。
  • (2) 基于 C++ 11 实现线程池,在 Windows 平台下可以正常运行,但在 Linux 平台下运行时自定义实现的信号量类型 Semaphore 会出现线程死锁问题(原因是 condition_variable 的析构函数为空导致,一旦 Result 对象比 Task 对象早被析构就会出现线程死锁)。

提示

这里提到的两个线程死锁问题,在上面给出的 C++ 11 / C++ 17 线程池代码中已经解决了。

问题定位

定位死锁问题时,通常可以通过 gdb 附加(attach)到正在运行的目标进程,然后使用以下命令进行分析:

  • 使用 ps 命令查找目标进程的 ID;
  • 使用 gdb attch <pid> 命令附加到目标进程;
  • 使用 info threads 命令查看当前进程中所有线程的状态及线程 ID;
  • 使用 thread <tid> 命令切换到指定的线程;
  • 使用 bt 命令查看当前线程的调用堆栈信息,判断其是否被阻塞、在等待互斥锁(mutex)、或处于循环等待状态;
  • 重复以上操作,依次分析所有可疑线程的调用堆栈信息,找出多个线程相互等待资源的典型死锁原因。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 查看目标进程的 ID
ps -aux|grep thread_pool

# GDB 附加的目标进程
$ gdb attach <pid>

# GDB 调试操作(查看目标进程中的所有线程)
> (gdb) info threads

# GDB 调试操作(切换到指定的线程)
> (gdb) threads <tid>

# GDB 调试操作(查看当前线程的调用堆栈信息)
> (gdb) bt

结合项目源码和线程之间的加锁、解锁逻辑,定位到具体发生死锁的代码片段后,可以进一步分析导致死锁的根本原因,常见的原因包括:

  • 多个线程交叉持有多个锁,且锁的获取顺序不一致;
  • 忽略了可能抛异常或提前返回,导致某个线程未能正确释放锁;
  • 加锁和等待条件变量的时机不当等;
  • 多个资源需要同时加锁时,未采用统一锁策略(比如没有统一使用 lock()unique_lock)。

问题解决

  • 规范加锁顺序,避免循环依赖;
  • 拆分锁或减少锁的粒度;
  • 使用 try_lock() 等避免阻塞的锁操作;
  • 引入超时机制,防止死锁的出现;
  • 在合适的场景下,可以引入无锁并发编程模型(如基于 CAS 的算法),以提升性能并减少锁竞争带来的开销。

项目输出总结

应用到企业项目中

  • 耗时任务处理
  • 高并发高性能网络服务器
  • Master-Slave 线程模型(可用于并行计算)

输出到求职简历上

  • 项目名称

    • 基于可变参数模板实现线程池
  • 项目描述

    • 基于可变参数模板和引用折叠原理,实现线程池 submitTask 接口,支持任意任务函数和任意参数的传递
    • 使用 future 类型定制 submitTask 接口提交任务的返回值
    • 使用 mapqueue 容器管理线程对象和任务
    • 基于条件变量 condition_variable 和互斥锁 mutex 实现任务提交线程和任务执行线程之间的通信机制
    • 支持 Fixed 和 Cached 模式的线程池
    • ……(自由发挥)
  • 项目问题

    • 遇到的问题
      • 基于 C++ 11 / C++ 17 实现线程池,当线程池准备结束运行(销毁),需要等待线程池中所有线程退出时,发生线程死锁问题,导致应用进程无法正常退出。
      • 基于 C++ 11 实现线程池,在 Windows 平台下可以正常运行,但在 Linux 平台下运行时自定义实现的信号量类型会出现线程死锁问题(原因是 condition_variable 的析构函数为空导致,一旦 Result 对象比 Task 对象早被析构就会出现线程死锁)。
    • 问题的定位
      • 使用 ps 命令查找目标进程的 ID;
      • 使用 gdb attch <pid> 命令附加到目标进程;
      • 使用 info threads 命令查看当前进程中所有线程的状态及线程 ID;
      • 使用 thread <tid> 命令切换到指定的线程;
      • 使用 bt 命令查看当前线程的调用堆栈信息,判断其是否被阻塞、在等待互斥锁(mutex)、或处于循环等待状态;
      • 重复以上操作,依次分析所有可疑线程的调用堆栈信息,找出多个线程相互等待资源的典型死锁原因。
    • 问题的解决
      • 结合项目源码和线程之间的加锁、解锁逻辑,定位到具体发生死锁的代码片段后,可以进一步分析导致死锁的根本原因,常见的原因包括:
      • (1) 多个线程交叉持有多个锁,且锁的获取顺序不一致。
      • (2) 忽略了可能抛异常或提前返回,导致某个线程未能正确释放锁。
      • (3) 加锁和等待条件变量的时机不当等。
      • (4) 多个资源需要同时加锁时,未采用统一锁策略(比如没有统一使用 lock()unique_lock)。

参考资料