鍍金池/ 教程/ 大數(shù)據(jù)/ Redis 是如何提供服務的
Redis 數(shù)據(jù)淘汰機制
積分排行榜
小剖 Memcache
Redis 數(shù)據(jù)結(jié)構(gòu) intset
分布式鎖
從哪里開始讀起,怎么讀
Redis 數(shù)據(jù)結(jié)構(gòu) dict
不在浮沙筑高臺
Redis 集群(上)
Redis 監(jiān)視器
源碼閱讀工具
Redis 日志和斷言
內(nèi)存數(shù)據(jù)管理
Redis 數(shù)據(jù)結(jié)構(gòu)綜述
源碼日志
Web 服務器存儲 session
消息中間件
Redis 與 Lua 腳本
什么樣的源代碼適合閱讀
Redis 數(shù)據(jù)結(jié)構(gòu) sds
Memcached slab 分配策略
訂閱發(fā)布機制
Redis 是如何提供服務的
Redis 事務機制
Redis 集群(下)
主從復制
Redis 應用
RDB 持久化策略
Redis 數(shù)據(jù)遷移
Redis 事件驅(qū)動詳解
初探 Redis
Redis 與 Memcache
AOF 持久化策略
Redis 數(shù)據(jù)結(jié)構(gòu) redisOb
作者簡介
Redis 數(shù)據(jù)結(jié)構(gòu) ziplist
Redis 數(shù)據(jù)結(jié)構(gòu) skiplist
Redis 哨兵機制

Redis 是如何提供服務的

在剛剛接觸 Redis 的時候,最想要知道的是一個’set name Jhon’ 命令到達 Redis 服務器的時候,它是如何返回’OK’ 的?里面命令處理的流程如何,具體細節(jié)怎么樣?你一定有問過自己。閱讀別人的代碼是很枯燥的,但帶著好奇心閱讀代碼,是一件很興奮的事情,接著翻到了 Redis 源碼的 main() 函數(shù)。

http://wiki.jikexueyuan.com/project/redis/images/redis6.png" alt="" />

Redis 在啟動做了一些初始化邏輯,比如配置文件讀取,數(shù)據(jù)中心初始化,網(wǎng)絡通信模塊初始化等,待所有初始化任務完畢后,便開始等待請求。

當請求到來時,Redis 進程會被喚醒,原理是 epoll. select, kqueue 等一些 I/O 多路復用的系統(tǒng)調(diào)用。如果有閱讀上一章節(jié),應該理解這一句話。接著讀取來來自客戶端的數(shù)據(jù),解析命令,查找命令,并執(zhí)行命令。

執(zhí)行命令’set name Jhon’ 的時候,Redis 會在預先初始化好的哈希表里頭, 查找 key=’name’ 對應的位置,并存入。

最后,把回復的內(nèi)容準備好回送給客戶端,客戶端于是收到了’OK’. 接下來,我們看看詳細的過程是怎么樣的。

詳細的過程

了解了 Redis 的事件驅(qū)動模型后,帶著命令是如何被處理的這個問題去讀代碼。剛開始的時候,會有一堆的變量和函數(shù)等著讀者,但只要抓住主干就好了,下面就是 Redis 的主干部分。

int main(int argc, char **argv) {
   ......
   // 初始化服務器配置,主要是填充 redisServer 結(jié)構(gòu)體中的各種參數(shù)
   initServerConfig();
   ......
   // 初始化服務器
   initServer();
   ......
   // 進入事件循環(huán)
   aeMain(server.el);
}

分別來看看它們主要做了什么?

initServerConfig()

initServerConfig() 主要是填充 struct redisServer 這個結(jié)構(gòu)體,Redis 所有相關的配置都 在里面。

void initServer() {
   // 創(chuàng)建事件循環(huán)結(jié)構(gòu)體
   server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
   // 分配數(shù)據(jù)集空間
   server.db = zmalloc(sizeof(redisDb)*server.dbnum);
   /* Open the TCP listening socket for the user commands. */
   // listenToPort() 中有調(diào)用listen()
    if (server.port != 0 &&
        listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
        exit(1);
......
// 初始化redis 數(shù)據(jù)集
/* Create the Redis databases, and initialize other internal state. */
for (j = 0; j < server.REDIS_DEFAULT_DBNUM; j++) { // 初始化多個數(shù)據(jù)庫
    // 哈希表,用于存儲鍵值對
    server.db[j].dict = dictCreate(&dbDictType,NULL);
    // 哈希表,用于存儲每個鍵的過期時間
    server.db[j].expires = dictCreate(&keyptrDictType,NULL);
    server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
    server.db[j].ready_keys = dictCreate(&setDictType,NULL);
    server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
    server.db[j].id = j;
    server.db[j].avg_ttl = 0;
  }
......
// 創(chuàng)建接收TCP 或者UNIX 域套接字的事件處理
// TCP
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
    // acceptTcpHandler() tcp 連接接受處理函數(shù)
    if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
        acceptTcpHandler,NULL) == AE_ERR)
        {
            redisPanic(
                "Unrecoverable error creating server.ipfd file event.");
        }
   }
......
}

在這里,創(chuàng)建了事件中心,是 Redis 的網(wǎng)絡模塊,如果你有學過 linux 下的網(wǎng)絡編程,那么知道這里一定和 select/epoll/kqueue 相關。

接著,是初始化數(shù)據(jù)中心,我們平時使用 Redis 設置的鍵值對,就是存儲在里面。這里不急著深入它是怎么做到存儲我們的鍵值對的,接著往下看好了,因為我們主要是想把大致的脈絡弄清楚。

在最后一段的代碼中,Redis 給 listen fd 注冊了回調(diào)函數(shù) acceptTcpHandler,也就是說當新的客戶端連接的時候,這個函數(shù)會被調(diào)用,詳情接下來再展開。

aeMain()

接著就開始等待請求的到來。

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
    // 進入事件循環(huán)可能會進入睡眠狀態(tài)。在睡眠之前,執(zhí)行預設置
    // 的函數(shù)aeSetBeforeSleepProc()。
    if (eventLoop->beforesleep != NULL)
        eventLoop->beforesleep(eventLoop);
    // AE_ALL_EVENTS 表示處理所有的事件
    aeProcessEvents(eventLoop, AE_ALL_EVENTS);
  }
}

前面的兩個函數(shù)都屬于是初始化的工作,到這里的時候,Redis 正式進入等待接收請求的狀態(tài)。具體的實現(xiàn),和 select/epoll/kqueue 這些 IO 多路復用的系統(tǒng)調(diào)用相關,而這也是網(wǎng)絡編程的基礎部分了。繼續(xù)跟蹤調(diào)用鏈:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    ......
    // 調(diào)用IO 多路復用函數(shù)阻塞監(jiān)聽
    numevents = aeApiPoll(eventLoop, tvp);
    // 處理已經(jīng)觸發(fā)的事件
for (j = 0; j < numevents; j++) {
    // 找到I/O 事件表中存儲的數(shù)據(jù)
    aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
    int mask = eventLoop->fired[j].mask;
    int fd = eventLoop->fired[j].fd;
    int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
    // 讀事件
    if (fe->mask & mask & AE_READABLE) {
        rfired = 1;
        fe->rfileProc(eventLoop,fd,fe->clientData,mask);
    }
    // 寫事件
    if (fe->mask & mask & AE_WRITABLE) {
    if (!rfired || fe->wfileProc != fe->rfileProc)
        fe->wfileProc(eventLoop,fd,fe->clientData,mask);
    }
    processed++;
  }
}
// 處理定時事件
/* Check time events */
if (flags & AE_TIME_EVENTS)
    processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}

可以看到,aeApiPoll 即是 IO 多路復用調(diào)用的地方,當有請求到來的時候,進程會覺醒以處理到來的請求。如果你有留意到上面的定時事件處理,也就明白相應的定時處理函數(shù)是在哪里觸發(fā)的了。

新連接的處理流程

在 initServer() 的講解中,Redis 注冊了回調(diào)函數(shù) acceptTcpHandler(),當有新的連接到來時,這個函數(shù)會被回調(diào),上面的函數(shù)指針 rfileProc() 實際上就是指向了 acceptTcpHandler()。

下面是 acceptTcpHandler() 的核心代碼:

// 用于TCP 接收請求的處理函數(shù)
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd;
    char cip[REDIS_IP_STR_LEN];
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    REDIS_NOTUSED(privdata);
    // 接收客戶端請求
    cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
    // 出錯
    if (cfd == AE_ERR) {
        redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
        return;
  }
    // 記錄
    redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
    // 真正有意思的地方
    acceptCommonHandler(cfd,0);
}

anetTcpAccept 是接收一個請求cfd,真正有意思的地方是acceptCommonHandler,而 acceptCommonHandler 最核心的調(diào)用是 createClient。Redis 對于每一個客戶端的連接,都會對應一個結(jié)構(gòu)體 struct redisClient。下面是 createClient 的核心代碼:

redisClient *createClient(int fd) {
    redisClient *c = zmalloc(sizeof(redisClient));
    /* passing -1 as fd it is possible to create a non connected client.
    * This is useful since all the Redis commands needs to be executed
    * in the context of a client. When commands are executed in other
    * contexts (for instance a Lua script) we need a non connected client. */
    if (fd != -1) {
        anetNonBlock(NULL,fd);
        anetEnableTcpNoDelay(NULL,fd);
    if (server.tcpkeepalive)
        anetKeepAlive(NULL,fd,server.tcpkeepalive);
    // 為接收到的套接字注冊監(jiān)聽事件
    // readQueryFromClient() 應該為處理客戶端請求的函數(shù)
    if (aeCreateFileEvent(server.el,fd,AE_READABLE,
        readQueryFromClient, c) == AE_ERR)
    {
        close(fd);
        zfree(c);
        return NULL;
    }
  }
  ......
  return c;
}

可以看到,createClient 在事件中心為與客戶端連接的套接字注冊了 readQueryFrom-Client() 回調(diào)函數(shù),而這也就是說當客戶端有請求數(shù)據(jù)過來的時候,acceptTcpHandler() 會被調(diào)用。于是,我們找到了’set name Jhon’ 開始處理的地方。

請求的處理流程

readQueryFromClient() 則是獲取來自客戶端的數(shù)據(jù),接下來它會調(diào)用processInput-Buffer() 解析命令和執(zhí)行命令,對于命令的執(zhí)行,調(diào)用的是函數(shù) processCommand()。下面是 processCommand() 核心代碼:

int processCommand(redisClient *c) {
    ......
    // 查找命令,redisClient.cmd 在此時賦值
    /* Now lookup the command and check ASAP about trivial error conditions
    * such as wrong arity, bad command name and so forth. */
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    // 沒有找到命令
    if (!c->cmd) {
        flagTransaction(c);
        addReplyErrorFormat(c,"unknown command '%s'",
            (char*)c->argv[0]->ptr);
        return REDIS_OK;
        // 參數(shù)個數(shù)不符合
    } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
               (c->argc < c->cmd->arity)) {
        flagTransaction(c);
        addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
            c->cmd->name);
        return REDIS_OK;
}
.....
    // 加入命令隊列的情況
    /* Exec the command */
    if (c->flags & REDIS_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
    // 命令入隊
    queueMultiCommand(c);
    addReply(c,shared.queued);
    // 真正執(zhí)行命令。
    // 注意,如果是設置了多命令模式,那么不是直接執(zhí)行命令,而是讓命令入隊
    } else {
        call(c,REDIS_CALL_FULL);
    if (listLength(server.ready_keys))
        handleClientsBlockedOnLists();
    }
return REDIS_OK;
}

如上可以看到,Redis 首先根據(jù)客戶端給出的命令字在命令表中查找對應的 c->cmd, 即 struct redisCommand()。

c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);

Redis 在初始化的時候準備了一個大數(shù)組,初始化了所有的命令,即初始化多個 structredisCommand,在 struct redisCommand 中就有該命令對應的回調(diào)函數(shù)指針。找到命令結(jié)構(gòu)體后,則開始執(zhí)行命令,核心調(diào)用是call()。

執(zhí)行命令

call() 做的事情有很多,但這里只關注這一句話:call() 調(diào)用了命令的回調(diào)函數(shù)。

    // call() 函數(shù)是執(zhí)行命令的核心函數(shù),真正執(zhí)行命令的地方
    /* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags) {
    ......
    // 執(zhí)行命令對應的處理函數(shù)
    c->cmd->proc(c);
    ......
}

http://wiki.jikexueyuan.com/project/redis/images/redis7.png" alt="" />

對于’set name Jhon’ 命令,對應的回調(diào)函數(shù)是 setCommand() 函數(shù)。setCommand 對 set 命令的參數(shù)做了檢測,因為還提供設置一個鍵值對的過期時間等功能,這里只關注最簡單的情況。

void setCommand(redisClient *c) {
    ......
    setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}
void setGenericCommand(redisClient *c, int flags, robj *key,
        robj *val, robj *expire, int unit, robj *ok_reply,
        robj *abort_reply) {
    ......
    setKey(c->db,key,val);
    ......
    addReply(c, ok_reply ? ok_reply : shared.ok);
}
void setKey(redisDb *db, robj *key, robj *val) {
    if (lookupKeyWrite(db,key) == NULL) {
        dbAdd(db,key,val);
    } else {
    dbOverwrite(db,key,val);
  }
  ......
}

setKey() 首先查看 key 是否存在于數(shù)據(jù)集中,如果存在則覆蓋寫;如果不存在則添加到數(shù)據(jù)集中。這里關注 key 不存在的情況:

void dbAdd(redisDb *db, robj *key, robj *val) {
    sds copy = sdsdup(key->ptr);
    int retval = dictAdd(db->dict, copy, val);
    redisAssertWithInfo(NULL,key,retval == REDIS_OK);
}

dictAdd() 就是把 key 存到字典中,實際上即是存到一個哈希表。

在哪里回復客戶端

最后,回到 setGenericCommand(), 會調(diào)用 addReply()。addReply() 會為與客戶端連接的套接字注冊可寫事件,把’ok’ 添加到客戶端的回復緩存中。待再一次回到事件循環(huán)的時候,如果這個套接字可寫,相應的回調(diào)函數(shù)就可以被回調(diào)了?;貜途彺嬷械臄?shù)據(jù)會被發(fā)送到客戶端。

由此’set name Jhon’ 命令執(zhí)行完畢。在把這個流程捋順的過程,我省去了很多的細節(jié),只關注場景最簡單情況最單一的時候,其他的代碼都沒有去看,譬如主從復制的,持久化的相關邏輯。這對我們快速了解一個系統(tǒng)的原理是很關鍵的。同樣,在面對其他系統(tǒng)代碼的時候,也可以帶著這三個最簡單的問題去閱讀:它是誰,它從哪里來,又到哪里去。