[C.C++] C++实现线程池

357 0
Honkers 2025-5-30 12:36:12 来自手机 | 显示全部楼层 |阅读模式

线程池介绍

作为五大池之一(内存池、连接池、线程池、进程池、协程池),线程池的应用非常广泛,不管事客户端程序还是后台服务端,都是提高业务处理能力的必备模块。

线程池有很多的开源实现,虽然在接口使用上优点区别,但是其核心实现原理基本相同。

线程池知识背景

  • C++面向对象的标准
    • 组合和继承、多态、STL容器、智能指针、函数对象、绑定器、可变参数模板等
  • C++11多线程编程
    • thread、mutex、atomic、condition_variable、unique_lock等
  • C++17和C++20标准
    • C++17中的any类型可以接受任何类型的参数、C++20 的semaphore信号量
  • 多线程理论
    • 多线程基本知识、线程互斥、线程同步、原子操作等。

并发和并行

  • 并发:单核上,多个线程占用不同的CPU时间片,物理上还是串行执行的,但是由于每个线程占用的CPU时间片非常短(比如10ms),看起来就像是多个线程都在共同执行一样,这样的场景称作并发(concurrent)。

  • 并行:在多核或者多CPU上,多个线程是在真正的同时执行,这样的场景称作并行(parallel)。

线程池的优势

多线程程序一定就好吗?不一定,要看具体的应用场景:

  • 多线程处理可以同时运⾏多个线程。由于多线程应⽤程序将程序划分成多个独⽴的任务,因此可以在以下⽅⾯显著提⾼性能:
    (1)多线程技术使程序的响应速度更快 ,因为⽤户界⾯可以在进⾏其它⼯作的同时⼀直处于活动状态;
    (2)当前没有进⾏处理的任务时可以将处理器时间让给其它任务;
    (3)占⽤⼤量处理时间的任务可以定期将处理器时间让给其它任务;
    (4)可以随时停⽌任务;
    (5)可以分别设置各个任务的优先级以优化性能。

  • 多线程开发的缺点:
    同样的 ,多线程也存在许多缺点 ,在考虑多线程时需要进⾏充分的考虑。多线程的主要缺点包括:
    (1)等候使⽤共享资源时造成程序的运⾏速度变慢。这些共享资源主要是独占性的资源 ,如打印机等。
    (2)对线程进⾏管理要求额外的 CPU开销。线程的使⽤会给系统带来上下⽂切换的额外负担。当这种负担超过⼀定程度时,多线程的特点主要
    表现在其缺点上,⽐如⽤独⽴的线程来更新数组内每个元素。
    (3)线程的死锁。即较长时间的等待或资源竞争以及死锁等多线程症状。
    (4)对公有变量的同时读或写。当多个线程需要对公有变量进⾏写操作时,后⼀个线程往往会修改掉前⼀个线程存放的数据,从⽽使前⼀个线程
    的参数被修改;另外 ,当公⽤变量的读写操作是⾮原⼦性时,在不同的机器上,中断时间的不确定性,会导致数据在⼀个线程内的操作产⽣错误,从⽽产⽣莫名其妙的错误,⽽这种错误是程序员⽆法预知的。

  • fixed模式线程池
    线程池里面的线程个数是固定不变的,一般是ThreadPool创建时根据当前机器的CPU核心数量进行指
    定。
    cached模式线程池
    线程池里面的线程个数是可动态增长的,根据任务的数量动态的增加线程的数量,但是会设置一个线程
    数量的阈值(线程过多的坏处上面已经讲过了),任务处理完成,如果动态增长的线程空闲了60s还没
    有处理其它任务,那么关闭线程,保持池中最初数量的线程即可。fixed模式线程池:
    线程池里面的线程个数是固定不变的,一般是ThreadPool创建时根据当前机器的CPU核心数量进行指
    定。

  • cached模式线程池:
    线程池里面的线程个数是可动态增长的,根据任务的数量动态的增加线程的数量,但是会设置一个线程
    数量的阈值(线程过多的坏处上面已经讲过了),任务处理完成,如果动态增长的线程空闲了60s还没
    有处理其它任务,那么关闭线程,保持池中最初数量的线程即可。

两个版本的线程池,第一个版本自己定义的类型较多;第二个版本多采用C++新特性提供的类型

第一个版本

线程池两个模式:

  1. //线程池支持的模式
  2. enum class PoolMode
  3. {
  4. MODE_FIXED, //固定数量的线程
  5. MODE_CACHED, //线程数量可以动态增长
  6. };
复制代码

C++17提供了Any类型 可以接受任何参数,我们第一个版本手写一个Any类型用于接受任何参数:

  1. //定义一个Any类型 可以接受任何数据类型
  2. //C++17中提供了Any类型 可以保存任何数据类型
  3. class Any
  4. {
  5. public:
  6. Any() = default;
  7. ~Any() = default;
  8. Any(const Any&) = delete;
  9. Any& operator=(const Any&) = delete;
  10. Any(Any&&) = default;
  11. Any& operator=(Any&&) = default;
  12. //构造成函数 可以接受任何数据类型
  13. template<typename T>
  14. Any(T data) :m_base(std::make_unique<Derive<T>>(data))
  15. {
  16. }
  17. template<typename T>
  18. T cast() //用来提取保存的数据
  19. {
  20. //先将保存的基类转化为子类
  21. Derive<T>* dp = dynamic_cast<Derive<T>*>(m_base.get());
  22. if (dp == nullptr)
  23. {
  24. throw "type is wrong";
  25. }
  26. return dp->m_data;
  27. }
  28. private:
  29. class Base //基类类型
  30. {
  31. public:
  32. virtual ~Base() = default;
  33. };
  34. template<typename T>
  35. class Derive :public Base //派生类类型
  36. {
  37. public:
  38. Derive(T data) :m_data(data)
  39. {
  40. }
  41. T m_data; //可以保存所有数据类型
  42. };
  43. private:
  44. //定义一个基类指针
  45. std::unique_ptr<Base> m_base;
  46. };
复制代码

C++20提供了semaphore信号量 但是之前的版本没有提供 下面我们手写一个信号量:

  1. //定义一个信号量 Semaphore在C++20中已经提供
  2. class Semaphore
  3. {
  4. public:
  5. //构造函数
  6. Semaphore(int limit = 0)
  7. :m_resLimit(limit)
  8. , m_isExit(false)
  9. {
  10. }
  11. ~Semaphore()
  12. {
  13. m_isExit = true;
  14. }
  15. //获取一个信号量资源
  16. void wait()
  17. {
  18. if (m_isExit)
  19. return;
  20. std::unique_lock<std::mutex> lock(m_mtx);
  21. //等待信号量有资源,没有资源的话 阻塞当前线程
  22. m_cond.wait(lock, [&]()->bool {return m_resLimit > 0; });
  23. m_resLimit--;
  24. }
  25. //增加一个信号量资源
  26. void post()
  27. {
  28. if (m_isExit)
  29. return;
  30. std::unique_lock<std::mutex> lock(m_mtx);
  31. m_resLimit++;
  32. m_cond.notify_all();
  33. }
  34. private:
  35. std::atomic_bool m_isExit;
  36. int m_resLimit;
  37. std::mutex m_mtx;
  38. std::condition_variable m_cond;
  39. };
复制代码

线程类型类:

  1. //线程类型
  2. class Thread
  3. {
  4. public:
  5. //线程函数对象类型
  6. using ThreadFunc = std::function<void(int)>;
  7. // 线程构造
  8. Thread(ThreadFunc func);
  9. // 线程析构
  10. ~Thread();
  11. // 启动线程
  12. void start();
  13. //获取线程ID
  14. int getId()const;
  15. private:
  16. ThreadFunc m_func;
  17. static int m_generateId;
  18. int m_threadId; //保存线程id
  19. };
复制代码

线程池类型:

  1. //线程池类型
  2. class ThreadPool
  3. {
  4. public:
  5. //线程池构造
  6. ThreadPool();
  7. //线程池析构
  8. ~ThreadPool();
  9. //设置线程池的工作模式
  10. void setMode(PoolMode mode);
  11. //设置task任务队列上线的阈值
  12. void setTaskQueMaxThrshHold(int threshhold);
  13. //给线程池提交任务
  14. Result submitTask(std::shared_ptr<Task> sp);
  15. //设置线程池cached模式下线程阈值
  16. void setThreadSizeThreshHold(int threshHold);
  17. //开启线程池
  18. void start(int initThreadSize = std::thread::hardware_concurrency());
  19. ThreadPool(const ThreadPool&) = delete;
  20. ThreadPool& operator=(const ThreadPool&) = delete;
  21. private:
  22. //定义线程函数
  23. void threadFunc(int threadId);
  24. //检查pool的运行状态
  25. bool checkRunningState() const;
  26. private:
  27. //std::vector<std::unique_ptr<Thread>> m_threads; //线程列表
  28. std::unordered_map <int, std::unique_ptr<Thread>> m_threads;
  29. //初始的线程数量
  30. int m_initThreadSize;
  31. //记录当前线程池里面线程的总数量
  32. std::atomic_int m_curThreadSize;
  33. //线程数量上限阈值
  34. int m_threadSizeThreshHold;
  35. //记录空闲线程的数量
  36. std::atomic_int m_idleThreadSize;
  37. //任务队列
  38. std::queue<std::shared_ptr<Task>> m_taskque;
  39. //任务的数量
  40. std::atomic_int m_taskSize;
  41. //任务队列数量上限的阈值
  42. int m_taskqueMaxThresHold;
  43. //包装任务队列的线程安全
  44. std::mutex m_taskQueMtx;
  45. //表示任务队列不满
  46. std::condition_variable m_notFull;
  47. //表示任务队列不空
  48. std::condition_variable m_notEmpty;
  49. //表示等待线程资源全部回收
  50. std::condition_variable m_exitCond;
  51. //当前线程池的工作模式
  52. PoolMode m_poolMode;
  53. //表示当前线程池的启动状态
  54. std::atomic_bool m_isPoolRunning;
  55. };
复制代码

测试类型(main.cpp):

  1. #include "ThreadPool.h"
  2. #include <thread>
  3. #include <chrono>
  4. #include <iostream>
  5. using ulong = unsigned long long;
  6. class MyTask:public Task
  7. {
  8. public:
  9. MyTask(int begin, int end)
  10. :m_begin(begin),
  11. m_end(end)
  12. {
  13. }
  14. Any run()
  15. {
  16. std::cout << "tid:" << std::this_thread::get_id()
  17. << "begin..." << std::endl;
  18. //std::this_thread::sleep_for(std::chrono::seconds(2));
  19. ulong sum = 0;
  20. for (int i = m_begin; i <= m_end; i++)
  21. {
  22. sum += i;
  23. }
  24. std::cout << "tid:" << std::this_thread::get_id()
  25. << "end..." << std::endl;
  26. return sum;
  27. }
  28. private:
  29. int m_begin;
  30. int m_end;
  31. };
  32. int main()
  33. {
  34. {
  35. ThreadPool pool;
  36. pool.setMode(PoolMode::MODE_CACHED);
  37. // 开始启动线程池
  38. pool.start(2);
  39. // linux上,这些Result对象也是局部对象,要析构的!!!
  40. Result res1 = pool.submitTask(std::make_shared<MyTask>(1, 100000000));
  41. Result res2 = pool.submitTask(std::make_shared<MyTask>(100000001, 200000000));
  42. pool.submitTask(std::make_shared<MyTask>(100000001, 200000000));
  43. pool.submitTask(std::make_shared<MyTask>(100000001, 200000000));
  44. pool.submitTask(std::make_shared<MyTask>(100000001, 200000000));
  45. ulong sum1 = res1.get().cast<ulong>();
  46. std::cout << sum1 << std::endl;
  47. } // 这里Result对象也要析构!!! 在vs下,条件变量析构会释放相应资源的
  48. std::cout << "main over!" << std::endl;
  49. getchar();
  50. }
复制代码

结果:

第二个版本

线程池支持的模式:

  1. enum class PoolMode
  2. {
  3. MODE_FIXED, //固定数量的线程
  4. MODE_CACHED, //线程数量可以动态增长
  5. };
复制代码

线程类型:

  1. class Thread
  2. {
  3. public:
  4. //线程函数对象类型
  5. using ThreadFunc = std::function<void(int)>;
  6. // 线程构造
  7. Thread(ThreadFunc func);
  8. // 线程析构
  9. ~Thread() = default;
  10. // 启动线程
  11. void start();
  12. //获取线程ID
  13. int getId()const;
  14. private:
  15. ThreadFunc m_func;
  16. static int m_generateId;
  17. int m_threadId; //保存线程id
  18. };
复制代码

测试类型(main.cpp):

  1. #include "thread.h"
  2. #include <future>
  3. #include <functional>
  4. #include <thread>
  5. #include <chrono>
  6. int sum1(int a, int b)
  7. {
  8. std::this_thread::sleep_for(std::chrono::seconds(1));
  9. return a + b;
  10. }
  11. int sum2(int a, int b, int c)
  12. {
  13. std::this_thread::sleep_for(std::chrono::seconds(1));
  14. return a + b + c;
  15. }
  16. int main()
  17. {
  18. ThreadPool pool;
  19. pool.start(4);
  20. std::future<int> res1 = pool.submit(sum1, 100, 300);
  21. std::future<int> res2 = pool.submit(sum2, 1, 2, 3);
  22. std::cout << res1.get() << std::endl;
  23. std::cout << res2.get() << std::endl;
  24. }
复制代码

测试结果:


获取完整代码:https://github.com/xf-8087/ThreadPool

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Honkers

荣誉红客

关注
  • 4014
    主题
  • 36
    粉丝
  • 0
    关注
这家伙很懒,什么都没留下!

中国红客联盟公众号

联系站长QQ:5520533

admin@chnhonker.com
Copyright © 2001-2025 Discuz Team. Powered by Discuz! X3.5 ( 粤ICP备13060014号 )|天天打卡 本站已运行