鍍金池/ 教程/ 大數(shù)據(jù)/ Redis 數(shù)據(jù)遷移
Redis 數(shù)據(jù)淘汰機(jī)制
積分排行榜
小剖 Memcache
Redis 數(shù)據(jù)結(jié)構(gòu) intset
分布式鎖
從哪里開(kāi)始讀起,怎么讀
Redis 數(shù)據(jù)結(jié)構(gòu) dict
不在浮沙筑高臺(tái)
Redis 集群(上)
Redis 監(jiān)視器
源碼閱讀工具
Redis 日志和斷言
內(nèi)存數(shù)據(jù)管理
Redis 數(shù)據(jù)結(jié)構(gòu)綜述
源碼日志
Web 服務(wù)器存儲(chǔ) session
消息中間件
Redis 與 Lua 腳本
什么樣的源代碼適合閱讀
Redis 數(shù)據(jù)結(jié)構(gòu) sds
Memcached slab 分配策略
訂閱發(fā)布機(jī)制
Redis 是如何提供服務(wù)的
Redis 事務(wù)機(jī)制
Redis 集群(下)
主從復(fù)制
Redis 應(yīng)用
RDB 持久化策略
Redis 數(shù)據(jù)遷移
Redis 事件驅(qū)動(dòng)詳解
初探 Redis
Redis 與 Memcache
AOF 持久化策略
Redis 數(shù)據(jù)結(jié)構(gòu) redisOb
作者簡(jiǎn)介
Redis 數(shù)據(jù)結(jié)構(gòu) ziplist
Redis 數(shù)據(jù)結(jié)構(gòu) skiplist
Redis 哨兵機(jī)制

Redis 數(shù)據(jù)遷移

Redis 提供在線數(shù)據(jù)遷移的能力,把自身的數(shù)據(jù)往其他 Redis 服務(wù)器上遷移。如果需要將部分?jǐn)?shù)據(jù)遷移到另一臺(tái)Redis 服務(wù)器上,這個(gè)命令會(huì)非常有用。

redis migraiton 的實(shí)現(xiàn)比較簡(jiǎn)單。首先將需要遷移的命令打包好,發(fā)送到指定的 Redis 服務(wù)器上,回復(fù)ok 后則刪除本地的鍵值對(duì)。

這里面用了前面講到的 rio:讀寫(xiě)對(duì)象既可以是文件也可以是內(nèi)存,只需要安裝相應(yīng)的讀寫(xiě)函數(shù)即可。這里不難理解。

網(wǎng)絡(luò)傳輸部分,用到了 Redis 內(nèi)部的 syncio 模塊,syncio 即同步 io,每讀/寫(xiě)入一部分?jǐn)?shù)據(jù)會(huì)用 IO 多路復(fù)用的技術(shù)等待下一次可讀寫(xiě)/的機(jī)會(huì)。在 migrateCommand() 的實(shí)現(xiàn)中,先用非阻塞的方式建立一個(gè)連接,接著將打包好的遷移數(shù)據(jù)發(fā)送到目標(biāo) Redis 服務(wù)器上,并等待目標(biāo) Redis 服務(wù)器的相應(yīng)。

下面通過(guò) migrateCommand() 來(lái)了解數(shù)據(jù)遷移是如何實(shí)現(xiàn)的:

/* MIGRATE host port key dbid timeout */
void migrateCommand(redisClient *c) {
    int fd;
    long timeout;
    long dbid;
    long long ttl = 0, expireat;
    robj *o;
    rio cmd, payload;
    // 準(zhǔn)備需要遷移的數(shù)據(jù),這個(gè)數(shù)據(jù)可以由客戶端來(lái)指定
    ......
    // 建立一個(gè)非阻塞連接
    /* Connect */
    fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
    atoi(c->argv[2]->ptr));
    if (fd == -1) {
        addReplyErrorFormat(c,"Can't connect to target node: %s",
        server.neterr);
        return;
    }
    // 等待建立成功
    if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
        addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
        return;
    }
    // rio 的讀寫(xiě)可以對(duì)應(yīng)的是文件讀寫(xiě),也可以是內(nèi)存的讀寫(xiě),只需要安裝相應(yīng)的讀寫(xiě)
    // 函數(shù)
    // 初始化一塊空的sds buffer
    /* Create RESTORE payload and generate the protocol to call the command. */
    rioInitWithBuffer(&cmd,sdsempty());
    // 把需要遷移的數(shù)據(jù)打包追加到
    ......
    // 可以指定過(guò)期時(shí)間
    expireat = getExpire(c->db,c->argv[3]);
    if (expireat != -1) {
        ttl = expireat-mstime();
    if (ttl < 1) ttl = 1;
    }
    // 生成restore 命令
    ......
    // 生成包含 Redis 版本和校驗(yàn)字段的 payload
    /* Finally the last argument that is the serailized object payload
    * in the DUMP format. */
    createDumpPayload(&payload,o);
    // 寫(xiě)入到遷移內(nèi)容中
    redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,
    sdslen(payload.io.buffer.ptr)));
    sdsfree(payload.io.buffer.ptr);
    // 將遷移數(shù)據(jù)送往目標(biāo) Redis 服務(wù)器
    /* Tranfer the query to the other node in 64K chunks. */
    {
    sds buf = cmd.io.buffer.ptr;
    size_t pos = 0, towrite;
    int nwritten = 0;
    // 最多只傳送64K
    while ((towrite = sdslen(buf)-pos) > 0) {
        towrite = (towrite > (64*1024) ? (64*1024) : towrite);
        // 同步寫(xiě)
        nwritten = syncWrite(fd,buf+pos,towrite,timeout);
    if (nwritten != (signed)towrite) goto socket_wr_err;
        pos += nwritten;
        }
    }
    // 讀取目標(biāo) Redis 服務(wù)器的回復(fù)
    /* Read back the reply. */
    {
    char buf1[1024];
    char buf2[1024];
    /* Read the two replies */
    if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0)
        goto socket_rd_err;
    if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0)
        goto socket_rd_err;
    if (buf1[0] == '-' || buf2[0] == '-') {
        addReplyErrorFormat(c,"Target instance replied with error: %s",
        (buf1[0] == '-') ? buf1+1 : buf2+1);
    } else {
        // 回復(fù)內(nèi)容正常,說(shuō)明數(shù)據(jù)已經(jīng)遷移成功,刪除原始 Redis 服務(wù)器的key-value
        robj *aux;
        dbDelete(c->db,c->argv[3]);
        signalModifiedKey(c->db,c->argv[3]);
        addReply(c,shared.ok);
        server.dirty++;
        // 將變更更新到從機(jī),并寫(xiě)入AOF 文件
        /* Translate MIGRATE as DEL for replication/AOF. */
        aux = createStringObject("DEL",3);
        rewriteClientCommandVector(c,2,aux,c->argv[3]);
        decrRefCount(aux);
        }
    }
    // 一些清理工作和錯(cuò)誤處理
    ......
}

migrateCommand() 只是數(shù)據(jù)遷移的一部分代碼,目標(biāo)機(jī)器還要負(fù)責(zé)將數(shù)據(jù)存儲(chǔ)到目標(biāo)機(jī)器上,有興趣可以參考 restoreCommand() 的實(shí)現(xiàn),基本上和 migrateCommand() 是逆過(guò)來(lái)的。