进程池和文件下载

结合学过的文件操作、网络通信、以及进程和线程的知识, 实现一个基本的文件下载服务器模型。尽管线程池才是最常用的,但先从进程池谈起,实际上实现一个池,其余也没太大区别。

进程池_模型.png

第一版

当前第一版还只是解决进程间共享文件对象的问题, 并且实现了服务器的主进程接收客户端连接请求, 并把客户端连接交给进程池中进程具体和客户端进行交互的功能。

代码地址:进程池第一版

主进程用以监听连接请求,建立连接得到的后续用于和客户端通信的 net_fd ,从进程池中选择一个空闲的进程来给 net_fd 发送一个消息,那就需要把 net_fd 给到子进程。

你以为轻松就能传递过去了?进程与进程之间是相互隔离,各自的文件描述符数组是不能被其他进程访问。你传递过去也不过是一个下标,但是你把这个下标拿给别的进程的文件描述符数组访问,得到的又不是一个东西。

两个进程共享一个文件对象

一般我们使用 socketpair 的目的:为了在两个进程之间, 传输一个文件对象的描述信息(可以让两个进程共享一个文件对象), 而不是单单只传输一个文件描述符数组的下标。

1
2
3
4
5
6
7
8
9
10
11
#include <sys/types.h> 
#include <sys/socket.h>
// create a pair of connected sockets
int socketpair(
int domain, // 指定socket使用的协议族, 本地通信我们使用: AF_LOCAL
int type, // 指定socket的类型: SOCK_STREAM(TCP), SOCK_DGRAM(UDP)
int protocol, // 指定协议, 默认设置0即可
int sv[2] // 用于返回两个连接的socket描述符(等价与socket的fd), 父子进程可以通过这个文件描述符进行通信(任何发送到sv[0]的数据都可以从sv[1]读取,反之任何发送到sv[1]的数据都可以从sv[0]读取。)
);

// 返回值: 成功返回0, 失败返回-1

socketpair 函数用于创建一对互相连接的全双工通信 socket。相比较普通的用于网络间不同主机通信的 socket; socketpair 函数创建的socket主要用于在同一台机器上进程间通信(我们可以称其为本地 socket )。

如果你想通过 socketpair 函数实现两个本地进程间的文件对象描述符的传输,除了需要 socketpair 函数创建通信的端点, 还需要借助 sendmsg 函数和 recvmsg 函数来实现具体的数据传输。但是具体的实现代码不用去记,更不用去写,直接从提供的代码地址中的 LocalSocekt.c 文件中获取封装好的函数即可。

1
2
3
4
// 工作线程用来读取main线程accept的客户端连接对象
// 参数一: 工进程程用来和main进程通信的特制本地socket
// 参数二: 用来存储从本地socket中读到的客户端连接对象的文件描述符
int read_net_fd(int socket_local_fd, int *net_fd)
1
2
3
4
// main进程accept获得的客户端连接对象发送给工作进程
// 参数一: main进程用来和工作进程通信的本地socket
// 参数二: main进程accept获得到的客户端连接文件对象的文件描述符
int write_net_fd(int socket_local_fd, int* net_fd)

write_net_fd 传递 套接字 net_fd,read_net_fd 接受 套接字 net_fd。

核心代码解读

socketpair 第四个参数是一个数组,即两个本地 socket_fd 组成的数组。我们也知道 socketpair 是全双公通信,那在代码中只使用一条通道就可以,故而进程把用不到的关闭即可。父进程传输,子进程接受的这样一条单通道。核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int initProcessPool(Worker_Status* subProcess,int size){
for (int i = 0; i < size; ++i) {
int socket_fd[2];
socketpair(AF_LOCAL,SOCK_STREAM,0,socket_fd);

int pid = fork();

if (pid == 0){
close(socket_fd[1]); // 关闭用不到的
startWorker(socket_fd[0]);
}else{
close(socket_fd[0]); // 关闭用不到的
subProcess[i].pid = pid;
subProcess[i].status = FREE;
subProcess[i].socket_local_fd = socket_fd[1];
}
}

return 0;
}

下面这份代码把 子进程存储的 本地套接字加入监听,是为了让 子进程完成任务之后,让主进程收到消息(消息内容随便写,主要是唤醒作用)之后把子进程的状态改为 FREE。

1
2
3
for (int i = 0; i < PROCESS_SIZE; ++i) {
epoll_add(epfd, subProcess[i].socket_local_fd);
}
1
2
pid_t pid = getpid();
send(socket_local_fd,&pid,sizeof(pid),0);
1
2
3
4
5
for (int j = 0; j < PROCESS_SIZE; ++j) {
if (subProcess[j].socket_local_fd == tmp_fd) {
subProcess[i].status = FREE;
}
}

第二版

在上一版本的基础上, 如果客户端的请求是想获得一份在服务端的文件,我们该如何做到呢?

我们需要多次和客户端交流,交换文件的相关信息,但由于 TCP 是流协议,没有边界,就不得不考虑粘包和半包问题。解决方案如下:

粘包:自定义协议

半包:等待满足指定文件长度才停止接受

自定义协议已在其他文章讨论过,而半包问题此前的 Asio 提供这样一种接口,就等待指定长度的消息才解除阻塞。Linux 网络编程一样提供。

1
2
3
4
5
6
7
8
#include <sys/types.h>
#include <sys/socket.h>
ssize_t recv(
int sockfd,
void *buf,
size_t len,
int flags // 定接收行为的标志位:MSG_WAITALL(等待所有请求的数据才返回),大多数情况下,flags设置为0。
);

还有关于 send 接口要注意的一个地方,即发送大文件的时候, 客户端有可能在发送的时候提前终止, 这会导致发送端/写端(send)因为抛出SIGPIPE导致进程终止。我们肯定不喜欢我们的进程受到影响。

1
2
3
4
5
6
7
8
9
#include <sys/types.h>
#include <sys/socket.h>
ssize_t send(
int sockfd,
const void *buf,
size_t len,
int flags// 额外选项:MSG_NOSIGNAL防止发送时由于连接断开而引发的SIGPIPE信号,大多数情况下,flags参数设置为0。
);

代码地址:简单的文件传输

小文件下载.png

此份代码却有不足,下面会继续迭代,但是对于基本的文件传输已无大碍。由于服务端这边只是读取一次文件且不是重复读取,再者读取的缓冲区大小设置 4096,超过此范围就会丢失。因此,核心的代码解读将放在下面来谈。

下面我们要迭代的将是支持大文件的传输,将存在如下三种版本。

底层原理推荐链接:零拷贝

send/read 通过循环读取并跟踪偏移量实现大文件传输

代码地址:[send/read 通过循环读取并跟踪偏移量实现大文件传输](send/read 通过循环读取并跟踪偏移量实现大文件传输)

代码解读和优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int sendFile(int net_fd,int open_fd,off_t file_size){
while (file_size > 0){
char buf[BUFFER_SIZE] = {0};
ssize_t rn = read(open_fd,buf,sizeof(buf));
if (rn < 0) {
error(1, errno, "read");
close(open_fd);
return -1;
}
ssize_t sn = send(net_fd,buf,rn,MSG_NOSIGNAL);
if (sn < 0) {
error(1, errno, "send");
close(open_fd);
return -1;
}
file_size -= sn;
}
return 1;
}

每次发送的字节大小为 BUFFER_SIZE,循环发送,直到总的需要传送到文件大小 file_size 不大于 0。

但是这份代码还有待完善的地方,即 rn 是实际读取的长度,sn 是实际发送的长度,可能存在 sn < rn 的情况(套接字的发送缓冲区可能已满,网络的带宽或状态等因素可能限制一次发送的字节数),导致有部分文件丢失。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
int sendFile(int net_fd, int open_fd, off_t file_size) {
while (file_size > 0) {
char buf[BUFFER_SIZE] = {0};
ssize_t rn = read(open_fd, buf, sizeof(buf));
if (rn < 0) {
error(1, errno, "read");
close(open_fd);
return -1;
}
if (rn == 0) {
break;
}

ssize_t total_sent = 0;
while (total_sent < rn) {
ssize_t sn = send(net_fd, buf + total_sent, rn - total_sent, MSG_NOSIGNAL);
if (sn < 0) {
error(1, errno, "send");
close(open_fd);
return -1;
}
total_sent += sn;
}

file_size -= total_sent;
}
return 1;
}

还有一个地方需要提及,那就是告知客户端文件大小函数的地方,核心代码如下:

1
2
3
char send_buf[MESSAGE_WAIT] = {0};
memcpy(send_buf,&file_size, sizeof(send_buf));
ssize_t send_len = send(net_fd,send_buf,sizeof(send_buf),MSG_NOSIGNAL);

MESSAGE_WAIT 为 4字节,而客户端那边是 有符号整型存储这个数据,就算你是 无符号整型来存储,你也顶多告知对方文件大小为4GB,意味着我们只能传输 4GB,这是一种限制。

可以考虑更改为 8 字节,这样传输的文件大小可以在 0~16EB范围,完全不用担心受到限制。源代码不用动,修改 MESSAGE_WAIT 大小即可。

1
#define MESSAGE_WAIT sizeof(off_t)

经过测试,成功传输 3GB 大文件。

read和send大文件下载.png

如何生成大文件?

1
2
dd if=/dev/urandom of=large_random_file bs=1G count=3 status=progress

重点关注如下命令,根据需要更改:

**bs=1G**:

  • bs 表示块大小(block size),设置为 1GB。dd 每次将读取和写入 1GB 的数据块。

**count=3**:

  • count 指定要复制的块数。在这个命令中,设置为 3,这意味着总共会生成 3GB 的随机数据(1GB * 3)。

mmap通过内存映射传输文件

代码地址:mmap 传输小文件

服务器和客户端都映射,但是有几个注意点:

  1. open 文件的时候需要 O_RDWR,避免 mmap 权限不足
  2. mmap 需要用 (char *) 强转
  3. 客户端拿到 文件具体大小之后,需要用 ftruncate 方法提前把文件大小 设置为相应的大小,以便接受文件
  4. 最后,记得 munmap 取消映射

当我们强调客户端和服务端都要映射的时候,并不是说服务器这边采用 mmap ,客户端也要用 mmap,反之亦然(包括下面要讲的 sendfile)。mmap 只是一种相较于 read 少一次拷贝,这是一种策略的选择。

真的要传输大文件,你还是需要循环的发送分割的文件,而不是发送整个大文件,否则超过 2G 就会卡住。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
int sendFile(int net_fd, int open_fd, off_t file_size) {
// 映射
char* mapped = mmap(NULL, file_size, PROT_READ, MAP_SHARED, open_fd, 0);
if (mapped == MAP_FAILED) {
error(1, errno, "sendFile mmap");
close(open_fd);
return -1;
}

// 发送
ssize_t total_sent = 0;
while (total_sent < file_size) {
ssize_t sent_bytes = send(net_fd, mapped + total_sent, file_size - total_sent, MSG_NOSIGNAL);
if (sent_bytes < 0) {
error(1, errno, "send");
munmap(mapped, file_size);
close(open_fd);
return -1;
}
total_sent += sent_bytes;
}

// 解除映射
if (munmap(mapped, file_size) == -1) {
error(1, errno, "sendFile munmap");
return -1;
}
close(open_fd);

return 1;
}

sendfile通过零拷贝传输文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int sendFile(int net_fd, int open_fd, off_t file_size) {
off_t offset = 0; // 用于跟踪已经发送的字节
ssize_t total_sent = 0; // 用于跟踪总共发送的字节数

while (total_sent < file_size) {
ssize_t sent_bytes = sendfile(net_fd, open_fd, &offset, file_size - total_sent);
if (sent_bytes < 0) {
error(1, errno, "sendFile sendfile");
close(open_fd);
return -1;
}
total_sent += sent_bytes;
offset += sent_bytes;
}

close(open_fd);
return 1;
}

进度条

如果我们想模仿日常下载文件的时候,进度条显示的效果。 我们可以在文件传输之前,先传输文件大小给客户端,在客户端不断接收文件的时候,根据已经接收的文件的大小/总文件的大小,显示进度条。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ssize_t recv_len = 0;
ssize_t total_len = 0;
off_t last_update_size = 0; // 每更新一次百分比增加
while (total_len < file_size) { // 循环接收
recv_len = recv(sock_fd, mapped + total_len, BUFFER_SIZE, 0);
if (recv_len <= 0) {
error(1, errno, "recv failed");
break;
}
total_len += recv_len;
// 计算相比上一次打印, 增加的百分比
double num = (double) total_len * 100 / file_size - (double) last_update_size * 100 / file_size;
if (num > 1) {
// 进度增加了百分之一
last_update_size = total_len;
// 打印进度条
printf("\rnow: %.2f%%", (double) last_update_size * 100 / file_size);
}
}

进度条对性能有损耗,若非必要可以选择不实现。

总结

  1. 不管是哪种发送文件的方式,如果要传输大文件,那就得分段传输,如果直接调用一次 接口是不可能发送大文件的,顶多就是 2G。这是历史遗留问题,那个时候只有 32位,没有像现在这样是 64位。
  2. 对比三种发送文件的方式,sendfile是最简单,效率也是最高(不是指网络传输)。mmap 麻烦在于要记住它需要注意的地方,很可能会因为记忆遗忘掉,导致看接口也不能写出正确的代码,实在是“不伦不类”。send 是最朴素的方式,先 read 读取文件到 缓冲区,再通过 send 把缓冲区的内容发出去,步骤有多余的行为,但是不存在 mmap那样看接口会写错。对此,我心中的排行是 sendfile > send > mmap。

第三版

有序退出

代码地址:有序退出

在谈这件事情之前,还是先把此前的两个函数谈一谈,即 传输文件描述符本质的函数。但它只提供两个参数,那将不适用于此情此景,因为我们需要区分到底是退出的含义,还是传递文件描述符的含义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
int local_send(int local_socket, int net_fd, int flag) {
struct msghdr msg;
memset(&msg, 0, sizeof(msg));

// 正文信息
struct iovec iov[1];
iov[0].iov_base = &flag;
iov[0].iov_len = sizeof(flag);
msg.msg_iov = iov;
msg.msg_iovlen = 1;

// 控制信息
char cmsgbuf[CMSG_SPACE(sizeof(net_fd))]; // 使用栈内存而不是动态分配
memset(cmsgbuf, 0, sizeof(cmsgbuf));

struct cmsghdr *cms = (struct cmsghdr *)cmsgbuf;
cms->cmsg_len = CMSG_LEN(sizeof(net_fd));
cms->cmsg_level = SOL_SOCKET;
cms->cmsg_type = SCM_RIGHTS;

*((int *) CMSG_DATA(cms)) = net_fd; // 直接将net_fd写入控制数据

msg.msg_control = cms;
msg.msg_controllen = CMSG_LEN(sizeof(net_fd));

if (sendmsg(local_socket, &msg, 0) == -1) {
perror("sendmsg failed");
return -1;
}

return 0;
}

int local_recv(int local_socket, int *net_fd, int *status) {
struct msghdr msg;
memset(&msg, 0, sizeof(msg));

// 正文信息
int flag;
struct iovec iov[1];
iov[0].iov_base = &flag;
iov[0].iov_len = sizeof(flag);
msg.msg_iov = iov;
msg.msg_iovlen = 1;

// 控制信息
char cmsgbuf[CMSG_SPACE(sizeof(int))]; // 使用栈内存而不是动态分配
memset(cmsgbuf, 0, sizeof(cmsgbuf));

msg.msg_control = cmsgbuf;
msg.msg_controllen = sizeof(cmsgbuf);

// 接收数据: 从本地socket
if (recvmsg(local_socket, &msg, 0) == -1) {
perror("recvmsg failed");
return -1;
}

*status = flag;

struct cmsghdr *cms = CMSG_FIRSTHDR(&msg);
if (cms && cms->cmsg_len == CMSG_LEN(sizeof(int)) && cms->cmsg_level == SOL_SOCKET && cms->cmsg_type == SCM_RIGHTS) {
*net_fd = *((int *) CMSG_DATA(cms));
} else {
*net_fd = -1;
}

return 0;
}

这样,既可以选择传递 文件描述符,也可以选择传递应该标准位 flag。

传递和接收文件描述符。即如果 local_recv 调用之后,flag 为 0 就表明是传递文件描述符。

1
2
3
local_send(subProcess[i].socket_local_fd,net_fd,0);

local_recv(socket_local_fd,&net_fd,&flag);

设置标志位,表明退出。即如果 local_recv 调用之后,flag 为 -1 就表明是退出子进程。

1
2
3
4
local_send(subProcess[j].socket_local_fd,0,-1);

int flag;
local_recv(socket_local_fd,&net_fd,&flag);

至此,我们可以讲如何让父子进程有序退出。

主进程注册 SIGINT 信号,对应的回调函数如下:

1
2
3
4
void noticeSonProcess(){
char* buf = "q";
write(pipe_fd[1],buf,1); // 通知 关闭子进程
}

而读管道已经监听到 epoll 树上,随时待命。执行指令如下:

1
2
3
4
5
6
7
8
9
10
if (tmp_fd == pipe_fd[0]) {
// 有序关闭子进程
for (int j = 0; j < PROCESS_SIZE; ++j) {
local_send(subProcess[j].socket_local_fd, 0, -1);
}
for (int j = 0; j < PROCESS_SIZE; ++j) {
wait(NULL);
}
exit(0); // 主进程也可以退出了
}

local_send 发出,即子进程退出的含义,那我们的 子进程执行的 startWorker 本质上就是 一个循环执行任务的函数,其内部核心实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int startWorker(int socket_local_fd){
while(true) {
int net_fd; // 存储 分配的 socket
int flag;
local_recv(socket_local_fd,&net_fd,&flag); // 如果收到关闭子进程的通知,必然是 runTask 没有启动 或 启动完成

if (flag == -1){
printf("已收到退出通知\n");
break;
}

runTask(net_fd);
}
return 0;
}

所以判断出 flag 为 -1,退出 while 循环,子进程也不会执行任何东西,只是还存在,当我们已经在最初的时候调用 wait 等待子进程退出,彻底回收子进程。

等到子进程回收完成,主进程也正常退出。

1
2
3
4
5
6
7
8
9
10
if (tmp_fd == pipe_fd[0]) {
// 有序关闭子进程
for (int j = 0; j < PROCESS_SIZE; ++j) {
local_send(subProcess[j].socket_local_fd, 0, -1);
}
for (int j = 0; j < PROCESS_SIZE; ++j) {
wait(NULL);
}
exit(0); // 主进程也可以退出了
}