简单从redis的实现了解redis是多线程还是单线程

2022-03-171079

1,服务器启动:

  • int main(int argc, char **argv)
//这里省略一堆初始化代码

//死循环的方法
aeMain(server.el);
aeDeleteEventLoop(server.el);
return 0;
  • void aeMain(aeEventLoop *eventLoop)
eventLoop->stop = 0;
    while (!eventLoop->stop) {
        //开始循环处理事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }

2,事件循环处理

  • int aeProcessEvents(aeEventLoop *eventLoop, int flags)
if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {

    //.........省略部分代码

    //这里会回写上一次循环中客户端的读写事件,如:本次循环接受到一个get命令,则在下一次循环中回写;(注意:这里是6.0的处理,与版本实现有关)
    if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
        eventLoop->beforesleep(eventLoop);

    /* Call the multiplexing API, will return only on timeout or when
     * some event fires. */
     //这里获取本次事件循环中所有客户端请求事件,例如当前有两个客户端同时发送了两条命令,则有两个事件
    numevents = aeApiPoll(eventLoop, tvp);

    /* After sleep callback. */
    if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
        eventLoop->aftersleep(eventLoop);

      //循环处理客户端读写事件
    for (j = 0; j < numevents; j++) {

        /* Fire the writable event. */
        if (fe->mask & mask & AE_WRITABLE) {
            if (!fired || fe->wfileProc != fe->rfileProc) {
                fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }
        }

        /* If we have to invert the call, fire the readable event now
         * after the writable one. */
        if (invert) {
            fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
            if ((fe->mask & mask & AE_READABLE) &&
                (!fired || fe->wfileProc != fe->rfileProc))
            {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }
        }

        processed++;
    }
}
/* Check time events */
//处理完客户端事件后,开始处理时间时间
if (flags & AE_TIME_EVENTS)
    processed += processTimeEvents(eventLoop);

return processed; /* return the number of processed file/time events */

到这里已经可以知道redis处理客户端事件时是单线程还是多线程的了,下面了解一下bgsave, bgrewriteaof,rdb与aof创建子进程的过程相似,直接看rdb的代码,如下

  • void bgsaveCommand(client *c)
//不为-1表示有子进程在进行rdb
if (server.rdb_child_pid != -1) {
    addReplyError(c,"Background save already in progress");
} else if (hasActiveChildProcess()) {
    if (schedule) {
        server.rdb_bgsave_scheduled = 1;
        addReplyStatus(c,"Background saving scheduled");
    } else {
        addReplyError(c,
        "Another child process is active (AOF?): can't BGSAVE right now. "
        "Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever "
        "possible.");
    }
} else if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK) {
    addReplyStatus(c,"Background saving started");
} else {
    addReply(c,shared.err);
}
  • int hasActiveChildProcess() ,方法注释如下:Return true if there are no active children processes doing RDB saving, AOF rewriting, or some side process spawned by a loaded module.代码:

    return server.rdb_child_pid != -1 ||  //没有子进程在进行rdb
            server.aof_child_pid != -1 || // 没有子进程进程aof
            server.module_child_pid != -1;//这个没有看到触发
    
    • 设置module_child_pid方法的注释:
    /* Create a background child process with the current frozen snaphost of the
     * main process where you can do some processing in the background without
     * affecting / freezing the traffic and no need for threads and GIL locking.
     * Note that Redis allows for only one concurrent fork.
     * When the child wants to exit, it should call RedisModule_ExitFromChild.
     * If the parent wants to kill the child it should call RedisModule_KillForkChild
     * The done handler callback will be executed on the parent process when the
     * child existed (but not when killed)
     * Return: -1 on failure, on success the parent process will get a positive PID
     * of the child, and the child process will get 0.
     */
    • 从上述注释可以知道,redis最多只会启动两个进程,在fork子进程时都会先去检测是否已经有子进程。
  • 执行bgsave命令

    int rdbSaveBackground(char *filename, rdbSaveInfo *rsi)

pid_t childpid;

//再次检查是否已经有子进程
if (hasActiveChildProcess()) return C_ERR;

server.dirty_before_bgsave = server.dirty;
server.lastbgsave_try = time(NULL);
openChildInfoPipe();

//fork子进程处理bgsave
if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
    int retval;

    /* Child */
    redisSetProcTitle("redis-rdb-bgsave");
    redisSetCpuAffinity(server.bgsave_cpulist);
    retval = rdbSave(filename,rsi);
    if (retval == C_OK) {
        sendChildCOWInfo(CHILD_TYPE_RDB, "RDB");
    }
    exitFromChild((retval == C_OK) ? 0 : 1);
} else {
    /* Parent */
    if (childpid == -1) {
        closeChildInfoPipe();
        server.lastbgsave_status = C_ERR;
        serverLog(LL_WARNING,"Can't save in background: fork: %s",
            strerror(errno));
        return C_ERR;
    }
    serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);
    server.rdb_save_time_start = time(NULL);
    server.rdb_child_pid = childpid;
    server.rdb_child_type = RDB_CHILD_TYPE_DISK;
    updateDictResizePolicy();
    return C_OK;
}
return C_OK; /* unreached */

3,总结

  • redis是单线程的,进入main方法初始化server后,调用一个死循环的方法进行事件处理,在每次事件循环中获取当前所有客户端的事件,循环处理,处理完后响应消息在下一次循环回写客户端。在处理完文件事件后,进行时间事件处理
  • redis只允许有一个子进程,在fork子进程时都要先检查是否已经有子进程
  • redis 6.0后支持IOThreads多线程,后续补充相关文章
分享
点赞1
打赏
上一篇:Docker常用命令笔记(一)
下一篇:在生产环境中调试多线程