psync主要用于redis主从实例之间的同步,该命令在2.8版本引入。

2.8之前的版本只有sync命令,每次触发都会先进行全同步,然后进行增量同步。而2.8引入的psync命令可以在类似于断线重连这种场景下,也能进行增量同步。实现原理是通过runidoffset来实现断点续传,防止在网络抖动的情况下触发全量同步而导致整个集群发生抖动(master会触发rdb dump,同时会占用网络)。

然而该断点续传信息均在内存中,当节点升级/重启的情况下,也会触发全量同步。

为了优化此场景,在4.0中优化了psync指令,也有人称之为psync2。该优化主要实现了即使该实例重启之后,也能进行增量同步。为了实现该功能,redis在shutdown的时候会将断点续传信息保存于rdb文件中,当redis重启后,会从rdb中读取对应的信息,然后使用该信息来做断点续传。

该实现主要设计以下几个关键变量:

  • master_replid。当前的主节点ID。当本节点为主节点,那么该值由自己随机生成;当本节点为从节点,那么该节点就是当前复制的主节点的值。
  • master_repl_offset。复制偏移量。
  • master_replid2。相对于psync命令,添加了此变量,该变量存储着上一次同步的主节点ID。当一个从节点提升为主节点之后,那么该节点的master_replid会新生成,而master_replid2则会存储着之前的一个主的复制ID。
  • second_repl_offset。上一次同步的主节点的偏移量。

为何要引入master_replid2second_repl_offset呢?主要为了当发生主从切换后,旧的主恢复或者是另一个从向新主复制数据的时候,降低发生全同步的概率。

在从节点执行psync命令的时候,master会根据传入的断点续传信息进行判断能否进行增量同步。当传入的replidpsync_offset满足以下条件的时候,则会触发全同步:

  • replid不等于当前节点的master_replid
  • replid不等于当前节点的master_replid2或者psync_off大于了当前节点的second_replid_offset

相关代码主要在masterTryPartialResynchronization中,具体实现如下:

/* Is the replication ID of this master the same advertised by the wannabe
     * slave via PSYNC? If the replication ID changed this master has a
     * different replication history, and there is no way to continue.
     *
     * Note that there are two potentially valid replication IDs: the ID1
     * and the ID2. The ID2 however is only valid up to a specific offset. */
    if (strcasecmp(master_replid, server.replid) &&
        (strcasecmp(master_replid, server.replid2) ||
         psync_offset > server.second_replid_offset))
    {
        /* Run id "?" is used by slaves that want to force a full resync. */
        if (master_replid[0] != '?') {
            if (strcasecmp(master_replid, server.replid) &&
                strcasecmp(master_replid, server.replid2))
            {
                serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
                    "Replication ID mismatch (Replica asked for '%s', my "
                    "replication IDs are '%s' and '%s')",
                    master_replid, server.replid, server.replid2);
            } else {
                serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
                    "Requested offset for second ID was %lld, but I can reply "
                    "up to %lld", psync_offset, server.second_replid_offset);
            }
        } else {
            serverLog(LL_NOTICE,"Full resync requested by replica %s",
                replicationGetSlaveName(c));
        }
        goto need_full_resync;
    }

在判断了replid之后,继续判断偏移量是否在命令的缓冲区内:

/* We still have the data our slave is asking for? */
    if (!server.repl_backlog ||
        psync_offset < server.repl_backlog_off ||
        psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
    {
        serverLog(LL_NOTICE,
            "Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset);
        if (psync_offset > server.master_repl_offset) {
            serverLog(LL_WARNING,
                "Warning: replica %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
        }
        goto need_full_resync;
    }

接下来就开始增量同步的逻辑了。

接下来,我们看看在何时会使得master_replid2有值。该修改函数是shiftReplicationId,主要将当前的master_replid复制入master_replid2并且更新second_repl_offset

/* Use the current replication ID / offset as secondary replication
 * ID, and change the current one in order to start a new history.
 * This should be used when an instance is switched from slave to master
 * so that it can serve PSYNC requests performed using the master
 * replication ID. */
void shiftReplicationId(void) {
    memcpy(server.replid2,server.replid,sizeof(server.replid));
    /* We set the second replid offset to the master offset + 1, since
     * the slave will ask for the first byte it has not yet received, so
     * we need to add one to the offset: for example if, as a slave, we are
     * sure we have the same history as the master for 50 bytes, after we
     * are turned into a master, we can accept a PSYNC request with offset
     * 51, since the slave asking has the same history up to the 50th
     * byte, and is asking for the new bytes starting at offset 51. */
    server.second_replid_offset = server.master_repl_offset+1;
    changeReplicationId();
    serverLog(LL_WARNING,"Setting secondary replication ID to %s, valid up to offset: %lld. New replication ID is %s", server.replid2, server.second_replid_offset, server.replid);
}

从注释来看,我们可以发现该函数是在一个从节点提升为主节点之后调用的,主要负责生成新的master_replid并且将老的master_replid复制入master_replid2。执行了slaveof no one的时候会触发该操作。

除了节点由从提升为主的场景,将一个从的master切换也会使得master_replid2进行更新。该实现主要在slaveTryPartialResynchronization中,当新的master的id不同于之前记录下的id之后,则会进行一次更新操作:

/* Check the new replication ID advertised by the master. If it
         * changed, we need to set the new ID as primary ID, and set or
         * secondary ID as the old master ID up to the current offset, so
         * that our sub-slaves will be able to PSYNC with us after a
         * disconnection. */
        char *start = reply+10;
        char *end = reply+9;
        while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0') end++;
        if (end-start == CONFIG_RUN_ID_SIZE) {
            char new[CONFIG_RUN_ID_SIZE+1];
            memcpy(new,start,CONFIG_RUN_ID_SIZE);
            new[CONFIG_RUN_ID_SIZE] = '\0';

            if (strcmp(new,server.cached_master->replid)) {
                /* Master ID changed. */
                serverLog(LL_WARNING,"Master replication ID changed to %s",new);

                /* Set the old ID as our ID2, up to the current offset+1. */
                memcpy(server.replid2,server.cached_master->replid,
                    sizeof(server.replid2));
                server.second_replid_offset = server.master_repl_offset+1;

                /* Update the cached master ID and our own primary ID to the
                 * new one. */
                memcpy(server.replid,new,sizeof(server.replid));
                memcpy(server.cached_master->replid,new,sizeof(server.replid));

                /* Disconnect all the sub-slaves: they need to be notified. */
                disconnectSlaves();
            }
        }

那假如该节点是从节点,何时更新自己的master_replid呢?在全量同步发送rdb阶段,rdb之后会附加一个同步的id和偏移量。该实现主要在readSyncBulkPayload中,实现的片段如下:

/* After a full resynchroniziation we use the replication ID and
         * offset of the master. The secondary ID / offset are cleared since
         * we are starting a new history. */
        memcpy(server.replid,server.master->replid,sizeof(server.replid));
        server.master_repl_offset = server.master->reploff;
        clearReplicationId2();
        /* Let's create the replication backlog if needed. Slaves need to
         * accumulate the backlog regardless of the fact they have sub-slaves
         * or not, in order to behave correctly if they are promoted to
         * masters after a failover. */
        if (server.repl_backlog == NULL) createReplicationBacklog();

有以下几个细节:

  • 当发生全量同步的时候,会清理掉master_replid2
  • 从节点也会创建Replication backlog,主要用于有限的保存命令,由此来支持当该从提升为主之后,别的节点能从Replication backlog中获取增量数据。
共 0 条回复
暂时没有人回复哦,赶紧抢沙发
发表新回复

作者

sryan
today is a good day