核心源码解读
server.c 中是 redis 服务启动的入口,main
函数就在该文件中,主要完成下面几件事:
基本初始化
检查哨兵模式,并检查是否要执行 RDB 检测或 AOF 检测
运行参数解析
初始化 server
执行事件驱动框架
image20250123142642703.png
initServer
initServer 函数:创建 eventLoop 和 创建 Redis 数据库并初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 void initServer (void ) { int j; server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); server.db = zmalloc(sizeof (redisDb)*server.dbnum); for (j = 0 ; j < server.dbnum; j++) { server.db[j].keys = kvstoreCreate(&dbDictType, slot_count_bits, flags); server.db[j].expires = kvstoreCreate(&dbExpiresDictType, slot_count_bits, flags); server.db[j].hexpires = ebCreate(); server.db[j].expires_cursor = 0 ; server.db[j].blocking_keys = dictCreate(&keylistDictType); server.db[j].blocking_keys_unblock_on_nokey = dictCreate(&objectKeyPointerValueDictType); server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType); server.db[j].watched_keys = dictCreate(&keylistDictType); server.db[j].id = j; server.db[j].avg_ttl = 0 ; server.db[j].defrag_later = listCreate(); listSetFreeMethod(server.db[j].defrag_later,(void (*)(void *))sdsfree); } aeSetBeforeSleepProc(server.el,beforeSleep); aeSetAfterSleepProc(server.el,afterSleep); }
initListeners
initListeners
函数:用于设置通信协议的监听器并确保系统能够正确接受客户端请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 void initListeners (void ) { connListener *listener; int listen_fds = 0 ; for (int j = 0 ; j < CONN_TYPE_MAX; j++) { listener = &server.listeners[j]; connListen(listener); createSocketAcceptHandler(listener, connAcceptHandler(listener->ct)); } }static int anetListen (char *err, int s, struct sockaddr *sa, socklen_t len, int backlog, mode_t perm) { if (bind(s,sa,len) == -1 ) { anetSetError(err, "bind: %s" , strerror(errno)); close(s); return ANET_ERR; } if (sa->sa_family == AF_LOCAL && perm) chmod(((struct sockaddr_un *) sa)->sun_path, perm); if (listen(s, backlog) == -1 ) { anetSetError(err, "listen: %s" , strerror(errno)); close(s); return ANET_ERR; } return ANET_OK; }
connAcceptHandler 函数:这个函数就是前面设置的回调函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 static inline aeFileProc *connAcceptHandler (ConnectionType *ct) { if (ct) return ct->accept_handler; return NULL ; }static void connSocketAcceptHandler (aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd; int max = server.max_new_conns_per_cycle; char cip[NET_IP_STR_LEN]; while (max--) { anetTcpAccept(server.neterr, fd, cip, sizeof (cip), &cport); acceptCommonHandler(connCreateAcceptedSocket(cfd, NULL ), 0 , cip); } }static int anetGenericAccept (char *err, int s, struct sockaddr *sa, socklen_t *len) { int fd; fd = accept(s,sa,len); return fd; }void acceptCommonHandler (connection *conn, int flags, char *ip) { client *c; c = createClient(conn); connAccept(conn, clientAcceptHandler); }
截止到目前,我们已经创建一个事件循环对象,尽管这个事件循环还没有正式启动。接着,开始监听客户端的连接,如果有连接过来就会触发
connAcceptHandler 回调,这个回调函数会 accept
得到一个用于后续通信的连接。
我们有看到 创建一个客户端对象,并设置回调
readQueryFromClient,这是一个读的回调函数,那写的回调函数呢?
readQueryFromClient 在读取到客户端命令后,进行解析,解析完成就会调用
processInputBuffer --> processCommandAndResetClient -->
processCommand -->
addReply,把要回复的数据写入到客户端的缓冲区(注意,这个时候还没有设置写回调)。
前面我们注册过一个回调函数 beforeSleep,也就是 Redis
事件驱动框架每次循环进入事件处理函数前,也就是在框架主函数 aeMain 中调用
aeProcessEvents,来处理监听到的已触发事件或是到时的时间事件之前都会调用它。这其中就包括了调用
handleClientsWithPendingWrites 函数,它会将Redis
sever客户端缓冲区中的数据写回客户端。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 int handleClientsWithPendingWrites (void ) { listIter li; listNode *ln; int processed = listLength(server.clients_pending_write); listRewind(server.clients_pending_write,&li); while ((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; listUnlinkNode(server.clients_pending_write,ln); if (writeToClient(c,0 ) == C_ERR) continue ; if (clientHasPendingReplies(c)) { installClientWriteHandler(c); } } return processed; }
aeMain
1 2 3 4 5 6 7 8 void aeMain (aeEventLoop *eventLoop) { eventLoop->stop = 0 ; while (!eventLoop->stop) { aeProcessEvents(eventLoop, AE_ALL_EVENTS| AE_CALL_BEFORE_SLEEP| AE_CALL_AFTER_SLEEP); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 int aeProcessEvents (aeEventLoop *eventLoop, int flags) { int processed = 0 , numevents; if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; struct timeval tv , *tvp = NULL ; int64_t usUntilTimer; numevents = aeApiPoll(eventLoop, tvp); for (j = 0 ; j < numevents; j++) { } } if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; }
epoll_wait 解除阻塞,如果有事件发生就去处理即可。
aeCreateFileEvent
该接口的作用就是添加 fd 到 epoll 中监听,并设置事件回调函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 int aeCreateFileEvent (aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } aeFileEvent *fe = &eventLoop->events[fd]; if (aeApiAddEvent (eventLoop, fd, mask) == -1 ) return AE_ERR; fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; }static int aeApiAddEvent (aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0 }; int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; ee.events = 0 ; mask |= eventLoop->events[fd].mask; if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.fd = fd; if (epoll_ctl (state->epfd,op,fd,&ee) == -1 ) return -1 ; return 0 ; }
总结
04152516e73f473df754a5c7a8f6841d.png
多线程支持
InitServerLast --> initThreadedIO
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 void initThreadedIO (void ) { server.io_threads_active = 0 ; io_threads_op = IO_THREADS_OP_IDLE; if (server.io_threads_num == 1 ) return ; if (server.io_threads_num > IO_THREADS_MAX_NUM) { serverLog(LL_WARNING,"Fatal: too many I/O threads configured. " "The maximum number is %d." , IO_THREADS_MAX_NUM); exit (1 ); } for (int i = 0 ; i < server.io_threads_num; i++) { io_threads_list[i] = listCreate(); if (i == 0 ) continue ; pthread_t tid; pthread_mutex_init(&io_threads_mutex[i],NULL ); setIOPendingCount(i, 0 ); pthread_mutex_lock(&io_threads_mutex[i]); if (pthread_create(&tid,NULL ,IOThreadMain,(void *)(long )i) != 0 ) { serverLog(LL_WARNING,"Fatal: Can't initialize IO thread." ); exit (1 ); } io_threads[i] = tid; } }
创建的线程池中,每个线程都会调用 IOThreadMain:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 void *IOThreadMain (void *myid) { long id = (unsigned long )myid; char thdname[16 ]; snprintf (thdname, sizeof (thdname), "io_thd_%ld" , id); redis_set_thread_title(thdname); redisSetCpuAffinity(server.server_cpulist); makeThreadKillable(); while (1 ) { for (int j = 0 ; j < 1000000 ; j++) { if (getIOPendingCount(id) != 0 ) break ; } if (getIOPendingCount(id) == 0 ) { pthread_mutex_lock(&io_threads_mutex[id]); pthread_mutex_unlock(&io_threads_mutex[id]); continue ; } serverAssert(getIOPendingCount(id) != 0 ); listIter li; listNode *ln; listRewind(io_threads_list[id],&li); while ((ln = listNext(&li))) { client *c = listNodeValue(ln); if (io_threads_op == IO_THREADS_OP_WRITE) { writeToClient(c,0 ); } else if (io_threads_op == IO_THREADS_OP_READ) { readQueryFromClient(c->conn); } else { serverPanic("io_threads_op value is unknown" ); } } listEmpty(io_threads_list[id]); setIOPendingCount(id, 0 ); } }
每一个IO线程运行时,都会不断检查是否有等待它处理的客户端。如果有,就根据操作类型,从客户端读取数据或是将数据写回客户端。
那么等待它处理的客户端是谁搞进来的?
这就要说到 Redis server 对应的全局变量 server了。server
变量中有两个List类型的成员变量:clients_pending_write 和
clients_pending_read,它们分别记录了待写回数据的客户端和待读取数据的客户端。
Redis server
在接收到客户端请求和给客户端返回数据的过程中,会根据一定条件,推迟客户端的读写操作,并分别把待读写的客户端保存到这两个列表中。然后,Redis
server 在每次进入事件循环前,会再把列表中的客户端添加到 io_threads_list
数组中,交给 IO 线程进行处理。
image20250123215844369.png
多IO线程本身并不会执行命令,它们只是利用多核并行地读取数据和解析命令,或是将server数据写回。所以,Redis执行命令的线程还是主IO线程。 这一点对于你理解多IO线程机制很重要,可以避免你误解Redis有多线程同时执行命令。
9332b6c0cd56c9467111609cce1d4b92.png
明显看得出,Redis 并不是一个规范的多线程 Reactor
模型,明显看到只有主线程是
eventLoop,并且执行命令。而其他多线程只是解析命令和回复数据。这显然是
Redis 的历史原因,谁让他之前是单线程呢。
image20250123223752634.png
参考内容
Redis 6.0
新特性:带你 100% 掌握多线程模型
Redis源码剖析与实战