线程池实现
- 一.线程池的本质
- 二.类内创建线程
- 三.代码实现
一.线程池的本质
线程池里面存储的都是一批已经创建好的线程,当线程池里有数据时,这批线程就会被唤醒去竞争数据,当线程池里没有数据时,这批线程就去休眠等待。
线程池的本质就是一个生产消费模型,当有生产者线程往线程池里发送任务时,线程池里的消费者线程就会竞争任务。
比如主线程往线程池里投递一个任务,线程池里的若干线程就会被立刻唤醒,然后去竞争抢任务执行。
所以一开始线程池里的线程必须早就被创建出来的,只不过线程池里没有东西,它们都去条件变量下等待了。
所以如果要实现一个线程池,那么需要些什么呢?
1.首先线程池肯定需要存储任务数据的场所,我们可以用队列来存储。也就是线程池里需要一个任务队列,用来存储接收到的任务。
2.其次线程池里存储很多已经被创建的线程,所以里面还需要能够找到这个已经被创建的线程,如何找到呢?根据线程的tid,所以我们可以用vector数组存储线程的tid,来找到所有线程。
3.然后就是我们要保证生产消费过程中的安全,比如消费者只能互斥竞争任务,不能同时竞争任务,生产者在生产,消费者不能去消费等,所以需要加锁保护。
4.最后就是要保证,生产消费之间的有序性,线程池里线程不能一直去竞争任务,如果线程池里没有任务了,那么就需要到条件变量下等待。
二.类内创建线程
在实现的过程中,有一个细节,那就是在类内部创建线程,因为线程池中的线程是早就被创建好的,所以程序一开始运行时就需要存在,只不过这时线程池里的线程都在休眠。但是在创建线程时,需要传递线程的执行函数,线程的执行函数有特定的形式,必须要返回值是void类型,参数也是void类型。
当在类内部创建时,这个线程执行函数默认是类成员函数,成员函数的参数里默认会有一个this指针。
也就是在类内创建线程时,该线程执行函数的参数是有两个的,不符合要求。就会创建失败。
那该如何解决呢?
我们可以在成员函数的前面加上static,就变成了静态成员函数了,静态成员函数是没有this指针的。
但设置成静态成员函数后,又会存在一个问题,那就是该静态成员函数是无法直接访问成员变量的。而线程的执行函数是需要去到线程池里的任务队列里去竞争任务的,所以必须要访问类成员变量。
【解决方法】
我们可以在创建线程时,将该类的this指针传给线程函数,这样线程的执行函数,就可以通过类型转换访问到类的成员变量了。
三.代码实现
#pragma once #include <pthread.h> #include <iostream> #include <queue> #include <string> #include <vector> //线程池的本质就是生产消费模型 //一个生产者往线程池里放任务,然后其他消费者者就竞争这个任务执行 //线程池里有很多线程,所以它的基本属性肯定有识别线程的id static const int defaultnum=3;//默认线程池里有3个线程 struct ThreadInfo { pthread_t tid; std::string name; }; template <class T> class ThreadPool { public: void Lock() { pthread_mutex_lock(&_mutex); } void Unlock() { pthread_mutex_unlock(&_mutex); } void Makeup() { pthread_cond_signal(&_cond); } bool isQueueEmpty() { return _task.empty(); } void ThreadSleep() { pthread_cond_wait(&_cond,&_mutex); } T Pop() { T t=_task.front(); _task.pop(); return t; } std::string GetthreadName(pthread_t id) { for(const auto&ti :_thread) { if(ti.tid==id) return ti.name; } return "None"; } public: ThreadPool(int num=defaultnum):_thread(num) { pthread_mutex_init(&_mutex,nullptr); pthread_cond_init(&_cond,nullptr); } void Push(const T& in)//往线程池里发送任务,发送是没有条件的,但一旦发送了,就说明消费条件满足了,就要唤醒线程池里的线程去执行 { Lock(); _task.push(in); Makeup();//唤醒在条件变量下等待的线程 Unlock(); } //要注意,在类内部创建线程时,线程执行的函数里,会有this指针,不满足要求,所以必须要使用静态成员函数 //这样才可以没有this指针,只有一个参数,但静态成员函数又不能访问类成员,所以在给线程函数传递参数时,我们传 //该类的this指针,这样就可以通过this指针访问类成员 static void *Handler(void* args)//去线程池里的任务队列里竞争任务 { ThreadPool<T>* td=static_cast< ThreadPool<T>*>(args); std::string name=td->GetthreadName(pthread_self());//根据tid来获取到对应的名字 //线程创建出来就去竞争任务,任务在哪里?在任务队列里,任务队列里没有怎么办?去条件变量下等待 while(true) { td->Lock(); while(td->isQueueEmpty())//防止伪唤醒 { td->ThreadSleep();//没有任务那么就去条件变量下等待 } //如果有任务,那么就将任务拿出来,并执行 T t=td->Pop(); td->Unlock(); t();//处理任务 std::cout<<name<<"run, "<<"reslut: "<<t.Getresult()<<std::endl; } } //当我们去调用这个线程池的时候,线程池就应该给我们创建若干个线程在线程池里。而当有人往线程池里发送任务时,线程 //池里的线程会立刻被唤醒,去竞争任务 void Start()//线程池里的线程刚被创建出来,就会去线程池里的队列里竞争任务,如果没有任务,那么它就会去休眠 { int num=_thread.size(); for(int i=0;i<num;i++) { _thread[i].name="thread- "+std::to_string(i+1); pthread_create(&(_thread[i].tid),nullptr,Handler,this); } } ~ThreadPool() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_cond); } private: std::vector<ThreadInfo> _thread;//根据这个来找到要分配任务的线程,线程池里存储的线程 std::queue<T> _task;//线程池里存放的任务 pthread_mutex_t _mutex; pthread_cond_t _cond; };
#include <iostream> #include "ThreadPool.hpp" #include "TASK.hpp" #include <ctime> #include <unistd.h> int main() { ThreadPool<TASK> *tp = new ThreadPool<TASK>(); tp->Start(); int len = opera.size(); srand(time(nullptr)); while (true) { // 1.获取数据 int x = rand() % 10 + 1; usleep(10); int y = rand() % 10; char op = opera[rand() % len]; TASK t(x, y, op); // 2.生产数据 tp->Push(t); std::cout<<"main thread make task"<<t.GetTASK()<<std::endl; sleep(1); } }
#pragma once #include <iostream> #include <string> std::string opera="+-*/%"; class TASK { public: TASK() {} TASK(int data1, int data2, char op) : _data1(data1), _data2(data2), _oper(op) { } void run() { switch (_oper) { case '+': _result = _data1 + _data2; break; case '-': _result = _data1 - _data2; break; case '*': _result = _data1 * _data2; break; case '/': { if (_data2 == 0) _exitcode = 1; else _result = _data1 / _data2; } break; case '%': { if (_data2 == 0) _exitcode = 2; else _result = _data1 % _data2; } break; default: _exitcode=3; break; } } std::string GetTASK() { std::string r=std::to_string(_data1); r+=_oper; r+=std::to_string(_data2); r+="=?"; return r; } std::string Getresult() { std::string result=std::to_string(_data1); result+=_oper; result+=std::to_string(_data2); result+='='; result+=std::to_string(_result); result+="[code:"; result+=std::to_string(_exitcode); result+=']'; return result; } void operator()() { run(); } private: int _data1; int _data2; char _oper; int _result; int _exitcode=0; };