IO多路复用

IO多路复用:由内核检测多个文件描述符的状态,来解决传统的阻塞式的IO,可以提高CPU的利用率。另外,相比于多线程模型,每个连接都要一个独立的线程,所需的系统开销过大,而IO多路复用只有在连接准备好进行读写操作时,才进行处理。

select

函数原型

1
2
3
4
5
6
7
#include <sys/select.h>
struct timeval {
time_t tv_sec; /* seconds */
suseconds_t tv_usec; /* microseconds */
};
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval * timeout);
  • nfds:委托内核检测的这三个集合中最大的文件描述符 + 1
  • readfds:需要检测缓冲区的文件描述符的集合
  • writefds:需要检测缓冲区的文件描述符的集合
  • exceptfds: 需要检测异常缓冲区的文件描述符的集合
  • timeout:超时时长

常用操作函数:

1
2
3
4
5
6
7
8
// 将文件描述符fd从set集合中删除 == 将fd对应的标志位设置为0        
void FD_CLR(int fd, fd_set *set);
// 判断文件描述符fd是否在set集合中 == 读一下fd对应的标志位到底是0还是1
int FD_ISSET(int fd, fd_set *set);
// 将文件描述符fd添加到set集合中 == 将fd对应的标志位设置为1
void FD_SET(int fd, fd_set *set);
// 将set集合中, 所有文件文件描述符对应的标志位设置为0, 集合中没有添加任何文件描述符
void FD_ZERO(fd_set *set);

select的局限性:

  • 频繁的数据复制:因为用户态和内核态之间存在严格的隔离,调用 select 函数会进行数据的拷贝(将文件描述符集合拷贝到内核空间,再将修改后的集合复制回用户空间),这种频繁的数据复制操作会带来额外的系统开销
  • 线性扫描:待检测文件描述符集合采取的是线性扫描的方式进行检查。
  • 默认最大文件描述符数限制select 可以检测的最大文件描述符数量通常是有限制的,默认情况下这个值是 1024(由宏 FD_SETSIZE 定义)。

通信代码

  • 服务器端代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <string.h>
    #include <arpa/inet.h>

    int main()
    {
    // 1. 创建监听的fd
    int lfd = socket(AF_INET, SOCK_STREAM, 0);

    // 2. 绑定
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(9999);
    addr.sin_addr.s_addr = INADDR_ANY;
    bind(lfd, (struct sockaddr*)&addr, sizeof(addr));

    // 3. 设置监听
    listen(lfd, 128);

    // 将监听的fd的状态检测委托给内核检测
    int maxfd = lfd;
    // 初始化检测的读集合
    fd_set rdset;
    fd_set rdtemp;
    // 清零
    FD_ZERO(&rdset);
    // 将监听的lfd设置到检测的读集合中
    FD_SET(lfd, &rdset);
    // 通过select委托内核检测读集合中的文件描述符状态, 检测read缓冲区有没有数据
    // 如果有数据, select解除阻塞返回
    // 应该让内核持续检测
    while(1)
    {
    // 默认阻塞
    // rdset 中是委托内核检测的所有的文件描述符
    rdtemp = rdset;
    int num = select(maxfd+1, &rdtemp, NULL, NULL, NULL);
    // rdset中的数据被内核改写了, 只保留了发生变化的文件描述的标志位上的1, 没变化的改为0
    // 只要rdset中的fd对应的标志位为1 -> 缓冲区有数据了
    // 判断
    // 有没有新连接
    if(FD_ISSET(lfd, &rdtemp))
    {
    // 接受连接请求, 这个调用不阻塞
    struct sockaddr_in cliaddr;
    int cliLen = sizeof(cliaddr);
    int cfd = accept(lfd, (struct sockaddr*)&cliaddr, &cliLen);
    //accept会返回用于通信的文件描述符
    // 得到了有效的文件描述符
    // 通信的文件描述符添加到读集合
    // 在下一轮select检测的时候, 就能得到缓冲区的状态
    FD_SET(cfd, &rdset);
    // 重置最大的文件描述符
    maxfd = cfd > maxfd ? cfd : maxfd;
    }

    // 没有新连接, 通信
    for(int i=0; i<maxfd+1; ++i)
    {
    // 判断从监听的文件描述符之后到maxfd这个范围内的文件描述符是否读缓冲区有数据
    if(i != lfd && FD_ISSET(i, &rdtemp))
    {
    // 接收数据
    char buf[10] = {0};
    // 一次只能接收10个字节, 客户端一次发送100个字节
    // 一次是接收不完的, 文件描述符对应的读缓冲区中还有数据
    // 下一轮select检测的时候, 内核还会标记这个文件描述符缓冲区有数据 -> 再读一次
    // 循环会一直持续, 知道缓冲区数据被读完位置
    int len = read(i, buf, sizeof(buf));
    if(len == 0)
    {
    printf("客户端关闭了连接...\n");
    // 将检测的文件描述符从读集合中删除
    FD_CLR(i, &rdset);
    close(i);
    }
    else if(len > 0)
    {
    // 收到了数据
    // 发送数据
    write(i, buf, strlen(buf)+1);
    }
    else
    {
    // 异常
    perror("read");
    }
    }
    }
    }

    return 0;
    }
  • 客户端代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <string.h>
    #include <arpa/inet.h>

    int main()
    {
    // 1. 创建用于通信的套接字
    int fd = socket(AF_INET, SOCK_STREAM, 0);
    if(fd == -1)
    {
    perror("socket");
    exit(0);
    }
    // 2. 连接服务器
    struct sockaddr_in addr;
    addr.sin_family = AF_INET; // ipv4
    addr.sin_port = htons(9999); // 服务器监听的端口, 字节序应该是网络字节序
    inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr.s_addr);
    int ret = connect(fd, (struct sockaddr*)&addr, sizeof(addr));
    if(ret == -1)
    {
    perror("connect");
    exit(0);
    }
    // 通信
    while(1)
    {
    // 读数据
    char recvBuf[1024];
    // 写数据
    // sprintf(recvBuf, "data: %d\n", i++);
    fgets(recvBuf, sizeof(recvBuf), stdin);
    write(fd, recvBuf, strlen(recvBuf)+1);
    // 如果客户端没有发送数据, 默认阻塞
    read(fd, recvBuf, sizeof(recvBuf));
    printf("recv buf: %s\n", recvBuf);
    sleep(1);
    }
    // 释放资源
    close(fd);
    return 0;
    }

epoll

对于待检测集合selectpoll是基于线性方式处理的,epoll是基于红黑树来管理待检测集合的。

epoll实例就是一棵树,每个节点是epoll_event的结构体,结构体里有events 字段描述文件描述符的事件类型,还有data字段是一个联合体类型,通常存储对应的文件描述符

操作函数

操作函数:

1
2
3
4
5
6
7
#include <sys/epoll.h>
// 创建epoll实例,通过一棵红黑树管理待检测集合
int epoll_create(int size);
// 管理红黑树上的文件描述符(添加、修改、删除)
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// 检测epoll树中是否有就绪的文件描述符
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

struct epoll_event:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 联合体, 多个变量共用同一块内存        
typedef union epoll_data {
void *ptr;
int fd; // 通常情况下使用这个成员, 和epoll_ctl的第三个参数相同即可
uint32_t u32;
uint64_t u64;
} epoll_data_t;

struct epoll_event {
uint32_t events; /* Epoll events */
//通常使用data.fd,将其设置为当前event的文件描述符
epoll_data_t data; /* User data variable */
};

通信代码(服务端)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#include <stdio.h>
#include <ctype.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>

// server
int main(int argc, const char* argv[])
{
// 创建监听的套接字
int lfd = socket(AF_INET, SOCK_STREAM, 0);
if(lfd == -1)
{
perror("socket error");
exit(1);
}

// 绑定
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(9999);
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 本地多有的IP

// 设置端口复用
int opt = 1;
setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

// 绑定端口
int ret = bind(lfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
if(ret == -1)
{
perror("bind error");
exit(1);
}
// 监听
ret = listen(lfd, 64);
if(ret == -1)
{
perror("listen error");
exit(1);
}
// 现在只有监听的文件描述符
// 所有的文件描述符对应读写缓冲区状态都是委托内核进行检测的epoll
// 创建一个epoll模型
int epfd = epoll_create(100);
if(epfd == -1)
{
perror("epoll_create");
exit(0);
}
// 往epoll实例中添加需要检测的节点, 现在只有监听的文件描述符
struct epoll_event ev;
ev.events = EPOLLIN; // 检测lfd读读缓冲区是否有数据
ev.data.fd = lfd;
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev);
if(ret == -1)
{
perror("epoll_ctl");
exit(0);
}
struct epoll_event evs[1024];
int size = sizeof(evs) / sizeof(struct epoll_event);
// 持续检测
while(1)
{
// 调用一次, 检测一次
int num = epoll_wait(epfd, evs, size, -1);
for(int i=0; i<num; ++i)
{
// 取出当前的文件描述符
int curfd = evs[i].data.fd;
// 判断这个文件描述符是不是用于监听的
if(curfd == lfd)
{
// 建立新的连接
int cfd = accept(curfd, NULL, NULL);
// 新得到的文件描述符添加到epoll模型中, 下一轮循环的时候就可以被检测了
ev.events = EPOLLIN; // 读缓冲区是否有数据
ev.data.fd = cfd;
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev);
if(ret == -1)
{
perror("epoll_ctl-accept");
exit(0);
}
}
else
{
// 处理通信的文件描述符
// 接收数据
char buf[1024];
memset(buf, 0, sizeof(buf));
int len = recv(curfd, buf, sizeof(buf), 0);
if(len == 0)
{
printf("客户端已经断开了连接\n");
// 将这个文件描述符从epoll模型中删除
epoll_ctl(epfd, EPOLL_CTL_DEL, curfd, NULL);
close(curfd);
}
else if(len > 0)
{
printf("客户端say: %s\n", buf);
send(curfd, buf, len, 0);
}
else
{
perror("recv");
exit(0);
}
}
}
}
return 0;
}

epoll的工作模式

水平触发模式(LT)

  • 持续性通知:只要文件描述符处于就绪状态(例如,有数据可以读取),就会持续通知应用。这意味着即使你不处理完所有可用的数据,下一次调用epoll_wait时,如果该文件描述符仍然准备好进行操作,它将继续被包含在返回的事件列表中。
  • 易于使用:因为其持续通知特性,编程相对简单,不容易错过事件。
  • 效率:由于可能会多次通知同一个就绪状态,这在高并发场景下可能导致一定的性能损耗。

边沿触发模式(ET)

  • 一次性通知仅当文件描述符的状态发生变化(例如,从不可读变为可读)时发出通知。这意味着如果你没有完全读取或写入所有数据,在下一次调用epoll_wait时可能不会再收到关于这个文件描述符的通知,除非它的状态再次改变。
  • 高效性:减少重复通知的数量,提高效率,特别适用于高负载的网络服务器等场景。
  • 复杂性增加:需要更细致地管理I/O操作,通常要求使用非阻塞I/O,并且需要确保每次触发时尽可能多地处理数据,以避免丢失事件。

ET模式的设置: 水平触发模式(LT)是默认的模式

1
2
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET; // 设置边沿模式

边沿触发模式需要设置为阻塞

边沿触发模式需要一次性操作缓冲区的全部数据,所以需要使用循环的结构操作数据,那么要将文件描述符设置为非阻塞,过程使用fcntl()进行处理:

1
2
3
4
// 设置完成之后, 读写都变成了非阻塞模式
int flag = fcntl(cfd, F_GETFL);
flag |= O_NONBLOCK;
fcntl(cfd, F_SETFL, flag);

注:当缓冲区数据被读完了,调用的read()/recv()函数还会继续从缓冲区中读数据,此时函数调用就失败了,返回-1,对应的全局变量 errno 值为EAGAIN或者 EWOULDBLOCK

边沿触发模式的通信代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#include <stdio.h>
#include <ctype.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>

// server
int main(int argc, const char* argv[])
{
// 创建监听的套接字
int lfd = socket(AF_INET, SOCK_STREAM, 0);
if(lfd == -1)
{
perror("socket error");
exit(1);
}

// 绑定
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(9999);
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 本地多有的IP
// 127.0.0.1
// inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr.s_addr);

// 设置端口复用
int opt = 1;
setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

// 绑定端口
int ret = bind(lfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
if(ret == -1)
{
perror("bind error");
exit(1);
}

// 监听
ret = listen(lfd, 64);//将 Socket 设置为监听状态
if(ret == -1)
{
perror("listen error");
exit(1);
}

// 现在只有监听的文件描述符
// 所有的文件描述符对应读写缓冲区状态都是委托内核进行检测的epoll
// 创建一个epoll模型
int epfd = epoll_create(100);
if(epfd == -1)
{
perror("epoll_create");
exit(0);
}

// 往epoll实例中添加需要检测的节点, 现在只有监听的文件描述符
struct epoll_event ev;
ev.events = EPOLLIN; // 检测lfd读读缓冲区是否有数据
ev.data.fd = lfd;
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev);
if(ret == -1)
{
perror("epoll_ctl");
exit(0);
}
struct epoll_event evs[1024];
int size = sizeof(evs) / sizeof(struct epoll_event);
// 持续检测
while(1)
{
// 调用一次, 检测一次
int num = epoll_wait(epfd, evs, size, -1);
printf("==== num: %d\n", num);

for(int i=0; i<num; ++i)
{
// 取出当前的文件描述符
int curfd = evs[i].data.fd;
// 判断这个文件描述符是不是用于监听的
if(curfd == lfd)
{
// 建立新的连接
int cfd = accept(curfd, NULL, NULL);
// 将文件描述符设置为非阻塞
// 得到文件描述符的属性
int flag = fcntl(cfd, F_GETFL);
flag |= O_NONBLOCK;
fcntl(cfd, F_SETFL, flag);
// 新得到的文件描述符添加到epoll模型中, 下一轮循环的时候就可以被检测了
// 通信的文件描述符检测读缓冲区数据的时候设置为边沿模式
ev.events = EPOLLIN | EPOLLET; // 读缓冲区是否有数据
ev.data.fd = cfd;
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev);
if(ret == -1)
{
perror("epoll_ctl-accept");
exit(0);
}
}
else
{
// 处理通信的文件描述符
// 接收数据
char buf[5];
memset(buf, 0, sizeof(buf));
// 循环读数据
while(1)
{
int len = recv(curfd, buf, sizeof(buf), 0);
if(len == 0)
{
// 非阻塞模式下和阻塞模式是一样的 => 判断对方是否断开连接
printf("客户端断开了连接...\n");
// 将这个文件描述符从epoll模型中删除
epoll_ctl(epfd, EPOLL_CTL_DEL, curfd, NULL);
close(curfd);
break;
}
else if(len > 0)
{
// 通信
// 接收的数据打印到终端
write(STDOUT_FILENO, buf, len);
// 发送数据
send(curfd, buf, len, 0);
}
else
{
// len == -1
if(errno == EAGAIN)
{
printf("数据读完了...\n");
break;
}
else
{
perror("recv");
exit(0);
}
}
}
}
}
}

return 0;
}

问答

1.为什么epoll的边沿模式要使用fcntl将文件描述符设置为非阻塞?