5414 字
27 分钟
Tiny Webserver拆解02 线程池头文件编写
2023-09-17

2.线程池头文件编写#

根据上面的思路,开始编写头文件,在头文件中定义出来我们要用到的变量和函数

1.引入头文件,并使用条件编译#

这个线程池头文件threadpool.h中,既有系统提供的头文件,也有我们自定义的头文件,我们需要使用==条件编译==语句对这个线程池头文件threadpool.h进行条件编译,防止后面的代码引用这个头文件时出现==头文件重复引用==的问题

有关条件编译的知识点在这篇文章里:宏定义与条件编译

头文件引用的格式如下:

  • 系统提供的头文件: #include <xxx.h>

头文件存放在一对尖括号< >内,表示引用==系统提供的头文件==,例如,stdio.h、stdlib.h头文件,是系统提供的头文件,所以,引用时,使用尖括号包含。

  • 用户自定义的头文件: #include “xxx.h”

第二种格式,头文件存放在一对双引号 “” 内,表示 引用==用户自定义的头文件==。例如我们自己定义了一个test.h头文件,那么,就使用双引号包含。

代码如下:

#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <list>
#include <cstdio>
#include <exception>
#include <pthread.h>
#include "../lock/locker.h"
#include "../CGImysql/sql_connection_pool.h"
// 代码主体
#endif

注意最后的#endif不要忘记写,不写会报错

2.定义线程池模板类#

然后就是编写代码主体的部分了,先定义一个线程池的模板类,具体的部分可以参考注释

代码如下:

// 线程池类定义
template <typename T>
class threadpool {
public:
// actor_model 用于切换的模型
// connPool 数据库连接池指针
// thread_number 是线程池中线程的数量
// max_request 请求队列中最大请求数量
threadpool(int actor_model,
connection_pool* connPool,
int thread_number = 8,
int max_request = 10000);
~threadpool();
// ?这干嘛的,参数 state 干嘛的?
bool append(T* request, int state);
// 向请求队列 m_workqueue 中插入请求对象
// 并通知 pthread_create() 创建的工作线程处理该任务
bool append_p(T* request);
private:
/*
* 线程池的工作函数,启动一个新线程并调用 threadpool 对象的
* run() 函数来处理任务
*/
static void* worker(void* arg);
// 线程池的主函数,负责不断检查任务队列
void run();
private:
int m_thread_number; // 线程池中的线程数
int m_max_requests; // 请求队列中最大待处理请求数
pthread_t* m_threads; // 线程池数组,大小为 m_thread_number
std::list<T*> m_workqueue; // 请求队列
locker m_queuelocker; // 保护请求队列的互斥锁
sem m_queuestat; // 是否有任务需要处理的信号量
connection_pool* m_connPool; // 数据库连接池
int m_actor_model; // 模型切换
};

3.模板外实现线程池的构造函数#

在线程池的构造函数中,创建线程函数,线程分离函数需要注意一下

1.创建线程函数#

pthread_create()函数是一个用于创建线程的函数,它属于POSIX线程库(pthread)中的一员。该函数具有以下原型:

int pthread_create(pthread_t* thread, const pthread_attr_t* attr,
void* (*start_routine)(void*), void* arg);

参数说明:

  • thread:指向pthread_t类型变量的指针,用于存储新创建线程的标识符。
  • attr:指向pthread_attr_t类型变量的指针,用于==指定新线程的属性==。可以通过该参数来设置线程的==分离状态==、==栈大小==等属性。如果传递==NULL==,则使用默认属性。
  • start_routine:指向一个函数(要求是==静态函数==)的指针,该函数作为新线程的==入口点==。新线程将==从该函数开始执行==。
  • arg:传递给start_routine函数的参数。可以通过该参数==向新线程传递数据==。

返回值说明:

  • 如果成功创建线程,函数返回0。
  • 如果发生错误,函数返回一个==非零错误码==,表示错误的==类型==。

创建新线程后,新线程将在start_routine函数中开始执行,并且可以通过arg参数传递数据。线程可以执行任意类型的任务,包括==并行处理==、==异步操作==等。

需要注意的是,创建线程的成功与否并==不保证线程的立即执行==。==线程的调度==是==由操作系统决定==的,可能会有一定的延迟。

使用pthread_create()函数创建线程时,通常还需要使用其他线程相关的函数来操作线程,例如pthread_join()函数用于等待线程的结束,pthread_exit()函数用于线程的退出等。

2.线程分离函数#

pthread_detach()函数是一个用于将线程设置为==分离状态==的函数,它属于POSIX线程库(pthread)中的一员。该函数具有以下原型:

int pthread_detach(pthread_t thread);

参数说明:

  • thread:要设置为分离状态的线程的标识符。

返回值说明:

  • 如果成功将线程设置为分离状态,函数返回0。
  • 如果发生错误,函数返回一个非零错误码,表示错误的类型。

将线程设置为分离状态后,==线程结束时系统会自动释放其占用的资源==,无需通过pthread_join()函数等待线程的结束。这样可以==避免线程资源的泄漏==,==提高程序的性能==。

需要注意的是,==一旦将线程设置为分离状态,就无法再通过pthread_join()函数等待该线程的结束,也无法获取线程的返回值==。因此,只有当==不再需要与线程进行交互或者不需要获取其返回值==时,才应将线程设置为分离状态。

使用pthread_detach()函数时需要注意以下几点:

  • 线程必须是==可连接(joinable)==状态才能被设置为分离状态。创建线程时可以通过设置线程属性(pthread_attr_t)的方式指定线程的可连接性。
  • 分离状态的线程==不会保持僵尸状态(zombie)==,即使线程结束后也不会被保留在系统中。因此,在调用pthread_detach()函数之前,需要确保线程已经执行完毕或不再需要等待其结束。
  • ==一旦线程被设置为分离状态,就无法再改变其状态==。如果需要重新连接一个已经分离的线程,只能通过创建新的线程来实现。

3.模板外参数列表初始化的方法#

在C++中,模板函数通常会==将实现与声明分离==。如果你想在模板外部实现模板函数,可以按照以下步骤进行:

  1. 在头文件中声明模板函数,但不进行具体的实现。例如:
template<typename T>
void foo(T arg);
  1. 在头文件之外的源文件中,实现模板函数的具体内容。这里需要注意的是,由于模板函数的具体实现需要知道模板参数的具体类型,因此需要在函数名后加上模板参数列表。例如:
template<typename T>
void foo(T arg) {
// 实现函数的具体内容
}
  1. 如果在==模板函数中使用了其他的自定义类型或函数==,需要确保==它们的声明在模板函数的实现之前==。可以通过在==头文件中提供这些类型或函数的声明==来实现。

需要注意的是,==模板函数的声明和实现都需要在编译阶段被解析==,因此最好将==模板函数的声明和实现都放在头文件==中。这样,在使用模板函数的地方,只需要包含对应的头文件即可。

下面是一个完整的示例,展示了在模板外部实现模板函数并初始化参数列表的过程:

// 头文件 template_example.h
template<typename T>
void foo(T arg);
// 源文件 template_example.cpp
#include "template_example.h"
template<typename T>
void foo(T arg) {
// 实现函数的具体内容
}
// 显式实例化模板函数,以初始化参数列表
template void foo<int>(int arg);
// 主文件 main.cpp
#include "template_example.h"
int main() {
foo(42);
return 0;
}

在源文件中的模板函数实现之后,通过显式实例化模板函数的方式,可以初始化参数列表。在上面的示例中,使用了template void foo<int>(int arg);来显式实例化了foo()模板函数,并将其参数列表初始化为int类型。

这样,在主文件中的main()函数中调用foo()时,参数为整数,编译器就会根据参数的类型选择正确的实例化版本进行调用。

代码如下:#

// 模板外实现线程池的构造函数
template <typename T>
threadpool<T>::threadpool(int actor_model, connection_pool* connPool, int thread_number, int max_requests) : m_actor_model(actor_model), m_thread_number(thread_number), m_max_requests(max_requests), m_threads(NULL), m_connPool(connPool) { // 参数列表初始化
// 异常判断,线程数或最大请求数小于 0,报错
if (thread_number <= 0 || max_requests <= 0) {
throw std::exception();
}
// 创建线程池数组(线程 id 初始化?)
m_threads = new pthread_t[m_thread_number];
// 创建线程池数组失败,抛出异常(判断线程 id 是否初始化成功?)
if (!m_threads) {
throw std::exception();
}
// 创建 thread_number 个线程,并将它们设置为线程分离状态
for (int i = 0; i < thread_number; ++i) {
/*
* pthread_create() 创建线程函数的参数需要特别注意
* 该函数原型中的参数如下:
* pthread_t* thread_tid // 线程数组中线程的地址
* const pthread_attr_t* attr // 指向线程属性的指针,通常
* 设置为NULL
* void* (*start_routine)(void*) // 处理线程函数的地址,注意
* 这个参数的类型为函数指针,
* 要求该函数为静态函数;如
* 果该函数为类成员函数时,
* 需要设置为静态成员函数
* void* arg // start_routine() 中的参数
*/
// pthread_create() 函数成功时返回 0,失败时返回错误号 errno
if (pthread_create(m_threads + i, NULL, worker, this) != 0) {
// 创建失败,删除线程池数组,并抛出异常
delete[] m_threads;
throw std::exception();
}
/*
* pthread_detach() 函数
* 作用: 实现线程分离
* 参数: 线程
* 返回值:成功返回 0,失败返回错误号
*/
if (pthread_detach(m_threads[i])) {
// 线程设置分离状态失败,删除线程池数组,抛出异常
delete[] m_threads;
throw std::exception();
}
}
}

4.模板外实现线程池的析构函数#

线程池的析构函数还是比较简单的,代码如下:

// 模板外实现线程池的析构函数
template <typename T>
threadpool<T>::~threadpool() {
// 删除线程池数组
delete[] m_threads;
}

5.模板外实现向请求队列中插入请求的函数#

这个函数在之前的代码中只有一个,我这次是看的项目源码,发现源码里有两个,功能是差不多的,只是参数不太一样

1.两个参数(T* request, int state)的函数#

代码如下:

// 模板外实现向任务队列插入任务请求函数
bool threadpool<T>::append(T* request, int state) {
m_queuelocker.lock(); // 添加互斥锁
// 当任务队列的大小大于最大请求数量时,解锁并报错
if (m_workqueue.size() >= m_max_requests) {
m_queuelocker.unlock();
return false;
}
// ????
request->m_state = state;
// 向队列中添加一个请求
m_workqueue.push_back(request);
m_queuelocker.unlock(); // 解锁
/*
* 这里使用互斥锁的原因:将请求加入工作队列的操作需要保证原子性,
* 需要互斥锁保证多个线程不会争抢
*/
// 增加信号量,通知线程池里的线程,有新任务要处理
m_queuestat.post();
/*
* 当一个新的任务被添加到任务队列中时,会调用 m_queuepost() 增加
* 信号量;在线程初始化时,每个工作线程都被创建并阻塞在
* m_queuestat.wait() 上等待信号量的触发,一旦 m_queuestat 的值大
* 于 0,其中的一个线程就会从阻塞状态唤醒并开始处理任务队列中的请
* 求
*/
return true;
}

2.一个参数(T* request)的函数#

代码如下:

// 模板外实现向任务队列插入任务请求函数
bool threadpool<T>::append_p(T* request) {
m_queuelocker.lock(); // 添加互斥锁
// 当任务队列的大小大于最大请求数量时,解锁并报错
if (m_workqueue.size() >= m_max_requests) {
m_queuelocker.unlock();
return false;
}
// 向队列中添加一个请求
m_workqueue.push_back(request);
m_queuelocker.unlock(); // 解锁
/*
* 这里使用互斥锁的原因:将请求加入工作队列的操作需要保证原子性,
* 需要互斥锁保证多个线程不会争抢
*/
// 增加信号量,通知线程池里的线程,有新任务要处理
m_queuestat.post();
/*
* 当一个新的任务被添加到任务队列中时,会调用 m_queuepost() 增加
* 信号量;在线程初始化时,每个工作线程都被创建并阻塞在
* m_queuestat.wait() 上等待信号量的触发,一旦 m_queuestat 的值大
* 于 0,其中的一个线程就会从阻塞状态唤醒并开始处理任务队列中的请
* 求
*/
return true;
}

6.线程池的工作函数#

代码如下:

// 线程池的工作函数,内部访问私有成员函数 run(),完成线程处理要求
template <typename T>
void* threadpool<T>::worker(void* arg) {
/*
* 启动线程时,传入 void* 类型参数 arg;这个参数实际上是一个指向
* threadpool 结构体的指针,所以将它强制转换成 threadpool* 类型并
* 赋值给 pool
*/
threadpool* pool = (threadpool*)arg;
// 启动线程池中的一个或多个线程,将待处理任务提交给线程池处理
pool->run();
return pool;
}

7.线程池主函数#

代码如下:

// 工作线程从请求队列中取出某个任务进行处理,注意线程同步
template <typename T>
void threadpool<T>::run() {
// ???这里原来的参数是:!m_stop???
while (true) {
/*
* 信号量等待:
* 等待 append() 函数传过来的信号量,收到表示需要运行的线程池,
* 使用其中的线程处理来处理任务
*/
m_queuestat.wait();
m_queuelocker.lock(); // 被唤醒后先加互斥锁
/*
* 收到信号量时,任务队列 m_workqueue 可能为空,也可能不为空,
* 这取决于在等待信号量之前是否有新任务被添加到了队列中,
* 如果没有新任务被添加,那么 m_workqueue 仍然为空;
* 如果有新任务被添加,那么 m_workqueue 将不为空;
* 需要注意的是,在多线程中,一个线程在等待信号量时,另一个线
* 程可能会往队列中添加新任务,因此需要加锁来保证对任务队列的
* 访问是线程安全的,这样可以避免出现竞争态条件(race
* condition,也就是线程不同步)
*/
if (m_workqueue.empty()) {
m_queuelocker.unlock(); // 解锁
continue; // 跳过后面的代码,开始下一个循环
}
// 从请求队列中取出队首请求,并将其弹出队列
T* request = m_workqueue.front();
m_workqueue.pop_front();
m_queuelocker.unlock(); // 取完请求后,解锁
if (!request) {
continue; // 没有取到就继续循环
}
// ??????
if (1 == m_actor_model) {
if (0 == request->m_state) {
if (request->read_once()) {
request->improv = 1;
connectionRAII mysqlcon(&request->mysql, m_connPool);
request->process();
}
else {
request->improv = 1;
request->timer_flag = 1;
}
}
else {
if (request->write()) {
request->improv = 1;
}
else {
request->improv = 1;
request->timer_flag = 1;
}
}
}
else {
connectionRAII mysqlcon(&request->mysql, m_connPool);
request->process();
}
}
}

8.线程池头文件完整代码#

代码如下:

#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <list>
#include <cstdio>
#include <exception>
#include <pthread.h>
#include "../lock/locker.h"
#include "../CGImysql/sql_connection_pool.h"
// 线程池类定义
template <typename T>
class threadpool {
public:
// actor_model 用于切换的模型
// connPool 数据库连接池指针
// thread_number 是线程池中线程的数量
// max_request 请求队列中最大请求数量
threadpool(int actor_model,
connection_pool* connPool,
int thread_number = 8,
int max_request = 10000);
~threadpool();
// ?这干嘛的,参数 state 干嘛的?
bool append(T* request, int state);
// 向请求队列 m_workqueue 中插入任务请求
// 并通知 pthread_create() 创建的工作线程处理该任务
bool append_p(T* request);
private:
/*
* 线程池的工作函数,启动一个新线程并调用 threadpool 对象的
* run() 函数来处理任务
*/
static void* worker(void* arg);
// 线程池的主函数,负责不断检查任务队列
void run();
private:
int m_thread_number; // 线程池中的线程数
int m_max_requests; // 请求队列中最大待处理请求数
pthread_t* m_threads; // 线程池数组,大小为 m_thread_number
std::list<T*> m_workqueue; // 请求队列
locker m_queuelocker; // 保护请求队列的互斥锁
sem m_queuestat; // 是否有任务需要处理的信号量
connection_pool* m_connPool; // 数据库连接池
int m_actor_model; // 模型切换
};
// 模板外实现线程池的构造函数
template <typename T>
threadpool<T>::threadpool(int actor_model, connection_pool* connPool, int thread_number, int max_requests) : m_actor_model(actor_model), m_thread_number(thread_number), m_max_requests(max_requests), m_threads(NULL), m_connPool(connPool) { // 参数列表初始化
// 异常判断,线程数或最大请求数小于 0,报错
if (thread_number <= 0 || max_requests <= 0) {
throw std::exception();
}
// 创建线程池数组(线程 id 初始化?)
m_threads = new pthread_t[m_thread_number];
// 创建线程池数组失败,抛出异常(判断线程 id 是否初始化成功?)
if (!m_threads) {
throw std::exception();
}
// 创建 thread_number 个线程,并将它们设置为线程分离状态
for (int i = 0; i < thread_number; ++i) {
/*
* pthread_create() 创建线程函数的参数需要特别注意
* 该函数原型中的参数如下:
* pthread_t* thread_tid // 线程数组中线程的地址
* const pthread_attr_t* attr // 指向线程属性的指针,通常
* 设置为NULL
* void* (*start_routine)(void*) // 处理线程函数的地址,注意
* 这个参数的类型为函数指针,
* 要求该函数为静态函数;如
* 果该函数为类成员函数时,
* 需要设置为静态成员函数
* void* arg // start_routine() 中的参数
*/
// pthread_create() 函数成功时返回 0,失败时返回错误号 errno
if (pthread_create(m_threads + i, NULL, worker, this) != 0) {
// 创建失败,删除线程池数组,并抛出异常
delete[] m_threads;
throw std::exception();
}
/*
* pthread_detach() 函数
* 作用: 实现线程分离
* 参数: 线程
* 返回值:成功返回 0,失败返回错误号
*/
if (pthread_detach(m_threads[i])) {
// 线程设置分离状态失败,删除线程池数组,抛出异常
delete[] m_threads;
throw std::exception();
}
}
}
// 模板外实现线程池的析构函数
template <typename T>
threadpool<T>::~threadpool() {
// 删除线程池数组
delete[] m_threads;
}
// 模板外实现向任务队列插入任务请求函数
bool threadpool<T>::append(T* request, int state) {
m_queuelocker.lock(); // 添加互斥锁
// 当任务队列的大小大于最大请求数量时,解锁并报错
if (m_workqueue.size() >= m_max_requests) {
m_queuelocker.unlock();
return false;
}
// ????
request->m_state = state;
// 向队列中添加一个请求
m_workqueue.push_back(request);
m_queuelocker.unlock(); // 解锁
/*
* 这里使用互斥锁的原因:将请求加入工作队列的操作需要保证原子性,
* 需要互斥锁保证多个线程不会争抢
*/
// 增加信号量,通知线程池里的线程,有新任务要处理
m_queuestat.post();
/*
* 当一个新的任务被添加到任务队列中时,会调用 m_queuepost() 增加
* 信号量;在线程初始化时,每个工作线程都被创建并阻塞在
* m_queuestat.wait() 上等待信号量的触发,一旦 m_queuestat 的值大
* 于 0,其中的一个线程就会从阻塞状态唤醒并开始处理任务队列中的请
* 求
*/
return true;
}
// 模板外实现向任务队列插入任务请求函数
bool threadpool<T>::append_p(T* request) {
m_queuelocker.lock(); // 添加互斥锁
// 当任务队列的大小大于最大请求数量时,解锁并报错
if (m_workqueue.size() >= m_max_requests) {
m_queuelocker.unlock();
return false;
}
// 向队列中添加一个请求
m_workqueue.push_back(request);
m_queuelocker.unlock(); // 解锁
/*
* 这里使用互斥锁的原因:将请求加入工作队列的操作需要保证原子性,
* 需要互斥锁保证多个线程不会争抢
*/
// 增加信号量,通知线程池里的线程,有新任务要处理
m_queuestat.post();
/*
* 当一个新的任务被添加到任务队列中时,会调用 m_queuepost() 增加
* 信号量;在线程初始化时,每个工作线程都被创建并阻塞在
* m_queuestat.wait() 上等待信号量的触发,一旦 m_queuestat 的值大
* 于 0,其中的一个线程就会从阻塞状态唤醒并开始处理任务队列中的请
* 求
*/
return true;
}
// 线程池的工作函数,内部访问私有成员函数 run(),完成线程处理要求
template <typename T>
void* threadpool<T>::worker(void* arg) {
/*
* 启动线程时,传入 void* 类型参数 arg;这个参数实际上是一个指向
* threadpool 结构体的指针,所以将它强制转换成 threadpool* 类型并
* 赋值给 pool
*/
threadpool* pool = (threadpool*)arg;
// 启动线程池中的一个或多个线程,将待处理任务提交给线程池处理
pool->run();
return pool;
}
// 工作线程从请求队列中取出某个任务进行处理,注意线程同步
template <typename T>
void threadpool<T>::run() {
// ???这里原来的参数是:!m_stop???
while (true) {
/*
* 信号量等待:
* 等待 append() 函数传过来的信号量,收到表示需要运行的线程池,
* 使用其中的线程处理来处理任务
*/
m_queuestat.wait();
m_queuelocker.lock(); // 被唤醒后先加互斥锁
/*
* 收到信号量时,任务队列 m_workqueue 可能为空,也可能不为空,
* 这取决于在等待信号量之前是否有新任务被添加到了队列中,
* 如果没有新任务被添加,那么 m_workqueue 仍然为空;
* 如果有新任务被添加,那么 m_workqueue 将不为空;
* 需要注意的是,在多线程中,一个线程在等待信号量时,另一个线
* 程可能会往队列中添加新任务,因此需要加锁来保证对任务队列的
* 访问是线程安全的,这样可以避免出现竞争态条件(race
* condition,也就是线程不同步)
*/
if (m_workqueue.empty()) {
m_queuelocker.unlock(); // 解锁
continue; // 跳过后面的代码,开始下一个循环
}
// 从请求队列中取出队首请求,并将其弹出队列
T* request = m_workqueue.front();
m_workqueue.pop_front();
m_queuelocker.unlock(); // 取完请求后,解锁
if (!request) {
continue; // 没有取到就继续循环
}
// ??????
if (1 == m_actor_model) {
if (0 == request->m_state) {
if (request->read_once()) {
request->improv = 1;
connectionRAII mysqlcon(&request->mysql, m_connPool);
request->process();
}
else {
request->improv = 1;
request->timer_flag = 1;
}
}
else {
if (request->write()) {
request->improv = 1;
}
else {
request->improv = 1;
request->timer_flag = 1;
}
}
}
else {
connectionRAII mysqlcon(&request->mysql, m_connPool);
request->process();
}
}
}
#endif
Tiny Webserver拆解02 线程池头文件编写
https://fuwari.cbba.top/posts/tiny-webserver拆解02-线程池头文件编写/
作者
Chen_Feng
发布于
2023-09-17
许可协议
CC BY-NC-SA 4.0