Redis 事件驱动框架思路浅读

核心源码解读

server.c 中是 redis 服务启动的入口,main 函数就在该文件中,主要完成下面几件事:

  1. 基本初始化
  2. 检查哨兵模式,并检查是否要执行 RDB 检测或 AOF 检测
  3. 运行参数解析
  4. 初始化 server
  5. 执行事件驱动框架
image20250123142642703.png

initServer

initServer 函数:创建 eventLoop 和 创建 Redis 数据库并初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void initServer(void) {
int j;

server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); // 创建 eventLoop(事件循环框架)

server.db = zmalloc(sizeof(redisDb)*server.dbnum); // 创建 Redis 数据库

for (j = 0; j < server.dbnum; j++) { // 为每个 Redis 数据库进行初始化
server.db[j].keys = kvstoreCreate(&dbDictType, slot_count_bits, flags);
server.db[j].expires = kvstoreCreate(&dbExpiresDictType, slot_count_bits, flags);
server.db[j].hexpires = ebCreate();
server.db[j].expires_cursor = 0;
server.db[j].blocking_keys = dictCreate(&keylistDictType);
server.db[j].blocking_keys_unblock_on_nokey = dictCreate(&objectKeyPointerValueDictType);
server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType);
server.db[j].watched_keys = dictCreate(&keylistDictType);
server.db[j].id = j;
server.db[j].avg_ttl = 0;
server.db[j].defrag_later = listCreate();
listSetFreeMethod(server.db[j].defrag_later,(void (*)(void*))sdsfree);
}

aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
}

initListeners

initListeners 函数:用于设置通信协议的监听器并确保系统能够正确接受客户端请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void initListeners(void) {
connListener *listener;

int listen_fds = 0;
for (int j = 0; j < CONN_TYPE_MAX; j++) {
listener = &server.listeners[j];

connListen(listener); // --> listenToPort --> anetListen --> 监听端口

createSocketAcceptHandler(listener, connAcceptHandler(listener->ct)); // 用于向事件循环注册 listen fd,并设置回调函数
}
}

// bind + listen
static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog, mode_t perm) {
if (bind(s,sa,len) == -1) {
anetSetError(err, "bind: %s", strerror(errno));
close(s);
return ANET_ERR;
}

if (sa->sa_family == AF_LOCAL && perm)
chmod(((struct sockaddr_un *) sa)->sun_path, perm);

if (listen(s, backlog) == -1) {
anetSetError(err, "listen: %s", strerror(errno));
close(s);
return ANET_ERR;
}
return ANET_OK;
}

connAcceptHandler 函数:这个函数就是前面设置的回调函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
static inline aeFileProc *connAcceptHandler(ConnectionType *ct) {
if (ct)
return ct->accept_handler; // connSocketAcceptHandler --> anetTcpAccept --> anetGenericAccept
return NULL;
}

// accept
static void connSocketAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd;
int max = server.max_new_conns_per_cycle;
char cip[NET_IP_STR_LEN];

// 循环处理新的客户端连接,直到达到最大限制或没有新的连接。
while (max--) {
// 本质上调用 accept,获取一个客户端连接后的 fd
anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);

// 创建一个 conn,并初始化
acceptCommonHandler(connCreateAcceptedSocket(cfd, NULL), 0, cip);
}
}

static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
int fd;
fd = accept(s,sa,len);
return fd;
}

void acceptCommonHandler(connection *conn, int flags, char *ip) {
client *c;

// 创建一个客户端对象,并设置回调 readQueryFromClient
c = createClient(conn); // connSocketSetReadHandler --> aeCreateFileEvent --> connSetReadHandler

// 执行 clientAcceptHandler 回调函数,即处理客户端连接的接收和验证
connAccept(conn, clientAcceptHandler);
}

截止到目前,我们已经创建一个事件循环对象,尽管这个事件循环还没有正式启动。接着,开始监听客户端的连接,如果有连接过来就会触发 connAcceptHandler 回调,这个回调函数会 accept 得到一个用于后续通信的连接。

我们有看到 创建一个客户端对象,并设置回调 readQueryFromClient,这是一个读的回调函数,那写的回调函数呢?

readQueryFromClient 在读取到客户端命令后,进行解析,解析完成就会调用 processInputBuffer --> processCommandAndResetClient --> processCommand --> addReply,把要回复的数据写入到客户端的缓冲区(注意,这个时候还没有设置写回调)。

前面我们注册过一个回调函数 beforeSleep,也就是 Redis 事件驱动框架每次循环进入事件处理函数前,也就是在框架主函数 aeMain 中调用 aeProcessEvents,来处理监听到的已触发事件或是到时的时间事件之前都会调用它。这其中就包括了调用 handleClientsWithPendingWrites 函数,它会将Redis sever客户端缓冲区中的数据写回客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
int handleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
int processed = listLength(server.clients_pending_write);

listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
listUnlinkNode(server.clients_pending_write,ln);

/* 写回客户端 */
if (writeToClient(c,0) == C_ERR) continue;

/* 还有没写完的,注册一个写回调,下次处理 */
if (clientHasPendingReplies(c)) {
installClientWriteHandler(c); // 注册写回调 --> connSetWriteHandlerWithBarrier --> sendReplyToClient --> writeToClient(注册的写回调)
}
}
return processed;
}

aeMain

1
2
3
4
5
6
7
8
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;

if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
struct timeval tv, *tvp = NULL; /* NULL means infinite wait. */
int64_t usUntilTimer;

numevents = aeApiPoll(eventLoop, tvp); // epoll_wait

for (j = 0; j < numevents; j++) { // 遍历发生事件的 fd
// 处理事件
}
}
/* 检查定时器事件 */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);

return processed; /* return the number of processed file/time events */
}

epoll_wait 解除阻塞,如果有事件发生就去处理即可。

aeCreateFileEvent

该接口的作用就是添加 fd 到 epoll 中监听,并设置事件回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];

if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc; // 读回调
if (mask & AE_WRITABLE) fe->wfileProc = proc; // 写回调
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}

// epoll_ctl
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0};
int op = eventLoop->events[fd].mask == AE_NONE ? // EPOLL_CTL_ADD 或 EPOLL_CTL_MOD
EPOLL_CTL_ADD : EPOLL_CTL_MOD;

ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}

总结

04152516e73f473df754a5c7a8f6841d.png

多线程支持

InitServerLast --> initThreadedIO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void initThreadedIO(void) {
server.io_threads_active = 0;
io_threads_op = IO_THREADS_OP_IDLE;

if (server.io_threads_num == 1) return; // 单线程直接返回

if (server.io_threads_num > IO_THREADS_MAX_NUM) { // 超过最大线程数(128),直接退出
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}

/* Spawn and initialize the I/O threads. */
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
io_threads_list[i] = listCreate();
if (i == 0) continue; /* Thread 0 is the main thread. */

/* Things we do only for the additional threads. */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
setIOPendingCount(i, 0);
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) { // 创建线程
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}

创建的线程池中,每个线程都会调用 IOThreadMain:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.io_threads_num-1), and is
* used by the thread to just manipulate a single sub-array of clients. */
long id = (unsigned long)myid;
char thdname[16];

snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
redis_set_thread_title(thdname);
redisSetCpuAffinity(server.server_cpulist);
makeThreadKillable();

while(1) { // 循环
/* Wait for start */
for (int j = 0; j < 1000000; j++) {
if (getIOPendingCount(id) != 0) break;
}

/* Give the main thread a chance to stop this thread. */
if (getIOPendingCount(id) == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}

serverAssert(getIOPendingCount(id) != 0);

listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) { // 读取客户端列表,并处理相应的事件回调
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
setIOPendingCount(id, 0);
}
}

每一个IO线程运行时,都会不断检查是否有等待它处理的客户端。如果有,就根据操作类型,从客户端读取数据或是将数据写回客户端。

那么等待它处理的客户端是谁搞进来的?

这就要说到 Redis server 对应的全局变量 server了。server 变量中有两个List类型的成员变量:clients_pending_write 和 clients_pending_read,它们分别记录了待写回数据的客户端和待读取数据的客户端。

Redis server 在接收到客户端请求和给客户端返回数据的过程中,会根据一定条件,推迟客户端的读写操作,并分别把待读写的客户端保存到这两个列表中。然后,Redis server 在每次进入事件循环前,会再把列表中的客户端添加到 io_threads_list 数组中,交给 IO 线程进行处理。

image20250123215844369.png

多IO线程本身并不会执行命令,它们只是利用多核并行地读取数据和解析命令,或是将server数据写回。所以,Redis执行命令的线程还是主IO线程。这一点对于你理解多IO线程机制很重要,可以避免你误解Redis有多线程同时执行命令。

9332b6c0cd56c9467111609cce1d4b92.png

明显看得出,Redis 并不是一个规范的多线程 Reactor 模型,明显看到只有主线程是 eventLoop,并且执行命令。而其他多线程只是解析命令和回复数据。这显然是 Redis 的历史原因,谁让他之前是单线程呢。

image20250123223752634.png

参考内容

Redis 6.0 新特性:带你 100% 掌握多线程模型

Redis源码剖析与实战