在剛剛接觸 Redis 的時候,最想要知道的是一個’set name Jhon’ 命令到達 Redis 服務器的時候,它是如何返回’OK’ 的?里面命令處理的流程如何,具體細節(jié)怎么樣?你一定有問過自己。閱讀別人的代碼是很枯燥的,但帶著好奇心閱讀代碼,是一件很興奮的事情,接著翻到了 Redis 源碼的 main() 函數(shù)。
http://wiki.jikexueyuan.com/project/redis/images/redis6.png" alt="" />
Redis 在啟動做了一些初始化邏輯,比如配置文件讀取,數(shù)據(jù)中心初始化,網(wǎng)絡通信模塊初始化等,待所有初始化任務完畢后,便開始等待請求。
當請求到來時,Redis 進程會被喚醒,原理是 epoll. select, kqueue 等一些 I/O 多路復用的系統(tǒng)調(diào)用。如果有閱讀上一章節(jié),應該理解這一句話。接著讀取來來自客戶端的數(shù)據(jù),解析命令,查找命令,并執(zhí)行命令。
執(zhí)行命令’set name Jhon’ 的時候,Redis 會在預先初始化好的哈希表里頭, 查找 key=’name’ 對應的位置,并存入。
最后,把回復的內(nèi)容準備好回送給客戶端,客戶端于是收到了’OK’. 接下來,我們看看詳細的過程是怎么樣的。
了解了 Redis 的事件驅(qū)動模型后,帶著命令是如何被處理的這個問題去讀代碼。剛開始的時候,會有一堆的變量和函數(shù)等著讀者,但只要抓住主干就好了,下面就是 Redis 的主干部分。
int main(int argc, char **argv) {
......
// 初始化服務器配置,主要是填充 redisServer 結(jié)構(gòu)體中的各種參數(shù)
initServerConfig();
......
// 初始化服務器
initServer();
......
// 進入事件循環(huán)
aeMain(server.el);
}
分別來看看它們主要做了什么?
initServerConfig() 主要是填充 struct redisServer 這個結(jié)構(gòu)體,Redis 所有相關的配置都 在里面。
void initServer() {
// 創(chuàng)建事件循環(huán)結(jié)構(gòu)體
server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
// 分配數(shù)據(jù)集空間
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
/* Open the TCP listening socket for the user commands. */
// listenToPort() 中有調(diào)用listen()
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
exit(1);
......
// 初始化redis 數(shù)據(jù)集
/* Create the Redis databases, and initialize other internal state. */
for (j = 0; j < server.REDIS_DEFAULT_DBNUM; j++) { // 初始化多個數(shù)據(jù)庫
// 哈希表,用于存儲鍵值對
server.db[j].dict = dictCreate(&dbDictType,NULL);
// 哈希表,用于存儲每個鍵的過期時間
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
server.db[j].ready_keys = dictCreate(&setDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
server.db[j].id = j;
server.db[j].avg_ttl = 0;
}
......
// 創(chuàng)建接收TCP 或者UNIX 域套接字的事件處理
// TCP
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
// acceptTcpHandler() tcp 連接接受處理函數(shù)
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
......
}
在這里,創(chuàng)建了事件中心,是 Redis 的網(wǎng)絡模塊,如果你有學過 linux 下的網(wǎng)絡編程,那么知道這里一定和 select/epoll/kqueue 相關。
接著,是初始化數(shù)據(jù)中心,我們平時使用 Redis 設置的鍵值對,就是存儲在里面。這里不急著深入它是怎么做到存儲我們的鍵值對的,接著往下看好了,因為我們主要是想把大致的脈絡弄清楚。
在最后一段的代碼中,Redis 給 listen fd 注冊了回調(diào)函數(shù) acceptTcpHandler,也就是說當新的客戶端連接的時候,這個函數(shù)會被調(diào)用,詳情接下來再展開。
接著就開始等待請求的到來。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
// 進入事件循環(huán)可能會進入睡眠狀態(tài)。在睡眠之前,執(zhí)行預設置
// 的函數(shù)aeSetBeforeSleepProc()。
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// AE_ALL_EVENTS 表示處理所有的事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
前面的兩個函數(shù)都屬于是初始化的工作,到這里的時候,Redis 正式進入等待接收請求的狀態(tài)。具體的實現(xiàn),和 select/epoll/kqueue 這些 IO 多路復用的系統(tǒng)調(diào)用相關,而這也是網(wǎng)絡編程的基礎部分了。繼續(xù)跟蹤調(diào)用鏈:
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
......
// 調(diào)用IO 多路復用函數(shù)阻塞監(jiān)聽
numevents = aeApiPoll(eventLoop, tvp);
// 處理已經(jīng)觸發(fā)的事件
for (j = 0; j < numevents; j++) {
// 找到I/O 事件表中存儲的數(shù)據(jù)
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
// 讀事件
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
// 寫事件
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
// 處理定時事件
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
可以看到,aeApiPoll 即是 IO 多路復用調(diào)用的地方,當有請求到來的時候,進程會覺醒以處理到來的請求。如果你有留意到上面的定時事件處理,也就明白相應的定時處理函數(shù)是在哪里觸發(fā)的了。
在 initServer() 的講解中,Redis 注冊了回調(diào)函數(shù) acceptTcpHandler(),當有新的連接到來時,這個函數(shù)會被回調(diào),上面的函數(shù)指針 rfileProc() 實際上就是指向了 acceptTcpHandler()。
下面是 acceptTcpHandler() 的核心代碼:
// 用于TCP 接收請求的處理函數(shù)
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd;
char cip[REDIS_IP_STR_LEN];
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
// 接收客戶端請求
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
// 出錯
if (cfd == AE_ERR) {
redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
return;
}
// 記錄
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
// 真正有意思的地方
acceptCommonHandler(cfd,0);
}
anetTcpAccept 是接收一個請求cfd,真正有意思的地方是acceptCommonHandler,而 acceptCommonHandler 最核心的調(diào)用是 createClient。Redis 對于每一個客戶端的連接,都會對應一個結(jié)構(gòu)體 struct redisClient。下面是 createClient 的核心代碼:
redisClient *createClient(int fd) {
redisClient *c = zmalloc(sizeof(redisClient));
/* passing -1 as fd it is possible to create a non connected client.
* This is useful since all the Redis commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */
if (fd != -1) {
anetNonBlock(NULL,fd);
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
// 為接收到的套接字注冊監(jiān)聽事件
// readQueryFromClient() 應該為處理客戶端請求的函數(shù)
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}
......
return c;
}
可以看到,createClient 在事件中心為與客戶端連接的套接字注冊了 readQueryFrom-Client() 回調(diào)函數(shù),而這也就是說當客戶端有請求數(shù)據(jù)過來的時候,acceptTcpHandler() 會被調(diào)用。于是,我們找到了’set name Jhon’ 開始處理的地方。
readQueryFromClient() 則是獲取來自客戶端的數(shù)據(jù),接下來它會調(diào)用processInput-Buffer() 解析命令和執(zhí)行命令,對于命令的執(zhí)行,調(diào)用的是函數(shù) processCommand()。下面是 processCommand() 核心代碼:
int processCommand(redisClient *c) {
......
// 查找命令,redisClient.cmd 在此時賦值
/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
// 沒有找到命令
if (!c->cmd) {
flagTransaction(c);
addReplyErrorFormat(c,"unknown command '%s'",
(char*)c->argv[0]->ptr);
return REDIS_OK;
// 參數(shù)個數(shù)不符合
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < c->cmd->arity)) {
flagTransaction(c);
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return REDIS_OK;
}
.....
// 加入命令隊列的情況
/* Exec the command */
if (c->flags & REDIS_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
// 命令入隊
queueMultiCommand(c);
addReply(c,shared.queued);
// 真正執(zhí)行命令。
// 注意,如果是設置了多命令模式,那么不是直接執(zhí)行命令,而是讓命令入隊
} else {
call(c,REDIS_CALL_FULL);
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}
return REDIS_OK;
}
如上可以看到,Redis 首先根據(jù)客戶端給出的命令字在命令表中查找對應的 c->cmd, 即 struct redisCommand()。
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
Redis 在初始化的時候準備了一個大數(shù)組,初始化了所有的命令,即初始化多個 structredisCommand,在 struct redisCommand 中就有該命令對應的回調(diào)函數(shù)指針。找到命令結(jié)構(gòu)體后,則開始執(zhí)行命令,核心調(diào)用是call()。
call() 做的事情有很多,但這里只關注這一句話:call() 調(diào)用了命令的回調(diào)函數(shù)。
// call() 函數(shù)是執(zhí)行命令的核心函數(shù),真正執(zhí)行命令的地方
/* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags) {
......
// 執(zhí)行命令對應的處理函數(shù)
c->cmd->proc(c);
......
}
http://wiki.jikexueyuan.com/project/redis/images/redis7.png" alt="" />
對于’set name Jhon’ 命令,對應的回調(diào)函數(shù)是 setCommand() 函數(shù)。setCommand 對 set 命令的參數(shù)做了檢測,因為還提供設置一個鍵值對的過期時間等功能,這里只關注最簡單的情況。
void setCommand(redisClient *c) {
......
setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}
void setGenericCommand(redisClient *c, int flags, robj *key,
robj *val, robj *expire, int unit, robj *ok_reply,
robj *abort_reply) {
......
setKey(c->db,key,val);
......
addReply(c, ok_reply ? ok_reply : shared.ok);
}
void setKey(redisDb *db, robj *key, robj *val) {
if (lookupKeyWrite(db,key) == NULL) {
dbAdd(db,key,val);
} else {
dbOverwrite(db,key,val);
}
......
}
setKey() 首先查看 key 是否存在于數(shù)據(jù)集中,如果存在則覆蓋寫;如果不存在則添加到數(shù)據(jù)集中。這里關注 key 不存在的情況:
void dbAdd(redisDb *db, robj *key, robj *val) {
sds copy = sdsdup(key->ptr);
int retval = dictAdd(db->dict, copy, val);
redisAssertWithInfo(NULL,key,retval == REDIS_OK);
}
dictAdd() 就是把 key 存到字典中,實際上即是存到一個哈希表。
最后,回到 setGenericCommand(), 會調(diào)用 addReply()。addReply() 會為與客戶端連接的套接字注冊可寫事件,把’ok’ 添加到客戶端的回復緩存中。待再一次回到事件循環(huán)的時候,如果這個套接字可寫,相應的回調(diào)函數(shù)就可以被回調(diào)了?;貜途彺嬷械臄?shù)據(jù)會被發(fā)送到客戶端。
由此’set name Jhon’ 命令執(zhí)行完畢。在把這個流程捋順的過程,我省去了很多的細節(jié),只關注場景最簡單情況最單一的時候,其他的代碼都沒有去看,譬如主從復制的,持久化的相關邏輯。這對我們快速了解一個系統(tǒng)的原理是很關鍵的。同樣,在面對其他系統(tǒng)代碼的時候,也可以帶著這三個最簡單的問題去閱讀:它是誰,它從哪里來,又到哪里去。