结合学过的文件操作、网络通信、以及进程和线程的知识,
实现一个基本的文件下载服务器模型。尽管线程池才是最常用的,但先从进程池谈起,实际上实现一个池,其余也没太大区别。
进程池_模型.png
第一版
当前第一版还只是解决进程间共享文件对象的问题,
并且实现了服务器的主进程接收客户端连接请求,
并把客户端连接交给进程池中进程具体和客户端进行交互的功能。
代码地址:进程池第一版
主进程用以监听连接请求,建立连接得到的后续用于和客户端通信的 net_fd
,从进程池中选择一个空闲的进程来给 net_fd 发送一个消息,那就需要把
net_fd 给到子进程。
你以为轻松就能传递过去了?进程与进程之间是相互隔离,各自的文件描述符数组是不能被其他进程访问。你传递过去也不过是一个下标,但是你把这个下标拿给别的进程的文件描述符数组访问,得到的又不是一个东西。
两个进程共享一个文件对象
一般我们使用 socketpair 的目的:为了在两个进程之间,
传输一个文件对象的描述信息(可以让两个进程共享一个文件对象),
而不是单单只传输一个文件描述符数组的下标。
1 2 3 4 5 6 7 8 9 10 11 #include <sys/types.h> #include <sys/socket.h> int socketpair ( int domain, int type, int protocol, int sv[2 ] ) ;
socketpair
函数用于创建一对互相连接的全双工通信
socket。相比较普通的用于网络间不同主机通信的 socket; socketpair
函数创建的socket主要用于在同一台机器上 的进程间通信 (我们可以称其为本地
socket )。
如果你想通过 socketpair
函数实现两个本地进程间的文件对象描述符的传输,除了需要 socketpair
函数创建通信的端点, 还需要借助 sendmsg 函数和
recvmsg
函数来实现具体的数据传输。但是具体的实现代码不用去记,更不用去写,直接从提供的代码地址中的
LocalSocekt.c 文件中获取封装好的函数即可。
1 2 3 4 int read_net_fd (int socket_local_fd, int *net_fd)
1 2 3 4 int write_net_fd (int socket_local_fd, int * net_fd)
write_net_fd 传递 套接字 net_fd,read_net_fd 接受 套接字 net_fd。
核心代码解读
socketpair 第四个参数是一个数组,即两个本地 socket_fd
组成的数组。我们也知道 socketpair
是全双公通信,那在代码中只使用一条通道就可以,故而进程把用不到的关闭即可。父进程传输,子进程接受的这样一条单通道。核心代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 int initProcessPool (Worker_Status* subProcess,int size) { for (int i = 0 ; i < size; ++i) { int socket_fd[2 ]; socketpair (AF_LOCAL,SOCK_STREAM,0 ,socket_fd); int pid = fork(); if (pid == 0 ){ close (socket_fd[1 ]); startWorker (socket_fd[0 ]); }else { close (socket_fd[0 ]); subProcess[i].pid = pid; subProcess[i].status = FREE; subProcess[i].socket_local_fd = socket_fd[1 ]; } } return 0 ; }
下面这份代码把 子进程存储的 本地套接字加入监听,是为了让
子进程完成任务之后,让主进程收到消息(消息内容随便写,主要是唤醒作用)之后把子进程的状态改为
FREE。
1 2 3 for (int i = 0 ; i < PROCESS_SIZE; ++i) { epoll_add (epfd, subProcess[i].socket_local_fd); }
1 2 pid_t pid = getpid ();send (socket_local_fd,&pid,sizeof (pid),0 );
1 2 3 4 5 for (int j = 0 ; j < PROCESS_SIZE; ++j) { if (subProcess[j].socket_local_fd == tmp_fd) { subProcess[i].status = FREE; } }
第二版
在上一版本的基础上,
如果客户端的请求是想获得一份在服务端的文件,我们该如何做到呢?
我们需要多次和客户端交流,交换文件的相关信息,但由于 TCP
是流协议,没有边界,就不得不考虑粘包和半包问题。解决方案如下:
粘包:自定义协议
半包:等待满足指定文件长度才停止接受
自定义协议已在其他文章讨论过,而半包问题此前的 Asio
提供这样一种接口,就等待指定长度的消息才解除阻塞。Linux
网络编程一样提供。
1 2 3 4 5 6 7 8 #include <sys/types.h> #include <sys/socket.h> ssize_t recv ( int sockfd, void *buf, size_t len, int flags ) ;
还有关于 send 接口要注意的一个地方,即发送大文件的时候,
客户端有可能在发送的时候提前终止,
这会导致发送端/写端(send)因为抛出SIGPIPE导致进程终止。我们肯定不喜欢我们的进程受到影响。
1 2 3 4 5 6 7 8 9 #include <sys/types.h> #include <sys/socket.h> ssize_t send ( int sockfd, const void *buf, size_t len, int flags ) ;
代码地址:简单的文件传输
小文件下载.png
此份代码却有不足,下面会继续迭代,但是对于基本的文件传输已无大碍。由于服务端这边只是读取一次文件且不是重复读取,再者读取的缓冲区大小设置
4096,超过此范围就会丢失。因此,核心的代码解读将放在下面来谈。
下面我们要迭代的将是支持大文件的传输,将存在如下三种版本。
底层原理推荐链接:零拷贝
send/read
通过循环读取并跟踪偏移量实现大文件传输
代码地址:send/read
通过循环读取并跟踪偏移量实现大文件传输
代码解读和优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 int sendFile (int net_fd,int open_fd,off_t file_size) { while (file_size > 0 ){ char buf[BUFFER_SIZE] = {0 }; ssize_t rn = read (open_fd,buf,sizeof (buf)); if (rn < 0 ) { error (1 , errno, "read" ); close (open_fd); return -1 ; } ssize_t sn = send (net_fd,buf,rn,MSG_NOSIGNAL); if (sn < 0 ) { error (1 , errno, "send" ); close (open_fd); return -1 ; } file_size -= sn; } return 1 ; }
每次发送的字节大小为
BUFFER_SIZE,循环发送,直到总的需要传送到文件大小 file_size 不大于
0。
但是这份代码还有待完善的地方,即 rn 是实际读取的长度,sn
是实际发送的长度,可能存在 sn < rn
的情况(套接字的发送缓冲区可能已满,网络的带宽或状态等因素可能限制一次发送的字节数),导致有部分文件丢失。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 int sendFile (int net_fd, int open_fd, off_t file_size) { while (file_size > 0 ) { char buf[BUFFER_SIZE] = {0 }; ssize_t rn = read (open_fd, buf, sizeof (buf)); if (rn < 0 ) { error (1 , errno, "read" ); close (open_fd); return -1 ; } if (rn == 0 ) { break ; } ssize_t total_sent = 0 ; while (total_sent < rn) { ssize_t sn = send (net_fd, buf + total_sent, rn - total_sent, MSG_NOSIGNAL); if (sn < 0 ) { error (1 , errno, "send" ); close (open_fd); return -1 ; } total_sent += sn; } file_size -= total_sent; } return 1 ; }
还有一个地方需要提及,那就是告知客户端文件大小函数的地方,核心代码如下:
1 2 3 char send_buf[MESSAGE_WAIT] = {0 };memcpy (send_buf,&file_size, sizeof (send_buf));ssize_t send_len = send (net_fd,send_buf,sizeof (send_buf),MSG_NOSIGNAL);
MESSAGE_WAIT 为 4字节,而客户端那边是
有符号整型存储这个数据,就算你是
无符号整型来存储,你也顶多告知对方文件大小为4GB,意味着我们只能传输
4GB,这是一种限制。
可以考虑更改为 8 字节,这样传输的文件大小可以在
0~16EB范围,完全不用担心受到限制。源代码不用动,修改 MESSAGE_WAIT
大小即可。
1 #define MESSAGE_WAIT sizeof(off_t)
经过测试,成功传输 3GB 大文件。
read和send大文件下载.png
如何生成大文件?
1 2 dd if=/dev/urandom of=large_random_file bs=1G count=3 status=progress
重点关注如下命令,根据需要更改:
bs=1G
:
bs
表示块大小(block size),设置为
1GB。dd
每次将读取和写入 1GB 的数据块。
count=3
:
count
指定要复制的块数。在这个命令中,设置为
3,这意味着总共会生成 3GB 的随机数据(1GB * 3)。
mmap通过内存映射传输文件
代码地址:mmap
传输小文件
服务器和客户端都映射,但是有几个注意点:
open 文件的时候需要 O_RDWR ,避免 mmap 权限不足
mmap 需要用 (char *) 强转
客户端拿到 文件具体大小之后,需要用 ftruncate 方法提前把文件大小
设置为相应的大小,以便接受文件
最后,记得 munmap 取消映射
当我们强调客户端和服务端都要映射的时候,并不是说服务器这边采用 mmap
,客户端也要用 mmap,反之亦然(包括下面要讲的 sendfile)。mmap
只是一种相较于 read 少一次拷贝,这是一种策略的选择。
真的要传输大文件,你还是需要循环的发送分割的文件,而不是发送整个大文件,否则超过
2G 就会卡住。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 int sendFile (int net_fd, int open_fd, off_t file_size) { char * mapped = mmap (NULL , file_size, PROT_READ, MAP_SHARED, open_fd, 0 ); if (mapped == MAP_FAILED) { error (1 , errno, "sendFile mmap" ); close (open_fd); return -1 ; } ssize_t total_sent = 0 ; while (total_sent < file_size) { ssize_t sent_bytes = send (net_fd, mapped + total_sent, file_size - total_sent, MSG_NOSIGNAL); if (sent_bytes < 0 ) { error (1 , errno, "send" ); munmap (mapped, file_size); close (open_fd); return -1 ; } total_sent += sent_bytes; } if (munmap (mapped, file_size) == -1 ) { error (1 , errno, "sendFile munmap" ); return -1 ; } close (open_fd); return 1 ; }
sendfile通过零拷贝传输文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 int sendFile (int net_fd, int open_fd, off_t file_size) { off_t offset = 0 ; ssize_t total_sent = 0 ; while (total_sent < file_size) { ssize_t sent_bytes = sendfile (net_fd, open_fd, &offset, file_size - total_sent); if (sent_bytes < 0 ) { error (1 , errno, "sendFile sendfile" ); close (open_fd); return -1 ; } total_sent += sent_bytes; offset += sent_bytes; } close (open_fd); return 1 ; }
进度条
如果我们想模仿日常下载文件的时候,进度条显示的效果。
我们可以在文件传输之前,先传输文件大小给客户端,在客户端不断接收文件的时候,根据已经接收的文件的大小/总文件的大小,显示进度条。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 ssize_t recv_len = 0 ;ssize_t total_len = 0 ;off_t last_update_size = 0 ; while (total_len < file_size) { recv_len = recv (sock_fd, mapped + total_len, BUFFER_SIZE, 0 ); if (recv_len <= 0 ) { error (1 , errno, "recv failed" ); break ; } total_len += recv_len; double num = (double ) total_len * 100 / file_size - (double ) last_update_size * 100 / file_size; if (num > 1 ) { last_update_size = total_len; printf ("\rnow: %.2f%%" , (double ) last_update_size * 100 / file_size); } }
进度条对性能有损耗,若非必要可以选择不实现。
总结
不管是哪种发送文件的方式,如果要传输大文件,那就得分段传输,如果直接调用一次
接口是不可能发送大文件的,顶多就是 2G。这是历史遗留问题,那个时候只有
32位,没有像现在这样是 64位。
对比三种发送文件的方式,sendfile是最简单,效率也是最高(不是指网络传输)。mmap
麻烦在于要记住它需要注意的地方,很可能会因为记忆遗忘掉,导致看接口也不能写出正确的代码,实在是“不伦不类”。send
是最朴素的方式,先 read 读取文件到 缓冲区,再通过 send
把缓冲区的内容发出去,步骤有多余的行为,但是不存在
mmap那样看接口会写错。对此,我心中的排行是 sendfile > send >
mmap。
第三版
有序退出
代码地址:有序退出
在谈这件事情之前,还是先把此前的两个函数谈一谈,即
传输文件描述符本质的函数。但它只提供两个参数,那将不适用于此情此景,因为我们需要区分到底是退出的含义,还是传递文件描述符的含义。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 int local_send (int local_socket, int net_fd, int flag) { struct msghdr msg; memset (&msg, 0 , sizeof (msg)); struct iovec iov[1 ]; iov[0 ].iov_base = &flag; iov[0 ].iov_len = sizeof (flag); msg.msg_iov = iov; msg.msg_iovlen = 1 ; char cmsgbuf[CMSG_SPACE (sizeof (net_fd))]; memset (cmsgbuf, 0 , sizeof (cmsgbuf)); struct cmsghdr *cms = (struct cmsghdr *)cmsgbuf; cms->cmsg_len = CMSG_LEN (sizeof (net_fd)); cms->cmsg_level = SOL_SOCKET; cms->cmsg_type = SCM_RIGHTS; *((int *) CMSG_DATA (cms)) = net_fd; msg.msg_control = cms; msg.msg_controllen = CMSG_LEN (sizeof (net_fd)); if (sendmsg (local_socket, &msg, 0 ) == -1 ) { perror ("sendmsg failed" ); return -1 ; } return 0 ; }int local_recv (int local_socket, int *net_fd, int *status) { struct msghdr msg; memset (&msg, 0 , sizeof (msg)); int flag; struct iovec iov[1 ]; iov[0 ].iov_base = &flag; iov[0 ].iov_len = sizeof (flag); msg.msg_iov = iov; msg.msg_iovlen = 1 ; char cmsgbuf[CMSG_SPACE (sizeof (int ))]; memset (cmsgbuf, 0 , sizeof (cmsgbuf)); msg.msg_control = cmsgbuf; msg.msg_controllen = sizeof (cmsgbuf); if (recvmsg (local_socket, &msg, 0 ) == -1 ) { perror ("recvmsg failed" ); return -1 ; } *status = flag; struct cmsghdr *cms = CMSG_FIRSTHDR (&msg); if (cms && cms->cmsg_len == CMSG_LEN (sizeof (int )) && cms->cmsg_level == SOL_SOCKET && cms->cmsg_type == SCM_RIGHTS) { *net_fd = *((int *) CMSG_DATA (cms)); } else { *net_fd = -1 ; } return 0 ; }
这样,既可以选择传递 文件描述符,也可以选择传递应该标准位 flag。
传递和接收文件描述符。即如果 local_recv 调用之后,flag 为 0
就表明是传递文件描述符。
1 2 3 local_send (subProcess[i].socket_local_fd,net_fd,0 );local_recv (socket_local_fd,&net_fd,&flag);
设置标志位,表明退出。即如果 local_recv 调用之后,flag 为 -1
就表明是退出子进程。
1 2 3 4 local_send(subProcess[j].socket_local_fd,0,-1); int flag; local_recv(socket_local_fd,&net_fd,&flag);
至此,我们可以讲如何让父子进程有序退出。
主进程注册 SIGINT 信号,对应的回调函数如下:
1 2 3 4 void noticeSonProcess () { char * buf = "q" ; write (pipe_fd[1 ],buf,1 ); }
而读管道已经监听到 epoll 树上,随时待命。执行指令如下:
1 2 3 4 5 6 7 8 9 10 if (tmp_fd == pipe_fd[0 ]) { for (int j = 0 ; j < PROCESS_SIZE; ++j) { local_send (subProcess[j].socket_local_fd, 0 , -1 ); } for (int j = 0 ; j < PROCESS_SIZE; ++j) { wait (NULL ); } exit (0 ); }
local_send 发出,即子进程退出的含义,那我们的 子进程执行的
startWorker 本质上就是 一个循环执行任务的函数,其内部核心实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 int startWorker (int socket_local_fd) { while (true ) { int net_fd; int flag; local_recv (socket_local_fd,&net_fd,&flag); if (flag == -1 ){ printf ("已收到退出通知\n" ); break ; } runTask (net_fd); } return 0 ; }
所以判断出 flag 为 -1,退出 while
循环,子进程也不会执行任何东西,只是还存在,当我们已经在最初的时候调用
wait 等待子进程退出,彻底回收子进程。
等到子进程回收完成,主进程也正常退出。
1 2 3 4 5 6 7 8 9 10 if (tmp_fd == pipe_fd[0 ]) { for (int j = 0 ; j < PROCESS_SIZE; ++j) { local_send (subProcess[j].socket_local_fd, 0 , -1 ); } for (int j = 0 ; j < PROCESS_SIZE; ++j) { wait (NULL ); } exit (0 ); }