前言 本文将基于 C++ 手写一个线程池,并分别提供 C++ 11 和 C++ 17 两种版本的线程池实现。线程池作为五大池之一(内存池、连接池、协程池、线程池、进程池),应用非常广泛,不管是客户端程序,还是后台服务程序,都是提高业务处理能力的必备模块。有很多开源的线程池实现,虽然各自接口在使用上稍有区别,但是其核心实现原理都是基本相同的。
知识背景 在基于 C++ 手写线程池之前,应该熟悉并掌握以下技术内容:
熟练基于 C++ 11 的面向对象编程 熟悉组合和继承、继承多态、STL 容器、智能指针、函数对象、绑定器、lambda
表达式、可变参数模板编程等。 熟悉 C++ 11 多线程编程,比如线程互斥、线程同步、原子操作、CAS 等。 熟悉 thread
、mutex
、unique_lock
、condition_variable
、atomic
等。 熟悉 C++ 17 和 C++ 20 的新特性,比如 C++ 17 的 any
类型和 C++ 20 的 counting_semaphore
信号量类型等。 开发工具 本文使用以下工具来开发 C++ 线程池项目:
C++ 11 / C++ 17 Visual Studio 2019 CMake 构建编译环境 GDB 调试分析定位线程死锁问题 Linux 编译项目生成动态链接库(.so
) 版本特性 C++ 各版本支持的特性如下:
特性名称 对应头文件 最低支持的 C++ 标准版本 说明 std::make_unique
<memory>
C++ 14 C++ 14 引入,用于简洁安全地创建 unique_ptr
智能指针。 std::any
<any>
C++ 17 提供类型安全的类型擦除容器。 std::counting_semaphore
<semaphore>
C++ 20 C++ 20 引入的信号量机制。 std::packaged_task
<future>
C++ 11 将可调用对象封装起来,并用于异步执行。 std::future
<future>
C++ 11 异步操作的结果获取机制,可以与 std::async
、std::promise
搭配使用。 可变参数模板 语言特性 C++ 11 支持模板中参数数量可变,用于构建灵活函数模板,如递归参数展开等。
提示
如果使用的是 GCC、Clang 或 MSVC 编译器,需确保编译器版本也支持相应的 C++ 标准。若需进一步查询编译器支持的标准或特性情况,可以参考 C/C++ 参考手册 。
兼容平台 本文提供的所有 C++ 线程池代码都可以兼容 Windows 和 Linux 平台,并分别提供 C++ 11 和 C++ 17 两种版本的线程池实现。
核心概念 并发与并行 并发(Concurrency)
在单核处理器上,多个线程之间通过操作系统的调度机制交替执行,每个线程轮流占用 CPU 的时间片(例如每 10 毫秒切换一次)。由于每个线程执行的时间片非常短,人们在宏观上感受到这些线程像是 “同时” 在执行一样,虽然它们在物理层面上是串行执行的。这种 “看起来同时执行” 的场景被称为并发。 并行(Parallelism)
在多核处理器或多 CPU 系统中,多个线程可以被分配到不同的核心上同时执行,彼此之间互不抢占 CPU 时间资源,是真正意义上的 “同时执行”。这种多个任务在物理层面并行的执行方式称为并行。 总结
简而言之,并发是 "逻辑上的同时发生",并行是 "物理上的同时进行"。
多线程的适用场景 多线程程序的性能就一定好吗?不一定,要看具体的应用场景:
I/O 密集型场景
对于涉及大量 I/O 操作(如磁盘读写、网络通信、数据库访问等)的程序,这些操作通常会导致线程阻塞,从而释放 CPU 时间片。此时,通过多线程并发处理,可以有效利用 CPU 的空闲时间,提高程序的吞吐量和响应速度。因此,无论是 CPU 单核、CPU 多核、多 CPU,多线程通常都能带来较好的性能提升,因此适合运行多线程程序。 CPU 密集型场景
CPU 单核在 CPU 单核中,多个线程无法真正并行执行,只能通过 CPU 时间片轮转实现 “伪并发”。多线程会导致频繁的线程上下文切换,增加 CPU 调度开销,甚至可能因为线程数量过多而拖慢整体性能。在这种情况下,使用单线程可能反而更高效,因此不适合运行多线程程序 。 CPU 多核、多 CPU 多个线程可以被调度到多个核心上并行运行,从而充分利用计算资源,提高程序的运行效率。对于 CPU 密集型任务(如图像处理、大规模计算等),多线程可以显著提升性能,尤其是在合理控制线程数量、避免过度竞争的前提下,因此适合运行多线程程序。 多线程的协作机制 在 C++ 中,为了实现多线程之间的正确协作与安全访问共享资源,通常需要使用线程互斥与线程同步机制。
线程互斥 线程互斥用于防止多个线程同时访问共享资源,从而避免数据竞争和数据不一致的问题。常用机制包括:
mutex
互斥锁
提供基本的加锁与解锁操作(lock()
/ unlock()
或 lock_guard
/ unique_lock
自动管理),确保同一时间只有一个线程能够访问临界区(共享资源)。 recursive_mutex
递归互斥锁
支持同一个线程多次对同一个互斥锁加锁,适用于递归函数调用场景。 atomic
原子类型
提供无锁的并发访问机制,支持原子操作,如自增、比较交换等,适用于简单共享变量的并发访问,性能较高。 线程同步 线程同步用于协调多个线程之间的执行顺序,例如一个线程等待另一个线程完成某项任务后再继续执行。常用机制包括:
多线程的额外开销 为了完成任务,创建很多的线程可以吗?线程真的是越多越好吗?答案是否定的。
线程的创建和销毁都是非常 “重” 的操作(涉及用户空间和内核空间的切换); 线程栈(通常为 8MB)本身占用大量内存空间; 线程的上下文切换需要占用大量时间; 大量线程同时唤醒,会导致操作系统经常出现锯齿状负载或者瞬间负载量很大,造成系统宕机。 特别注意
在 Linux 平台下,通过 pthread
库默认最多可以创建约 380
个线程。若希望创建更多的线程,可以降低线程栈的大小(比如 8KB),还可以增大 Linux 系统的最大可打开文件描述符数(ulimit -n
),或者最大可创建的用户进程数 / 线程数(ulimit -u
)。
线程池的介绍 线程池的使用优势 在操作系统中,线程的创建和销毁都是较为 “昂贵” 的操作,不仅耗时,而且会消耗较多的系统资源。如果在服务运行过程中,每当有任务到来就动态创建线程来执行,任务完成后又立即销毁线程,那么在高并发或大流量场景下,频繁的线程创建与销毁会显著降低系统的实时响应能力,增加 CPU 开销,从而影响整体业务的处理效率。线程池的出现正是为了解决这一问题。在线程池机制中,服务进程在启动阶段就会预先创建好一组可复用的线程(即线程池),这些线程会在后台处于等待状态。当业务任务到来时,系统无需重新创建线程,而是直接从线程池中取出一个空闲线程来执行任务(Task)。任务执行完毕后,该线程不会被销毁,而是归还给线程池,等待下一次任务(Task)的分配。
线程池带来了多种优势:减少线程创建和销毁的开销,提高系统性能; 避免线程资源耗尽,通过线程池大小限制线程总数,控制系统并发量; 提升响应速度,线程可立即复用,减少任务启动延迟; 便于线程管理和监控,统一由线程池控制线程生命周期和运行状态。 总结
线程池是一种非常重要的并发编程工具,特别适用于高并发、高吞吐、对实时性有要求的业务场景。
线程池的两种模式 Fixed 模式线程池(固定大小线程池)
在这种模式下,线程池中的线程数量在创建时就被固定,且在整个线程池的生命周期内保持不变。一般在初始化线程池时,会根据当前机器的 CPU 核心数量或业务需求设定一个合理的线程数。 当有任务到来时,如果线程池中有空闲线程,则立即分配执行;如果线程都处于忙碌状态,则任务会被放入等待队列中,排队等待有空闲的线程。 Fixed 模式适合任务量较为稳定、对系统资源可控性要求较高的场景,有助于防止线程数量膨胀导致系统负载过高。 Cached 模式线程池(缓存线程池)
在这种模式下,线程池的线程数量不是固定的,而是可以根据实际任务量动态增长。当有新的任务到来时,如果线程池中没有空闲线程,会临时创建新的线程来处理任务,并在任务完成后将其归还到线程池中。 为了防止线程无限增长,通常会设置一个线程最大数量的阀值。此外,如果某个线程在空闲超过指定时间(如 60 秒)后仍无新任务可执行,它将被自动销毁,从而释放资源。线程池会始终保留初始的核心线程数,用于处理正常业务流量。 Cached 模式适用于任务处理量波动较大、流量突发性强的场景,可以在短时间内快速扩展线程池以应对高并发流量,但需要注意合理设置线程上限以避免资源耗尽。 线程池的实现 整体设计
功能描述 (1) 基于 C++ 可变参数模板编程和引用折叠原理,实现线程池 submitTask
接口,支持任意任务函数和任意参数的传递 (2) 使用 future
类型定制 submitTask
接口提交任务的返回值 (3) 使用 map
和 queue
容器管理线程对象和任务 (4) 基于条件变量 condition_variable
和互斥锁 mutex
实现任务提交线程和任务执行线程间的同步通信机制 (5) 支持 fixed
模式和 cached
模式的线程池定制 代码实现 C++ 11 版本 这里基于 C++ 11 手写一个线程池,并自行实现 make_unique()
、any
类型、信号量类型。
代码下载
C++ 11 实现线程池的完整案例代码可以从 这里 下载得到,所有案例代码都兼容 Windows 和 Linux 平台。
核心代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 #ifndef EXTEND_H #define EXTEND_H #include <memory> #include <utility> #include <type_traits> namespace extend { template <typename T, typename ... Args> typename std::enable_if<!std::is_array<T>::value, std::unique_ptr<T>>::type make_unique (Args &&... args) { return std::unique_ptr<T>(new T (std::forward<Args>(args)...)); } template <typename T> typename std::enable_if<std::is_array<T>::value && std::extent<T>::value == 0 , std::unique_ptr<T>>::type make_unique (std::size_t size) { using ElementType = typename std::remove_extent<T>::type; return std::unique_ptr<T>(new ElementType[size]()); } template <typename T, typename ... Args> typename std::enable_if<(std::extent<T>::value != 0 ), void >::type make_unique (Args &&...) = delete ;} class Any {public : Any () = default ; ~Any () = default ; Any (const Any &) = delete ; Any &operator =(const Any &) = delete ; Any (Any&& other) = default ; Any &operator =(Any &&other) = default ; template <typename T> Any (T&& data) : base_(extend::make_unique<Derive<typename std::decay<T>::type>>(std::forward<T>(data))) { } template <typename T> T cast () { if (base_ == nullptr ) { throw std::runtime_error ("Any is empty" ); } Derive <T> *p = dynamic_cast <Derive <T> *>(base_.get ()); if (p == nullptr ) { throw std::runtime_error ("type is unmatch!" ); } return p->getData (); } private : class Base { public : virtual ~Base () = default ; }; template <typename T> class Derive : public Base { public : template <typename U> Derive (U &&data) : data_(std::forward<U>(data)) { } ~Derive () { } T getData () const { return data_; } private : T data_; }; private : std::unique_ptr<Base> base_; }; class Semaphore {public : Semaphore (int limit = 0 ) : limit_ (limit), isDestroyed (false ) { } ~Semaphore () { isDestroyed = true ; }; void wait () { if (!isDestroyed) { std::unique_lock<std::mutex> lock (mtx_) ; cond_.wait (lock, [this ]() { return limit_ > 0 ; }); limit_--; } } void post () { if (!isDestroyed) { std::unique_lock<std::mutex> lock (mtx_) ; limit_++; cond_.notify_all (); } } private : int limit_; std::mutex mtx_; std::condition_variable cond_; std::atomic_bool isDestroyed; }; #endif
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 #ifndef THREAD_POOL_H #define THREAD_POOL_H #include <iostream> #include <functional> #include <vector> #include <queue> #include <memory> #include <atomic> #include <mutex> #include <condition_variable> #include <stdexcept> #include <unordered_map> #include <climits> #include "extend.h" const int INIT_THREAD_SIZE = std::thread::hardware_concurrency ();const int TASK_MAX_THRESHHOLD = INT_MAX;const int THREAD_SIZE_MAX_THRESHHOLD = 1024 ;const int THREAD_MAX_IDLE_TIME = 60 ;enum class PoolMode { MODE_FIXED, MODE_CACHED }; class Result ;class Task {public : Task (); ~Task () = default ; virtual Any run () = 0 ; void exec () ; void setResult (Result *p) ; private : Result *result_; }; class Result {public : Result (std::shared_ptr<Task> task, bool isValid = true ); ~Result () = default ; Any get () ; void setVal (Any data) ; bool isValid () const ; bool isFinished () const ; private : Any data_; Semaphore sem_; std::atomic_bool isValid_; std::shared_ptr<Task> task_; std::atomic_bool isFinished_; }; class Thread {public : using ThreadHandler = std::function<void (int )>; Thread (ThreadHandler handler); ~Thread (); void start () ; int getId () const ; private : int threadId_; static int generateId_; ThreadHandler threadHandler_; }; class ThreadPool {public : ThreadPool (); ~ThreadPool (); void setMode (PoolMode mode) ; void setThreadSizeMaxThreshHold (int threshhold) ; void setTaskQueMaxThreshHold (size_t threshhold) ; void start (int initThreadSize = INIT_THREAD_SIZE) ; std::shared_ptr<Result> submitTask (std::shared_ptr<Task> task) ; ThreadPool (const ThreadPool &) = delete ; ThreadPool &operator =(const ThreadPool &) = delete ; private : void threadHandler (int threadId) ; bool checkRunningState () const ; void cleanTaskResult () ; private : std::unordered_map<int , std::unique_ptr<Thread>> threads_; PoolMode poolMode_; std::atomic_bool isPoolRuning_; size_t initThreadSize_; std::atomic_int idleThreadSize_; std::atomic_int curThreadSize_; int threadSizeMaxThreshHold_; std::queue<std::shared_ptr<Task>> taskQueue_; std::atomic_uint taskSize_; size_t taskQueMaxThreshHold_; std::mutex taskQueMtx_; std::condition_variable notFull_; std::condition_variable notEmpty_; std::condition_variable allExit_; std::vector<std::shared_ptr<Result>> taskResults_; std::mutex taskResultsMtx_; }; #endif
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 #include <iostream> #include <algorithm> #include <functional> #include <thread> #include <memory> #include <mutex> #include "thread_pool.h" Task::Task () : result_ (nullptr ) { } void Task::exec () { Any data = run (); if (result_ != nullptr ) { result_->setVal (std::move (data)); } } void Task::setResult (Result *p) { result_ = p; } Result::Result (std::shared_ptr<Task> task, bool isValid) : task_ (task), isValid_ (isValid), isFinished_ (false ) { task->setResult (this ); } Any Result::get () { if (!isValid_) { return "" ; } sem_.wait (); return std::move (data_); } void Result::setVal (Any data) { data_ = std::move (data); isFinished_ = true ; sem_.post (); } bool Result::isValid () const { return isValid_; } bool Result::isFinished () const { return isFinished_; } Thread::Thread (ThreadHandler handler) : threadHandler_ (handler), threadId_ (generateId_++) { } Thread::~Thread () { } void Thread::start () { std::thread t (threadHandler_, threadId_) ; t.detach (); } int Thread::getId () const { return threadId_; } int Thread::generateId_ = 0 ;ThreadPool::ThreadPool () { idleThreadSize_ = 0 ; curThreadSize_ = INIT_THREAD_SIZE; initThreadSize_ = INIT_THREAD_SIZE; threadSizeMaxThreshHold_ = THREAD_SIZE_MAX_THRESHHOLD; taskSize_ = 0 ; taskQueMaxThreshHold_ = TASK_MAX_THRESHHOLD; poolMode_ = PoolMode::MODE_FIXED; isPoolRuning_ = false ; } ThreadPool::~ThreadPool () { isPoolRuning_ = false ; std::unique_lock<std::mutex> lock (taskQueMtx_) ; notEmpty_.notify_all (); allExit_.wait (lock, [this ]() { return threads_.size () == 0 ; }); } void ThreadPool::setMode (PoolMode mode) { if (!checkRunningState ()) { poolMode_ = mode; } } void ThreadPool::setThreadSizeMaxThreshHold (int threshhold) { if (PoolMode::MODE_CACHED == poolMode_ && !checkRunningState ()) { threadSizeMaxThreshHold_ = threshhold; } } void ThreadPool::setTaskQueMaxThreshHold (size_t threshhold) { if (!checkRunningState ()) { taskQueMaxThreshHold_ = threshhold; } } bool ThreadPool::checkRunningState () const { return isPoolRuning_; } void ThreadPool::cleanTaskResult () { std::lock_guard<std::mutex> resultLock (taskResultsMtx_) ; auto new_end = std::remove_if (taskResults_.begin (), taskResults_.end (), [](const std::shared_ptr<Result> &res) { return res->isFinished (); }); taskResults_.erase (new_end, taskResults_.end ()); } void ThreadPool::start (int initThreadSize) { isPoolRuning_ = true ; initThreadSize_ = initThreadSize; curThreadSize_ = initThreadSize; for (int i = 0 ; i < initThreadSize_; i++) { std::unique_ptr<Thread> thread = extend::make_unique<Thread>(std::bind (&ThreadPool::threadHandler, this , std::placeholders::_1)); threads_.emplace (thread->getId (), std::move (thread)); } for (int i = 0 ; i < initThreadSize_; i++) { threads_[i]->start (); idleThreadSize_++; } } void ThreadPool::threadHandler (int threadId) { auto lastTime = std::chrono::high_resolution_clock ().now (); for (;;) { std::unique_lock<std::mutex> lock (taskQueMtx_) ; std::cout << "thread " << std::this_thread::get_id () << " 等待获取任务..." << std::endl; while (taskQueue_.size () == 0 ) { if (!checkRunningState ()) { threads_.erase (threadId); allExit_.notify_all (); std::cout << "thread pool destroy, thread " << std::this_thread::get_id () << " exited." << std::endl; return ; } if (PoolMode::MODE_CACHED == poolMode_) { std::cv_status waitResult = notEmpty_.wait_for (lock, std::chrono::seconds (1 )); if (std::cv_status::timeout == waitResult) { auto nowTime = std::chrono::high_resolution_clock ().now (); auto duration = std::chrono::duration_cast<std::chrono::seconds>(nowTime - lastTime); if (duration.count () > THREAD_MAX_IDLE_TIME && curThreadSize_ > initThreadSize_) { threads_.erase (threadId); idleThreadSize_--; curThreadSize_--; std::cout << "idle thread " << std::this_thread::get_id () << " exited." << std::endl; return ; } } } else { notEmpty_.wait (lock); } } idleThreadSize_--; std::shared_ptr<Task> task = taskQueue_.front (); taskQueue_.pop (); taskSize_--; std::cout << "thread " << std::this_thread::get_id () << " 成功获取任务..." << std::endl; if (taskQueue_.size () > 0 ) { notEmpty_.notify_all (); } notFull_.notify_all (); lock.unlock (); if (task != nullptr ) { task->exec (); } idleThreadSize_++; cleanTaskResult (); lastTime = std::chrono::high_resolution_clock ().now (); } } std::shared_ptr<Result> ThreadPool::submitTask (std::shared_ptr<Task> task) { std::unique_lock<std::mutex> lock (taskQueMtx_) ; bool waitResult = notFull_.wait_for (lock, std::chrono::seconds (1 ), [this ]() { return taskQueue_.size () < taskQueMaxThreshHold_; }); if (!waitResult) { std::cerr << "task queue is full, submit task failed." ; std::shared_ptr<Result> result = std::make_shared<Result>(task, false ); std::unique_lock<std::mutex> resultLock (taskResultsMtx_) ; taskResults_.emplace_back (result); return result; } taskQueue_.emplace (task); taskSize_++; notEmpty_.notify_all (); if (PoolMode::MODE_CACHED == poolMode_ && taskSize_ > idleThreadSize_ && curThreadSize_ < threadSizeMaxThreshHold_) { std::cout << "expand and create new thread." << std::endl; std::unique_ptr<Thread> thread = extend::make_unique<Thread>(std::bind (&ThreadPool::threadHandler, this , std::placeholders::_1)); int threadId = thread->getId (); threads_.emplace (threadId, std::move (thread)); threads_[threadId]->start (); idleThreadSize_++; curThreadSize_++; } std::shared_ptr<Result> result = std::make_shared<Result>(task); std::unique_lock<std::mutex> resultLock (taskResultsMtx_) ; taskResults_.emplace_back (result); return result; }
测试代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 #include <iostream> #include <vector> #include <thread> #include "thread_pool.h" using ULong = unsigned long long ;class CalculateTask : public Task {public : CalculateTask (ULong begin, ULong end) : begin_ (begin), end_ (end) { } virtual Any run () override { std::cout << "execute task by thread " << std::this_thread::get_id () << std::endl; ULong sum = 0 ; for (ULong i = begin_; i <= end_; ++i) { sum += i; } std::this_thread::sleep_for (std::chrono::seconds (5 )); return Any (sum); } private : ULong begin_; ULong end_; }; int main () { int poolMode = -1 ; while (true ) { std::cout << "请选择线程池的工作模式,0 - Fixed,1 - Cached" << std::endl; std::cin >> poolMode; if (std::cin.fail ()) { std::cin.clear (); std::cin.ignore (std::numeric_limits<std::streamsize>::max (), '\n' ); std::cout << "输入无效,请重新输入一个合法的数字!\n" << std::endl; } else { std::cin.ignore (std::numeric_limits<std::streamsize>::max (), '\n' ); break ; } } { ThreadPool pool; if (poolMode == 0 ) { pool.setMode (PoolMode::MODE_FIXED); std::cout << "线程池工作模式:Fixed" << std::endl; } else { pool.setMode (PoolMode::MODE_CACHED); pool.setThreadSizeMaxThreshHold (8 ); std::cout << "线程池工作模式:Cached" << std::endl; } pool.start (4 ); std::vector<std::shared_ptr<Result>> results; ULong begin = 0 ; ULong end = 0 ; ULong step = 100000 ; for (int i = 0 ; i < 10 ; i++) { begin = end + 1 ; end = begin + step - 1 ; std::shared_ptr<Task> task = std::make_shared<CalculateTask>(begin, end); std::shared_ptr<Result> result = pool.submitTask (task); if (result->isValid ()) { results.emplace_back (result); } } ULong sum = 0 ; for (int i = 0 ; i < results.size (); i++) { ULong result = results[i]->get ().cast<ULong>(); sum += result; } std::cout << "==> 计算结果:1 + 2 + ... + " << end << " = " << sum << std::endl; if (poolMode == 1 ) { std::cout << "==> 等待空闲线程被回收(默认的最大空闲时间是60秒)..." << std::endl; std::this_thread::sleep_for (std::chrono::seconds (THREAD_MAX_IDLE_TIME + 5 )); } } char c = getchar (); return 0 ; }
Linux 平台编译测试代码,生成并运行可执行测试程序(使用的 g++
版本是 4.8.5
) 1 2 3 4 5 6 7 8 cd c++-11-thread-poolg++ -Iinclude src/thread_pool.cpp src/test.cpp -o thread_pool_test -pthread -std=c++11 ./thread_pool_test
程序运行的输出结果(线程池使用 Fixed 模式) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 线程池工作模式:Fixed thread 11872 等待获取任务... thread 11872 成功获取任务... execute task by thread 11872 thread 480 等待获取任务... thread 480 成功获取任务... execute task by thread 480 thread 9140 等待获取任务... thread 9140 成功获取任务... execute task by thread 9140 thread 2620 等待获取任务... thread 2620 成功获取任务... execute task by thread 2620 thread 480 等待获取任务... thread 480 成功获取任务... execute task by thread 480 thread 9140 等待获取任务... thread 9140 成功获取任务... execute task by thread 9140 thread 11872 等待获取任务... thread 11872 成功获取任务... execute task by thread 11872 thread 2620 等待获取任务... thread 2620 成功获取任务... execute task by thread 2620 thread 9140 等待获取任务... thread 9140 成功获取任务... execute task by thread 9140 thread 11872 等待获取任务... thread 11872 成功获取任务... execute task by thread 11872 thread 2620 等待获取任务... thread 480 等待获取任务... thread 11872 等待获取任务... thread 9140 等待获取任务... ==> 计算结果:1 + 2 + ... + 1000000 = 500000500000 thread pool destroy, thread 480 exited. thread pool destroy, thread 11872 exited. thread pool destroy, thread 2620 exited. thread pool destroy, thread 9140 exited. >>> 按下任意键,然后再按下回车键结束程序
程序运行的输出结果(线程池使用 Cached 模式) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 线程池工作模式:Cached expand and create new thread. expand and create new thread. expand and create new thread. expand and create new thread. thread 15144 等待获取任务... thread 15144 成功获取任务... execute task by thread 15144 thread 11812 等待获取任务... thread 11812 成功获取任务... execute task by thread 11812 thread 20864 等待获取任务... thread 20864 成功获取任务... execute task by thread 20864 thread 20812 等待获取任务... thread 20812 成功获取任务... execute task by thread 20812 thread 3904 等待获取任务... thread 3904 成功获取任务... execute task by thread 3904 thread 15616 等待获取任务... thread 15616 成功获取任务... execute task by thread 15616 thread 9800 等待获取任务... thread 9800 成功获取任务... execute task by thread 9800 thread 480 等待获取任务... thread 480 成功获取任务... execute task by thread 480 thread 11812 等待获取任务... thread 11812 成功获取任务... execute task by thread 11812 thread 20864 等待获取任务... thread 20864 成功获取任务... execute task by thread 20864 thread 20812 等待获取任务... thread 15144 等待获取任务... thread 15616 等待获取任务... thread 9800 等待获取任务... thread 3904 等待获取任务... thread 480 等待获取任务... thread 11812 等待获取任务... thread 20864 等待获取任务... ==> 计算结果:1 + 2 + ... + 1000000 = 500000500000 ==> 等待空闲线程被回收(默认的最大空闲时间是60秒)... idle thread 15616 exited. idle thread 9800 exited. idle thread 480 exited. idle thread 15144 exited. thread pool destroy, thread 20864 exited. thread pool destroy, thread 11812 exited. thread pool destroy, thread 20812 exited. thread pool destroy, thread 3904 exited. >>> 按下任意键,然后再按下回车键结束程序
动态链接库 为了便于将线程池库提供给第三方使用,可以将线程池项目的核心源码编译成动态链接库。下面将以 Linux 平台为例,介绍使用不同的方式将线程池项目编译为动态链接库(.so
文件)。值得一提的是,C++ 线程池项目的目录结构如下:
1 2 3 4 5 6 7 8 c++-11-thread-pool ├── CMakeLists.txt ├── include │ ├── extend.h │ └── thread_pool.h └── src ├── test.cpp └── thread_pool.cpp
GCC 编译 版本说明
本文使用的 g++
版本是 4.8.5
,若版本过低,可能会出现线程池代码编译失败的问题。
1 2 3 4 5 6 7 8 cd c++-11-thread-poolg++ -fPIC -Iinclude -shared src/thread_pool.cpp -o libthread_pool.so -pthread -std=c++11 g++ -Iinclude src/thread_pool.cpp src/test.cpp -o thread_pool_test -pthread -std=c++11
编译参数 参数说明 -fPIC
生成位置无关代码,.so
文件必须使用 -Iinclude
指定头文件目录为 include
-shared
指定编译生成共享库(动态链接库) src/thread_pool.cpp
指定源文件路径 -o libthread_pool.so
指定编译输出的文件名 -pthread
链接 pthread
库 -std=c++11
指定 C++ 版本
动态链接库的安装(可选),如果不安装,第三方程序在运行时可能会遇到找不到动态链接库的问题 1 2 3 4 5 6 7 8 9 10 11 12 13 14 cd c++-11-thread-poolsudo cp ./include/* /usr/local /include sudo cp libthread_pool.so /usr/local /lib sudo echo "/usr/local/lib" > /etc/ld.so.conf.d/custom.conf sudo ldconfig
CMake 编译 版本说明
本文使用的 g++
版本是 4.8.5
,CMake 版本是 3.25.1
,若版本过低,可能会出现线程池代码编译失败的问题。值得一提的是,CMake 的详细使用教程可以参考 这里 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 cmake_minimum_required(VERSION 3.15) # 项目名称和语言 project(cxx_11_thread_pool VERSION 1.0 LANGUAGES CXX) # 设置 C++ 版本 set(CMAKE_CXX_STANDARD 11) # 设置构建模式 set(CMAKE_BUILD_TYPE Debug CACHE STRING "" FORCE) # 设置头文件目录 include_directories(${CMAKE_SOURCE_DIR}/include) # 设置源文件 file(GLOB SRC_FILES src/*.cpp) # 编译生成动态链接库 add_library(thread_pool SHARED ${SRC_FILES}) # 设置动态链接库的文件名称 set_target_properties(thread_pool PROPERTIES OUTPUT_NAME "thread_pool" PREFIX "lib" SUFFIX ".so" POSITION_INDEPENDENT_CODE ON ) # 设置动态链接库的链接标志 if (CMAKE_SYSTEM_NAME STREQUAL "Linux") target_link_libraries(thread_pool PRIVATE pthread) endif () # 编译生成可执行测试程序 add_executable(thread_pool_test ${SRC_FILES}) # 设置可执行测试程序的链接标志 if (CMAKE_SYSTEM_NAME STREQUAL "Linux") target_link_libraries(thread_pool_test PRIVATE pthread) endif () # 设置默认的安装前缀路径 if(CMAKE_INSTALL_PREFIX_INITIALIZED_TO_DEFAULT) set(CMAKE_INSTALL_PREFIX "/usr/local" CACHE PATH "Install path prefix." FORCE) endif() # 将头文件和编译生成的动态链接库安装到指定位置(可选) install(DIRECTORY include/ DESTINATION include) install(TARGETS thread_pool DESTINATION lib)
1 2 3 4 5 cd c++-11-thread-poolcmake -S . -B build && cmake --build build
编译参数 参数说明 -S .
指定源码目录为当前目录(包含 CMakeLists.txt
) -B build
指定构建目录为 build
,不存在时会自动创建 cmake --build build
执行编译(相当于 make
)
动态链接库的安装(可选),如果不安装,第三方程序在运行时可能会遇到找不到动态链接库的问题 1 2 3 4 5 6 7 8 9 10 11 cd c++-11-thread-poolsudo cmake --install build sudo echo "/usr/local/lib" > /etc/ld.so.conf.d/custom.conf sudo ldconfig
C++ 17 版本 这里基于 C++ 17 手写一个线程池,使用了可变参数模板、make_unique()
、packaged_task
、future
特性。
代码下载
C++ 17 实现线程池的完整案例代码可以从 这里 下载得到,所有案例代码都兼容 Windows 和 Linux 平台。
核心代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 #ifndef THREAD_POOL_H #define THREAD_POOL_H #include <iostream> #include <functional> #include <vector> #include <queue> #include <unordered_map> #include <memory> #include <future> #include <atomic> #include <mutex> #include <condition_variable> #include <stdexcept> #include <climits> const int INIT_THREAD_SIZE = std::thread::hardware_concurrency ();const int TASK_MAX_THRESHHOLD = INT_MAX;const int THREAD_SIZE_MAX_THRESHHOLD = 1024 ;const int THREAD_MAX_IDLE_TIME = 60 ;enum class PoolMode { MODE_FIXED, MODE_CACHED }; class Thread {public : using ThreadHandler = std::function<void (int )>; Thread (ThreadHandler handler) : threadHandler_ (handler), threadId_ (generateId_++) { } ~Thread () { } void start () { std::thread t (threadHandler_, threadId_) ; t.detach (); } int getId () const { return threadId_; } private : int threadId_; static int generateId_; ThreadHandler threadHandler_; }; int Thread::generateId_ = 0 ;class ThreadPool {public : ThreadPool () { idleThreadSize_ = 0 ; curThreadSize_ = INIT_THREAD_SIZE; initThreadSize_ = INIT_THREAD_SIZE; threadSizeMaxThreshHold_ = THREAD_SIZE_MAX_THRESHHOLD; taskSize_ = 0 ; taskQueMaxThreshHold_ = TASK_MAX_THRESHHOLD; poolMode_ = PoolMode::MODE_FIXED; isPoolRuning_ = false ; } ~ThreadPool () { isPoolRuning_ = false ; std::unique_lock<std::mutex> lock (taskQueMtx_) ; notEmpty_.notify_all (); allExit_.wait (lock, [this ]() { return threads_.size () == 0 ; }); } void setMode (PoolMode mode) { if (!checkRunningState ()) { poolMode_ = mode; } } void setThreadSizeMaxThreshHold (int threshhold) { if (PoolMode::MODE_CACHED == poolMode_ && !checkRunningState ()) { threadSizeMaxThreshHold_ = threshhold; } } void setTaskQueMaxThreshHold (size_t threshhold) { if (!checkRunningState ()) { taskQueMaxThreshHold_ = threshhold; } } void start (int initThreadSize) { isPoolRuning_ = true ; initThreadSize_ = initThreadSize; curThreadSize_ = initThreadSize; for (int i = 0 ; i < initThreadSize_; i++) { std::unique_ptr<Thread> thread = std::make_unique<Thread>(std::bind (&ThreadPool::threadHandler, this , std::placeholders::_1)); threads_.emplace (thread->getId (), std::move (thread)); } for (int i = 0 ; i < initThreadSize_; i++) { threads_[i]->start (); idleThreadSize_++; } } template <typename Func, typename ... Args> auto submitTask (Func &&func, Args &&...args) -> std::future<decltype (func(args...)) > { using RType = decltype (func (args...)); auto task = std::make_shared<std::packaged_task<RType ()>>( std::bind (std::forward<Func>(func), std::forward<Args>(args)...)); std::future<RType> future = task->get_future (); std::unique_lock<std::mutex> lock (taskQueMtx_) ; bool waitResult = notFull_.wait_for (lock, std::chrono::seconds (1 ), [this ]() { return taskQueue_.size () < taskQueMaxThreshHold_; }); if (!waitResult) { std::cerr << "task queue is full, submit task failed." ; auto task = std::make_shared<std::packaged_task<RType ()>>([]() -> RType { return RType (); }); (*task)(); return task->get_future (); } taskQueue_.emplace ([task]() { (*task)(); }); taskSize_++; notEmpty_.notify_all (); if (PoolMode::MODE_CACHED == poolMode_ && taskSize_ > idleThreadSize_ && curThreadSize_ < threadSizeMaxThreshHold_) { std::cout << "expand and create new thread." << std::endl; std::unique_ptr<Thread> thread = std::make_unique<Thread>(std::bind (&ThreadPool::threadHandler, this , std::placeholders::_1)); int threadId = thread->getId (); threads_.emplace (threadId, std::move (thread)); threads_[threadId]->start (); idleThreadSize_++; curThreadSize_++; } return future; } ThreadPool (const ThreadPool &) = delete ; ThreadPool &operator =(const ThreadPool &) = delete ; private : void threadHandler (int threadId) { auto lastTime = std::chrono::high_resolution_clock ().now (); for (;;) { std::unique_lock<std::mutex> lock (taskQueMtx_) ; std::cout << "thread " << std::this_thread::get_id () << " 等待获取任务..." << std::endl; while (taskQueue_.size () == 0 ) { if (!checkRunningState ()) { threads_.erase (threadId); allExit_.notify_all (); std::cout << "thread pool destroy, thread " << std::this_thread::get_id () << " exited." << std::endl; return ; } if (PoolMode::MODE_CACHED == poolMode_) { std::cv_status waitResult = notEmpty_.wait_for (lock, std::chrono::seconds (1 )); if (std::cv_status::timeout == waitResult) { auto nowTime = std::chrono::high_resolution_clock ().now (); auto duration = std::chrono::duration_cast<std::chrono::seconds>(nowTime - lastTime); if (duration.count () > THREAD_MAX_IDLE_TIME && curThreadSize_ > initThreadSize_) { threads_.erase (threadId); idleThreadSize_--; curThreadSize_--; std::cout << "idle thread " << std::this_thread::get_id () << " exited." << std::endl; return ; } } } else { notEmpty_.wait (lock); } } idleThreadSize_--; Task task = taskQueue_.front (); taskQueue_.pop (); taskSize_--; std::cout << "thread " << std::this_thread::get_id () << " 成功获取任务..." << std::endl; if (taskQueue_.size () > 0 ) { notEmpty_.notify_all (); } notFull_.notify_all (); lock.unlock (); if (task != nullptr ) { task (); } idleThreadSize_++; lastTime = std::chrono::high_resolution_clock ().now (); } } bool checkRunningState () const { return isPoolRuning_; } private : std::unordered_map<int , std::unique_ptr<Thread>> threads_; PoolMode poolMode_; std::atomic_bool isPoolRuning_; size_t initThreadSize_; std::atomic_int idleThreadSize_; std::atomic_int curThreadSize_; int threadSizeMaxThreshHold_; using Task = std::function<void ()>; std::queue<Task> taskQueue_; std::atomic_uint taskSize_; size_t taskQueMaxThreshHold_; std::mutex taskQueMtx_; std::condition_variable notFull_; std::condition_variable notEmpty_; std::condition_variable allExit_; }; #endif
测试代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 #include <iostream> #include <vector> #include <thread> #include "thread_pool.h" using ULong = unsigned long long ;ULong sum (ULong begin, ULong end) { std::cout << "execute task by thread " << std::this_thread::get_id () << std::endl; ULong sum = 0 ; for (ULong i = begin; i <= end; ++i) { sum += i; } std::this_thread::sleep_for (std::chrono::seconds (5 )); return sum; } int main () { int poolMode = -1 ; while (true ) { std::cout << "请选择线程池的工作模式,0 - Fixed,1 - Cached" << std::endl; std::cin >> poolMode; if (std::cin.fail ()) { std::cin.clear (); std::cin.ignore (std::numeric_limits<std::streamsize>::max (), '\n' ); std::cout << "输入无效,请重新输入一个合法的数字!\n" << std::endl; } else { std::cin.ignore (std::numeric_limits<std::streamsize>::max (), '\n' ); break ; } } { ThreadPool pool; if (poolMode == 0 ) { pool.setMode (PoolMode::MODE_FIXED); std::cout << "线程池工作模式:Fixed" << std::endl; } else { pool.setMode (PoolMode::MODE_CACHED); pool.setThreadSizeMaxThreshHold (8 ); std::cout << "线程池工作模式:Cached" << std::endl; } pool.start (4 ); std::vector<std::future<ULong>> results; ULong begin = 0 ; ULong end = 0 ; ULong step = 100000 ; for (int i = 0 ; i < 10 ; i++) { begin = end + 1 ; end = begin + step - 1 ; std::future<ULong> result = pool.submitTask (sum, begin, end); results.emplace_back (std::move (result)); } ULong sum = 0 ; for (int i = 0 ; i < results.size (); i++) { ULong result = results[i].get (); sum += result; } std::cout << "==> 计算结果:1 + 2 + ... + " << end << " = " << sum << std::endl; if (poolMode == 1 ) { std::cout << "==> 等待空闲线程被回收(默认的最大空闲时间是60秒)..." << std::endl; std::this_thread::sleep_for (std::chrono::seconds (THREAD_MAX_IDLE_TIME + 5 )); } } char c = getchar (); return 0 ; }
Linux 平台编译测试代码,生成并运行可执行测试程序(使用的 g++
版本是 12.2.0
) 1 2 3 4 5 6 7 8 cd c++-17-thread-poolg++ -Iinclude src/test .cpp -o thread_pool_test -pthread -std =c++17 ./thread_pool_test
程序运行的输出结果(线程池使用 Fixed 模式) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 线程池工作模式:Fixed thread 11872 等待获取任务... thread 11872 成功获取任务... execute task by thread 11872 thread 480 等待获取任务... thread 480 成功获取任务... execute task by thread 480 thread 9140 等待获取任务... thread 9140 成功获取任务... execute task by thread 9140 thread 2620 等待获取任务... thread 2620 成功获取任务... execute task by thread 2620 thread 480 等待获取任务... thread 480 成功获取任务... execute task by thread 480 thread 9140 等待获取任务... thread 9140 成功获取任务... execute task by thread 9140 thread 11872 等待获取任务... thread 11872 成功获取任务... execute task by thread 11872 thread 2620 等待获取任务... thread 2620 成功获取任务... execute task by thread 2620 thread 9140 等待获取任务... thread 9140 成功获取任务... execute task by thread 9140 thread 11872 等待获取任务... thread 11872 成功获取任务... execute task by thread 11872 thread 2620 等待获取任务... thread 480 等待获取任务... thread 11872 等待获取任务... thread 9140 等待获取任务... ==> 计算结果:1 + 2 + ... + 1000000 = 500000500000 thread pool destroy, thread 480 exited. thread pool destroy, thread 11872 exited. thread pool destroy, thread 2620 exited. thread pool destroy, thread 9140 exited. >>> 按下任意键,然后再按下回车键结束程序
程序运行的输出结果(线程池使用 Cached 模式) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 线程池工作模式:Cached expand and create new thread. expand and create new thread. expand and create new thread. expand and create new thread. thread 15144 等待获取任务... thread 15144 成功获取任务... execute task by thread 15144 thread 11812 等待获取任务... thread 11812 成功获取任务... execute task by thread 11812 thread 20864 等待获取任务... thread 20864 成功获取任务... execute task by thread 20864 thread 20812 等待获取任务... thread 20812 成功获取任务... execute task by thread 20812 thread 3904 等待获取任务... thread 3904 成功获取任务... execute task by thread 3904 thread 15616 等待获取任务... thread 15616 成功获取任务... execute task by thread 15616 thread 9800 等待获取任务... thread 9800 成功获取任务... execute task by thread 9800 thread 480 等待获取任务... thread 480 成功获取任务... execute task by thread 480 thread 11812 等待获取任务... thread 11812 成功获取任务... execute task by thread 11812 thread 20864 等待获取任务... thread 20864 成功获取任务... execute task by thread 20864 thread 20812 等待获取任务... thread 15144 等待获取任务... thread 15616 等待获取任务... thread 9800 等待获取任务... thread 3904 等待获取任务... thread 480 等待获取任务... thread 11812 等待获取任务... thread 20864 等待获取任务... ==> 计算结果:1 + 2 + ... + 1000000 = 500000500000 ==> 等待空闲线程被回收(默认的最大空闲时间是60秒)... idle thread 15616 exited. idle thread 9800 exited. idle thread 480 exited. idle thread 15144 exited. thread pool destroy, thread 20864 exited. thread pool destroy, thread 11812 exited. thread pool destroy, thread 20812 exited. thread pool destroy, thread 3904 exited. >>> 按下任意键,然后再按下回车键结束程序
特别注意 动态链接库说明
由于上面基于 C++ 17 实现的线程池,其核心源码全部都写在头文件中,也就是说 thread_pool.h
是纯 Header-Only(例如类模板 / 函数模板全部实现都在头文件中),因此该线程池项目是无法编译成动态链接库(比如 .so
)的。如果一定要编译成动态链接库,可以将线程池的核心代码逻辑移植到 .cpp
源文件,然后再编译成动态链接库。由于篇幅有限,这里不再累述。
常见问题 问题描述 (1) 基于 C++ 11 / C++ 17 实现线程池,当线程池准备结束运行(销毁),需要等待线程池中所有线程退出时,发生线程死锁问题,导致应用进程无法正常退出。 (2) 基于 C++ 11 实现线程池,在 Windows 平台下可以正常运行,但在 Linux 平台下运行时自定义实现的信号量类型 Semaphore 会出现线程死锁问题(原因是 condition_variable
的析构函数为空导致,一旦 Result 对象比 Task 对象早被析构就会出现线程死锁)。 提示
这里提到的两个线程死锁问题,在上面给出的 C++ 11 / C++ 17 线程池代码中已经解决了。
问题定位 定位死锁问题时,通常可以通过 gdb
附加(attach
)到正在运行的目标进程,然后使用以下命令进行分析:
使用 ps
命令查找目标进程的 ID; 使用 gdb attch <pid>
命令附加到目标进程; 使用 info threads
命令查看当前进程中所有线程的状态及线程 ID; 使用 thread <tid>
命令切换到指定的线程; 使用 bt
命令查看当前线程的调用堆栈信息,判断其是否被阻塞、在等待互斥锁(mutex
)、或处于循环等待状态; 重复以上操作,依次分析所有可疑线程的调用堆栈信息,找出多个线程相互等待资源的典型死锁原因。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 ps -aux|grep thread_pool $ gdb attach <pid> > (gdb) info threads > (gdb) threads <tid> > (gdb) bt
结合项目源码和线程之间的加锁、解锁逻辑,定位到具体发生死锁的代码片段后,可以进一步分析导致死锁的根本原因,常见的原因包括:
多个线程交叉持有多个锁,且锁的获取顺序不一致; 忽略了可能抛异常或提前返回,导致某个线程未能正确释放锁; 加锁和等待条件变量的时机不当等; 多个资源需要同时加锁时,未采用统一锁策略(比如没有统一使用 lock()
或 unique_lock
)。 问题解决 规范加锁顺序,避免循环依赖; 拆分锁或减少锁的粒度; 使用 try_lock()
等避免阻塞的锁操作; 引入超时机制,防止死锁的出现; 在合适的场景下,可以引入无锁并发编程模型(如基于 CAS 的算法),以提升性能并减少锁竞争带来的开销。 项目输出总结 应用到企业项目中 耗时任务处理 高并发高性能网络服务器 Master-Slave 线程模型(可用于并行计算) 输出到求职简历上 项目名称
项目描述
基于可变参数模板和引用折叠原理,实现线程池 submitTask
接口,支持任意任务函数和任意参数的传递 使用 future
类型定制 submitTask
接口提交任务的返回值 使用 map
和 queue
容器管理线程对象和任务 基于条件变量 condition_variable
和互斥锁 mutex
实现任务提交线程和任务执行线程之间的通信机制 支持 Fixed 和 Cached 模式的线程池 ……(自由发挥) 项目问题
遇到的问题基于 C++ 11 / C++ 17 实现线程池,当线程池准备结束运行(销毁),需要等待线程池中所有线程退出时,发生线程死锁问题,导致应用进程无法正常退出。 基于 C++ 11 实现线程池,在 Windows 平台下可以正常运行,但在 Linux 平台下运行时自定义实现的信号量类型会出现线程死锁问题(原因是 condition_variable
的析构函数为空导致,一旦 Result 对象比 Task 对象早被析构就会出现线程死锁)。 问题的定位使用 ps
命令查找目标进程的 ID; 使用 gdb attch <pid>
命令附加到目标进程; 使用 info threads
命令查看当前进程中所有线程的状态及线程 ID; 使用 thread <tid>
命令切换到指定的线程; 使用 bt
命令查看当前线程的调用堆栈信息,判断其是否被阻塞、在等待互斥锁(mutex
)、或处于循环等待状态; 重复以上操作,依次分析所有可疑线程的调用堆栈信息,找出多个线程相互等待资源的典型死锁原因。 问题的解决结合项目源码和线程之间的加锁、解锁逻辑,定位到具体发生死锁的代码片段后,可以进一步分析导致死锁的根本原因,常见的原因包括: (1) 多个线程交叉持有多个锁,且锁的获取顺序不一致。 (2) 忽略了可能抛异常或提前返回,导致某个线程未能正确释放锁。 (3) 加锁和等待条件变量的时机不当等。 (4) 多个资源需要同时加锁时,未采用统一锁策略(比如没有统一使用 lock()
或 unique_lock
)。 参考资料