Redis 提供在線數(shù)據(jù)遷移的能力,把自身的數(shù)據(jù)往其他 Redis 服務(wù)器上遷移。如果需要將部分?jǐn)?shù)據(jù)遷移到另一臺(tái)Redis 服務(wù)器上,這個(gè)命令會(huì)非常有用。
redis migraiton 的實(shí)現(xiàn)比較簡(jiǎn)單。首先將需要遷移的命令打包好,發(fā)送到指定的 Redis 服務(wù)器上,回復(fù)ok 后則刪除本地的鍵值對(duì)。
這里面用了前面講到的 rio:讀寫(xiě)對(duì)象既可以是文件也可以是內(nèi)存,只需要安裝相應(yīng)的讀寫(xiě)函數(shù)即可。這里不難理解。
網(wǎng)絡(luò)傳輸部分,用到了 Redis 內(nèi)部的 syncio 模塊,syncio 即同步 io,每讀/寫(xiě)入一部分?jǐn)?shù)據(jù)會(huì)用 IO 多路復(fù)用的技術(shù)等待下一次可讀寫(xiě)/的機(jī)會(huì)。在 migrateCommand() 的實(shí)現(xiàn)中,先用非阻塞的方式建立一個(gè)連接,接著將打包好的遷移數(shù)據(jù)發(fā)送到目標(biāo) Redis 服務(wù)器上,并等待目標(biāo) Redis 服務(wù)器的相應(yīng)。
下面通過(guò) migrateCommand() 來(lái)了解數(shù)據(jù)遷移是如何實(shí)現(xiàn)的:
/* MIGRATE host port key dbid timeout */
void migrateCommand(redisClient *c) {
int fd;
long timeout;
long dbid;
long long ttl = 0, expireat;
robj *o;
rio cmd, payload;
// 準(zhǔn)備需要遷移的數(shù)據(jù),這個(gè)數(shù)據(jù)可以由客戶端來(lái)指定
......
// 建立一個(gè)非阻塞連接
/* Connect */
fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
atoi(c->argv[2]->ptr));
if (fd == -1) {
addReplyErrorFormat(c,"Can't connect to target node: %s",
server.neterr);
return;
}
// 等待建立成功
if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
return;
}
// rio 的讀寫(xiě)可以對(duì)應(yīng)的是文件讀寫(xiě),也可以是內(nèi)存的讀寫(xiě),只需要安裝相應(yīng)的讀寫(xiě)
// 函數(shù)
// 初始化一塊空的sds buffer
/* Create RESTORE payload and generate the protocol to call the command. */
rioInitWithBuffer(&cmd,sdsempty());
// 把需要遷移的數(shù)據(jù)打包追加到
......
// 可以指定過(guò)期時(shí)間
expireat = getExpire(c->db,c->argv[3]);
if (expireat != -1) {
ttl = expireat-mstime();
if (ttl < 1) ttl = 1;
}
// 生成restore 命令
......
// 生成包含 Redis 版本和校驗(yàn)字段的 payload
/* Finally the last argument that is the serailized object payload
* in the DUMP format. */
createDumpPayload(&payload,o);
// 寫(xiě)入到遷移內(nèi)容中
redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,
sdslen(payload.io.buffer.ptr)));
sdsfree(payload.io.buffer.ptr);
// 將遷移數(shù)據(jù)送往目標(biāo) Redis 服務(wù)器
/* Tranfer the query to the other node in 64K chunks. */
{
sds buf = cmd.io.buffer.ptr;
size_t pos = 0, towrite;
int nwritten = 0;
// 最多只傳送64K
while ((towrite = sdslen(buf)-pos) > 0) {
towrite = (towrite > (64*1024) ? (64*1024) : towrite);
// 同步寫(xiě)
nwritten = syncWrite(fd,buf+pos,towrite,timeout);
if (nwritten != (signed)towrite) goto socket_wr_err;
pos += nwritten;
}
}
// 讀取目標(biāo) Redis 服務(wù)器的回復(fù)
/* Read back the reply. */
{
char buf1[1024];
char buf2[1024];
/* Read the two replies */
if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0)
goto socket_rd_err;
if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0)
goto socket_rd_err;
if (buf1[0] == '-' || buf2[0] == '-') {
addReplyErrorFormat(c,"Target instance replied with error: %s",
(buf1[0] == '-') ? buf1+1 : buf2+1);
} else {
// 回復(fù)內(nèi)容正常,說(shuō)明數(shù)據(jù)已經(jīng)遷移成功,刪除原始 Redis 服務(wù)器的key-value
robj *aux;
dbDelete(c->db,c->argv[3]);
signalModifiedKey(c->db,c->argv[3]);
addReply(c,shared.ok);
server.dirty++;
// 將變更更新到從機(jī),并寫(xiě)入AOF 文件
/* Translate MIGRATE as DEL for replication/AOF. */
aux = createStringObject("DEL",3);
rewriteClientCommandVector(c,2,aux,c->argv[3]);
decrRefCount(aux);
}
}
// 一些清理工作和錯(cuò)誤處理
......
}
migrateCommand() 只是數(shù)據(jù)遷移的一部分代碼,目標(biāo)機(jī)器還要負(fù)責(zé)將數(shù)據(jù)存儲(chǔ)到目標(biāo)機(jī)器上,有興趣可以參考 restoreCommand() 的實(shí)現(xiàn),基本上和 migrateCommand() 是逆過(guò)來(lái)的。