【网络编程】事件驱动 reactor 式的服务器(EPOLL机制)

文章目录

  • 业务拆解
    • 事件驱动的 reactor
    • 总流程图
  • C 代码实现
    • 准备工作
      • 编写头文件 reactor.h
      • 准备头文件
      • 准备宏定义
      • 声明三大模块函数和基础的内存变量长度
      • 定义全局变量
      • 定义 EPOLL 实例事件处理的函数与释放资源的函数
      • 注册服务器监听套接字的函数
    • accept_cb 模块
    • read_cb 模块
    • send_cb 模块
    • 服务器代码
  • 代码运行效果
  • 总结

推荐一个零声教育学习教程,个人觉得老师讲得不错,分享给大家:[Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等技术内容,点击立即学习: https://github.com/0voice 链接。

业务拆解

在上一篇 文章 里,我们使用 EPOLL 机制去搭建服务器,能够低成本高效率的执行 “多路复用网络 I/O 高并发”。在本篇文章里,我们将要把上一篇问文章的代码切分开来,使之模块化,更好地满足业务特色化需求。比如,最基础的网络 I 任务,就是读取信息,如果业务有特殊需求,我们要特殊处理所读取的信息,我们就要专门在源代码的基础上额外写多几个业务读函数;再比如,最基础的网络 O 任务,是发送特定资源信息,如果业务有特殊需求,我们要特殊处理将要发送的信息,比如以 HTTP 报文形式发送给用户,使得用户可以在浏览器上看到我们所发的内容。

总之,我们要实现一个服务器底座,一个可根据业务内容做简单扩展的服务器代码,降低开发难度。我们称之为 “事件驱动的 reactor”。

事件驱动的 reactor

reactor 顾名思义就是反应器的原理,不同的信号就会有不同的反应,回顾上一篇 文章 的 EPOLL 服务器,主要有三类网络 I/O 任务

  • 输入任务1:监听套接字 sockfd 监听到来访 IP,读取连接信息,分配套接字 clientfd,以负责对应网络 I/O。
  • 输入任务2:客户端发来信息,epoll 调用操作系统内核通知进程处理读事件。
  • 输出任务:沿着对应套接字 clientfd 向客户端发送信息。

也就是说我们要把代码分成三个模块,实现三种因事件信号而异的反应,综合起来就是一个反应器 reactor。我们在每一个模块中预留一个地方给各自的业务代码函数。这三个模块我们分别记成

  1. accept_cb 模块,
  2. read_cb 模块,
  3. send_cb 模块

总流程图

为了简化表达,避免像上一篇 文章 那样写的那么复杂。我们也分模块来写流程图,各个模块再给出它自己的流程图。我们先给总体的流程图。

Created with Raphaël 2.3.0开始init_server 占用若干个端口建立服务器套接字 sockfds,使这些 sockfds 处于监听状态创建 epoll 实例 epfd(被一个套接字所表示),作为总集创建长度固定的 struct epoll_event 事件数组,作为 epfd 内核中就绪链表的誊写白纸把若干个监听套接字 sockfds 的关注事件类型设置成 EPOLLIN,以 epoll_event 形式加入实例 epfd调用 epoll_wait 函数, epoll 实例同时监视服务器的监听套接字 sockfd 以及所注册所有的来访 IP 连接的套接字 clientfd,并且统计响应数量 nready对这 nready 个内核响应的套接字-事件,我们逐个来处理这 nready 个套接字-事件是否还没有处理完?当前所检查的套接字是否正常(没出现错误 EPOLLERR )?当前所检查的套接字不是监控套接字吗?当前所检查的 套接字-事件 不是 EPOLLIN 事件吗?当前所检查的 套接字-事件 不是 EPOLLOUT 事件吗?关闭套接字,释放资源,快进到下一个套接字-事件已处理的 套接字-事件 计数加 1send_cb 模块,发送内容给客户端recv_cb 模块,ET 边沿模式需循环读取 I/O 文件accept_cb 模块,注册新的套接字-事件入 EPOLL 实例关闭套接字,释放资源,快进到下一个套接字-事件yesnoyesnoyesnoyesnoyesno

C 代码实现

准备工作

编写头文件 reactor.h

此文件内定义了特殊的结构体,记录了若干个套接字每次 I/O 的内容

#ifndef __SERVER_H__
#define __SERVER_H__#define BUFFER_LENGTH		1024typedef int (*RCALLBACK)(int fd);struct conn {int fd;//	申请缓冲区的大小 1024 --> 2048 ...... 逐步递增 KB,在 accept_cb 函数中用 malloc 建立,在 recv_cb 和 send_cb 函数中用 realloc 函数调整其大小char *rbuffer;	//	边沿模式下,我们是不能够假设我们要读取总量多少的内容,只能是全部读取ssize_t rlength;ssize_t rcap;  // 读缓冲区容量char *wbuffer;	//	边沿模式下,我们是不能够假设我们要读取总量多少的内容,只能是全部写入ssize_t wlength;ssize_t wcap;  // 写缓冲区容量RCALLBACK send_callback;union {RCALLBACK recv_callback;RCALLBACK accept_callback;} r_action;};#endif

其中以下代码结构是头文件为了保护定义而专门设置的,避免同一个头文件重复定义

#ifndef __SERVER_H__
#define __SERVER_H__// 定义类型#endif

所定义的 struct conn 类型中有一个成员值得注意,那就是 r_action,它是一个联合体而非结构体,它有一词多义性,我们知道 RCALLBACK 是一个回调函数,

typedef int (*RCALLBACK)(int fd);

回调函数可以是任何 同 (返回类型、传入参数类型)的函数。而在类型 struct conn 中的成员 r_action 就好比 “面向对象编程中的多态”,成员都叫同一个名字,但是具体的定义是不一样的。下文的主函数(服务器代码)中会出现两行类似的代码,就是来自于回调函数的使用,它们分别是(读者可以通篇阅读完这篇文章后再回来)

1、conn_list[sockfd].r_action.accept_callback = accept_cb;
2、conn_list[fd].r_action.recv_callback = recv_cb;
3、conn_list[fd].send_callback = send_cb;
4for (int j = 0; j < MAX_PORTS; j++) {if (connfd == listen_fds[j]) {conn_list[connfd].r_action.accept_callback(connfd);is_listener = 1;break;}}
5if (conn_list[connfd].r_action.recv_callback(connfd) >= 0) { // 返回0表示成功printf("[%ld] RECV: %s\n",conn_list[connfd].rlength, conn_list[connfd].rbuffer);} else {//	连接在 recv 函数处早已释放continue; // 跳过后续处理}
6、conn_list[connfd].send_callback(connfd);

准备头文件

#include <errno.h>				// 这是全局变量 errno,用于健壮的读取功能
#include <stdio.h>	
#include <stdlib.h>				// 动态内存分配
#include <sys/socket.h>			// 创建和管理套接字。绑定地址、监听连接和接受连接。发送和接收数据。设置和获取套接字选项。 socket()、connect()、sendto()、recvfrom()、accept()
#include <netinet/in.h>			// 提供了结构体 sockaddr_in
#include <string.h>				// strerror 函数
#include <fcntl.h>				// 用于更改套接字的模式,比如非阻塞模式
#include <unistd.h>				// close 函数,关闭套接字
#include <sys/types.h>			// ssize_t 是一个有符号的整数类型
#include <sys/epoll.h>			// EPOLL 高并发机制
#include <sys/time.h>			// timeval 类型,用于表述等待时间#include "reactor.h"				// 底层数据结构,回调函数、业务端函数的统一声明

此处注意到我们是导入了刚刚所写的头文件 “reactor.h”。

准备宏定义

这个代码可建立一百多万个连接,下一篇文章里我将介绍百万连接的方法。为了释放这百万连接的代码潜力,我们要定义这三个宏。

#define CONNECTION_SIZE			1048576 	// 	1024 * 1024,即我们要测试 1 M 的连接数#define MAX_PORTS				20			//	该服务器占用本地 20 个端口,用以建立网络 I/O ,更好地实现百万并发//	这是一个宏操作,是两个时间戳的相减,表示 “计时”
#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)	

MAX_PORTS 是用来支持百万连接的,CONNECTION_SIZE 是指定最大的连接数。TIME_SUB_MS(tv1, tv2) 宏操作是用来测试服务器的性能,用来计算运行的时长

声明三大模块函数和基础的内存变量长度

// 声明函数,先写主函数的代码,适用于底层链接的
int accept_cb(int fd);
int recv_cb(int fd );
int send_cb(int fd);#define BUFFER_LENGTH		1024

无论是网络输入还是输出,每次操作的字节数上限都是 BUFFER_LENGTH 个,慢慢的逐步地有条不紊地接收发送信息,而非一次过把所有内容都吞下。

定义全局变量

当我们定义了某个全局变量,我们所定义所有函数都可以在不传入该变量的前提下对其进行改变,无通过效仿一般函数的传入参数前外加取址符,以求在内存层次改变变量本身。

全局变量并不占用线程栈的内存空间,而是存储在 “静态数据区” 之中。

int epfd = 0;			//  epoll 事件文件套接字,它申请为一个全局变量
struct timeval begin;	//	声明一个全局时间戳变量//	本 EPOLL 实例一次最多建立 1 M = 1024*1024 个网络 I/O 事件(百万并发);这又被称之为总体事件集
struct conn conn_list[CONNECTION_SIZE] = {0};	
// 	这是一个全局变量
//	在函数中对全局变量赋值后,函数执行完毕后全局变量的值会保持修改后的值。全局变量的特性决定了它的值在程序运行期间会持久存在,不会因为函数执行结束而重置。

如果读者不知道静态数据区的大小是多少,可以通过以下方法查看(我是用 Linux 系统编程的)

qiming@qiming:~/share/CTASK/TCP_test$ ulimit -a
real-time non-blocking time  (microseconds, -R) unlimited
core file size              (blocks, -c) 0
data seg size               (kbytes, -d) unlimited
scheduling priority                 (-e) 0
file size                   (blocks, -f) unlimited
pending signals                     (-i) 15051
max locked memory           (kbytes, -l) 496096
max memory size             (kbytes, -m) unlimited
open files                          (-n) 1024
pipe size                (512 bytes, -p) 8
POSIX message queues         (bytes, -q) 819200
real-time priority                  (-r) 0
stack size                  (kbytes, -s) 8192
cpu time                   (seconds, -t) unlimited
max user processes                  (-u) 15051
virtual memory              (kbytes, -v) unlimited
file locks                          (-x) unlimited

我们注意到

data seg size               (kbytes, -d) unlimited
  • data seg size:表示程序数据段的最大大小,单位为 KB。数据段包括全局变量和静态变量,这些变量存储在全局数据区中。

这说明静态数据区没有特别的限制,仅由操作系统的内存所限制。故而我们不必担心 conn_list 这个一百万多元的超大数组会超规模占用空间。

另外,epfd 将会是下文的 “EPOLL 实例”,begin 是全局的开端时间戳。

定义 EPOLL 实例事件处理的函数与释放资源的函数

EPOLL 实例事件处理的函数如下。

//	该函数可以实现对文件描述符的 EPOLL 事件注册、修改或删除,flag 参数是用来区分情况的
int set_event(int fd, int event, int flag) {//	fd 是目标文件描述符或套接字;//	event 是事件类型,比如读事件 EPOLLIN;//	flag 是用来区分是注册还是修改的;1 表示是注册;0 表示修改;2 表示删除if (flag == 1) {  // add 1struct epoll_event ev;ev.events = event;ev.data.fd = fd;if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) < 0) {// 老板感兴趣的事件是,前台小姐姐的工作情况;EPOLL_CTL_ADD 是把套接字 sockfd 和事件 event 注册入 EPOLL 之中// (即一个套接字 fd 对应一个就绪状态表 0/1)perror("epoll_ctl failed");close(fd);return -1;}} else if (flag == 2) {	// delete 2struct epoll_event ev;ev.events = event;ev.data.fd = fd;if (epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ev) < 0) {// 老板感兴趣的事件是,前台小姐姐的工作情况;EPOLL_CTL_ADD 是把套接字 sockfd 和事件 event 注册入 EPOLL 之中// (即一个套接字 fd 对应一个就绪状态表 0/1)perror("epoll_ctl failed");return -1;}} else if (flag == 0) {  // modify 0struct epoll_event ev;ev.events = event;ev.data.fd = fd;//	EPOLL_CTL_MOD:修改已经注册到 epoll 实例中的文件描述符 fd 的监视事件。if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev) < 0) {// 老板感兴趣的事件是,前台小姐姐的工作情况;EPOLL_CTL_ADD 是把套接字 sockfd 和事件 event 注册入 EPOLL 之中// (即一个套接字 fd 对应一个就绪状态表 0/1)perror("epoll_ctl failed");close(fd);return -1;}} else {printf("Param Error: flag =0, 1, 2\n");}}

我们注意到,该函数有一个 int flag 参数,这其实是一个状态机,用来标明选择处理方式。用来区分是注册还是修改的,抑或删除;1 表示是注册;0 表示修改;2 表示删除。

释放资源的函数如下。

//	只在连接关闭时释放资源:事件 event 代号 0 通常表示没有事件发生
void close_connection(int fd) {set_event(fd, 0, 2);close(fd);if (conn_list[fd].rbuffer) {free(conn_list[fd].rbuffer);conn_list[fd].rbuffer = NULL; // 防止重复释放}if (conn_list[fd].wbuffer) {free(conn_list[fd].wbuffer);conn_list[fd].wbuffer = NULL;}memset(&conn_list[fd], 0, sizeof(struct conn));
}

缓冲区生命周期必须与连接生命周期一致,在连接关闭前不要释放缓冲区,在连接关闭后确保完全释放。

注册服务器监听套接字的函数

占用设备的端口,设置监听套接字。

//	创建监听套接字 sockfd,并且绑定本机 IP 和端口 port
// (远程IP, 远程PORT, 本地IP, 本地PORT, 协议) 与 网络 I/O 一对一成型一个 sockfd 
int init_server(unsigned short port) {int sockfd = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in servaddr;servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0servaddr.sin_port = htons(port); // 0-1023, // 设置端口复用// 当服务器主动关闭 TCP 连接时,会进入 TIME_WAIT 状态(通常持续 2MSL,约 1-4 分钟)。在此期间,操作系统会保留该端口绑定记录,防止延迟到达的数据包干扰新连接。// 问题:服务器崩溃或重启后尝试重新绑定端口时,会因 TIME_WAIT 状态导致 bind() 失败(错误:Address already in use) // 以下处理措施:能避免再次启用服务器程序时,系统的宕机int reuse = 1;if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0) {//  如果未设置 SO_REUSEADDR,导致端口被占用后无法立即重用(TIME_WAIT 状态)printf("setsockopt failed: %s\n", strerror(errno));return -1;}// setsockopt 是一个用于设置套接字选项的系统调用函数。// 第二项参数 level:指定选项所在的协议级别。常见的值包括:SOL_SOCKET:表示套接字级别的选项。IPPROTO_TCP:表示 TCP 协议级别的选项。IPPROTO_IP:表示 IP 协议级别的选项。IPPROTO_IPV6:表示 IPv6 协议级别的选项。// 第三项参数 optname:指定要设置的选项名称。不同的协议级别有不同的选项名称。例如:在 SOL_SOCKET 级别,常见的选项包括 SO_REUSEADDR、SO_KEEPALIVE、SO_LINGER 等。在 IPPROTO_TCP 级别,常见的选项包括 TCP_NODELAY 等。// 第四项参数 optval:指向包含选项值的内存区域。选项值的类型和大小取决于 optname// 第五项参数 optlen:指定 optval 的长度(以字节为单位)。if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) {printf("bind failed: %s\n", strerror(errno));	//	strerror 函数定义在 <string.h> 中,而 errno 是 <errno.h> 的全局变量return -1;}//	一次监听 10 个来访 IP:PORT//	printf("listen finshed: %d\n", sockfd); // 3 if (listen(sockfd, 10) < 0) {// 将套接字设置为被动模式:套接字从主动连接模式(用于客户端)转换为被动监听模式(用于服务器)。我们可以把这个 socket 想象成公司的前台小姐。// 5:是监听队列的最大长度,表示系统可以为该套接字排队的最大未完成连接数。当新的连接请求到达时,如果队列已满,新的连接请求将被拒绝。// 返回值:成功,返回 0。失败,返回 -1,并设置 errno 以指示错误原因。printf("listen finshed: %d\n", sockfd); return -1;}return sockfd;}

accept_cb 模块

首先给出的是,针对已经在 EPOLL 实例中注册的套接字,初始化其在全局变量 conn_list 对应位置上的内存配置。

//	针对已经注册的 clientfd	(当然也有可能注册不成功,要注意处理失败) 在事件总集的对应 fd 上的综合情况进行初始化
int event_register(int fd, int event) {if (fd < 0) return -1;								//	这里是用来应对 accept 函数调用失败的错误conn_list[fd].fd = fd;conn_list[fd].r_action.recv_callback = recv_cb;conn_list[fd].send_callback = send_cb;conn_list[fd].rbuffer = NULL;		//	重置读缓冲区conn_list[fd].rlength = 0;conn_list[fd].rcap = 0;conn_list[fd].wbuffer = NULL; // 确保初始化为NULLconn_list[fd].wcap = 0;conn_list[fd].wlength = 0;set_event(fd, event, 1);							//  标志 1 表示当前是对 fd 的事件注册而非修改
}

紧接着,当服务器的监听套接字监听到了来访 IP 时,分配出新的套接字以对接接下来对该 IP 的 I/O 任务。

// listenfd(sockfd) --> EPOLLIN --> accept_cb	根据情况使用回调函数————监控套接字 sockfd 的读事件是 accept 注册 clientfd
int accept_cb(int fd) {struct sockaddr_in  clientaddr;socklen_t len = sizeof(clientaddr);//	这两个变量是专门用来注册 clientfd 的int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);// accept 会因为无输入而阻塞(因为 sockfd 是默认的阻塞模式),本设计就是防止其阻塞(利用条件判断绕开阻塞)if (clientfd < 0) {printf("accept errno: %d --> %s\n", errno, strerror(errno));return -1;}// 动态分配读入内存的空间conn_list[clientfd].rbuffer = malloc(BUFFER_LENGTH); // 先行分配1024字节的内存,它是读取内容最终归宿if (conn_list[clientfd].rbuffer) {memset(conn_list[clientfd].rbuffer, 0, BUFFER_LENGTH); // 初始化为0}conn_list[clientfd].rlength = 0;conn_list[clientfd].rcap = BUFFER_LENGTH;  //  记录容量if (conn_list[clientfd].rbuffer == NULL) {perror("Malloc ReadBuffer Error");return -1;}//  使用 `EPOLLET` 必须配合非阻塞套接字,否则可能阻塞线程int flags = fcntl(clientfd, F_GETFL, 0);        // F_GETFL 是获取标志的命令fcntl(clientfd, F_SETFL, flags | O_NONBLOCK);   // 要注意 “|” 是按位或操作,能进行掩码叠加;F_SETFL 是设置文件状态标志,在原来的基础上增加非阻塞功能//  函数 fcntl() 作用:读取 clientfd 当前的所有文件状态标志//  返回值:包含位掩码的整数,表示当前所有设置的标志//          O_RDONLY:只读模式 (0);O_WRONLY:只写模式 (1);O_RDWR:读写模式 (2);//          O_NONBLOCK:非阻塞模式 (04000);O_APPEND:追加模式 (02000)event_register(clientfd, EPOLLIN | EPOLLET);  // | EPOLLET 是 clientfd 使用边沿触发模式//	这是用于说明情况的if ((clientfd % 1000) == 0) {		//	为了使得打印不过分密集,每一千个 I/O 就打印一次struct timeval current;gettimeofday(&current, NULL);	//	获取当前时间戳,定义在 <sys/time.h> 头文件中int time_used = TIME_SUB_MS(current, begin);	//	获取时间差memcpy(&begin, &current, sizeof(struct timeval));printf("accept finshed: %d, time_used: %d\n", clientfd, time_used);}return 0;
}

read_cb 模块

这个函数是有相当多细节的。首先,我们用于计数的变量 count 所用的数字类型是 ssize_t 类型的(在头文件 <sys/types.h> 中定义的),是一个ssize_t 是一个有符号的整数类型,在 64 位操作系统中,范围是 [-263,263-1],是一个相当大的数。这说明这个服务器是有被用户上传视频这种大型数据的能力的。

//	根据情况使用回调函数————普通套接字 clientfd 的读事件是 recv_cb  读取内存
//	在边沿模式下,该函数要循环执行
int recv_cb(int fd) {ssize_t count=0;char buffer[BUFFER_LENGTH] = {0}; 		//struct conn* c = &conn_list[fd];  // 使用局部变量简化代码if (conn_list[fd].rbuffer != NULL) {	//	连接复用,二次收信息的时候free(conn_list[fd].rbuffer);conn_list[fd].rbuffer = NULL; // 防止重复释放}conn_list[fd].rlength =0;conn_list[fd].rcap = BUFFER_LENGTH; while (1) {count = recv(fd, buffer, BUFFER_LENGTH, 0);	//	读取固定长度的内容,0 代表以阻塞模式读取数据// 因为 clientfd 不是阻塞模式, recv 不会因无输入而阻断,而是会立即返回 -1// 函数 recv: 读取固定字节的内容。// 返回值 > 0:表示成功接收了数据,返回值表示实际接收到的字节数。// 返回值 == 0:表示对端已经关闭了连接(TCP连接的正常关闭)。这是TCP协议的对端关闭连接的标志。// 返回值 == -1:表示无信息可读。并设置错误码为 EAGAIN 或 EWOULDBLOCK。这表示当前没有数据可读,但连接仍然有效。// 当调用 recv 时,会触发用户态到内核态的切换,内核负责从套接字接收缓冲区复制数据到用户提供的缓冲区,内核会更新接收缓冲区的状态(如移除已读取数据)if (count == 0) { 	// disconnectclose_connection(fd); // 连接断开必须要清理套接字资源,前面已经清理过一次了,避免在次释放printf("client disconnect: %d\n", fd);//	当客户端主动发来断开连接的请求时,return -1;} else if (count < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) {// 无数据可读,退出循环return 0;}else {printf("recv error: %s\n", strerror(errno));close_connection(fd);return -2;}} else {// 使用结构体中的长度和容量字段ssize_t new_length = conn_list[fd].rlength + count;// 确保缓冲区存在,这是下文能成功使用 memcpy 的原因if (!conn_list[fd].rbuffer) {conn_list[fd].rbuffer = malloc(BUFFER_LENGTH);if (!conn_list[fd].rbuffer) {perror("malloc failed");close_connection(fd);return -3;}conn_list[fd].rcap = BUFFER_LENGTH;conn_list[fd].rlength = 0;}// 动态扩容if (new_length > conn_list[fd].rcap) {ssize_t new_cap = conn_list[fd].rcap * 2;char *new_buf = realloc(conn_list[fd].rbuffer, new_cap);if (!new_buf) {perror("realloc failed");close_connection(fd);return -3;}conn_list[fd].rbuffer = new_buf;conn_list[fd].rcap = new_cap;}// 复制数据memcpy(conn_list[fd].rbuffer + conn_list[fd].rlength, buffer, count);	//	我在这个地方多次出错,memcpy 用的不好,我在此处爆栈了,conn_list[fd].rlength = new_length;// 安全打印(指定长度)// printf("[%zd]RECV: %.*s\n", count, (int)count, buffer);	//	这是特殊的占位符用法}memset(buffer,0,BUFFER_LENGTH); 	// 重置读取的缓冲区,他是固定长度的,用于接收 fd 里面的字节,每次只读取一点点}return 0;
}

这个函数先是对初始化 conn_list 的读缓冲区和读缓冲区的长度和容量进行初始化,而后采用边沿读取的方法,无限循化地读取,直至读完。边沿触发模式下,事件只在状态发生变化时通知一次,不会因为缓冲区中持续有数据而反复触发。这大大减少了epoll_wait的调用次数,降低了内核与用户态之间的上下文切换开销,从而显著提高了性能。边沿触发模式通常与非阻塞I/O搭配使用。在非阻塞模式下,程序会尽可能多地读取或写入数据,直到遇到EAGAINEWOULDBLOCK错误为止。这种模式下,边沿触发能够更好地发挥其优势。

  • 当用户远程断开连接时,会自动给服务器发送信息,触发 EPOLL 的读事件 event,并且使用 recv 函数后,返回值是 0
  • 当文件读取完毕后,由于 clientfd 被设置成非阻塞模式,recv 不会因无输入而阻断,而是会立即返回 -1,对应的错误是 EAGAINEWOULDBLOCK 。但这是正常现象。换句话说,在阻塞模式下,是不存在这个错误的。
  • 我们在读取 I/O 文件内容时,是使用了动态内存扩充的方法,这使得我们的服务器可以接受超大型的数据。

send_cb 模块

在处理网络输出(即 EPOLLOUT 事件)时,套接字 clientfd 的模式被函数set_event 调整为阻塞模式,可保证发送到内核缓冲区(即套接字对应的 I/O 文件上),但不保证完整传输到对端(受网络状况影响),因而还需要循环发送。我们还需设置超时处理,保证可以全部发送。

//	挂起等待,写事件就绪,处理 send 函数的 Bug
void wait_for_socket_writable(int sockfd) {// 需提前创建epoll实例并注册事件int epoll_fd_1 = epoll_create1(0);struct epoll_event ev;ev.events = EPOLLOUT;  // 监听可写ev.data.fd = sockfd;epoll_ctl(epoll_fd_1, EPOLL_CTL_ADD, sockfd, &ev);// 等待事件触发,无限阻塞epoll_wait(epoll_fd_1, &ev, 1, -1);close(epoll_fd_1); // 添加关闭
}//	确保数据全部发送
//	即使使用阻塞 sockfd 也必须循环发送!因为阻塞模式只保证发送到内核缓冲区,不保证完整传输到对端(受网络状况影响)。
ssize_t send_all(int sockfd, const void *buf, size_t len, int flags) {ssize_t total_sent = 0;      // 已发送字节数const char *ptr = (const char *)buf;  // 移动指针指向未发送数据//	即使使用阻塞 sockfd 也必须循环发送!因为阻塞模式只保证发送到内核缓冲区,不保证完整传输到对端(受网络状况影响)。while (total_sent < len) {// 尝试发送剩余数据ssize_t n = send(sockfd, ptr, len - total_sent, flags);if (n < 0) {// 错误处理(重点!)if (errno == EINTR) continue;   // 信号中断:重试if (errno == EAGAIN || errno == EWOULDBLOCK) {// 阻塞模式:等待可写(需配合 select/poll/epoll)wait_for_socket_writable(sockfd);continue;}return -1;  // 其他错误(如连接断开)} else if (n == 0) {printf("client disconnect: %d\n", sockfd);close_connection(sockfd);return total_sent; // 连接关闭(部分发送)}// 更新状态total_sent += n;ptr += n;  // 移动指针到未发送数据位置}return total_sent;  // 返回实际发送的字节数(应等于 len)
}

综合以上两个函数,我们还指示了业务处理位置。

//	根据情况使用回调函数————普通套接字 clientfd 的输出事件是 send_cb  发送内容
//	在边沿模式下,该函数要循环执行
int send_cb(int fd) {if (conn_list[fd].wbuffer != NULL) {	//	连接复用,二次发送信息的时候,写缓冲区的初始化free(conn_list[fd].wbuffer);	conn_list[fd].wbuffer = NULL; // 防止重复释放}conn_list[fd].wlength =0;///////////////////////////////////////		响应操作的业务端(开始)	 ////////////////////////////////////////////////////////////////////////////////		响应操作的业务端(结束)	 /////////////////////////////////////////ssize_t count = 0;if (conn_list[fd].wlength != 0) {count = send_all(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);}printf("SEND: %zd\n", count);return count;
}

服务器代码

结合前面的流程图,我们可以给出这个服务器的代码

int main() {unsigned short port = 2000;	// 	端口 portepfd = epoll_create(1);		//	epfd 已经被声明为一个全局变量int listen_fds[MAX_PORTS] = {0}; // 存储所有监听socketint i = 0;for (i = 0; i < MAX_PORTS; i++) {int sockfd = init_server(port + i);listen_fds[i] = sockfd;conn_list[sockfd].fd = sockfd;  //  使用fd作为索引conn_list[sockfd].r_action.accept_callback = accept_cb;set_event(sockfd, EPOLLIN, 1);}gettimeofday(&begin, NULL);		//	获取最初的时间戳,begin 是全局变量while (1) { // mainloop	服务器的根本struct epoll_event events[1024] = {0};int nready = epoll_wait(epfd, events, 1024, 5);	//	5 指代等待 5 毫秒,如果 I/O 响应满员的话,立即返回//	该通知函数是需要调用系统内核的,是需要花费成本的,如果只是通知某些内容没有读完而调用,是极其得不偿失的,这是我们需要用到边沿触发的原因,只是编程难度更大了//	我们要重视操作系统内核的调动,系统 I/O 并不全体现在代码之上,而代码要考虑操作系统内核可能出现的情况;编写代码要看到代码之外的东西int i = 0;for (i = 0;i < nready;i ++) {int connfd = events[i].data.fd;int is_listener = 0;	//	状态机-重置// 当连接异常断开时未清理资源,添加错误处理:if (events[i].events & EPOLLERR || events[i].events & EPOLLHUP) {	// 要注意 “|” 是按位或操作,能进行掩码叠加;“&” 是按位与操作//  EPOLLHUP 表示对应的文件描述符被挂断。EPOLLERR 表示对应的文件描述符发生错误。close_connection(connfd);continue;}// 检查是否为监听套接字for (int j = 0; j < MAX_PORTS; j++) {if (connfd == listen_fds[j]) {conn_list[connfd].r_action.accept_callback(connfd);is_listener = 1;break;}}if (is_listener) continue;	//	状态机-进入下一个环节// 处理读事件if (events[i].events & EPOLLIN) {// 这是监控到了除 sockfd 以外的套接字// ET 边沿模式需循环读取,原因是要保证网络 I/O 所有内容都被读取!if (conn_list[connfd].recv_callback(connfd) >= 0) { // 返回0表示成功printf("[%ld] RECV: %s\n",conn_list[connfd].rlength, conn_list[connfd].rbuffer);} else {//	连接在 recv 函数处早已释放continue; // 跳过后续处理}///////////////////////		读操作的业务端(start)     //////////////////////////	至此,客户端的请求报文全部写完了,写入了 rbuffer 之中。我们要利用这个内存去执行业务操作//	实现 HTTP 请求,在代码里面,该函数只是形式上存在,并不是重点//	http_request(&conn_list[fd]);//	WebSocket 协议的请求// 	ws_request(&conn_list[fd]);///////////////////////		读操作的业务端(start)     ////////////////////////set_event(connfd, EPOLLOUT, 0);} else if (events[i].events & EPOLLOUT) {//	这里必须注意 EPOLLOUT 不是边沿事件触发模式,我们通常只把响应报文的 header 写入 wbuffer 中,长度是固定的,因此水平出发即可。//	至于那大段大段的文件资源传输,则是通过文件描述符之间操作完成,跳过缓冲区读写// send 会因无输输出而阻断// 函数 send: 发送固定字节的内容。// 返回值 > 0:表示成功发送了数据,返回值表示实际发送的字节数。// 返回值 == -1:表示发送操作失败。错误原因可以通过 errno 获取conn_list[connfd].send_callback(connfd);//	如果是为了测试百万并发,则用这个,不要把 fd 关闭set_event(connfd, EPOLLIN | EPOLLET, 0);	// 这次输出发送事件结束了;改为 “边沿读写模式”,执行 epoll_ctl 函数,系统内核会直接把 fd 加载到 epoll 的就绪集之中}	else {printf("Unknown event on clientfd: %d, errno:%d\n", connfd, errno);close_connection(connfd);}}}}

我们注意到,对于模块函数 accept_cb 的使用,我们借助了 “状态机” 的思路,即代码中的 is_listener,让情况得以分类。

代码运行效果

代码编译

qiming@qiming:~/share/CTASK/TCP_test$ gcc -o reactor reactor.c

程序执行,一开始没链接的时候,程序并没有挂起,而是作无意义的 while 死循环,原因是代码中的 epoll_wait(epfd, events, 1024, 5) 并非阻塞运行,而是等待 5 毫秒后运行下一行代码。

qiming@qiming:~/share/CTASK/TCP_test$ ./reactor

NetAssist 远程连接
在这里插入图片描述
发送信息(我们并没发送什么东西,只是象征性的设置,读者可自行设置,输出)

qiming@qiming:~/share/CTASK/TCP_test$ ./reactor
[13244] RECV: The drawings convey the significance of self-discipline and hard-work. The youngster on the left concentrates on her assignments and shows no intention of putting them off. In contrast, the man on the right exhibits a desire for delaying his task and holds that he doesn't go cracking until the deadline.
As the thought-provoking pictures intend to mirror, the idle boy as mentioned above is not likely to finish his assignments in the end, any more than a man can attain great achievement if he is accustomed to putting tasks off. For one thing, the habits shape the turns of our attitudes and behaviors of how to deal with our plans and tasks. In the course of pursuing our goals, we may face a great deal of temp-
tations and adversities, and a good habit of self-discipline can direct us to make right decisions and refrain from indulging us with entertainment. For another, if we fail to be self-disciplined, we will be deprived of the opportunities for forging the brilliant traits, like mental maturity, fortitude, creativity and so on, which will render us considerable autonomy and lead us to be successful.
Personally, we can't underscore the significance of the good habits too much. It is advisable that we should concentrate on our daily tasks through strict time management and enjoy the course of striving.
The drawings convey the significance of self-discipline and hard-work. The youngster on the left concentrates on her assignments and shows no intention of putting them off. In contrast, the man on the right exhibits a desire for delaying his task and holds that he doesn't go cracking until the deadline.
As the thought-provoking pictures intend to mirror, the idle boy as mentioned above is not likely to finish his assignments in the end, any more than a man can attain great achievement if he is accustomed to putting tasks off. For one thing, the habits shape the turns of our attitudes and behaviors of how to deal with our plans and tasks. In the course of pursuing our goals, we may face a great deal of temp-
tations and adversities, and a good habit of self-discipline can direct us to make right decisions and refrain from indulging us with entertainment. For another, if we fail to be self-disciplined, we will be deprived of the opportunities for forging the brilliant traits, like mental maturity, fortitude, creativity and so on, which will render us considerable autonomy and lead us to be successful.
Personally, we can't underscore the significance of the good habits too much. It is advisable that we should concentrate on our daily tasks through strict time management and enjoy the course of striving.The drawings convey the significance of self-discipline and hard-work. The youngster on the left concentrates on her assignments and shows n

我发送了若干段考研英语一的大作文(同一篇文章多次复制粘贴),模拟客户端发送的内容非常多,多到超过了一开始设置的读缓冲区容量 BUFFER_LENGTH,即 1 KB 的内容。我们注意到

[13244] RECV: The drawings ... 略

共发送了 1 万多个字节的内容,说明我们的动态内存扩充的代码是有效的。

而且接受完信息后,还可以发送信息,说明事件转化的设计也是有效的(在输出的最末端,我们可在命令行处注意到)。

SEND: 0

这样一来,我们这个服务器是可以完成一个准网络 I/O 事务的。完整的网络 I/O 事务是

客户端服务器第一步,网络请求第二步,网络响应第三步,客户端处理响应客户端服务器

总结

本篇文章,对上一篇 文章 的 EPOLL 服务器作出模块化的升级,让代码更有组织度的同时,还有以下的特性

  1. 定义了 conn 类型,它能记录对应网络 I/O 文件的输入输出内容,而且还记录了对应套接字的输入和输出行为。类似于 “多态” 的概念,同类型的两个变量内同一成员名字有不同含义。让命名变得简单了许多。
  2. 对任何套接字(无论是否为监听套接字)设置了根据事件信号而触发的网络 I/O 行为。这就是 “Reactor” 这个名字的由来。
  3. 对于 recv_cb 函数设计了可动态扩充读缓冲区的功能,能够接收超大型数据。
  4. 函数 set_event 能够有效的进行事件转化,使之同一个套接字的事件能够在 EPOLLIN | EPOLLETEPOLLOUT 之间来回转换,刚好适应了网络 I/O 的事务模式。

在下一篇文章里,我们针对这个事件驱动 reactor 式的服务器(还是这份代码)测试其百万并发的能力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.tpcf.cn/web/87439.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何做好云服务器密码管理

一、设置强密码 强密码就像是给云服务器上了一把“超级锁”。专家建议&#xff0c;一个强密码应该包含大写字母、小写字母、数字和特殊字符&#xff0c;长度至少在 12 位以上。比如说&#xff0c;“Abc12345678”就比简单的“123456”要安全得多。有数据显示&#xff0c;简单密…

《新消费模式与消费者权益保护研讨会》课题研讨会在北京顺利召开

近期&#xff0c;《新消费模式与消费者权益保护研讨会》课题研讨会在北京召开。来自市场监管、政法、宏观管理等部门专家参会&#xff0c;聚焦《消费者权益保护法》《关于以新业态新模式引领新型消费加快发展的意见》等文件精神&#xff0c;探讨激发市场主体活力、促进新型消费…

Gradio全解13——MCP协议详解(6)——MCP服务器构建、测试与示例大全

Gradio全解13——MCP协议详解&#xff08;6&#xff09;——MCP服务器构建、测试与示例大全第13章 MCP协议详解13.6 MCP服务器构建、测试与示例大全13.6.1 开发MCP天气服务器1. 天气服务器概述2. 安装Node.js并设置环境3. 构建服务器13.6.2 安装Claude for Desktop1. 安装Claud…

Windows 11 24H2 专业版/家庭版安装教程(2025年6月更新版)- U盘启动盘制作+详细步骤

准备U盘启动盘​ 下载个叫「Rufus」的免费小工具&#xff08;百度搜就行&#xff09;。插入一个至少8GB的空U盘&#xff08;U盘会被清空&#xff0c;提前备份资料&#xff01;&#xff09;。打开Rufus&#xff0c;选你的U盘&#xff0c;ISO文件选你下载的那个 zh-cn_windows_1…

mac电脑wireshark快速实现http接口抓包

wireshark介绍 Wireshark 是一款功能强大的网络协议分析工具&#xff0c;可以用来抓取网络中的数据包&#xff0c;包括 HTTP 请求和响应。 wireshark安装 安装下载官网 https://www.wireshark.org/download.html&#xff0c;根据个人电脑环境下载安装wireshark使用 1配置网卡2选…

Softhub软件下载站实战开发(十二):软件管理编辑页面实现

文章目录 Softhub软件下载站实战开发&#xff08;十二&#xff09;&#xff1a;软件管理编辑页面实现✨功能概述 &#x1f4cb;编辑页面实现 &#x1f6e0;️1. 页面结构设计2. aieEditor集成 &#x1f31f;初始化配置编辑器功能 3. 大整数处理 &#x1f522;4. 封面图片上传 &…

微服务外联Feign调用:第三方API调用的负载均衡与容灾实战

01Feign 简介 Feign 是 Spring Cloud Netflix 中的 声明式 HTTP 客户端&#xff0c;它如同一位贴心的信使&#xff0c;帮我们化繁为简&#xff0c;让服务间的调用变得轻松又高效。 Feign 的核心优势在于&#xff1a;。 • 声明式调用&#xff1a;开发者只需定义接口和注解&a…

k8s pod调度基础

目录 一&#xff1a;replication controller和replicaset 1&#xff1a;replication controller replication controller的使用示例。 2&#xff1a;标签与标签选择器 &#xff08;1&#xff09;标签 &#xff08;2&#xff09;标签选择器 &#xff08;3&#xff09;标签…

学习者的Python项目灵感

一、实用工具类 - 文件批量重命名工具 用 os 模块实现按规则&#xff08;如添加日期、序号、替换关键词&#xff09;批量重命名文件&#xff0c;适合处理大量图片/文档。 - 简易待办事项管理器&#xff08;To-Do List&#xff09; 用 tkinter 或 PyQt 做GUI界面&#xff0c;…

gRPC服务发现

基于 etcd 实现的服务发现&#xff0c;按照非规范化的 etcd key 实现&#xff0c;详细见代码注释。 package discoveryimport ("context""encoding/json""fmt""go.etcd.io/etcd/api/v3/mvccpb"clientv3 "go.etcd.io/etcd/client/…

基于Linux的Spark本地模式环境搭建实验指南

一、实验目的 掌握Spark本地模式的安装与配置方法验证Spark本地环境是否搭建成功了解Spark基本操作和运行原理 二、实验环境准备 操作系统&#xff1a;Linux&#xff08;推荐ubuntu&#xff09;Java环境&#xff1a;JDK 1.8或以上版本内存&#xff1a;至少4GB&#xff08;推…

数学建模_时间序列

什么是时间序列时间序列预测方法/模型条件&#xff1a;非白噪音平稳平稳性评估不平稳变成平稳然后用ARIMA模型确定p,qAR模型(ARMA特例)MA模型(ARMA特例)ARMA模型(普适)灰色模型神经网络/LSTM组合预测模型向量数据预测结果和为1的情况什么是时间序列 省略具体图形例子 时间序列…

linux用rpm包升级sudo包为sudo-1.9.17-2版本

rpm下载地址&#xff1a; https://www.sudo.ws/dist/packages/1.9.17p1/ 备注&#xff1a;其他压缩包下载地址&#xff1a;https://www.sudo.ws/download.html sudo-1.9.17-2.el7.x86_64.rpm 检查一下&#xff0c;本地sudo版本&#xff0c;执行&#xff1a;sudo -V 或者sudo -…

【开源项目】一款真正可修改视频MD5工具视频质量不损失

文章目录 视频MD5修改工具 🎬📋 目录✨ 功能特点💻 系统要求🏗️ 设计架构🔬 技术原理💻 核心代码1. 视频MD5修改核心逻辑2. 前端异步处理代码3. 错误处理与日志记录📥 安装方法方法一:直接下载方法二:使用本地服务器📚 使用教程基本使用步骤高级使用技巧📁…

Day05: Python 中的并发和并行(1)

理解 Python 中的线程和进程 理解线程和进程是实现在 Python 中并发和并行的基础。这种知识使你能够编写能够看似同时执行多个任务的程序&#xff0c;从而提高性能和响应能力。本课程将深入探讨线程和进程的核心概念、它们的区别&#xff0c;以及它们如何为更高级的并发技术奠…

Spring Boot 集成 MinIO 实现分布式文件存储与管理

Spring Boot 集成 MinIO 实现分布式文件存储与管理 一、MinIO 简介 MinIO 是一个高性能的分布式对象存储服务器&#xff0c;兼容 Amazon S3 API。它具有以下特点&#xff1a; 轻量级且易于部署高性能&#xff08;读写速度可达每秒数GB&#xff09;支持数据加密和访问控制提供…

从小白入门,基于Cursor开发一个前端小程序之Cursor 编程实践与案例分析

Cursor 编程实践与案例分析 Cursor 编程实践与案例分析 1. 什么是 Cursor&#xff1f; Cursor 是一款面向开发者的 AI 编程助手&#xff0c;集成于本地 IDE&#xff0c;支持自然语言与代码的无缝协作。它不仅能自动补全、重构、查找代码&#xff0c;还能理解业务上下文&#…

一、如何用MATLAB画一个三角形 代码

一、如何用MATLAB画一个三角形 代码在MATLAB中绘制三角形可以通过指定三个顶点的坐标并使用 fill 或 patch 函数实现。以下是详细代码示例&#xff1a;方法1&#xff1a;使用 fill 函数&#xff08;简单填充&#xff09;% 定义三角形的三个顶点坐标 (x, y) x [0, 1, 0.5]; % …

Postman自动化测试提取相应body体中的参数

文章目录Postman自动化测试提取相应body体中的参数1. 示例响应 Body 参数2. 提取响应 Body 参数Postman自动化测试提取相应body体中的参数 上一篇的文中介绍了使用postman自动化测试时从响应的header中提取token参数&#xff0c;很多同学私信问如何从响应体body中提取参数。 有…

vue-39(为复杂 Vue 组件编写单元测试)

实际练习:为复杂 Vue 组件编写单元测试 单元测试对于确保复杂 Vue 组件的可靠性和可维护性至关重要。通过隔离和测试代码的各个单元,您可以在开发过程的早期发现并修复错误,从而构建更健壮和可预测的应用程序。本课程重点介绍为复杂 Vue 组件编写单元测试的实用方面,建立在…