线程池设计---C++

什么是线程池:

一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

线程池的应用场景:

  1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
  2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
  3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误.

线程池示例:

  1. 创建固定数量线程池,循环从任务队列中获取任务对象
  2. 获取到任务对象后,执行任务对象中的任务接口

锁设计

采用RAII风格的加锁方式,用于保护在出了临界区后,忘记解锁的情况。

Mutex 类:

  • Mutex 类封装了 pthread_mutex_t 互斥锁。它包含 lock 和 unlock 方法,分别用于加锁和解锁。
  • 构造函数接收一个指向 pthread_mutex_t 的指针,并将其存储在私有成员 _pmtx 中。

lockGuard 类:

  • lockGuard 类是 RAII 风格的锁保护类。在构造函数中,它接收一个 pthread_mutex_t 指针并使用 Mutex 类将锁住。
  • 在析构函数中,它解锁互斥锁。这样,当 lockGuard 对象超出范围(超出了作用域),它的析构函数将确保互斥锁被正确解锁,即使在发生异常的情况下也是如此。
#include <iostream>
#include <pthread.h>

class Mutex{
public:
    Mutex(pthread_mutex_t *mtx)
    :_pmtx(mtx)
    {}
    void lock()
    {
        pthread_mutex_lock(_pmtx);
    }
    void unlock()
    {
        pthread_mutex_unlock(_pmtx);
    }
    ~Mutex()
    {}

private:
    pthread_mutex_t *_pmtx;
};

// RAII风格的加锁方式
class lockGuard
{
public:
    lockGuard(pthread_mutex_t *mtx)
    :_mtx(mtx)
    {
        _mtx.lock();
    }
    ~lockGuard()
    {
        _mtx.unlock();
    }

private:
    Mutex _mtx;
};

线程封装

把线程进行封装,用于创建和管理线程。

主要的类和相关功能:

ThreadData 类:

  • 存储线程的参数和名称,包含 _args 和 _name 成员变量。

Thread 类:

  • 构造函数接受线程编号、回调函数指针和线程参数,生成一个线程名称,并初始化了 _func 和 _tdata 。
  • start 方法用于创建线程,调用 pthread_create。
  • join 方法等待线程的结束,调用 pthread_join。name 方法返回线程的名称。
#include <iostream>
#include <string>
#include <functional>
#include <cstdio>

typedef void*(*fun_t)(void*);

class ThreadData
{
public:
    void* _args;
    std::string _name;
};

class Thread
{
public:
    Thread(int num, fun_t callback, void* args)
    :_func(callback)
    {
        char nameBuffer[64];
        snprintf(nameBuffer, sizeof nameBuffer, "thread-%d", num);
        _name = nameBuffer;
        _tdata._name = _name;
        _tdata._args = args;
    }

    void start()
    {
        pthread_create(&_tid, nullptr, _func, (void*)&_tdata);
    }
    void join()
    {
        pthread_join(_tid, nullptr);
    }
    std::string name()
    {
        return _name;
    }
    ~Thread()
    {}

private:
    std::string _name;  // 线程号
    fun_t _func;  // 仿函数
    ThreadData _tdata;  
    pthread_t _tid;   // 线程标识符
};

线程池

  1. getThreadPool 函数是获取线程池单例的静态方法。使用了双检锁机制(Double-Check Locking)确保在多线程环境下只创建一个实例。
  2. routine 是线程的执行函数,其中使用了 lockGuard 类来实现 RAII 风格的加锁和解锁。在 routine 中,线程不断从任务队列中取出任务执行。
  3. PushTask 用于向任务队列中添加任务,并通过条件变量 pthread_cond_signal 通知等待中的线程有新任务。
  4. run 方法启动所有线程。
  5. 在构造函数中初始化了互斥锁 pthread_mutex_init 和条件变量 pthread_cond_init。
  6. 析构函数负责销毁线程池中的线程、互斥锁和条件变量。

条件变量:
void waitCond() 方法使用了条件变量 pthread_cond_wait 函数,该函数的作用是使当前线程阻塞,等待条件变量被其他线程通过 pthread_cond_signal 或 pthread_cond_broadcast 激活。这个函数会释放 lock 互斥锁,允许其他线程在执行 pthread_cond_signal 或 pthread_cond_broadcast 时获得锁。

具体步骤如下:

  1. 当前线程调用 pthread_cond_wait 时,它会释放 lock,使得其他线程可以进入临界区。
  2. 当其他线程执行 pthread_cond_signal 或 pthread_cond_broadcast 时,被阻塞的线程会重新获得 lock。
  3. 被重新唤醒的线程会重新检查条件。如果条件满足,它将继续执行;否则,它将再次进入等待状态。

这种机制通常用于线程之间的同步,其中一个线程在满足某个条件时通知其他线程继续执行。在这个线程池的实现中,waitCond 用于在任务队列为空时阻塞线程,直到有新的任务到来。

#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <unistd.h>
#include "lockGuard.hpp"
#include "thread.hpp"
#include "log.hpp"

const int g_thread_num = 10;

// 本质:生产者消费者模型
template<class T>
class ThreadPool
{
public:
	// 加锁
    pthread_mutex_t* getMutex()
    {
        return &lock;
    }
	// 判断任务队列是否为空
    bool isEmpty()
    {
        return _task_queue.empty();
    }
	// 调用条件变量
    void waitCond()
    {
        pthread_cond_wait(&cond, &lock);
    }
	// 获取一个任务
    T getTask()
    {
        T t = _task_queue.front();
        _task_queue.pop();
        return t;
    }

private:
    ThreadPool(int thread_num = g_thread_num)
    :_num(thread_num)
    {

        for(int i=1; i <= _num; i++)
        {
            _threads.push_back(new Thread(i, routine, this));
        }
        pthread_mutex_init(&lock, nullptr);
        pthread_cond_init(&cond, nullptr);
    }
	// 禁用拷贝构造和拷贝赋值
    ThreadPool(const ThreadPool<T> &other) = delete;
    const ThreadPool<T> operator=(const ThreadPool<T> &other) = delete;

public: 
    static ThreadPool<T> *getThreadPool(int num = g_thread_num)
    {
        // 可以有效减少未来必定要进行枷锁检测的问题
        // 拦截大量的再已经创建好单例的时候,剩余线程请求单例的而直接访问所的行为
        if(nullptr == thread_ptr)
        {
            lockGuard lockguard(&mutex);
            if(thread_ptr == nullptr)
            {
                thread_ptr = new ThreadPool<T>(num);
            }
        }
        return thread_ptr;
    }
    void run()
    {
         // 启动所有线程
        for(auto &iter : _threads)
        {
            iter->start();
            logMessage(NORMAL, "%s %s", iter->name().c_str(), "启动成功");
        }
    }
 // 从任务队列中获取任务
    static void* routine(void* args)
    {
        ThreadData *td = (ThreadData*)args;
        ThreadPool<T> *tp = (ThreadPool<T>*)td->_args;
        while(true)
        {
            T task;
            {
                lockGuard lockGuard(tp->getMutex());
                while(tp->isEmpty())
                {
                    tp->waitCond();
                }
                // 获取任务
                task = tp->getTask();
            }
            task(td->_name);
        }
    }
    // 添加任务到任务队列
    void PushTask(const T &task)
    {
        lockGuard lockguard(&lock);
        _task_queue.push(task);
        pthread_cond_signal(&cond);
    }
    ~ThreadPool()
    {
        for(auto &iter : _threads)
        {
            delete iter;
        }
        pthread_mutex_destroy(&lock);
        pthread_cond_destroy(&cond);
    }

private:
    std::vector<Thread*> _threads;
    int _num;
    std::queue<T> _task_queue;

    static ThreadPool<T> *thread_ptr;
    static pthread_mutex_t mutex;

    pthread_mutex_t lock;
    pthread_cond_t cond;
};

template <typename T>
ThreadPool<T> *ThreadPool<T>::thread_ptr = nullptr;
template <typename T>
pthread_mutex_t ThreadPool<T>::mutex = PTHREAD_MUTEX_INITIALIZER;
// 这样的定义确保了每个模板实例都有自己的静态成员,而不是共享同一个。
// PTHREAD_MUTEX_INITIALIZER,这是一个宏,用于初始化一个互斥锁。