Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

Vinllen Chen


但行好事,莫问前程

redis主从复制

  本文介绍一下redis的主从复制机制,首先是大体原理,然后是4.0的复制入口代码分析,接着是master上的处理逻辑,再是slave的,最后有几个Q&A记录。

1. 大体原理

  基本上大部分数据库主从原理都是先全量复制,传播当前master的全量数据,然后是增量复制,传播从全量复制开始积累的增量数据。
  redis在2.8以前的版本,slave节点向master发送sync请求全量+增量数据,sync命令的缺点是一旦中间因为网络抖动等原因断开,重新连接需要重新同步全量,那么这个代价就很大。所以从2.8开始引入了psync,支持断点续传,从4.0开始,又引入了psync2(其redis命令还是psync),支持主从切换情况下的断点续传。下面介绍以4.0的psync2为例。
  当在slave上执行slaveof xxx的时候,当前所连接的结点将会发送psync命令给master,请求连接,其命令格式是:psync repl-id offset,repl-id表示master的全局标志id,offset表示上次同步的位点,默认第一次发送psync ? -1的命令。master收到以后会拿repl-id跟自身的id对比,如果不一致那么断点续传失败,进入全量同步;同理也会拿offset在内存的backlog队列里面寻找,查看这个offset对应的位点是否还存在,backlog本身是个循环队列,所以如果断开的时间过久,或者写入量太大,之前的数据就被清掉了,这个时候断点续传也失败了,将会进行全量同步。

  • 如果master可以进行断点续传,回复+CONTINUE,后面跟上master的replid+offset。
  • 如果master不能断点续传,回复+FULLRESYNC,后面跟上master的replid+offset。

  4.0里面存了2个replid,除了上面介绍的replid外,还有一个replid2用于存储上一次master的id,这样如果发生切换了,收到slave的psync命令,也会用收到的这个replid跟内存里面的2个replid相比,只要有1个相等,那就可以做断点续传,这样就可以解决主从切换的时候继续做断点续传。
  slave在接受master的增量过程中,会定期1s回复replconf ack offset命令给master,以告知目前slave获取的offset的位点,这个信息只是用来展示给用户(在info server),并没有用来判断。

2. 源码入口分析

  replicationCron是replication.c里面定时调度的函数,负责维持和master以及和slave的连接状态(如果有master或者slave的话)。注意本文贴的代码为了保证不太多,忽略了部分日志及代码。

/* Replication cron function, called 1 time per second. */
void replicationCron(void) {  
   // 1.  判断当前结点是否有一个上级master,并且连接中的状态超时了,则取消本次连接。
    /* Non blocking connection timeout? */
    if (server.masterhost &&
        (server.repl_state == REPL_STATE_CONNECTING ||
         slaveIsInHandshakeState()) &&
         (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
    {
        cancelReplicationHandshake();
    }

    // 2. 是否接受master的RDB超时了?是的话,同样取消与master连接
    /* Bulk transfer I/O timeout? */
    if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER &&
        (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
    {
        cancelReplicationHandshake();
    }

    // 3. 如果自身已经是个连接状态的slave,但是跟master连接超时了,那么取消与这个master的连接状态。
    /* Timed out master when we are an already connected slave? */
    if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED &&
        (time(NULL)-server.master->lastinteraction) > server.repl_timeout)
    {
        freeClient(server.master);
    }

    // 4. 自身是需要连接的状态,则发起与master的连接
    /* Check if we should connect to a MASTER */
    if (server.repl_state == REPL_STATE_CONNECT) {
        if (connectWithMaster() == C_OK) {
            serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started");
        }
    }

    // 5. 如果支持psync,则定期1s向master发送slave收到的offset的ack给master
    /* Send ACK to master from time to time.
     * Note that we do not send periodic acks to masters that don't
     * support PSYNC and replication offsets. */
    if (server.masterhost && server.master &&
        !(server.master->flags & CLIENT_PRE_PSYNC))
        replicationSendAck();

    // 6. 如果本身下面有挂slave节点,根据ping的间隔设置挨个ping之。
    /* If we have attached slaves, PING them from time to time.
     * So slaves can implement an explicit timeout to masters, and will
     * be able to detect a link disconnection even if the TCP connection
     * will not actually go down. */
    ...

    // 6.1 挨个ping slave
    /* First, send PING according to ping_slave_period. */
    if ((replication_cron_loops % server.repl_ping_slave_period) == 0 &&
        listLength(server.slaves))
    {
        ping_argv[0] = createStringObject("PING",4);
        replicationFeedSlaves(server.slaves, server.slaveseldb,
            ping_argv, 1);
        decrRefCount(ping_argv[0]);
    }

    // 6.2 如果本身是在bgsave阶段,发送\n给下级slave以进行应用层面的连接保活。注意rdb是异步save的,所以这个\n可能在rdb发送之前已经发送了。redis-shake踩过这个坑。
    /* Second, send a newline to all the slaves in pre-synchronization
     * stage, that is, slaves waiting for the master to create the RDB file.
     *
     * Also send the a newline to all the chained slaves we have, if we lost
     * connection from our master, to keep the slaves aware that their
     * master is online. This is needed since sub-slaves only receive proxied
     * data from top-level masters, so there is no explicit pinging in order
     * to avoid altering the replication offsets. This special out of band
     * pings (newlines) can be sent, they will have no effect in the offset.
     *
     * The newline will be ignored by the slave but will refresh the
     * last interaction timer preventing a timeout. In this case we ignore the
     * ping period and refresh the connection once per second since certain
     * timeouts are set at a few seconds (example: PSYNC response). */
    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        client *slave = ln->value;

        int is_presync =
            (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
            (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
             server.rdb_child_type != RDB_CHILD_TYPE_SOCKET));

        if (is_presync) {
            if (write(slave->fd, "\n", 1) == -1) {
                /* Don't worry about socket errors, it's just a ping. */
            }
        }
    }

    // 6.3 断开与超时slave的连接
    /* Disconnect timedout slaves. */
    if (listLength(server.slaves)) {
        listIter li;
        listNode *ln;

        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            client *slave = ln->value;

            if (slave->replstate != SLAVE_STATE_ONLINE) continue;
            if (slave->flags & CLIENT_PRE_PSYNC) continue;
            if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout)
            {
                serverLog(LL_WARNING, "Disconnecting timedout slave: %s",
                    replicationGetSlaveName(slave));
                freeClient(slave);
            }
        }
    }

    // 7. 如果当前是master且没有slave,并且自己也没有上一级master,那么可以释放backlog队列用于减少内存。但如果自身是个slave,无论有没有下一级slave,都不能释放backlog队列内存,这是为了应对主备切换的情况。
    // Q: 1. 那释放后如果新来1个slave,会重新建立这个内存? 
    // A: Yes.
    /* If this is a master without attached slaves and there is a replication
     * backlog active, in order to reclaim memory we can free it after some
     * (configured) time. Note that this cannot be done for slaves: slaves
     * without sub-slaves attached should still accumulate data into the
     * backlog, in order to reply to PSYNC queries if they are turned into
     * masters after a failover. */
    if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
        server.repl_backlog && server.masterhost == NULL)
    {
        // 代码省略
    }

    // 7. 如果AOF关闭了,并且没有slave节点,那么这个lua脚本用的script cache就没必要保存了。
    /* If AOF is disabled and we no longer have attached slaves, we can
     * free our Replication Script Cache as there is no need to propagate
     * EVALSHA at all. */
    if (listLength(server.slaves) == 0 &&
        server.aof_state == AOF_OFF &&
        listLength(server.repl_scriptcache_fifo) != 0)
    {
        replicationScriptCacheFlush();
    }

    // 8. 如果有slave是在WAIT_BGSAVE_START,并且当前没有启动bgsave子线程和aof rewrite子线程,则启动bgsave。
    // 如果开启了流式传输,那么会等待一段时间再开始,以防止有多个slave差不多同一时间区间过来bgsave。
    /* Start a BGSAVE good for replication if we have slaves in
     * WAIT_BGSAVE_START state.
     *
     * In case of diskless replication, we make sure to wait the specified
     * number of seconds (according to configuration) so that other slaves
     * have the time to arrive before we start streaming. */
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {  // 当前没有rdb bgsave子线程和aof rewrite子线程
        time_t idle, max_idle = 0;
        int slaves_waiting = 0;
        int mincapa = -1;
        listNode *ln;
        listIter li;

        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            client *slave = ln->value;
            if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
                idle = server.unixtime - slave->lastinteraction;
                if (idle > max_idle) max_idle = idle;
                slaves_waiting++;
                mincapa = (mincapa == -1) ? slave->slave_capa :
                                            (mincapa & slave->slave_capa);
            }
       }

        if (slaves_waiting &&
            (!server.repl_diskless_sync ||
             max_idle > server.repl_diskless_sync_delay))
        {
            /* Start the BGSAVE. The called function may start a
             * BGSAVE with socket target or disk target depending on the
             * configuration and slaves capabilities. */
            startBgsaveForReplication(mincapa);
        }
    }

    // 9. 刷新good slave计数
    /* Refresh the number of slaves with lag <= min-slaves-max-lag. */
    refreshGoodSlavesCount();
    replication_cron_loops++; /* Incremented with frequency 1 HZ. */
}

3. Master接受psync的处理流程

  syncCommand是收到syncpsync命令的处理流程。代码不贴了,我就根据代码来分析吧。

  1. 如果发送的结点已经是一个slave了,则没必要再处理这个psync情况,当前函数直接退出。
  2. 对于级联的情况A->B->C,如果当前结点B还没有和master A完全建立连接,则拒绝收到C的psync情况。
  3. 如果发送节点对应的client已经有未发送的回复,那么拒绝掉此次请求。
  4. 如果收到的命令是psync,则尝试调masterTryPartialResynchronization进行断点续传。
  5. 如果断点续传不满足条件,则尝试进行全量复制,先状态标记SLAVE_STATE_WAIT_BGSAVE_START,表示准备搞一个rdb。
  6. 判断当前结点是否有backlog(不是刚创建就有的,因为这个空间可能会被释放),创建backlog之前会先修改当前结点的replid,并释放replid2,然后再进行创建。
  7. 如果目前已经有bgsave线程在工作了,并且选项是落盘(bgsave还有流式传输的选项),那么进入下面操作。首先判断当前结点是否已经有slave结点的状态是SLAVE_STATE_WAIT_BGSAVE_END(表示等待bgsave完成),那么这个slave对应的客户端buffer复制给当前的slave对应的客户端,这样当这个bgsave结束的时候,这个rdb能被多次使用。另外,增量开始的offset也置为一样,这个offset将用来在全量结束的时候,从backlog队列里拉取增量的数据的位点。如果没有slave状态是SLAVE_STATE_WAIT_BGSAVE_END,那么需要等待下次bgsave(由client重新发送psync)。
  8. 如果目前已经有bgsave线程在工作了,并且目的端是socket传输,那么同样需要等待下次bgsave。
  9. 如果当前没有bgsave线程工作,启动一个bgsave的线程。

  此外,再大概介绍一下replicationFeedSlavesreplicationFeedSlavesFromMaster,这两个是主从复制的增量传播的函数。
  replicationFeedSlaves。目前只用于master向slave发ping。挨个执行下面逻辑。

  • 对于级联的情况,如果当前结点不是最顶级那个master,该函数直接返回。
  • 如果当前结点没有开启backlog队列并且没有下一级slave,也会立刻返回。
  • 如果slave的dbid不等于当前增量的id,那么会先发一个select命令。这也就是说,如果用户有2个客户端写入,一个在db1上写入,另一个在db2上写入,增量将会不断发送select 1,然后select 2,再可能是select 1这样来回切换。
  • 如果当前有backlog,则这个数据会被写入到backlog里面。

  replicationFeedSlavesFromMaster。先判断如果当前开了backlog,则调用feedReplicationBacklog把数据写入backlog。再判断下一级状态如果ok的话把命令推入下一级的slave。
  feedReplicationBacklog是用于把增量写入的函数,本身backlog实现就是个循环队列,具体细节感兴趣可以撸下代码。

4. Slave向master建立连接

  connectWithMaster是slave向master发送请求的入口函数(在replicationCron中被调用),其主要调用syncWithMaster进行连接建立。

  1. 如果当前slave状态是非连接状态(可能刚进行slaveof no one的修改),则直接返回。
  2. 如果socket存在错误,则返回上层错误。
  3. 如果状态是连接中,则置状态为REPL_STATE_RECEIVE_PONG, 并向master同步发送ping。
  4. 如果状态是REPL_STATE_RECEIVE_PONG,则读取上次的回复。如果不是+xxxx, -NOAUTH或者-ERR operation not permitted则报错返回,反之,状态置为REPL_STATE_SEND_AUTH
  5. 如果状态是REPL_STATE_SEND_AUTH,并且master需要认证的情况,发送认证命令,状态置为REPL_STATE_RECEIVE_AUTH,否则,状态置为REPL_STATE_SEND_PORT
  6. 如果状态是REPL_STATE_RECEIVE_AUTH,则读取是否认证成功,并置状态是REPL_STATE_SEND_PORT
  7. 如果状态是REPL_STATE_SEND_PORT,则发送REPLCONF listening-port xx告知slave当前监听的port,然后置状态为REPL_STATE_RECEIVE_PORT。这个port用于master上展示slave是来自哪个端口,其他并没有什么作用。
  8. 如果状态是REPL_STATE_RECEIVE_PORT,读取master的返回,成功的话置状态REPL_STATE_SEND_IP
  9. 如果状态是REPL_STATE_SEND_IP,slave向master发送REPLCONF ip-address xxx以告知真实的ip。正常情况下,其实不发送也无所谓,因为刚开始建立的时候master已经知道了slave的ip,这里告知是为了考虑在做NAT或者流量转发的情况。状态改为REPL_STATE_RECEIVE_IP
  10. 如果状态是REPL_STATE_RECEIVE_IP,读取回复是否正常,然后状态进入REPL_STATE_SEND_CAPA
  11. 如果状态是REPL_STATE_SEND_CAPA,则发送REPLCONF capa eof capa psync2,表示slave具有流式传输以及psync2的能力。不过最后具体是否rdb流式发送,还是落盘再发送,取决于master,此处只是slave告知master有这个能力。状态进入REPL_STATE_RECEIVE_CAPA
  12. 如果状态是REPL_STATE_RECEIVE_CAPA,读取回复状态,并进入REPL_STATE_SEND_PSYNC
  13. 如果状态是REPL_STATE_SEND_PSYNC,则调用slaveTryPartialResynchronization发送psync命令,并进入REPL_STATE_RECEIVE_PSYNC状态。
  14. 读取slaveTryPartialResynchronization的回复。其回复有几种状态:
    a) PSYNC_WAIT_REPLY。等待回复。
    b) PSYNC_CONTINUE。断点续传。
    c) PSYNC_FULLRESYNC。全量传输。
    d) PSYNC_NOT_SUPPORTED。不支持psync命令。
    e) PSYNC_TRY_LATER。稍后重试。
  15. 如果回复的状态是PSYNC_WAIT_REPLY,则返回,等会再看看。
  16. 如果回复的状态是PSYNC_TRY_LATER,那么表示主上现在不能接受psync,比如自己是rdb loading状态等,此时报错返回。
  17. 如果回复的状态是PSYNC_CONTINUE,表示master接受断点续传,那么回调函数直接读增量就可以了,当前函数返回。
  18. 到这一步的话,表示可能进入了全量或者出错,那么释放当前结点的所有下一级slave结点的连接,也就是说对于链式连接的情况,a->b->c,如果b和a发生了全量同步,那么b->c的连接会被断开。
  19. 如果回复的状态是PSYNC_NOT_SUPPORTED,那么用sync命令发送连接。
  20. 到这一步,需要接受全量数据。首先会在本地创建文件,然后注册回调函数,调用readSyncBulkPayload接受到对应fd收到的rdb消息,然后把rdb同步写到这个本地的临时文件。

  接下来,分析一下readSyncBulkPayload这个函数,slave是如何接受rdb的,并且收到了rdb后到底是如何操作的?

  1. 首先解析读取整个rdb的大小,源端rdb一旦save完成,会先发送整个rdb的大小给slave,比如xx\r\n,表示整个rdb是xx个字节,否则会不断发送\n
  2. 接着是接受整个rdb的数据,不断把收到的数据写入本地的临时文件。
  3. 读取过程不断计算收到的字节数,如果等于刚开始的rdb大小,则表示整个rdb接受完毕了,那么进入后面处理。
  4. 判断当前是否有bgsave的线程,有的话干掉,因为本身在loading rdb,没必要做bgsave。
  5. 将收到的rdb重命名为dump.rdb。
  6. 将appendonly的状态由yes改为false,表示不需要增量写aof文件,因为后面要重新加载整个rdb。
  7. 调用emptyDb清空所有内存数据。
  8. 调用aeDeleteFileEvent将对应fd的回调函数取消,这是因为当前连接在全量传输完毕之后,还会被增量进行复用,所以如果不把回调函数断开,之后当前函数还会收到增量数据,这并不是我们所期望的。增量函数的处理逻辑在下面会介绍。
  9. 调用rdbLoad加载重命名后的rdb文件。
  10. 调用replicationCreateMasterClient创建slave上这个master对应的client结构.
  11. 接下来就是重置一些状态,比如清空当前的replid2,置master的offset,新建backlog内存,置slave复制状态为REPL_STATE_CONNECTED,如果开关开了的话则重新启用aof。

  到这里slave的处理逻辑大概都梳理了一遍,还遗留一条主线,就是slave对增量是如何处理的?其实没特殊处理,上面我们讲过,replicationCreateMasterClient函数是注册了一个client,所以这里master传播的命令就相当于一个普通客户端的写命令了,类似用户自己的客户端进行数据写入。

5. Q&A

  本小节记录部分Q&A。

  • Q: 主从复制期间,如果从在收到rdb的过程中,那么将会读本地原来的快照?还是说边同步就已经边读了?
    A: 对于redis来说,无论是master还是slave,在rdb loading阶段不对外提供服务,将会返回-LOADING的错误。也没有快照的概念。
  • Q: 级联复制,对于增量,中间节点是直接透传的?还是数据会写入到中间节点的数据,然后slave节点写入output buffer,再同步到下一个结点?
    A: 4.0之前,是先写中间节点,然后中间节点再产生增量推给目的端,相当于中间节点本身就是master,不同slave拿到的应用是可能不一样的。4.0之后,所有级联的slave拿到的复制数据完全一样,拿到以后先应用再传给下一级slave,但是是直接转发的,而不是应用完产生的增量再传,offset来说一定是a>=b>=c。全量同步跟级联无关,只有增量有关。
  • Q: slave连接master的时候,有个发送replconf ip-address xx的命令,告知slave自己的ip,这个不发会不会有问题?
    A: 正常情况没有问题,但是slave如果是NAT透传的,master上显示的slave就不是真实的slave了,这时候这个命令就有用了。
  • Q: slave向master发送replconf capa eof命令的话(表示可以不落盘流式发送RDB),master收到是直接会通过不落盘的方式发送RDB给slave吗?还是说master是可以自己选择的?
    A: 此处只是slave的能力,是否落盘是master决定。
  • Q: 级联情况中间节点b,也是有Backlog队列的吧?
    A: yes,只要有下一级slave就有backlog。
  • Q: 级联场景,a->b->c,对于c来说,replid是b还是a的?
    A: 只有1个replid,就是a的,无论对于a,b,c谁来说,这样的话即使发生切换,级联也是可以搞定的。
  • Q: backlog队列一共有几个,是一个slave对应一个吗?
    A: 1个redis只有1个,无论有几个下级slave。1个slave和client对应一个output buffer。
  • Q: slave收到master的rdb,是先存本地的?然后再load?
    A: yes,参考上面slave收到rdb的分析流程。
  • Q: slave重启以后进行断点续传,如何知道上一次的逻辑dbid是多少,master发现slave重连以后,会重新发一遍当前的dbid吗?比如master上面select 2;然后进行增量传递,然后这个时候slave重启了。master从中间开始断点续传,这个select 2会不会重新发送?
    A: 不会。这个dbid是slave自己持久化的,重启会重新加载,这样就知道当前的dbid是多少了。master也没办法重新发送当前的dbid,因为select语句本身也是进入到backlog队列的,也会增加位点的。关于这段代码可以参考replicationCreateMasterClient

说明

转载请注明出处:http://vinllen.com/rediszhu-cong-fu-zhi/


About the author

vinllen chen

Beijing, China

格物致知


Discussions

comments powered by Disqus