多线程-生产者消费者模型
Code
#include <iostream>
#include <thread>
#include <queue>
#include <condition_variable>
#include <mutex>
class Entity
{
public:
void Producer()
{
for (int i = 0; i < 10; ++i) {
std::unique_lock<std::mutex> lock(mtx);
queue.push(i);
std::cout << "Producer :" << i << std::endl;
lock.unlock();
// 生产者添加任务后通知消费者来取任务
cv.notify_one();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void Consumer()
{
while (true)
{
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [&]()->bool {
return !queue.empty();
});
int value = queue.front();
queue.pop();
std::cout << "Consumer: " << value << std::endl;
}
}
private:
std::mutex mtx;
std::condition_variable cv;
std::queue<int> queue;
};
int main()
{
Entity entity;
std::thread thread1(&Entity::Producer, &entity);
std::thread thread2(&Entity::Consumer, &entity);
thread1.join();
thread2.join();
std::cout << "running done..." << std::endl;
return 0;
}
解析
这是一个“生产者-消费者”模型。 生产者不断的向任务队列中添加任务,消费者不停的从队列中取出任务执行。生产者和消费者在不同的线程中异步执行。
生产者
生产者不停的向任务队列中添加任务,在添加任务后,还需要召唤消费者从任务队列中取任务。
- 加锁
- 生产
- 解锁
- 通知
我们的任务队列使用了queue数据结构,虽然其push和pop操作的是头尾,并不在一起,但是在操作的时候,会涉及到修改其内部维护的size,所以对任务队列的push与pop必须要用锁。 另一方面,可能存在多个消费者pop,也也要用锁来保护。
我们在生产者中,先使用std::unique_lock<std::mutex> lock(mtx);来上锁,然后添加任务。需要注意的是,在通过条件变量来唤醒(通知)消费者取任务之前,要先手动释放锁。因为cv.notify_one()不会释放锁。如果生产者没有释放锁,则消费者试图取任务时将会得不到锁,取不到任务。
也可以通过scope来自动释放锁:
void Producer()
{
for (int i = 0; i < 10; ++i) {
{
std::unique_lock<std::mutex> lock(mtx);
queue.push(i);
std::cout << "Producer :" << i << std::endl;
}
//lock.unlock();
// 生产者添加任务后通知消费者来取任务
cv.notify_one();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
这是使用了unique_lock的析构函数来释放锁。
消费者
消费者线程首先会试图获取锁,然后通过wait()函数来检查当前的任务队列是否有任务。如果当前的任务队列没有任务,wait()函数会释放锁,并且阻塞当前线程,直到被notify_one/notify_all唤醒,并重新获取锁,然后检查当前的任务队列是否为空(假唤醒行为)。
关于cv.wait() 首先,它会获取一个互斥锁 (由调用 cv.wait 时传入的 std::unique_lock<std::mutex> 对象管理)。这个互斥锁用于在等待期间保护共享资源,以避免多个线程同时访问和修改。 然后,它会检查条件,即在等待期间判断是否满足特定条件的函数。条件是通过一个可调用的谓词 (函数、函数对象或 Lambda 表达式) 指定的,它返回一个 bool 值,表示条件是否满足。如果条件不满足,线程将被阻塞。 如果条件不满足,cv.wait 会释放之前获取的互斥锁,并将当前线程置于等待状态,直到收到通知。 当收到通知时,cv.wait 会再次获取互斥锁,并重新检查条件。如果条件满足,线程将继续执行;否则,它会继续等待通知。 在等待期间,cv.wait 会暂时释放互斥锁,允许其他线程访问共享资源并修改其状态。这种释放和重新获取锁的机制可以确保线程安全,并允许多个线程在合适的时机进行并发操作。