5521 字
28 分钟
Tiny Webserver 半同步半反应堆线程池
2023-09-13

服务器编程基本框架#

主要由(),()和()组成,其中每个单元之间通过()进行通信,从而()。

其中I/O单元用于处理();逻辑单元用于处理();网络存储单元指()。

1

1

1

1

1

主要由==I/O单元==,==逻辑单元==和==网络存储单元==组成,其中每个单元之间通过==请求队列==进行通信,从而==协同完成任务==。

其中I/O单元用于处理==客户端连接,读写网络数据==;逻辑单元用于处理==业务逻辑的线程==;网络存储单元指==本地数据库和文件等==。

五种I/O模型#

阻塞IO:调用者调用了(),等待这个函数返回,期间(),不停的去检查这个函数(),必须等这个函数返回才能()

非阻塞IO:非阻塞等待,()就去检测()。没有就绪就可以做其他事。非阻塞I/O执行系统调用总是(),不管()是否已经发生,若时间没有发生,则返回(),此时可以根据()区分这两种情况,对于(),()和(),事件未发生时,errno通常被设置成()

信号驱动IO用()进行信号驱动IO,安装一个(),进程继续()并不(),当IO时间(),进程收到()信号。然后处理IO事件。

IO复用用()函数实现IO复用模型,这两个函数也会使进程(),但是和阻塞IO所不同的是这两个函数可以同时阻塞()。而且可以同时对多个()、()的IO函数进行检测。知道有数据可读或可写时,才真正调用()

异步IO中,可以调用()函数(使用aio_read函数时,需要指定要读取的()、()、()、()),向内核发送这个操作指令后立即返回,当内核将数据拷贝到缓冲区后,再通知应用程序。

注意:(),(),()和()都是同步I/O。同步I/O指内核向应用程序通知的是(),比如只通知(),要求()执行I/O操作,异步I/O是指内核向应用程序通知的是(),比如读取客户端的数据后才通知应用程序,由()执行I/O操作。

1

1

1

1

1

阻塞IO:调用者调用了==某个函数==,等待这个函数返回,期间==什么也不做==,不停的去检查这个函数==有没有返回==,必须等这个函数返回才能==进行下一步动作==

非阻塞IO:非阻塞等待,==每隔一段时间==就去检测==IO事件是否就绪==。没有就绪就可以做其他事。非阻塞I/O执行系统调用总是==立即返回==,不管==时间==是否已经发生,若时间没有发生,则返回==-1==,此时可以根据==errno==区分这两种情况,对于==accept==,==recv==和==send==,事件未发生时,errno通常被设置成==eagain==

信号驱动IO用==套接口==进行信号驱动IO,安装一个==信号处理函数==,进程继续==运行==并不==阻塞==,当IO时间==就绪==,进程收到==SIGIO==信号。然后处理IO事件。

IO复用用==select/poll==函数实现IO复用模型,这两个函数也会使进程==阻塞==,但是和阻塞IO所不同的是这两个函数可以同时阻塞==多个IO操作==。而且可以同时对多个==读操作==、==写操作==的IO函数进行检测。知道有数据可读或可写时,才真正调用==IO操作函数==

异步IO中,可以调用==aio_read==函数(使用aio_read函数时,需要指定要读取的==文件描述符==、==缓冲区指针==、==读取的字节数==、==偏移量==),向内核发送这个操作指令后立即返回,当内核将数据拷贝到缓冲区后,再通知应用程序。

注意:==阻塞I/O==,==非阻塞I/O==,==信号驱动I/O==和==I/O复用==都是同步I/O。同步I/O指内核向应用程序通知的是==就绪事件==,比如只通知==有客户端连接==,要求==用户代码==执行I/O操作,异步I/O是指内核向应用程序通知的是==完成事件==,比如读取客户端的数据后才通知应用程序,由==内核==执行I/O操作。

事件处理模式#

reactor模式中,主线程(I/O处理单元)只负责(),有的话立即通知工作线程(逻辑单元 ),()、()以及()都在工作线程中完成。通常由()实现。

proactor模式中,()和()负责处理()、()等I/O操作,工作线程仅负责(),如()。通常由()实现。

1

1

1

1

1

reactor模式中,主线程(I/O处理单元)只负责==监听文件描述符上是否有事件发生==,有的话立即通知工作线程(逻辑单元 ),==读写数据==、==接受新连接==以及==处理客户请求==都在工作线程中完成。通常由==同步I/O==实现。

proactor模式中,==主线程==和==内核==负责处理==读写数据==、==接受新连接==等I/O操作,工作线程仅负责==业务逻辑==,如==处理客户请求==。通常由==异步I/O==实现。

同步I/O模拟proactor模式#

由于异步I/O并不成熟,实际中使用较少,这里将使用()模拟实现()模式。

同步I/O模型的工作流程如下(()为例):

1.主线程往()注册()上的()事件。

2.主线程调用()等待()上()

3.当socket上有数据可读,epoll_wait通知(),主线程从socket()读取数据,直到没有更多数据可读,然后将读取到的数据封装成一个()并插入()。

4.睡眠在请求队列上某个()被唤醒,它获得请求对象并处理(),然后往()中()该()上的()事件

5.主线程调用epoll_wait等待()。

6.当socket上有数据可写,epoll_wait通知()。主线程往socket上写入()处理()的结果。

1

1

1

1

1

由于异步I/O并不成熟,实际中使用较少,这里将使用==同步I/O==模拟实现==proactor==模式。

同步I/O模型的工作流程如下(==epoll_wait==为例):

1.主线程往==epoll内核事件表==注册==socket==上的==读就绪==事件。

2.主线程调用==epoll_wait==等待==socket==上==有数据可读==

3.当socket上有数据可读,epoll_wait通知==主线程==,主线程从socket==循环==读取数据,直到没有更多数据可读,然后将读取到的数据封装成一个==请求对象==并插入==请求队列==。

4.睡眠在请求队列上某个==工作线程==被唤醒,它获得请求对象并处理==客户请求==,然后往==epoll内核事件表==中==注册==该==socket==上的==写就绪==事件

5.主线程调用epoll_wait等待==socket可写==。

6.当socket上有数据可写,epoll_wait通知==主线程==。主线程往socket上写入==服务器==处理==客户请求==的结果。

并发编程模式#

并发编程方法的实现有()和()两种,但这里涉及的并发模式指()与()的协同完成任务的方法。

()模式

()模式

1

1

1

1

1

并发编程方法的实现有==多线程==和==多进程==两种,但这里涉及的并发模式指==I/O处理单元==与==逻辑单元==的协同完成任务的方法。

==半同步/半异步==模式

==领导者/追随者==模式

半同步/半反应堆#

半同步/半反应堆并发模式是()的变体,将半异步具体化为().

并发模式中的同步和异步

同步指的是()完全按照()的()执行

异步指的是程序的执行需要由()驱动

半同步/半异步模式工作流程

同步线程用于处理()

异步线程用于处理()

异步线程监听到()后,就将其封装成()并插入()中

请求队列将通知某个工作在()来读取并处理该请求对象

半同步/半反应堆工作流程(以Proactor模式为例)

主线程充当(),负责监听所有()上的事件

若有()到来,()接收这个新请求,得到新的(),然后往()中注册该socket上的()

如果连接socket上有读写事件发生,主线程从socket上接收(),并将数据封装成()插入到()中

所有()睡眠在请求队列上,当有任务到来时,通过()获得任务的()

1

1

1

1

1

半同步/半反应堆并发模式是==半同步/半异步==的变体,将半异步具体化为==某种事件处理模式==.

并发模式中的同步和异步

同步指的是==程序==完全按照==代码序列==的==顺序==执行

异步指的是程序的执行需要由==系统事件==驱动

半同步/半异步模式工作流程

同步线程用于处理==客户逻辑==

异步线程用于处理==I/O事件==

异步线程监听到==客户请求==后,就将其封装成==请求对象==并插入==请求队列==中

请求队列将通知某个工作在==同步模式的工作线程==来读取并处理该请求对象

半同步/半反应堆工作流程(以Proactor模式为例)

主线程充当==异步线程==,负责监听所有==socket==上的事件

若有==新请求==到来,==主线程==接收这个新请求,得到新的==连接socket==,然后往==epoll内核事件表==中注册该socket上的==读写事件==

如果连接socket上有读写事件发生,主线程从socket上接收==数据==,并将数据封装成==请求对象==插入到==请求队列==中

所有==工作线程==睡眠在请求队列上,当有任务到来时,通过==竞争(如互斥锁)==获得任务的==接管权==

线程池#

()换(),浪费服务器的(),换取()

池是一组()的集合,这组资源在服务器启动之初就被(),这称为()

当服务器进入正式运行阶段,开始处理()的时候,如果它需要相关的资源,可以直接从()中获取,无需()

当服务器处理完一个()后,可以把相关的资源()池中,无需执行系统调用()

1

1

1

1

1

==空间==换==时间==,浪费服务器的==硬件资源==,换取==运行效率==

池是一组==资源==的集合,这组资源在服务器启动之初就被==完全创建好并初始化==,这称为==静态资源==

当服务器进入正式运行阶段,开始处理==客户请求==的时候,如果它需要相关的资源,可以直接从==池==中获取,无需==动态分配==

当服务器处理完一个==客户连接==后,可以把相关的资源==放回==池中,无需执行系统调用==释放资源==

基础知识#

1.静态成员变量#

将类成员变量声明为(),则为(),与一般的成员变量不同,无论建立多少对象,都只有()静态成员变量的拷贝,静态成员变量属于()不属于(),所有对象(),例如父类中定义了静态成员,则整个继承体系中只有一个这样的成员,无论派生出多少个子类,静态成员是存储在()的

静态变量在()阶段就分配了空间,对象还没创建时就已经分配了空间,放到全局静态区。

静态成员变量

最好是类内(),类外()(以免类名访问静态成员访问不到)。

无论公有,私有,静态成员都可以在()定义,但()仍有访问权限。

非静态成员类外不能()

静态成员数据是()的

1

1

1

1

1

将类成员变量声明为==static==,则为==静态成员变量==,与一般的成员变量不同,无论建立多少对象,都只有==一个==静态成员变量的拷贝,静态成员变量属于==类==不属于==对象==,所有对象==共享==,例如父类中定义了静态成员,则整个继承体系中只有一个这样的成员,无论派生出多少个子类,静态成员是存储在==全局静态区==的

静态变量在==编译==阶段就分配了空间,对象还没创建时就已经分配了空间,放到全局静态区。

静态成员变量

最好是类内==声明==,类外==初始化==(以免类名访问静态成员访问不到)。

无论公有,私有,静态成员都可以在==类外==定义,但==私有成员==仍有访问权限。

非静态成员类外不能==初始化==

静态成员数据是==共享==的

2.静态成员函数#

将类成员函数声明为static,则为静态成员函数。

静态成员函数

  • 静态成员函数可以直接访问(),不能直接访问(),但可以通过()的方式访问。

  • 普通成员函数可以访问普通成员变量,()访问静态成员变量。

  • 静态成员函数没有()。非静态数据成员为()单独维护,但静态成员函数为共享函数,无法区分是哪个对象,因此不能直接访问(),也没有this指针

1

1

1

1

1

将类成员函数声明为static,则为静态成员函数。

静态成员函数

  • 静态成员函数可以直接访问==静态成员变量==,不能直接访问==普通成员变量==,但可以通过==参数传递==的方式访问。

  • 普通成员函数可以访问普通成员变量,==也可以==访问静态成员变量。

  • 静态成员函数没有==this指针==。非静态数据成员为==对象==单独维护,但静态成员函数为共享函数,无法区分是哪个对象,因此不能直接访问==普通变量成员==,也没有this指针

3.pthread_create陷阱#

函数原型中的第三个参数,为(),指向处理线程函数的地址。该函数,要求为()。如果处理线程函数为()时,需要将其设置为()

1

1

1

1

1

首先看一下该函数的函数原型。

#include <pthread.h>
int pthread_create (pthread_t *thread_tid, //返回新生成的线程的id
const pthread_attr_t *attr, //指向线程属性的指针,通常设置为NULL
void * (*start_routine) (void *),//处理线程函数的地址
void *arg); //start_routine()中的参数

函数原型中的第三个参数,为==函数指针==,指向处理线程函数的地址。该函数,要求为==静态函数==。如果处理线程函数为==类成员函数==时,需要将其设置为==静态成员函数==

4.this指针的锅#

pthread_create的函数原型中第三个参数的类型为(),指向的线程处理函数参数类型为(),若线程函数为()函数,则=()会作为默认的参数被传进pthread_create函数中,从而和线程函数参数(void*)不能匹配,不能通过编译。

()函数就没有这个问题,里面没有this指针。

1

1

1

1

1

pthread_create的函数原型中第三个参数的类型为==函数指针==,指向的线程处理函数参数类型为==(void *)==,若线程函数为==类成员==函数,则==this指针==会作为默认的参数被传进pthread_create函数中,从而和线程函数参数(void*)不能匹配,不能通过编译。

==静态成员==函数就没有这个问题,里面没有this指针。

5.pthread_detach线程分离函数#

线程分离状态:指定该状态,线程()与()断开关系。使用()或者线程()后,其()状态不由其他线程获取,而直接自己()。()、()服务器常用。

进程若有该机制,将不会产生()进程。僵尸进程的产生主要由于进程死后,大部分资源被释放,一点残留资源仍存于系统中,导致()认为该进程仍存在。

也可使用()函数参2(线程属性)来设置线程分离。()函数是在()之后调用的。

函数描述:实现线程分离 函数原型:int pthread_detach(pthread_t thread); 函数返回值:成功:();失败:() 一般情况下,线程()后,其终止状态一直保留到其它线程调用()获取它的状态为止。但是线程也可以被置为()状态,这样的线程一旦()就立刻回收它占用的所有资源,而不保留终止状态。不能对一个已经处于detach状态的线程调用pthread_join,这样的调用将返回()错误。也就是说,如果已经对一个线程调用了pthread_detach就不能再调用pthread_join了。

以一个例子引入:编写程序,在创建线程之后设置线程的分离状态。

说明:如果线程已经设置了分离状态,则再调用pthread_join就会失败,可用这个方法验证()

1

1

1

1

1

线程分离状态:指定该状态,线程==主动==与==主控线程==断开关系。使用==pthread_exit==或者线程==自动结束==后,其==退出==状态不由其他线程获取,而直接自己==自动释放==。==网络==、==多线程==服务器常用。

进程若有该机制,将不会产生==僵尸==进程。僵尸进程的产生主要由于进程死后,大部分资源被释放,一点残留资源仍存于系统中,导致==内核==认为该进程仍存在。

也可使用 ==pthread_create==函数参2(线程属性)来设置线程分离。==pthread_detach==函数是在==创建线程==之后调用的。

函数描述:实现线程分离 函数原型:int pthread_detach(pthread_t thread); 函数返回值:成功:==0==;失败:==错误号== 一般情况下,线程==终止==后,其终止状态一直保留到其它线程调用==pthread_join==获取它的状态为止。但是线程也可以被置为==detach==状态,这样的线程一旦==终止==就立刻回收它占用的所有资源,而不保留终止状态。不能对一个已经处于detach状态的线程调用pthread_join,这样的调用将返回==EINVAL==错误。也就是说,如果已经对一个线程调用了pthread_detach就不能再调用pthread_join了。

以一个例子引入:编写程序,在创建线程之后设置线程的分离状态。

说明:如果线程已经设置了分离状态,则再调用pthread_join就会失败,可用这个方法验证==是否已成功设置分离状态==

线程池分析#

线程池的设计模式为(),其中反应堆具体为()事件处理模式。

具体的,主线程为(),负责监听(),接收()新连接,若当前监听的socket发生了()事件,然后将任务插入到()。工作线程从请求队列中取出任务,完成()的处理。

1

1

1

1

1

线程池的设计模式为==半同步/半反应堆==,其中反应堆具体为==Proactor==事件处理模式。

具体的,主线程为==异步线程==,负责监听==文件描述符==,接收==socket==新连接,若当前监听的socket发生了==读写==事件,然后将任务插入到==请求队列==。工作线程从请求队列中取出任务,完成==读写数据==的处理。

1.线程池类定义#

具体定义可以看代码。需要注意,==线程处理函数==和==运行函数==设置为==私有属性==

template <typename T>
class threadpool {
public:
//thread_number是线程池中线程的数量
//max_requests是请求队列中最多允许的、等待处理的请求的数量
//connPool是数据库连接池指针
threadpool(connection_pool *connPool, int thread_number = 8, int max_request = 10000);
~threadpool();
//像请求队列中插入任务请求
bool append(T* request);
private:
//工作线程运行的函数
//它不断从工作队列中取出任务并执行之
static void *worker(void *arg);
void run();
private:
//线程池中的线程数
int m_thread_number;
//请求队列中允许的最大请求数
int m_max_requests;
//描述线程池的数组,其大小为m_thread_number
pthread_t *m_threads;
//请求队列
std::list<T *>m_workqueue;
//保护请求队列的互斥锁
locker m_queuelocker;
//是否有任务需要处理
sem m_queuestat;
//是否结束线程
bool m_stop;
//数据库连接池
connection_pool *m_connPool;
};

2.线程池创建与回收#

==构造函数==中创建线程池,pthread_create函数中将==类==的对象作为参数传递给==静态函数(worker)==,在静态函数中引用这个对象,并调用其==动态方法(run)==。

具体的,类对象传递时用==this指针==,传递给静态函数后,将其转换为==线程池类==,并调用==私有成员函数run==

template < typename T>
threadpool<T>::threadpool(connection_pool *connPool, int thread_number, int max_requests) : m_thread_number(thread_number), m_max_requests(max_requests), m_stop(false), m_threads(NULL), m_connPool(connPool) {
if (thread_number <= 0 || max_requests <= 0)
throw std::exception();
//线程id初始化
m_threads = new pthread_t[m_thread_number];
if (!m_threads)
throw std::exception();
for (int i = 0; i < thread_number; ++i)
{
//循环创建线程,并将工作线程按要求进行运行
if (pthread_create(m_threads + i, NULL, worker, this) != 0) {
delete[] m_threads;
throw std::exception();
}
//将线程进行分离后,不用单独对工作线程进行回收
if (pthread_detach(m_threads[i])) {
delete[] m_threads;
throw std::exception();
}
}
}

3.向请求队列中添加任务#

通过==list容器==创建请求队列,向队列中添加时,通过==互斥锁==保证==线程安全==,添加完成后通过==信号量==提醒有任务要处理,最后注意==线程同步==

template < typename T>
bool threadpool<T>::append(T* request)
{
m_queuelocker.lock();
//根据硬件,预先设置请求队列的最大值
if (m_workqueue.size() > m_max_requests)
{
m_queuelocker.unlock();
return false;
}
//添加任务
m_workqueue.push_back(request);
m_queuelocker.unlock();
//信号量提醒有任务要处理
m_queuestat.post();
return true;
}

4.线程处理函数#

==内部==访问私有成员函数run,完成线程处理要求。

template < typename T>
void* threadpool<T>::worker(void* arg) {
//将参数强转为线程池类,调用成员方法
threadpool* pool = (threadpool*)arg;
pool->run();
return pool;
}

5.run执行任务#

主要实现,==工作线程==从==请求队列==中取出某个任务进行处理,注意==线程同步==

template < typename T>
void threadpool<T>::run()
{
while (!m_stop)
{
//信号量等待
m_queuestat.wait();
//被唤醒后先加互斥锁
m_queuelocker.lock();
if (m_workqueue.empty())
{
m_queuelocker.unlock();
continue;
}
//从请求队列中取出第一个任务
//将任务从请求队列删除
T* request = m_workqueue.front();
m_workqueue.pop_front();
m_queuelocker.unlock();
if (!request)
continue;
//从连接池中取出一个数据库连接
request->mysql = m_connPool->GetConnection();
//process(模板类中的方法,这里是http类)进行处理
request->process();
//将数据库连接放回连接池
m_connPool->ReleaseConnection(request->mysql);
}
}
Tiny Webserver 半同步半反应堆线程池
https://fuwari.cbba.top/posts/tiny-webserver-半同步半反应堆线程池/
作者
Chen_Feng
发布于
2023-09-13
许可协议
CC BY-NC-SA 4.0