POSIX信号量
- 一.进一步探析生产者消费者模型
- 二.POSIX信号量
-
- 1.基本使用
- 2.基于环形队列的生产消费模型
-
- 1.概念
- 2.模拟
- 3.一个使用例子
一.进一步探析生产者消费者模型
前面我们详解过生产者消费者模型,也了解了它的一些优点,例如:解耦,支持并发,支持忙闲不均…但其实也有人说它高效,这体现在哪呢?
需要理解的一个重要细节是:在生产者进行生产时,需要首先从其它地方获取”原材料”;在消费者消费数据后,也需要对数据进行加工。而生产者消费者模型的高效就体现在当生产者持有锁时,消费者不能进行消费但可以进行数据加个;当消费者持有锁时,生产者不能进行生产但可以获取“原材料”。
再回过头来看看上一篇博客写的模拟阻塞队列(生产者消费者模型)代码,可以发现每一个线程都是先加锁,再进行判断。这很好理解,因为判断也是临界资源。
当判断不成立时就会让该线程进入等待状态,同时注意pthread_cond_wait会自动释放锁。这样其他线程就能拿到锁,正常运行。当该线程资源充足后再将该线程唤醒,该线程又重新拿到锁,开始继续执行代码。那么一个问题,如果该线程误唤醒(伪唤醒)了呢?
什么是伪唤醒(误唤醒)
就以生产者消费者模型为例。上一篇博客里的模拟代码是单生产者和单消费者,很明显在实际中往往是多生产者和多消费者。当有多个生产者在进行等待(为什么会有多个生产者等待,因为pthread_cond_wait会自动释放锁,从而其他生产者就可以持有锁再进入等待队列)。
如果使用一个函数(例如:pthread_cond_broadcast)将它们全部唤醒,毫无疑问它们要再重新持有锁,那么就必然会进行锁的竞争。当一个生产者拿到了锁生产完毕后再释放锁,紧接着另一个生产者又抢到了锁,再进行生产…假设生产者的锁的竞争力更强,消费者一直没抢到锁,没办法进行消费(注意生产者和消费者用的是同一把锁)。当队列已经满后,生产者还在被唤醒进行生产,此时的唤醒被称为误唤醒(伪唤醒)。
为了防止这种情况,建议把if判断改成while判断。
二.POSIX信号量
前面在进程间通信里已经写过信号量了,那里主要是SystemV信号量,这里是POSIX信号量。POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 POSIX可以用于线程间同步。
信号量是保证PV操作原子性的一个计数器。申请信号量就是对计数器进行减减,释放信号量就是对计数器进行加加。 例如:我像让三个线程访问一个数组,把该数组分为三部分,让每个线程分别访问它的三个部分。那么如何保证进入的只有三个线程而不是四个线程呢,就依靠信号量这一个计数器进行计数。
1.基本使用
初始化信号量
参数:
sem:创建的信号量 pshared:0表示线程间共享,非零表示进程间共享 value:信号量初始值
销毁信号量
等待信号量
功能:等待信号量,会将信号量的值减1
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1
2.基于环形队列的生产消费模型
1.概念
循环队列相信大家都不陌生,这是属于数据结构的知识。这里的循环队列与数据结构里的也几乎一样。
我们规定按照顺时针走,P生产一个走一格,C消费一个走一格。
有三个条件:
1.当指向同一位置时,只能有一方访问,避免冲突。(空:P走;满:C走) 2.C不能超过P 3.不能发生套圈情况
这里看是空还是满,就可以直接使用信号量进行判别。
我们用信号量保证生产者和消费者间的互斥,用锁来保证消费者和消费者,生产者和生产者间的互斥。
2.模拟
RingQueue.hpp
#pragma once #include <iostream> #include <vector> #include <semaphore.h> #include <pthread.h> const static int defaultcap = 5; template<class T> class RingQueue{ private: void P(sem_t &sem) { sem_wait(&sem); } void V(sem_t &sem) { sem_post(&sem); } void Lock(pthread_mutex_t &mutex) { pthread_mutex_lock(&mutex); } void Unlock(pthread_mutex_t &mutex) { pthread_mutex_unlock(&mutex); } public: RingQueue(int cap = defaultcap) :ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0) { sem_init(&cdata_sem_, 0, 0); sem_init(&pspace_sem_, 0, cap); pthread_mutex_init(&c_mutex_, nullptr); pthread_mutex_init(&p_mutex_, nullptr); } void Push(const T &in) // 生产 { P(pspace_sem_); Lock(p_mutex_); // ? ringqueue_[p_step_] = in; // 位置后移,维持环形特性 p_step_++; p_step_ %= cap_; Unlock(p_mutex_); V(cdata_sem_); } void Pop(T *out) // 消费 { P(cdata_sem_); Lock(c_mutex_); // ? *out = ringqueue_[c_step_]; // 位置后移,维持环形特性 c_step_++; c_step_ %= cap_; Unlock(c_mutex_); V(pspace_sem_); } ~RingQueue() { sem_destroy(&cdata_sem_); sem_destroy(&pspace_sem_); pthread_mutex_destroy(&c_mutex_); pthread_mutex_destroy(&p_mutex_); } private: std::vector<T> ringqueue_; int cap_; int c_step_; // 消费者下标 int p_step_; // 生产者下标 sem_t cdata_sem_; // 消费者关注的数据资源 sem_t pspace_sem_; // 生产者关注的空间资源 pthread_mutex_t c_mutex_; pthread_mutex_t p_mutex_; };
3.一个使用例子
Task.hpp
#pragma once #include <iostream> #include <string> std::string opers="+-*/%"; enum{ DivZero=1, ModZero, Unknown }; class Task { public: Task() {} Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0) { } void run() { switch (oper_) { case '+': result_ = data1_ + data2_; break; case '-': result_ = data1_ - data2_; break; case '*': result_ = data1_ * data2_; break; case '/': { if(data2_ == 0) exitcode_ = DivZero; else result_ = data1_ / data2_; } break; case '%': { if(data2_ == 0) exitcode_ = ModZero; else result_ = data1_ % data2_; } break; default: exitcode_ = Unknown; break; } } void operator ()() { run(); } std::string GetResult() { std::string r = std::to_string(data1_); r += oper_; r += std::to_string(data2_); r += "="; r += std::to_string(result_); r += "[code: "; r += std::to_string(exitcode_); r += "]"; return r; } std::string GetTask() { std::string r = std::to_string(data1_); r += oper_; r += std::to_string(data2_); r += "=?"; return r; } ~Task() { } private: int data1_; int data2_; char oper_; int result_; int exitcode_; };
Main.cc
#include <iostream> #include <pthread.h> #include <unistd.h> #include <ctime> #include "RingQueue.hpp" #include "Task.hpp" using namespace std; struct ThreadData { RingQueue<Task> *rq; std::string threadname; }; void *Productor(void *args) { // sleep(3); ThreadData *td = static_cast<ThreadData*>(args); RingQueue<Task> *rq = td->rq; std::string name = td->threadname; int len = opers.size(); while (true) { // 1. 获取数据 int data1 = rand() % 10 + 1; usleep(10); int data2 = rand() % 10; char op = opers[rand() % len]; Task t(data1, data2, op); // 2. 生产数据 rq->Push(t); cout << "Productor task done, task is : " << t.GetTask() << " who: " << name << endl; sleep(1); } return nullptr; } void *Consumer(void *args) { ThreadData *td = static_cast<ThreadData*>(args); RingQueue<Task> *rq = td->rq; std::string name = td->threadname; while (true) { // 1. 消费数据 Task t; rq->Pop(&t); // 2. 处理数据 t(); cout << "Consumer get task, task is : " << t.GetTask() << " who: " << name << " result: " << t.GetResult() << endl; // sleep(1); } return nullptr; } int main() { srand(time(nullptr) ^ getpid()); RingQueue<Task> *rq = new RingQueue<Task>(50); pthread_t c[5], p[3]; for (int i = 0; i < 1; i++) { ThreadData *td = new ThreadData(); td->rq = rq; td->threadname = "Productor-" + std::to_string(i); pthread_create(p + i, nullptr, Productor, td); } for (int i = 0; i < 1; i++) { ThreadData *td = new ThreadData(); td->rq = rq; td->threadname = "Consumer-" + std::to_string(i); pthread_create(c + i, nullptr, Consumer, td); } for (int i = 0; i < 1; i++) { pthread_join(p[i], nullptr); } for (int i = 0; i < 1; i++) { pthread_join(c[i], nullptr); } return 0; }