redis主从复制
本文介绍一下redis的主从复制机制,首先是大体原理,然后是4.0的复制入口代码分析,接着是master上的处理逻辑,再是slave的,最后有几个Q&A记录。
1. 大体原理
基本上大部分数据库主从原理都是先全量复制,传播当前master的全量数据,然后是增量复制,传播从全量复制开始积累的增量数据。
redis在2.8以前的版本,slave节点向master发送sync
请求全量+增量数据,sync
命令的缺点是一旦中间因为网络抖动等原因断开,重新连接需要重新同步全量,那么这个代价就很大。所以从2.8开始引入了psync
,支持断点续传,从4.0开始,又引入了psync2(其redis命令还是psync
),支持主从切换情况下的断点续传。下面介绍以4.0的psync2为例。
当在slave上执行slaveof xxx
的时候,当前所连接的结点将会发送psync
命令给master,请求连接,其命令格式是:psync repl-id offset
,repl-id表示master的全局标志id,offset表示上次同步的位点,默认第一次发送psync ? -1
的命令。master收到以后会拿repl-id跟自身的id对比,如果不一致那么断点续传失败,进入全量同步;同理也会拿offset在内存的backlog队列里面寻找,查看这个offset对应的位点是否还存在,backlog本身是个循环队列,所以如果断开的时间过久,或者写入量太大,之前的数据就被清掉了,这个时候断点续传也失败了,将会进行全量同步。
- 如果master可以进行断点续传,回复+CONTINUE,后面跟上master的replid+offset。
- 如果master不能断点续传,回复+FULLRESYNC,后面跟上master的replid+offset。
4.0里面存了2个replid,除了上面介绍的replid外,还有一个replid2用于存储上一次master的id,这样如果发生切换了,收到slave的psync命令,也会用收到的这个replid跟内存里面的2个replid相比,只要有1个相等,那就可以做断点续传,这样就可以解决主从切换的时候继续做断点续传。
slave在接受master的增量过程中,会定期1s回复replconf ack offset
命令给master,以告知目前slave获取的offset的位点,这个信息只是用来展示给用户(在info server
),并没有用来判断。
2. 源码入口分析
replicationCron
是replication.c里面定时调度的函数,负责维持和master以及和slave的连接状态(如果有master或者slave的话)。注意本文贴的代码为了保证不太多,忽略了部分日志及代码。
/* Replication cron function, called 1 time per second. */
void replicationCron(void) {
// 1. 判断当前结点是否有一个上级master,并且连接中的状态超时了,则取消本次连接。
/* Non blocking connection timeout? */
if (server.masterhost &&
(server.repl_state == REPL_STATE_CONNECTING ||
slaveIsInHandshakeState()) &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
cancelReplicationHandshake();
}
// 2. 是否接受master的RDB超时了?是的话,同样取消与master连接
/* Bulk transfer I/O timeout? */
if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
cancelReplicationHandshake();
}
// 3. 如果自身已经是个连接状态的slave,但是跟master连接超时了,那么取消与这个master的连接状态。
/* Timed out master when we are an already connected slave? */
if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED &&
(time(NULL)-server.master->lastinteraction) > server.repl_timeout)
{
freeClient(server.master);
}
// 4. 自身是需要连接的状态,则发起与master的连接
/* Check if we should connect to a MASTER */
if (server.repl_state == REPL_STATE_CONNECT) {
if (connectWithMaster() == C_OK) {
serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started");
}
}
// 5. 如果支持psync,则定期1s向master发送slave收到的offset的ack给master
/* Send ACK to master from time to time.
* Note that we do not send periodic acks to masters that don't
* support PSYNC and replication offsets. */
if (server.masterhost && server.master &&
!(server.master->flags & CLIENT_PRE_PSYNC))
replicationSendAck();
// 6. 如果本身下面有挂slave节点,根据ping的间隔设置挨个ping之。
/* If we have attached slaves, PING them from time to time.
* So slaves can implement an explicit timeout to masters, and will
* be able to detect a link disconnection even if the TCP connection
* will not actually go down. */
...
// 6.1 挨个ping slave
/* First, send PING according to ping_slave_period. */
if ((replication_cron_loops % server.repl_ping_slave_period) == 0 &&
listLength(server.slaves))
{
ping_argv[0] = createStringObject("PING",4);
replicationFeedSlaves(server.slaves, server.slaveseldb,
ping_argv, 1);
decrRefCount(ping_argv[0]);
}
// 6.2 如果本身是在bgsave阶段,发送\n给下级slave以进行应用层面的连接保活。注意rdb是异步save的,所以这个\n可能在rdb发送之前已经发送了。redis-shake踩过这个坑。
/* Second, send a newline to all the slaves in pre-synchronization
* stage, that is, slaves waiting for the master to create the RDB file.
*
* Also send the a newline to all the chained slaves we have, if we lost
* connection from our master, to keep the slaves aware that their
* master is online. This is needed since sub-slaves only receive proxied
* data from top-level masters, so there is no explicit pinging in order
* to avoid altering the replication offsets. This special out of band
* pings (newlines) can be sent, they will have no effect in the offset.
*
* The newline will be ignored by the slave but will refresh the
* last interaction timer preventing a timeout. In this case we ignore the
* ping period and refresh the connection once per second since certain
* timeouts are set at a few seconds (example: PSYNC response). */
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
int is_presync =
(slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
(slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
server.rdb_child_type != RDB_CHILD_TYPE_SOCKET));
if (is_presync) {
if (write(slave->fd, "\n", 1) == -1) {
/* Don't worry about socket errors, it's just a ping. */
}
}
}
// 6.3 断开与超时slave的连接
/* Disconnect timedout slaves. */
if (listLength(server.slaves)) {
listIter li;
listNode *ln;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate != SLAVE_STATE_ONLINE) continue;
if (slave->flags & CLIENT_PRE_PSYNC) continue;
if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout)
{
serverLog(LL_WARNING, "Disconnecting timedout slave: %s",
replicationGetSlaveName(slave));
freeClient(slave);
}
}
}
// 7. 如果当前是master且没有slave,并且自己也没有上一级master,那么可以释放backlog队列用于减少内存。但如果自身是个slave,无论有没有下一级slave,都不能释放backlog队列内存,这是为了应对主备切换的情况。
// Q: 1. 那释放后如果新来1个slave,会重新建立这个内存?
// A: Yes.
/* If this is a master without attached slaves and there is a replication
* backlog active, in order to reclaim memory we can free it after some
* (configured) time. Note that this cannot be done for slaves: slaves
* without sub-slaves attached should still accumulate data into the
* backlog, in order to reply to PSYNC queries if they are turned into
* masters after a failover. */
if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
server.repl_backlog && server.masterhost == NULL)
{
// 代码省略
}
// 7. 如果AOF关闭了,并且没有slave节点,那么这个lua脚本用的script cache就没必要保存了。
/* If AOF is disabled and we no longer have attached slaves, we can
* free our Replication Script Cache as there is no need to propagate
* EVALSHA at all. */
if (listLength(server.slaves) == 0 &&
server.aof_state == AOF_OFF &&
listLength(server.repl_scriptcache_fifo) != 0)
{
replicationScriptCacheFlush();
}
// 8. 如果有slave是在WAIT_BGSAVE_START,并且当前没有启动bgsave子线程和aof rewrite子线程,则启动bgsave。
// 如果开启了流式传输,那么会等待一段时间再开始,以防止有多个slave差不多同一时间区间过来bgsave。
/* Start a BGSAVE good for replication if we have slaves in
* WAIT_BGSAVE_START state.
*
* In case of diskless replication, we make sure to wait the specified
* number of seconds (according to configuration) so that other slaves
* have the time to arrive before we start streaming. */
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { // 当前没有rdb bgsave子线程和aof rewrite子线程
time_t idle, max_idle = 0;
int slaves_waiting = 0;
int mincapa = -1;
listNode *ln;
listIter li;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
idle = server.unixtime - slave->lastinteraction;
if (idle > max_idle) max_idle = idle;
slaves_waiting++;
mincapa = (mincapa == -1) ? slave->slave_capa :
(mincapa & slave->slave_capa);
}
}
if (slaves_waiting &&
(!server.repl_diskless_sync ||
max_idle > server.repl_diskless_sync_delay))
{
/* Start the BGSAVE. The called function may start a
* BGSAVE with socket target or disk target depending on the
* configuration and slaves capabilities. */
startBgsaveForReplication(mincapa);
}
}
// 9. 刷新good slave计数
/* Refresh the number of slaves with lag <= min-slaves-max-lag. */
refreshGoodSlavesCount();
replication_cron_loops++; /* Incremented with frequency 1 HZ. */
}
3. Master接受psync的处理流程
syncCommand
是收到sync
和psync
命令的处理流程。代码不贴了,我就根据代码来分析吧。
- 如果发送的结点已经是一个slave了,则没必要再处理这个psync情况,当前函数直接退出。
- 对于级联的情况A->B->C,如果当前结点B还没有和master A完全建立连接,则拒绝收到C的psync情况。
- 如果发送节点对应的client已经有未发送的回复,那么拒绝掉此次请求。
- 如果收到的命令是psync,则尝试调
masterTryPartialResynchronization
进行断点续传。 - 如果断点续传不满足条件,则尝试进行全量复制,先状态标记
SLAVE_STATE_WAIT_BGSAVE_START
,表示准备搞一个rdb。 - 判断当前结点是否有backlog(不是刚创建就有的,因为这个空间可能会被释放),创建backlog之前会先修改当前结点的replid,并释放replid2,然后再进行创建。
- 如果目前已经有bgsave线程在工作了,并且选项是落盘(bgsave还有流式传输的选项),那么进入下面操作。首先判断当前结点是否已经有slave结点的状态是
SLAVE_STATE_WAIT_BGSAVE_END
(表示等待bgsave完成),那么这个slave对应的客户端buffer复制给当前的slave对应的客户端,这样当这个bgsave结束的时候,这个rdb能被多次使用。另外,增量开始的offset也置为一样,这个offset将用来在全量结束的时候,从backlog队列里拉取增量的数据的位点。如果没有slave状态是SLAVE_STATE_WAIT_BGSAVE_END
,那么需要等待下次bgsave(由client重新发送psync)。 - 如果目前已经有bgsave线程在工作了,并且目的端是socket传输,那么同样需要等待下次bgsave。
- 如果当前没有bgsave线程工作,启动一个bgsave的线程。
此外,再大概介绍一下replicationFeedSlaves
和replicationFeedSlavesFromMaster
,这两个是主从复制的增量传播的函数。
replicationFeedSlaves
。目前只用于master向slave发ping。挨个执行下面逻辑。
- 对于级联的情况,如果当前结点不是最顶级那个master,该函数直接返回。
- 如果当前结点没有开启backlog队列并且没有下一级slave,也会立刻返回。
- 如果slave的dbid不等于当前增量的id,那么会先发一个select命令。这也就是说,如果用户有2个客户端写入,一个在db1上写入,另一个在db2上写入,增量将会不断发送
select 1
,然后select 2
,再可能是select 1
这样来回切换。 - 如果当前有backlog,则这个数据会被写入到backlog里面。
replicationFeedSlavesFromMaster
。先判断如果当前开了backlog,则调用feedReplicationBacklog
把数据写入backlog。再判断下一级状态如果ok的话把命令推入下一级的slave。
feedReplicationBacklog
是用于把增量写入的函数,本身backlog实现就是个循环队列,具体细节感兴趣可以撸下代码。
4. Slave向master建立连接
connectWithMaster
是slave向master发送请求的入口函数(在replicationCron
中被调用),其主要调用syncWithMaster
进行连接建立。
- 如果当前slave状态是非连接状态(可能刚进行
slaveof no one
的修改),则直接返回。 - 如果socket存在错误,则返回上层错误。
- 如果状态是连接中,则置状态为
REPL_STATE_RECEIVE_PONG
, 并向master同步发送ping。 - 如果状态是
REPL_STATE_RECEIVE_PONG
,则读取上次的回复。如果不是+xxxx
,-NOAUTH
或者-ERR operation not permitted
则报错返回,反之,状态置为REPL_STATE_SEND_AUTH
。 - 如果状态是
REPL_STATE_SEND_AUTH
,并且master需要认证的情况,发送认证命令,状态置为REPL_STATE_RECEIVE_AUTH
,否则,状态置为REPL_STATE_SEND_PORT
。 - 如果状态是
REPL_STATE_RECEIVE_AUTH
,则读取是否认证成功,并置状态是REPL_STATE_SEND_PORT
。 - 如果状态是
REPL_STATE_SEND_PORT
,则发送REPLCONF listening-port xx
告知slave当前监听的port,然后置状态为REPL_STATE_RECEIVE_PORT
。这个port用于master上展示slave是来自哪个端口,其他并没有什么作用。 - 如果状态是
REPL_STATE_RECEIVE_PORT
,读取master的返回,成功的话置状态REPL_STATE_SEND_IP
。 - 如果状态是
REPL_STATE_SEND_IP
,slave向master发送REPLCONF ip-address xxx
以告知真实的ip。正常情况下,其实不发送也无所谓,因为刚开始建立的时候master已经知道了slave的ip,这里告知是为了考虑在做NAT或者流量转发的情况。状态改为REPL_STATE_RECEIVE_IP
。 - 如果状态是
REPL_STATE_RECEIVE_IP
,读取回复是否正常,然后状态进入REPL_STATE_SEND_CAPA
。 - 如果状态是
REPL_STATE_SEND_CAPA
,则发送REPLCONF capa eof capa psync2
,表示slave具有流式传输以及psync2的能力。不过最后具体是否rdb流式发送,还是落盘再发送,取决于master,此处只是slave告知master有这个能力。状态进入REPL_STATE_RECEIVE_CAPA
。 - 如果状态是
REPL_STATE_RECEIVE_CAPA
,读取回复状态,并进入REPL_STATE_SEND_PSYNC
。 - 如果状态是
REPL_STATE_SEND_PSYNC
,则调用slaveTryPartialResynchronization
发送psync命令,并进入REPL_STATE_RECEIVE_PSYNC
状态。 - 读取
slaveTryPartialResynchronization
的回复。其回复有几种状态:
a)PSYNC_WAIT_REPLY
。等待回复。
b)PSYNC_CONTINUE
。断点续传。
c)PSYNC_FULLRESYNC
。全量传输。
d)PSYNC_NOT_SUPPORTED
。不支持psync
命令。
e)PSYNC_TRY_LATER
。稍后重试。 - 如果回复的状态是
PSYNC_WAIT_REPLY
,则返回,等会再看看。 - 如果回复的状态是
PSYNC_TRY_LATER
,那么表示主上现在不能接受psync,比如自己是rdb loading状态等,此时报错返回。 - 如果回复的状态是
PSYNC_CONTINUE
,表示master接受断点续传,那么回调函数直接读增量就可以了,当前函数返回。 - 到这一步的话,表示可能进入了全量或者出错,那么释放当前结点的所有下一级slave结点的连接,也就是说对于链式连接的情况,a->b->c,如果b和a发生了全量同步,那么b->c的连接会被断开。
- 如果回复的状态是
PSYNC_NOT_SUPPORTED
,那么用sync
命令发送连接。 - 到这一步,需要接受全量数据。首先会在本地创建文件,然后注册回调函数,调用
readSyncBulkPayload
接受到对应fd收到的rdb消息,然后把rdb同步写到这个本地的临时文件。
接下来,分析一下readSyncBulkPayload
这个函数,slave是如何接受rdb的,并且收到了rdb后到底是如何操作的?
- 首先解析读取整个rdb的大小,源端rdb一旦save完成,会先发送整个rdb的大小给slave,比如
xx\r\n
,表示整个rdb是xx个字节,否则会不断发送\n
。 - 接着是接受整个rdb的数据,不断把收到的数据写入本地的临时文件。
- 读取过程不断计算收到的字节数,如果等于刚开始的rdb大小,则表示整个rdb接受完毕了,那么进入后面处理。
- 判断当前是否有bgsave的线程,有的话干掉,因为本身在loading rdb,没必要做bgsave。
- 将收到的rdb重命名为dump.rdb。
- 将appendonly的状态由yes改为false,表示不需要增量写aof文件,因为后面要重新加载整个rdb。
- 调用
emptyDb
清空所有内存数据。 - 调用
aeDeleteFileEvent
将对应fd的回调函数取消,这是因为当前连接在全量传输完毕之后,还会被增量进行复用,所以如果不把回调函数断开,之后当前函数还会收到增量数据,这并不是我们所期望的。增量函数的处理逻辑在下面会介绍。 - 调用
rdbLoad
加载重命名后的rdb文件。 - 调用
replicationCreateMasterClient
创建slave上这个master对应的client结构. - 接下来就是重置一些状态,比如清空当前的replid2,置master的offset,新建backlog内存,置slave复制状态为
REPL_STATE_CONNECTED
,如果开关开了的话则重新启用aof。
到这里slave的处理逻辑大概都梳理了一遍,还遗留一条主线,就是slave对增量是如何处理的?其实没特殊处理,上面我们讲过,replicationCreateMasterClient
函数是注册了一个client,所以这里master传播的命令就相当于一个普通客户端的写命令了,类似用户自己的客户端进行数据写入。
5. Q&A
本小节记录部分Q&A。
- Q: 主从复制期间,如果从在收到rdb的过程中,那么将会读本地原来的快照?还是说边同步就已经边读了?
A: 对于redis来说,无论是master还是slave,在rdb loading阶段不对外提供服务,将会返回-LOADING
的错误。也没有快照的概念。 - Q: 级联复制,对于增量,中间节点是直接透传的?还是数据会写入到中间节点的数据,然后slave节点写入output buffer,再同步到下一个结点?
A: 4.0之前,是先写中间节点,然后中间节点再产生增量推给目的端,相当于中间节点本身就是master,不同slave拿到的应用是可能不一样的。4.0之后,所有级联的slave拿到的复制数据完全一样,拿到以后先应用再传给下一级slave,但是是直接转发的,而不是应用完产生的增量再传,offset来说一定是a>=b>=c。全量同步跟级联无关,只有增量有关。 - Q: slave连接master的时候,有个发送
replconf ip-address xx
的命令,告知slave自己的ip,这个不发会不会有问题?
A: 正常情况没有问题,但是slave如果是NAT透传的,master上显示的slave就不是真实的slave了,这时候这个命令就有用了。 - Q: slave向master发送replconf capa eof命令的话(表示可以不落盘流式发送RDB),master收到是直接会通过不落盘的方式发送RDB给slave吗?还是说master是可以自己选择的?
A: 此处只是slave的能力,是否落盘是master决定。 - Q: 级联情况中间节点b,也是有Backlog队列的吧?
A: yes,只要有下一级slave就有backlog。 - Q: 级联场景,a->b->c,对于c来说,replid是b还是a的?
A: 只有1个replid,就是a的,无论对于a,b,c谁来说,这样的话即使发生切换,级联也是可以搞定的。 - Q: backlog队列一共有几个,是一个slave对应一个吗?
A: 1个redis只有1个,无论有几个下级slave。1个slave和client对应一个output buffer。 - Q: slave收到master的rdb,是先存本地的?然后再load?
A: yes,参考上面slave收到rdb的分析流程。 - Q: slave重启以后进行断点续传,如何知道上一次的逻辑dbid是多少,master发现slave重连以后,会重新发一遍当前的dbid吗?比如master上面select 2;然后进行增量传递,然后这个时候slave重启了。master从中间开始断点续传,这个select 2会不会重新发送?
A: 不会。这个dbid是slave自己持久化的,重启会重新加载,这样就知道当前的dbid是多少了。master也没办法重新发送当前的dbid,因为select语句本身也是进入到backlog队列的,也会增加位点的。关于这段代码可以参考replicationCreateMasterClient
。