psync主要用于redis主从实例之间的同步,该命令在2.8版本引入。
2.8之前的版本只有sync命令,每次触发都会先进行全同步,然后进行增量同步。而2.8引入的psync命令可以在类似于断线重连这种场景下,也能进行增量同步。实现原理是通过runid
和offset
来实现断点续传,防止在网络抖动的情况下触发全量同步而导致整个集群发生抖动(master会触发rdb dump,同时会占用网络)。
然而该断点续传信息均在内存中,当节点升级/重启的情况下,也会触发全量同步。
为了优化此场景,在4.0中优化了psync指令,也有人称之为psync2。该优化主要实现了即使该实例重启之后,也能进行增量同步。为了实现该功能,redis在shutdown的时候会将断点续传信息保存于rdb文件中,当redis重启后,会从rdb中读取对应的信息,然后使用该信息来做断点续传。
该实现主要设计以下几个关键变量:
master_replid
。当前的主节点ID。当本节点为主节点,那么该值由自己随机生成;当本节点为从节点,那么该节点就是当前复制的主节点的值。master_repl_offset
。复制偏移量。master_replid2
。相对于psync命令,添加了此变量,该变量存储着上一次同步的主节点ID。当一个从节点提升为主节点之后,那么该节点的master_replid
会新生成,而master_replid2
则会存储着之前的一个主的复制ID。second_repl_offset
。上一次同步的主节点的偏移量。为何要引入master_replid2
和second_repl_offset
呢?主要为了当发生主从切换后,旧的主恢复或者是另一个从向新主复制数据的时候,降低发生全同步的概率。
在从节点执行psync命令的时候,master会根据传入的断点续传信息进行判断能否进行增量同步。当传入的replid
和psync_offset
满足以下条件的时候,则会触发全同步:
replid
不等于当前节点的master_replid
replid
不等于当前节点的master_replid2
或者psync_off
大于了当前节点的second_replid_offset
相关代码主要在masterTryPartialResynchronization
中,具体实现如下:
/* Is the replication ID of this master the same advertised by the wannabe
* slave via PSYNC? If the replication ID changed this master has a
* different replication history, and there is no way to continue.
*
* Note that there are two potentially valid replication IDs: the ID1
* and the ID2. The ID2 however is only valid up to a specific offset. */
if (strcasecmp(master_replid, server.replid) &&
(strcasecmp(master_replid, server.replid2) ||
psync_offset > server.second_replid_offset))
{
/* Run id "?" is used by slaves that want to force a full resync. */
if (master_replid[0] != '?') {
if (strcasecmp(master_replid, server.replid) &&
strcasecmp(master_replid, server.replid2))
{
serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
"Replication ID mismatch (Replica asked for '%s', my "
"replication IDs are '%s' and '%s')",
master_replid, server.replid, server.replid2);
} else {
serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
"Requested offset for second ID was %lld, but I can reply "
"up to %lld", psync_offset, server.second_replid_offset);
}
} else {
serverLog(LL_NOTICE,"Full resync requested by replica %s",
replicationGetSlaveName(c));
}
goto need_full_resync;
}
在判断了replid
之后,继续判断偏移量是否在命令的缓冲区内:
/* We still have the data our slave is asking for? */
if (!server.repl_backlog ||
psync_offset < server.repl_backlog_off ||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
{
serverLog(LL_NOTICE,
"Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset);
if (psync_offset > server.master_repl_offset) {
serverLog(LL_WARNING,
"Warning: replica %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
}
goto need_full_resync;
}
接下来就开始增量同步的逻辑了。
接下来,我们看看在何时会使得master_replid2
有值。该修改函数是shiftReplicationId
,主要将当前的master_replid
复制入master_replid2
并且更新second_repl_offset
。
/* Use the current replication ID / offset as secondary replication
* ID, and change the current one in order to start a new history.
* This should be used when an instance is switched from slave to master
* so that it can serve PSYNC requests performed using the master
* replication ID. */
void shiftReplicationId(void) {
memcpy(server.replid2,server.replid,sizeof(server.replid));
/* We set the second replid offset to the master offset + 1, since
* the slave will ask for the first byte it has not yet received, so
* we need to add one to the offset: for example if, as a slave, we are
* sure we have the same history as the master for 50 bytes, after we
* are turned into a master, we can accept a PSYNC request with offset
* 51, since the slave asking has the same history up to the 50th
* byte, and is asking for the new bytes starting at offset 51. */
server.second_replid_offset = server.master_repl_offset+1;
changeReplicationId();
serverLog(LL_WARNING,"Setting secondary replication ID to %s, valid up to offset: %lld. New replication ID is %s", server.replid2, server.second_replid_offset, server.replid);
}
从注释来看,我们可以发现该函数是在一个从节点提升为主节点之后调用的,主要负责生成新的master_replid
并且将老的master_replid
复制入master_replid2
。执行了slaveof no one
的时候会触发该操作。
除了节点由从提升为主的场景,将一个从的master切换也会使得master_replid2
进行更新。该实现主要在slaveTryPartialResynchronization
中,当新的master的id不同于之前记录下的id之后,则会进行一次更新操作:
/* Check the new replication ID advertised by the master. If it
* changed, we need to set the new ID as primary ID, and set or
* secondary ID as the old master ID up to the current offset, so
* that our sub-slaves will be able to PSYNC with us after a
* disconnection. */
char *start = reply+10;
char *end = reply+9;
while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0') end++;
if (end-start == CONFIG_RUN_ID_SIZE) {
char new[CONFIG_RUN_ID_SIZE+1];
memcpy(new,start,CONFIG_RUN_ID_SIZE);
new[CONFIG_RUN_ID_SIZE] = '\0';
if (strcmp(new,server.cached_master->replid)) {
/* Master ID changed. */
serverLog(LL_WARNING,"Master replication ID changed to %s",new);
/* Set the old ID as our ID2, up to the current offset+1. */
memcpy(server.replid2,server.cached_master->replid,
sizeof(server.replid2));
server.second_replid_offset = server.master_repl_offset+1;
/* Update the cached master ID and our own primary ID to the
* new one. */
memcpy(server.replid,new,sizeof(server.replid));
memcpy(server.cached_master->replid,new,sizeof(server.replid));
/* Disconnect all the sub-slaves: they need to be notified. */
disconnectSlaves();
}
}
那假如该节点是从节点,何时更新自己的master_replid
呢?在全量同步发送rdb阶段,rdb之后会附加一个同步的id和偏移量。该实现主要在readSyncBulkPayload
中,实现的片段如下:
/* After a full resynchroniziation we use the replication ID and
* offset of the master. The secondary ID / offset are cleared since
* we are starting a new history. */
memcpy(server.replid,server.master->replid,sizeof(server.replid));
server.master_repl_offset = server.master->reploff;
clearReplicationId2();
/* Let's create the replication backlog if needed. Slaves need to
* accumulate the backlog regardless of the fact they have sub-slaves
* or not, in order to behave correctly if they are promoted to
* masters after a failover. */
if (server.repl_backlog == NULL) createReplicationBacklog();
有以下几个细节:
master_replid2
。Replication backlog
,主要用于有限的保存命令,由此来支持当该从提升为主之后,别的节点能从Replication backlog
中获取增量数据。