Post

多线程-生产者消费者模型

cpp 多线程入门,生产者消费者模型

多线程-生产者消费者模型

Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include <iostream>
#include <thread>
#include <queue>
#include <condition_variable>
#include <mutex>

class Entity
{
public:
    void Producer()
    {
        for (int i = 0; i < 10; ++i) {
            std::unique_lock<std::mutex> lock(mtx);
            queue.push(i);
            std::cout << "Producer :" << i << std::endl;
            lock.unlock();
            // 生产者添加任务后通知消费者来取任务
            cv.notify_one();
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
    }
    void Consumer()
    {
        while (true)
        {
            std::unique_lock<std::mutex> lock(mtx);
            cv.wait(lock, [&]()->bool {
                return !queue.empty();
            });
            int value = queue.front();
            queue.pop();
            std::cout << "Consumer: " << value << std::endl;
        }
    }
private:
    std::mutex mtx;
    std::condition_variable cv;
    std::queue<int> queue;
};


int main()
{
    Entity entity;
    std::thread thread1(&Entity::Producer, &entity);
    std::thread thread2(&Entity::Consumer, &entity);
    thread1.join();
    thread2.join();
    std::cout << "running done..." << std::endl;
    return 0;
}

解析

这是一个“生产者-消费者”模型。 生产者不断的向任务队列中添加任务,消费者不停的从队列中取出任务执行。生产者和消费者在不同的线程中异步执行。

生产者

生产者不停的向任务队列中添加任务,在添加任务后,还需要召唤消费者从任务队列中取任务。

  • 加锁
  • 生产
  • 解锁
  • 通知

我们的任务队列使用了queue数据结构,虽然其push和pop操作的是头尾,并不在一起,但是在操作的时候,会涉及到修改其内部维护的size,所以对任务队列的push与pop必须要用锁。 另一方面,可能存在多个消费者pop,也也要用锁来保护。

我们在生产者中,先使用std::unique_lock<std::mutex> lock(mtx);来上锁,然后添加任务。需要注意的是,在通过条件变量来唤醒(通知)消费者取任务之前,要先手动释放锁。因为cv.notify_one()不会释放锁。如果生产者没有释放锁,则消费者试图取任务时将会得不到锁,取不到任务。

也可以通过scope来自动释放锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void Producer()
{
    for (int i = 0; i < 10; ++i) {
        {
            std::unique_lock<std::mutex> lock(mtx);
            queue.push(i);
            std::cout << "Producer :" << i << std::endl;
        }
        //lock.unlock();
        // 生产者添加任务后通知消费者来取任务
        cv.notify_one();
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

这是使用了unique_lock的析构函数来释放锁。

消费者

消费者线程首先会试图获取锁,然后通过wait()函数来检查当前的任务队列是否有任务。如果当前的任务队列没有任务,wait()函数会释放锁,并且阻塞当前线程,直到被notify_one/notify_all唤醒,并重新获取锁,然后检查当前的任务队列是否为空(假唤醒行为)。

关于cv.wait() 首先,它会获取一个互斥锁 (由调用 cv.wait 时传入的 std::unique_lock<std::mutex> 对象管理)。这个互斥锁用于在等待期间保护共享资源,以避免多个线程同时访问和修改。 然后,它会检查条件,即在等待期间判断是否满足特定条件的函数。条件是通过一个可调用的谓词 (函数、函数对象或 Lambda 表达式) 指定的,它返回一个 bool 值,表示条件是否满足。如果条件不满足,线程将被阻塞。 如果条件不满足,cv.wait 会释放之前获取的互斥锁,并将当前线程置于等待状态,直到收到通知。 当收到通知时,cv.wait 会再次获取互斥锁,并重新检查条件。如果条件满足,线程将继续执行;否则,它会继续等待通知。 在等待期间,cv.wait 会暂时释放互斥锁,允许其他线程访问共享资源并修改其状态。这种释放和重新获取锁的机制可以确保线程安全,并允许多个线程在合适的时机进行并发操作。

This post is licensed under CC BY 4.0 by the author.