Post

实现基于C++11的线程池

实现C++11的线程池,并介绍了单例设计模式

实现基于C++11的线程池

单例模式

单例模式是一种设计模式,指类只有一个实例化的对象,例如线程池、LOG等。

通常有两种设计模式来实现单例,即饿汉模式懒汉模式

以一个LOG类来说明情况。不管什么设计模式,单例模式中,class的拷贝和移动构造函数都需要删除,并且需要隐藏默认构造函数。

1
2
3
4
5
6
7
8
9
10
11
12
class LOG
{
public:
    LOG(const LOG& log) = delete;
    LOG& operator=(LOG& log) = delete;
    static void printLog(const std::string& msg)
    {
        std::cout << msg << std::endl;
    }
private:
    LOG() = default; // 隐藏默认构造函数
};

饿汉模式

饿汉模式是一种单例模式的实现方式,它的特点是在程序启动时就进行对象的实例化。在这种实现方式中,单例对象会在类加载时就被创建出来,因此也被称作“饱汉模式”或者“静态常量方式”。饿汉模式的优点是实现简单、线程安全,无需考虑多线程环境下的同步问题。其缺点是无法进行懒加载,带来了一定的系统资源开销。在某些场景下,如果单例对象很大或者初始化耗时较长,饿汉模式的开销可能会比较明显。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class LOG
{
public:
    LOG(const LOG& log) = delete;
    LOG& operator=(LOG& log) = delete;
    static void printLog(const std::string& msg)
    {
        std::cout << msg << std::endl;
    }
    static LOG& GetInstance()
    {
        static LOG log;  // 懒汉模式
        return log;
    }
private:
    LOG() = default; // 隐藏默认构造函数
};

饿汉模式在类加载的过程中便已经在代码的静态变量存储区实现了类的实例化,在任何地方只要调用LOG::GetInstance()即可获取该对象,因此是线程安全的。

懒汉模式

懒汉模式是一种单例模式的实现方式,它的特点是在首次访问单例对象时才进行对象的实例化。懒汉模式的优点是可以避免在程序启动时就进行对象的实例化,节省了系统的资源。懒汉模式的缺点是在多线程环境下,可能会出现线程安全问题,需要额外的同步措施来保证线程安全性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class LOG
{
public:
    LOG(const LOG& log) = delete;
    LOG& operator=(LOG& log) = delete;
    static void printLog(const std::string& msg)
    {
        std::cout << msg << std::endl;
    }
    static LOG& GetInstance()
    {
        static LOG *log = nullptr;  // 饿汉模式
        if (!log)
            log = new LOG;
        return *log;
    }
private:
    LOG() = default; // 隐藏默认构造函数
};

饿汉模式即首次访问单例对象时才进行对象的实例化。但是其是线程不安全的,如果有两个线程同时调用GetInstance,可能会造成错误。

call_once

std::call_once 是 C++11 标准库中的一个函数,用于确保某个函数只会被调用一次。其函数原型如下:

1
2
template<class Callable, class... Args>
void call_once(std::once_flag& flag, Callable&& func, Args&&... args);

其中,flag 是一个 std::once_flag 类型的对象,用于标记函数是否已经被调用;func 是需要被调用的函数或可调用对象;args 是函数或可调用对象的参数。

std::call_once 的作用是,确保在多个线程中同时调用 call_once 时,只有一个线程能够成功执行 func 函数,而其他线程则会等待该函数执行完成。

使用 std::call_once 的过程中,需要注意以下几点:

  1. flag 参数必须是一个 std::once_flag 类型的对象,并且在多次调用 call_once 函数时需要使用同一个 flag 对象。

  2. func 参数是需要被调用的函数或可调用对象。该函数只会被调用一次,因此应该确保该函数是幂等的。

  3. args 参数是 func 函数或可调用对象的参数。如果 func 函数没有参数,则该参数可以省略。

  4. std::call_once 函数会抛出 std::system_error 异常,如果在调用 func 函数时发生了异常,则该异常会被传递给调用者。

使用 std::call_once 可以在多线程环境中实现一次性初始化,避免了多个线程同时初始化的问题。例如,在单例模式中,可以使用 std::call_once 来保证单例实例只会被创建一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class LOG
{
public:
    LOG(const LOG& log) = delete;
    LOG& operator=(LOG& log) = delete;
    static void printLog(const std::string& msg)
    {
        std::cout << msg << std::endl;
    }

    static LOG& GetInstance()
    {
        std::call_once(onceFlag, []()
        {
            log = new LOG;
        });
        return *log;
    }
private:
    LOG() = default; // 隐藏默认构造函数
    static std::once_flag onceFlag;
    static LOG *log;
};
LOG* LOG::log = nullptr;
std::once_flag LOG::onceFlag;

线程池

在c++中开启线程是一个开销较大的场景,为了提高性能,可以预先开启多线程,然后在需要的时候往其中添加任务来执行,可以避免运行时开辟线程的开销,这就是线程池。

线程数组

我们使用一个数组来维护线程,使用队列来维护需要执行的任务。

1
2
    std::vector<std::thread> threads;
    std::queue<std::function<void()>> tasks;

需求说明:

  1. 在程序开始运行时,初始化全部线程
  2. 可以在线程池中注册不同的任务(函数指针的形式),在线程池中有任务需要执行时,会自动执行

我们为我们的线程池封装成一个classThreadsPool,并在其构造函数中初始化线程池,该class会提供一个为线程池添加任务的函数template<typename Func, typename... Args>void pushTasks(Func&& func, Args&&... args);

  1. 我们的class没必要也不应该有多个实例化的对象,因此我们对线城池class使用单例模式,并使用上文提到的std::call_once来实现一个饿汉模式的单例class。

ThreadsPool.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#ifndef CPPTUTORIAL_THREADSPOOL_H
#define CPPTUTORIAL_THREADSPOOL_H

#include <iostream>
#include <vector>
#include <queue>
#include <functional>
#include <thread>
#include <mutex>
#include <condition_variable>

class ThreadsPool {
private:
    ThreadsPool();
    explicit ThreadsPool(int threadNums);
public:
    ThreadsPool(const ThreadsPool& val) = delete;
    ThreadsPool& operator=(ThreadsPool& val) = delete;

    template<typename Func, typename... Args>
    void pushTasks(Func&& func, Args&&... args)
    {
        // 函数模板是无法将声明和定义放在两个文件的,最直接的理由是函数模板因为没有被实例化,编译器不会编译函数模板。
        // 函数模版中,&&右值引用表示万能引用,根据输入的不同,自动推导左右值引用
        // Args是一个可变参数模版
        // std::bind是函数模板(是一个函数,使用std::bind可以将可调用对象和参数一起绑定,绑定后的结果使用std::function进行保存,并延迟调用到任何我们需要的时候。
        // 在模版中,使用了&&来进行万能引用,在bind绑定的时候使用std::forward进行完美转发
        // std::forward不会修改参数左右值的属性
        std::function<void()> task(std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
        {
            std::unique_lock<std::mutex>lock(this->mutex);
            this->tasks.emplace(std::move(task));
        }
        this->conditionVariable.notify_one();
    }

    static ThreadsPool& getInstance(int threadNums = -1);
    ~ThreadsPool();
private:
    std::vector<std::thread> threads;
    std::queue<std::function<void()>> tasks;
    std::mutex mutex;
    std::condition_variable conditionVariable;
    bool isStop;
private:
    static std::once_flag onceFlag;
    static std::unique_ptr<ThreadsPool>threadsPool;
};

#endif

类中变量

  1. 我们使用数组来维护线程,使用deque来维护任务。

  2. 由于涉及到对临界区的读写,因此还需要锁来保护临界区。

  3. 线程池中任务的执行也符合生产者消费者模型,为了能自动唤醒消费者从任务队列中取任务执行,我们还需要一个条件变量。
  4. 由于使用了std::call_once来实现单例类,所以我们还需要一个flag来保证类只可以实例化一个对象,该flag同样是static修饰的
  5. 由于使用了单例类,我们还需要一个static修饰的指针来指向这个单例对象
1
2
3
4
5
6
7
8
9
private:
    std::vector<std::thread> threads;
    std::queue<std::function<void()>> tasks;
    std::mutex mutex;
    std::condition_variable conditionVariable;
    bool isStop;
private:
    static std::once_flag onceFlag;
    static std::unique_ptr<ThreadsPool>threadsPool;

关于类中static的补充

c++中变量/方法的声明与定义是分离的。声明可以声明多份,但是定义/实现只有一份,否则编译器会给出重定义的错误。

在c++中,我们通常把c++的声明放在class.h文件中,实现放在class.cpp文件中,在需要使用这个class时,需要做三件事情:

  1. 编译class.cpp文件生成class.o文件,代码文件只有经过编译才能使用
  2. include这个头文件,这是为了获取类/方法的声明
  3. 在link的过程中,连接class.o文件
  • 类中的变量

类中的普通变量,在编译的过程中,并不会实际分配内存,而是随着类实例化对象时随着分配在栈或者堆上。

类中static修饰的变量在所有类实现的对象中,只有一个副本。其不存放在堆栈上,而是存放在静态存储区。因此static修饰的变量,在头文件中只能声明不能定义。原因在于通常最少会有两个(class.cpp main.cpp)cpp文件include头文件,这会触发重定义符号的错误。因此我们只在头文件中声明class中的static的静态变量,其定义放在了class.cpp中

  • 类中的方法

    在头文件的class中定义方法,然后在多个cpp文件中include该头文件不会给出重定义的错误,而头文件中非class中的方法则会报错重定义。这是因为c++中class里所有的方法默认都是内联的,编译器针对内联做了特殊处理。

    https://stackoverflow.com/questions/28183729/why-i-can-implement-the-classs-member-function-in-header-file

    https://stackoverflow.com/questions/5373107/how-to-implement-static-class-member-functions-in-cpp-file

ThreadsPool.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#include "ThreadsPool.h"

std::once_flag ThreadsPool::onceFlag;
std::unique_ptr<ThreadsPool>ThreadsPool::threadsPool(nullptr);

ThreadsPool::ThreadsPool(int threadNums) : isStop(false)
{
    std::cout << "create " << threadNums << " threads." << std::endl;
    for (int i = 0; i < threadNums; ++i)
    {
        // 为线程池添加线程
        threads.emplace_back([this]()->void {
            while (true)
            {
                std::unique_lock<std::mutex> lock(this->mutex);
                this->conditionVariable.wait(lock, [this]()->bool {
                    // 当前任务队列非空就继续执行(取任务并执行)
                    // 如果当前任务队列为空就阻塞当前线程并等待
                    // 为了解决所有任务队列执行完毕(为空)后继续阻塞等待,无法退出的问题
                    // 使用isStop变量。只有当前任务队列执行完毕,并且没有停止的时候才阻塞
                    return !this->tasks.empty() || this->isStop;  // 如果当前任务队列是空的且线程池不需要需要停止才阻塞当前线程
                });
                // 条件变量.wait()中,true就继续执行,false就堵塞当前线程
                if (this->isStop && this->tasks.empty())
                    return;
                // 从任务队列取出一个任务后执行任务
                std::function<void()>task(std::move(this->tasks.front()));
                this->tasks.pop();
                lock.unlock();
                task();
            }
        });
    }
}

ThreadsPool::~ThreadsPool() {
    std::cout << "running here..." << std::endl;
    {
        std::unique_lock<std::mutex> lock(this->mutex);
        this->isStop = true;
    }
    this->conditionVariable.notify_all(); // 通知所有线程完成任务
    for (auto& val : this->threads)
        val.join();
    std::cout << "compelete all tasks." << std::endl;
}

//用cpu核心数目-1作为线程池数量
ThreadsPool::ThreadsPool() : ThreadsPool(std::max(std::thread::hardware_concurrency(),(unsigned int)1) - 1) {
}

ThreadsPool &ThreadsPool::getInstance(int threadNums)
{
    std::call_once(onceFlag, [&threadNums](){
        if (threadNums < 0)
            threadsPool.reset(new ThreadsPool());
        else
            threadsPool.reset(new ThreadsPool(threadNums));
    });
    return *threadsPool;
}

饿汉模式的单例类

为了实现单例类,我们使用private来修饰构造函数,并删除拷贝和移动构造函数。

1
2
3
4
5
6
7
private:
    ThreadsPool();
    explicit ThreadsPool(int threadNums);
public:
    ThreadsPool(const ThreadsPool& val) = delete;
    ThreadsPool& operator=(ThreadsPool& val) = delete;
	  static ThreadsPool& getInstance(int threadNums = -1);

为了获取单例类对象,我们实现了一个static修饰的static ThreadsPool& getInstance(int threadNums = -1); 该方法会返回ThreadsPool的引用。

threadNums如使用缺省值-1表示会根据当前cpu的线程数来自动选择线程池中线程的数量,如果该值为大于1表示根据输入的数据来确定线程池中线程的数量

由于我们使用了饿汉模式的单例类,因此单例对象只有在第一次调用getInstance才会实例化对象。为了实现线程安全,我们使用了call_once。

1
2
3
4
5
6
7
8
9
10
ThreadsPool &ThreadsPool::getInstance(int threadNums)
{
    std::call_once(onceFlag, [&threadNums](){
        if (threadNums < 0)
            threadsPool.reset(new ThreadsPool());
        else
            threadsPool.reset(new ThreadsPool(threadNums));
    });
    return *threadsPool;
}

call_once会保证这个函数只执行一次。

在此处我们使用lambada表达式。我们在class内部定义了一个static修饰的智能指针,由于是单例类,所以我们使用了unique_ptr智能指针static std::unique_ptr<ThreadsPool>threadsPool;,并根据输入的threadNum数字的不同,调用不同的构造函数。

我们程序启动的时候把threadsPool指向了nullptr,但是第一次调用getInstance之后,会调用不同的构造函数,并把threadsPool指向实例化出来的单例类中。再后续需要访问单例对象时,仍然调用getInstance即可以直接返回单例类对象了。

构造函数

我们为线程池实现了两个构造函数,其会在call_once中执行。两个构造函数其实本质上是一样的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//用cpu核心数目-1作为线程池数量
ThreadsPool::ThreadsPool() : ThreadsPool(std::max(std::thread::hardware_concurrency(),(unsigned int)1) - 1) {
}
ThreadsPool::ThreadsPool(int threadNums) : isStop(false)
{
    std::cout << "create " << threadNums << " threads." << std::endl;
    for (int i = 0; i < threadNums; ++i)
    {
        // 为线程池添加线程
        threads.emplace_back([this]()->void {
            while (true)
            {
                std::unique_lock<std::mutex> lock(this->mutex);
                this->conditionVariable.wait(lock, [this]()->bool {
                    // 当前任务队列非空就继续执行(取任务并执行)
                    // 如果当前任务队列为空就阻塞当前线程并等待
                    // 为了解决所有任务队列执行完毕(为空)后继续阻塞等待,无法退出的问题
                    // 使用isStop变量。只有当前任务队列执行完毕,并且没有停止的时候才阻塞
                    return !this->tasks.empty() || this->isStop;  // 如果当前任务队列是空的且线程池不需要需要停止才阻塞当前线程
                });
                // 条件变量.wait()中,true就继续执行,false就堵塞当前线程
                if (this->isStop && this->tasks.empty())
                    return;
                // 从任务队列取出一个任务后执行任务
                std::function<void()>task(std::move(this->tasks.front()));
                this->tasks.pop();
                lock.unlock();
                task();
            }
        });
    }
}

在c++中,可以通过列表初始化的方式,手动的通过一个构造函数调用另一个构造函数。

1
2
3
ThreadsPool::ThreadsPool() : ThreadsPool(std::max(std::thread::hardware_concurrency(),(unsigned int)1) - 1) {
}
// ThreadsPool::ThreadsPool()构造函数通过列表初始化的方式手动的调用ThreadsPool::ThreadsPool(int threadNums) : isStop(false);构造函数

ThreadsPool::ThreadsPool(int ) : isStop(false);构造函数中,我们初始化线程池,向线程数组std::vector<std::thread> threads;中添加threadNums个线程。

threads的类型是vector的std::thread,std::thread的构造函数支持模版/lambda/s t d::function/函数指针。我们此处使用的是lambda表达式。

线程池中初始化执行的程序可以表示为:

  1. 如果当前的任务队列中有任务,取出一个任务执行
  2. 如果当前任务队列中无任务,阻塞当前线程;
  3. 阻塞当前线程后,线程会在往任务队列中添加任务时被唤醒

任务队列我们使用的是deque,为了保证临界区的线程安全,所有涉及临界区的代码都要通过锁来保护。

std::function

std::function是一个函数包装模板,可以包装下列这几种可调用元素类型:函数、函数指针、类成员函数指针或任意类型的函数对象(例如定义了operator()操作并拥有函数闭包)。std::function对象可被拷贝和转移,并且可以使用指定的调用特征来直接调用目标元素。当std::function对象未包裹任何实际的可调用元素,调用该std::function对象将抛出std::bad_function_call异常。

https://blog.csdn.net/weixin_44378800/article/details/115210731

向线程池中添加任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
template<typename Func, typename... Args>
void pushTasks(Func&& func, Args&&... args)
{
    // 函数模板是无法将声明和定义放在两个文件的,最直接的理由是函数模板因为没有被实例化,编译器不会编译函数模板。
    // 函数模版中,&&右值引用表示万能引用,根据输入的不同,自动推导左右值引用
    // Args是一个可变参数模版
    // std::bind是函数模板,使用std::bind可以将可调用对象和参数一起绑定,绑定后的结果使用std::function进行保存,并延迟调用到任何我们需要的时候。
    // 在模版中,使用了&&来进行万能引用,在bind绑定的时候使用std::forward进行完美转发
    // std::forward不会修改参数左右值的属性
    std::function<void()> task(std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
    {
        std::unique_lock<std::mutex>lock(this->mutex);
        this->tasks.emplace(std::move(task));
    }
    this->conditionVariable.notify_one();
}

由于线程池中可能会注册各种各样的函数,所以在此处我们使用了模版。需要注意的是,模版的完成实现必须在头文件中(模版的实现不允许跨文件),这是因为模版本身并不生成代码,只有才编译的时候使用的模版才会生成对应的代码。

析构函数

1
2
3
4
5
6
7
8
9
10
11
ThreadsPool::~ThreadsPool() {
    std::cout << "running here..." << std::endl;
    {
        std::unique_lock<std::mutex> lock(this->mutex);
        this->isStop = true;
    }
    this->conditionVariable.notify_all(); // 通知所有线程完成任务
    for (auto& val : this->threads)
        val.join();
    std::cout << "compelete all tasks." << std::endl;
}

析构函数的逻辑相对较简单。我们把结束标志位置为false,这样可以避免任务队列中所有的任务已经执行完毕,且当前程序需要退出了,但是由于任务队列是空,线程默认的操作是阻塞当前队列等待有任务添加时被唤醒。如果发现这种情况,线程就直接return退出了。

同时还通知所有线程,从当前任务队列中取出剩余任务执行。

为什么要用智能指针?

我们在main线程中为了保护cout使用了锁std::mutex coutMutex,但是我们的main线程可能先于线程池中线程先结束。当main线程先结束时,main线程的堆栈会被释放,coutMutex对象也会被释放,这会导致线程池中线程在执行任务队列中任务时,访问了一个不存在的对象引发错误。

合理的逻辑时,主线程要结束时,先等待线程池中所有的任务执行完毕再退出。

我们单例类的对象是一个static的指针,该对象是通过call_once在堆上new出来的,堆上的变量在销毁时不会调用析构函数,但是静态存储区上的对象在销毁时,会调用析构函数。

我们把static修饰的单例对象指针static std::unique_ptr<ThreadsPool>threadsPool;使用智能指针修饰,在主线程结束时,释放静态存储的对象,释放到智能指针static std::unique_ptr<ThreadsPool>threadsPool;会自动调用析构函数,等待线程池对象结束后再结束主线程。

main.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <iostream>
#include <thread>
#include <queue>
#include <condition_variable>
#include <mutex>
#include "ThreadsPool.h"

void threadFunc(int i, std::mutex& mtx) {
    // std::mutex禁用了拷贝和移动构造函数,不允许值传递
    {
        // 在超出作用域后会自动调用析构函数释放锁,无需手动释放了
        std::lock_guard<std::mutex>lockGuard(mtx);
        // cout不是线程安全的,用锁保护临界区
        std::cout << "task: " << i << " running..." <<std::endl;
    }
    std::this_thread::sleep_for(std::chrono::milliseconds (200));
    {
        std::lock_guard<std::mutex>lockGuard(mtx);
        std::cout << "task: " << i << " done..." <<std::endl;
    }
}

std::mutex coutMutex;

int main() {
    for (int i = 0; i < 10; ++i)
        ThreadsPool::getInstance(5).pushTasks(threadFunc, i, std::ref(coutMutex));
    std::cout << "running done..." << std::endl;
}

std::ref与引用退化

我们在main.cpp中定义了一个函数threadFunc并添加到线程池中,由于threadFunc函数涉及到cout,但是cout不是线程安全的,因此在输出的时候可能会出现错误,因此我们需要使用锁来进行保护。

但是std::mutex禁用了拷贝和移动构造函数,这意味着锁作为参数传递时只能通过传引用的方式。但是我们往线程池中任务队列添加任务时,使用了模版类来实现了可变参数。在 std::bindstd::thread 中可能会遇到引用失效的问题,解决办法是通过 std::ref 传递引用。

因此,我们在添加任务时,使用了std::ref来实现强制引用传递。

1

This post is licensed under CC BY 4.0 by the author.