brat是百度开源的RAFT实现。

初始化

braft::add_service添加的服务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
    // 发送文件
    if (0 != server->AddService(file_service(), brpc::SERVER_DOESNT_OWN_SERVICE)) {

    // raft实现,pre_vote/request_vote/append_entries/install_snapshot/timeout_now
    if (0 != server->AddService(
                new RaftServiceImpl(listen_address), 
                brpc::SERVER_OWNS_SERVICE)) {
    // 统计信息
    if (0 != server->AddService(new RaftStatImpl, brpc::SERVER_OWNS_SERVICE)) {
    // 命令行操作
    if (0 != server->AddService(new CliServiceImpl, brpc::SERVER_OWNS_SERVICE)) {

NodeImpl::init

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
    CHECK_EQ(0, _vote_timer.init(this, options.election_timeout_ms + options.max_clock_drift_ms));
    CHECK_EQ(0, _election_timer.init(this, options.election_timeout_ms));
    CHECK_EQ(0, _stepdown_timer.init(this, options.election_timeout_ms));
    CHECK_EQ(0, _snapshot_timer.init(this, options.snapshot_interval_s * 1000));
    // 启动执行队列
    if (bthread::execution_queue_start(&_apply_queue_id, NULL,
                                       execute_applying_tasks, this) != 0) {
    // log storage and log manager init
    if (init_log_storage() != 0) {

    // 初始化fsm caller时创建了另外一个执行队列
    if (init_fsm_caller(LogId(0, 0)) != 0) {

    // commitment manager init
    _ballot_box = new BallotBox();
    if (_ballot_box->init(ballot_box_options) != 0) {

    // snapshot storage init and load
    if (init_snapshot_storage() != 0) {

    // if have log using conf in log, else using conf in options
    if (_log_manager->last_log_index() > 0) {
        _log_manager->check_and_set_configuration(&_conf);
    } else {
        _conf.conf = _options.initial_conf;
    }

    // init meta and check term
    if (init_meta_storage() != 0) {

    // Now the raft node is started , have to acquire the lock to avoid race
    // conditions
    std::unique_lock<raft_mutex_t> lck(_mutex);
    // 只有一个节点时马上选举自己
    if (_conf.stable() && _conf.conf.size() == 1u
            && _conf.conf.contains(_server_id)) {
        // The group contains only this server which must be the LEADER, trigger
        // the timer immediately.
        elect_self(&lck);
    }

选举阶段

选举超时处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void NodeImpl::handle_election_timeout() {
    std::unique_lock<raft_mutex_t> lck(_mutex);

    // check state
    if (_state != STATE_FOLLOWER) {
        return;
    }

    // Trigger vote manually, or wait until follower lease expire.
    if (!_vote_triggered && !_follower_lease.expired()) {

        return;
    }
    bool triggered = _vote_triggered;
    _vote_triggered = false;

    // Reset leader as the leader is uncerntain on election timeout.
    PeerId empty_id;
    butil::Status status;
    status.set_error(ERAFTTIMEDOUT, "Lost connection from leader %s",
                                    _leader_id.to_string().c_str());
    // 里面会根据情况情况回调on_start_following/on_stop_following
    reset_leader_id(empty_id, status);

    // 发起预选举
    return pre_vote(&lck, triggered);
    // Don't touch any thing of *this ever after
}

pre_vote

Pre_vote 算法是 raft 作者在其博士论文中提出的,在节点发起一次选举时,会先发起一次 prevote 请求,判断是否能够赢得选举,赢得选举的条件与正常选举相同。如果可以,则增加 term 值,并发起正常的选举。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void NodeImpl::pre_vote(std::unique_lock<raft_mutex_t>* lck, bool triggered) {
    // 获取last log的term和id
    int64_t old_term = _current_term;
    // get last_log_id outof node mutex
    lck->unlock();
    const LogId last_log_id = _log_manager->last_log_id(true);
    lck->lock();
    // pre_vote need defense ABA after unlock&lock
    if (old_term != _current_term) {
        LOG(WARNING) << "node " << _group_id << ":" << _server_id
                     << " raise term " << _current_term << " when get last_log_id";
        return;
    }

    _pre_vote_ctx.init(this, triggered);
    std::set<PeerId> peers;
    _conf.list_peers(&peers);

    // 向所有peer发送pre_vote请求
    for (std::set<PeerId>::const_iterator
            iter = peers.begin(); iter != peers.end(); ++iter) {
        if (*iter == _server_id) {
            continue;
        }

        OnPreVoteRPCDone* done = new OnPreVoteRPCDone(
                *iter, _current_term, _pre_vote_ctx.version(), this);
        done->cntl.set_timeout_ms(_options.election_timeout_ms);
        done->request.set_term(_current_term + 1); // next term
        done->request.set_last_log_index(last_log_id.index);
        done->request.set_last_log_term(last_log_id.term);

        RaftService_Stub stub(&channel);
        stub.pre_vote(&done->cntl, &done->request, &done->response, done);
    }
    // 给自己投票
    grant_self(&_pre_vote_ctx, lck);
}

pre_vote请求处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
void RaftServiceImpl::pre_vote(google::protobuf::RpcController* cntl_base,
                          const RequestVoteRequest* request,
                          RequestVoteResponse* response,
                          google::protobuf::Closure* done) {
    // 获取node对象
    scoped_refptr<NodeImpl> node_ptr = 
                        global_node_manager->get(request->group_id(), peer_id);
    NodeImpl* node = node_ptr.get();
    int rc = node->handle_pre_vote_request(request, response);
    if (rc != 0) {
        cntl->SetFailed(rc, "%s", berror(rc));
        return;
    }
}

int NodeImpl::handle_pre_vote_request(const RequestVoteRequest* request,
                                      RequestVoteResponse* response) {
    std::unique_lock<raft_mutex_t> lck(_mutex);

    bool granted = false;
    bool rejected_by_lease = false;
    do {
        if (request->term() < _current_term) {
            // ignore older term
            break;
        }

        // get last_log_id outof node mutex
        lck.unlock();
        LogId last_log_id = _log_manager->last_log_id(true);
        lck.lock();
        // pre_vote not need ABA check after unlock&lock

        int64_t votable_time = _follower_lease.votable_time_from_now();
        bool grantable = (LogId(request->last_log_index(), request->last_log_term())
                        >= last_log_id);
        if (grantable) {
            granted = (votable_time == 0);
            rejected_by_lease = (votable_time > 0);
        }
    } while (0);

    response->set_term(_current_term);
    response->set_granted(granted);
    response->set_rejected_by_lease(rejected_by_lease);
    response->set_disrupted(_state == STATE_LEADER);
    response->set_previous_term(_current_term);

    return 0;
}

pre_vote响应处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
void NodeImpl::handle_pre_vote_response(const PeerId& peer_id, const int64_t term,
                                        const int64_t ctx_version,
                                        const RequestVoteResponse& response) {
    // check response term,收到的term更大则放弃选举
    if (response.term() > _current_term) {
        status.set_error(EHIGHERTERMRESPONSE, "Raft node receives higher term "
                "pre_vote_response.");
        step_down(response.term(), false, status);
        return;
    }
    // 省略比较复杂的检查grant逻辑,涉及lease/disrupted_leader等

    // 预选举通过,开始正式的选举
    if (_pre_vote_ctx.granted()) {
        elect_self(&lck);
    }
}

elect

预选举通过,开始正式的选举

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// in lock
void NodeImpl::elect_self(std::unique_lock<raft_mutex_t>* lck, 
                          bool old_leader_stepped_down) {
    // cancel follower election timer
    if (_state == STATE_FOLLOWER) {
        _election_timer.stop();
    }
    // reset leader_id before vote
    const PeerId old_leader = _leader_id;
    const int64_t leader_term = _current_term;
    PeerId empty_id;
    butil::Status status;
    status.set_error(ERAFTTIMEDOUT, "A follower's leader_id is reset to NULL "
                                    "as it begins to request_vote.");
    reset_leader_id(empty_id, status);

    // 修改状态
    _state = STATE_CANDIDATE;
    // term加1
    _current_term++;
    _voted_id = _server_id;

    BRAFT_VLOG << "node " << _group_id << ":" << _server_id
               << " term " << _current_term << " start vote_timer";
    // 启动vote定时器
    _vote_timer.start();
    _pre_vote_ctx.reset(this);
    _vote_ctx.init(this, false);
    if (old_leader_stepped_down) {
        _vote_ctx.set_disrupted_leader(DisruptedLeader(old_leader, leader_term));
        _follower_lease.expire();
    }

    int64_t old_term = _current_term;
    // get last_log_id outof node mutex
    lck->unlock();
    const LogId last_log_id = _log_manager->last_log_id(true);
    lck->lock();
    // vote need defense ABA after unlock&lock
    if (old_term != _current_term) {
        // term changed cause by step_down
        LOG(WARNING) << "node " << _group_id << ":" << _server_id
                     << " raise term " << _current_term << " when get last_log_id";
        return;
    }
    _vote_ctx.set_last_log_id(last_log_id);

    std::set<PeerId> peers;
    _conf.list_peers(&peers);
    // 向peer发送vote请求
    request_peers_to_vote(peers, _vote_ctx.disrupted_leader());

    //TODO: outof lock
    status = _meta_storage->
                    set_term_and_votedfor(_current_term, _server_id, _v_group_id);
    if (!status.ok()) {
        LOG(ERROR) << "node " << _group_id << ":" << _server_id
                   << " fail to set_term_and_votedfor itself when elect_self,"
                      " error: " << status;
        // reset _voted_id to avoid inconsistent cases
        // return immediately without granting _vote_ctx
        _voted_id.reset();
        return;
    }
    // 给自己投票
    grant_self(&_vote_ctx, lck);
}

request vote请求处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
int NodeImpl::handle_request_vote_request(const RequestVoteRequest* request,
                                          RequestVoteResponse* response) {
    std::unique_lock<raft_mutex_t> lck(_mutex);

    // 忽略一些disrupt逻辑
    bool disrupted = false;
    int64_t previous_term = _current_term;
    bool rejected_by_lease = false;
    do {
        // ignore older term
        if (request->term() < _current_term) {
            // ignore older term
            break;
        }

        // get last_log_id outof node mutex
        lck.unlock();
        LogId last_log_id = _log_manager->last_log_id(true);
        lck.lock();

        // vote need ABA check after unlock&lock
        if (previous_term != _current_term) {
            LOG(WARNING) << "node " << _group_id << ":" << _server_id
                         << " raise term " << _current_term << " when get last_log_id";
            break;
        }

        bool log_is_ok = (LogId(request->last_log_index(), request->last_log_term())
                          >= last_log_id);
        int64_t votable_time = _follower_lease.votable_time_from_now();

        // if the vote is rejected by lease, tell the candidate
        if (votable_time > 0) {
            rejected_by_lease = log_is_ok;
            break;
        }

        // increase current term, change state to follower
        if (request->term() > _current_term) {
            butil::Status status;
            status.set_error(EHIGHERTERMREQUEST, "Raft node receives higher term "
                    "request_vote_request.");
            disrupted = (_state <= STATE_TRANSFERRING);
            step_down(request->term(), false, status);
        }

        // save
        if (log_is_ok && _voted_id.is_empty()) {
            butil::Status status;
            status.set_error(EVOTEFORCANDIDATE, "Raft node votes for some candidate, "
                    "step down to restart election_timer.");
            step_down(request->term(), false, status);
            _voted_id = candidate_id;
            //TODO: outof lock
            // 投票需要持久化存储
            status = _meta_storage->
                    set_term_and_votedfor(_current_term, candidate_id, _v_group_id);
            if (!status.ok()) {
                // reset _voted_id to response set_granted(false)
                _voted_id.reset(); 
            }
        }
    } while (0);

    response->set_disrupted(disrupted);
    response->set_previous_term(previous_term);
    response->set_term(_current_term);
    response->set_granted(request->term() == _current_term && _voted_id == candidate_id);
    response->set_rejected_by_lease(rejected_by_lease);
    return 0;
}

request vote响应处理逻辑

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
void NodeImpl::handle_request_vote_response(const PeerId& peer_id, const int64_t term,
                                            const int64_t ctx_version,
                                            const RequestVoteResponse& response) {
    BAIDU_SCOPED_LOCK(_mutex);

    // check state,状态不对直接忽略,比如已成为leader
    if (_state != STATE_CANDIDATE) {
        return;
    }
    // check response term,response term更大则停止选举
    if (response.term() > _current_term) {
        butil::Status status;
        status.set_error(EHIGHERTERMRESPONSE, "Raft node receives higher term "
                "request_vote_response.");
        step_down(response.term(), false, status);
        return;
    }

    if (!response.granted() && !response.rejected_by_lease()) {
        return;
    }

    if (response.disrupted()) {
        _vote_ctx.set_disrupted_leader(DisruptedLeader(peer_id, response.previous_term()));
    }
    if (response.granted()) {
        _vote_ctx.grant(peer_id);
        if (peer_id == _follower_lease.last_leader()) {
            _vote_ctx.grant(_server_id);
            _vote_ctx.stop_grant_self_timer(this);
        }
        // 如果获得多数支持,成为leader
        if (_vote_ctx.granted()) {
            return become_leader();
        }
    } else {
        // If the follower rejected the vote because of lease, reserve it, and
        // the candidate will try again after it disrupt the old leader.
        _vote_ctx.reserve(peer_id);
    }
    // 再次要求预留的peer投票
    retry_vote_on_reserved_peers();
}

void NodeImpl::retry_vote_on_reserved_peers() {
    std::set<PeerId> peers;
    _vote_ctx.pop_grantable_peers(&peers);
    if (peers.empty()) {
        return;
    }
    request_peers_to_vote(peers, _vote_ctx.disrupted_leader());
}

leader逻辑

成为leader

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void NodeImpl::become_leader() {
    // 检查状态必须为候选者
    CHECK(_state == STATE_CANDIDATE);
    // cancel candidate vote timer
    _vote_timer.stop();
    _vote_ctx.reset(this);

    // 修改状态和leader_id
    _state = STATE_LEADER;
    _leader_id = _server_id;

    _replicator_group.reset_term(_current_term);
    _follower_lease.reset();
    _leader_lease.on_leader_start(_current_term);

    std::set<PeerId> peers;
    _conf.list_peers(&peers);
    // 每个follower启动一个replicator
    for (std::set<PeerId>::const_iterator
            iter = peers.begin(); iter != peers.end(); ++iter) {
        if (*iter == _server_id) {
            continue;
        }

        _replicator_group.add_replicator(*iter);
    }

    // init commit manager
    _ballot_box->reset_pending_index(_log_manager->last_log_index() + 1);

    // Register _conf_ctx to reject configuration changing before the first log
    // is committed.
    CHECK(!_conf_ctx.is_busy());
    _conf_ctx.flush(_conf.conf, _conf.old_conf);
    _stepdown_timer.start();
}

根据Raft论文,新leader登基之后需要提交一个no-op日志才能安全的提交之前任期的日志。在braft的实现中, _conf_ctx.flush(_conf.conf, _conf.old_conf) 会发起一次节点更新,在节点更新的日志成功提交后回调用户状态机的 on_leader_start 。braft使用节点更新日志替代了论文中的no-op日志。

Replicator启动逻辑

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
int Replicator::start(const ReplicatorOptions& options, ReplicatorId *id) {
    Replicator* r = new Replicator();
    brpc::ChannelOptions channel_opt;
    //channel_opt.connect_timeout_ms = *options.heartbeat_timeout_ms;
    channel_opt.timeout_ms = -1; // We don't need RPC timeout
    // 创建发送channel
    if (r->_sending_channel.Init(options.peer_id.addr, &channel_opt) != 0) {

    r->_options = options;
    r->_next_index = r->_options.log_manager->last_log_index() + 1;
    // 记录到bthread local存储
    if (bthread_id_create(&r->_id, r, _on_error) != 0) {

    bthread_id_lock(r->_id, NULL);
    r->_catchup_closure = NULL;
    r->_update_last_rpc_send_timestamp(butil::monotonic_time_ms());
    r->_start_heartbeat_timer(butil::gettimeofday_us());
    // Note: r->_id is unlock in _send_empty_entries, don't touch r ever after
    // 发送no_op来宣示leader身份
    r->_send_empty_entries(false);
    return 0;
}

维护leader身份——心跳

在Replicator::start里面开始了heartbeat_timer,它是个bthread_timer,在超时的时候会调用Replicator::_on_timedout,该函数会把对应的id设置为ETIMEDOUT。

1
2
3
4
void Replicator::_on_timedout(void* arg) {
    bthread_id_t id = { (uint64_t)arg };
    bthread_id_error(id, ETIMEDOUT);
}

bthread_id_error会去调用_on_error,然后开始_send_heartbeat。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
int Replicator::_on_error(bthread_id_t id, void* arg, int error_code) {
    } else if (error_code == ETIMEDOUT) {
        // This error is issued in the TimerThread, start a new bthread to avoid
        // blocking the caller.
        // Unlock id to remove the context-switch out of the critical section
        CHECK_EQ(0, bthread_id_unlock(id)) << "Fail to unlock" << id;
        bthread_t tid;
        // 启动一个bthread来执行_send_heartbeat
        if (bthread_start_urgent(&tid, NULL, _send_heartbeat,
                                 reinterpret_cast<void*>(id.value)) != 0) {
            PLOG(ERROR) << "Fail to start bthread";
            _send_heartbeat(reinterpret_cast<void*>(id.value));
        }
        return 0;
    } else {
}

发送心跳

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
void* Replicator::_send_heartbeat(void* arg) {
    Replicator* r = NULL;
    bthread_id_t id = { (uint64_t)arg };
    // 从bthread local存储获取replicator
    if (bthread_id_lock(id, (void**)&r) != 0) {
        // This replicator is stopped
        return NULL;
    }
    // id is unlock in _send_empty_entries;
    r->_send_empty_entries(true);
    return NULL;
}

void Replicator::_send_empty_entries(bool is_heartbeat) {
    std::unique_ptr<brpc::Controller> cntl(new brpc::Controller);
    std::unique_ptr<AppendEntriesRequest> request(new AppendEntriesRequest);
    std::unique_ptr<AppendEntriesResponse> response(new AppendEntriesResponse);
    if (_fill_common_fields(
                request.get(), _next_index - 1, is_heartbeat) != 0) {
        CHECK(!is_heartbeat);
        // _id is unlock in _install_snapshot
        return _install_snapshot();
    }
    if (is_heartbeat) {
        _heartbeat_in_fly = cntl->call_id();
        _heartbeat_counter++;
        // set RPC timeout for heartbeat, how long should timeout be is waiting to be optimized.
        cntl->set_timeout_ms(*_options.election_timeout_ms / 2);
    } else {
        _st.st = APPENDING_ENTRIES;
        _st.first_log_index = _next_index;
        _st.last_log_index = _next_index - 1;
        CHECK(_append_entries_in_fly.empty());
        CHECK_EQ(_flying_append_entries_size, 0);
        _append_entries_in_fly.push_back(FlyingAppendEntriesRpc(_next_index, 0, cntl->call_id()));
        _append_entries_counter++;
    }

    google::protobuf::Closure* done = brpc::NewCallback(
                is_heartbeat ? _on_heartbeat_returned : _on_rpc_returned, 
                _id.value, cntl.get(), request.get(), response.get(),
                butil::monotonic_time_ms());

    RaftService_Stub stub(&_sending_channel);
    stub.append_entries(cntl.release(), request.release(), 
                        response.release(), done);
    CHECK_EQ(0, bthread_id_unlock(_id)) << "Fail to unlock " << _id;
}

append_entries请求处理逻辑,既包括心跳处理,也包括正常的请求处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
void NodeImpl::handle_append_entries_request(brpc::Controller* cntl,
                                             const AppendEntriesRequest* request,
                                             AppendEntriesResponse* response,
                                             google::protobuf::Closure* done,
                                             bool from_append_entries_cache) {
    std::vector<LogEntry*> entries;
    entries.reserve(request->entries_size());
    brpc::ClosureGuard done_guard(done);
    std::unique_lock<raft_mutex_t> lck(_mutex);

    // pre set term, to avoid get term in lock
    response->set_term(_current_term);

    // check stale term
    // 让已被篡位的leader下台
    if (request->term() < _current_term) {
        const int64_t saved_current_term = _current_term;
        lck.unlock();
        response->set_success(false);
        response->set_term(saved_current_term);
        return;
    }

    // check term and state to step down
    check_step_down(request->term(), server_id);   

    // 如果有两个相同term的leader,自己的term加一让两个leader都下台
    if (server_id != _leader_id) {
        // Increase the term by 1 and make both leaders step down to minimize the
        // loss of split brain
        butil::Status status;
        status.set_error(ELEADERCONFLICT, "More than one leader in the same term."); 
        step_down(request->term() + 1, false, status);
        response->set_success(false);
        response->set_term(request->term() + 1);
        return;
    }

    const int64_t prev_log_index = request->prev_log_index();
    const int64_t prev_log_term = request->prev_log_term();
    const int64_t local_prev_log_term = _log_manager->get_term(prev_log_index);
    // 如果term不匹配,视是否允许乱序而报错或正常处理
    if (local_prev_log_term != prev_log_term) {
    }

    // 心跳请求处理
    if (request->entries_size() == 0) {
        response->set_success(true);
        response->set_term(_current_term);
        response->set_last_log_index(_log_manager->last_log_index());
        response->set_readonly(_node_readonly);
        lck.unlock();
        // see the comments at FollowerStableClosure::run()
        _ballot_box->set_last_committed_index(
                std::min(request->committed_index(),
                         prev_log_index));
        return;
    }

    // Parse request
    butil::IOBuf data_buf;
    data_buf.swap(cntl->request_attachment());
    int64_t index = prev_log_index;
    for (int i = 0; i < request->entries_size(); i++) {
        index++;
        const EntryMeta& entry = request->entries(i);
        if (entry.type() != ENTRY_TYPE_UNKNOWN) {
            LogEntry* log_entry = new LogEntry();
            log_entry->AddRef();
            log_entry->id.term = entry.term();
            log_entry->id.index = index;
            log_entry->type = (EntryType)entry.type();
            // 如果请求中peer不为空,也设置log_entry的peer/old_peer字段
            } else {
                CHECK_NE(entry.type(), ENTRY_TYPE_CONFIGURATION);
            }
            if (entry.has_data_len()) {
                int len = entry.data_len();
                data_buf.cutn(&log_entry->data, len);
            }
            entries.push_back(log_entry);
        }
    }

    // check out-of-order cache
    check_append_entries_cache(index);

    FollowerStableClosure* c = new FollowerStableClosure(
            cntl, request, response, done_guard.release(),
            this, _current_term);
    // 写入log,在回调中进行回包
    _log_manager->append_entries(&entries, c);

    // update configuration after _log_manager updated its memory status
    _log_manager->check_and_set_configuration(&_conf);
}

心跳回包处理逻辑

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
void Replicator::_on_heartbeat_returned(
        ReplicatorId id, brpc::Controller* cntl,
        AppendEntriesRequest* request, 
        AppendEntriesResponse* response,
        int64_t rpc_send_time) {
    std::unique_ptr<brpc::Controller> cntl_guard(cntl);
    std::unique_ptr<AppendEntriesRequest>  req_guard(request);
    std::unique_ptr<AppendEntriesResponse> res_guard(response);
    Replicator *r = NULL;
    bthread_id_t dummy_id = { id };
    const long start_time_us = butil::gettimeofday_us();
    // XXX bthread_id大概相当于pthread key
    if (bthread_id_lock(dummy_id, (void**)&r) != 0) {
        return;
    }

    if (cntl->Failed()) {
        // 如果失败则再次启动定时器
        r->_start_heartbeat_timer(start_time_us);
        CHECK_EQ(0, bthread_id_unlock(dummy_id)) << "Fail to unlock " << dummy_id;
        return;
    }
    r->_consecutive_error_times = 0;
    // 如果心跳收到更高的term,则退位
    if (response->term() > r->_options.term) {
        NodeImpl *node_impl = r->_options.node;
        // Acquire a reference of Node here in case that Node is detroyed
        // after _notify_on_caught_up.
        node_impl->AddRef();
        r->_notify_on_caught_up(EPERM, true);
        LOG(INFO) << "Replicator=" << dummy_id << " is going to quit"
                  << ", group " << r->_options.group_id;
        butil::Status status;
        status.set_error(EHIGHERTERMRESPONSE, "Leader receives higher term "
                "heartbeat_response from peer:%s", r->_options.peer_id.to_string().c_str());
        r->_destroy();
        node_impl->increase_term_to(response->term(), status);
        node_impl->Release();
        return;
    }

    bool readonly = response->has_readonly() && response->readonly();
    BRAFT_VLOG << ss.str() << " readonly " << readonly;
    r->_update_last_rpc_send_timestamp(rpc_send_time);
    r->_start_heartbeat_timer(start_time_us);
    NodeImpl* node_impl = NULL;
    // Check if readonly config changed
    if ((readonly && r->_readonly_index == 0) ||
        (!readonly && r->_readonly_index != 0)) {
        node_impl = r->_options.node;
        node_impl->AddRef();
    }
    if (!node_impl) {
        CHECK_EQ(0, bthread_id_unlock(dummy_id)) << "Fail to unlock " << dummy_id;
        return;
    }
    const PeerId peer_id = r->_options.peer_id;
    int64_t term = r->_options.term;
    CHECK_EQ(0, bthread_id_unlock(dummy_id)) << "Fail to unlock " << dummy_id;
    node_impl->change_readonly_config(term, peer_id, readonly);
    node_impl->Release();
    return;
}

状态机任务执行

用户代码会调用NodeImpl::apply(const Task& task),apply的实现中将任务加入_apply_queue队列。队列执行时最终会调用下面的函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void NodeImpl::apply(LogEntryAndClosure tasks[], size_t size) {
    g_apply_tasks_batch_counter << size;

    std::vector<LogEntry*> entries;
    entries.reserve(size);
    std::unique_lock<raft_mutex_t> lck(_mutex);
    bool reject_new_user_logs = (_node_readonly || _majority_nodes_readonly);
    if (_state != STATE_LEADER || reject_new_user_logs) {
        // 非leader或只读时设置错误,并回调用户的done
        st.set_error(EPERM, "is not leader");
        for (size_t i = 0; i < size; ++i) {
            tasks[i].entry->Release();
            if (tasks[i].done) {
                tasks[i].done->status() = st;
                run_closure_in_bthread(tasks[i].done);
            }
        }
        return;
    }
    for (size_t i = 0; i < size; ++i) {
        // 检查ABA问题,有问题会设置错误和调用done
        if (tasks[i].expected_term != -1 && tasks[i].expected_term != _current_term) {
            continue;
        }
        entries.push_back(tasks[i].entry);
        entries.back()->id.term = _current_term;
        entries.back()->type = ENTRY_TYPE_DATA;
        // 加入投票箱
        _ballot_box->append_pending_task(_conf.conf,
                                         _conf.stable() ? NULL : &_conf.old_conf,
                                         tasks[i].done);
    }
    // LeaderStableClosure在回调时会投票
    _log_manager->append_entries(&entries,
                               new LeaderStableClosure(
                                        NodeId(_group_id, _server_id),
                                        entries.size(),
                                        _ballot_box));
    // update _conf.first
    _log_manager->check_and_set_configuration(&_conf);
}

log持久化

无论是leader还是follower都是调用LogManager来持久化日志,对于leader来说,持久化完成后进行投票;对于follower来说,持久化完成后回包。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
void LogManager::append_entries(
            std::vector<LogEntry*> *entries, StableClosure* done) {
    std::unique_lock<raft_mutex_t> lck(_mutex);
    // 检查是否存在冲突
    if (!entries->empty() && check_and_resolve_conflict(entries, done) != 0) {
        lck.unlock();
        // release entries
        for (size_t i = 0; i < entries->size(); ++i) {
            (*entries)[i]->Release();
        }
        entries->clear();
        return;
    }

    for (size_t i = 0; i < entries->size(); ++i) {
        // Add ref for disk_thread
        (*entries)[i]->AddRef();
        if ((*entries)[i]->type == ENTRY_TYPE_CONFIGURATION) {
            ConfigurationEntry conf_entry(*((*entries)[i]));
            // 对于修改peer的类型的日志,加入config manager,影响后续的投票。
            _config_manager->add(conf_entry);
        }
    }

    if (!entries->empty()) {
        done->_first_log_index = entries->front()->id.index;
        // 插入内存
        _logs_in_memory.insert(_logs_in_memory.end(), entries->begin(), entries->end());
    }

    done->_entries.swap(*entries);
    // 刷盘
    int ret = bthread::execution_queue_execute(_disk_queue, done);
    CHECK_EQ(0, ret) << "execq execute failed, ret: " << ret << " err: " << berror();
    wakeup_all_waiter(lck);
}

int LogManager::check_and_resolve_conflict(
            std::vector<LogEntry*> *entries, StableClosure* done) {
    AsyncClosureGuard done_guard(done);
    if (entries->front()->id.index == 0) {
        // 某些情况下leader会设置index
        // Node is currently the leader and |entries| are from the user who
        // don't know the correct indexes the logs should assign to. So we have
        // to assign indexes to the appending entries
        for (size_t i = 0; i < entries->size(); ++i) {
            (*entries)[i]->id.index = ++_last_log_index;
        }
        done_guard.release();
        return 0;
    } else {
        // follower逻辑

        // Node is currently a follower and |entries| are from the leader. We 
        // should check and resolve the confliction between the local logs and
        // |entries|
        // 日志不连续性,报错
        if (entries->front()->id.index > _last_log_index + 1) {
            done->status().set_error(EINVAL, "There's gap between first_index=%" PRId64
                                     " and last_log_index=%" PRId64,
                                     entries->front()->id.index, _last_log_index);
            return -1;
        }
        const int64_t applied_index = _applied_id.index;
        // 如果最后一条日志已经apply过,直接返回
        if (entries->back()->id.index <= applied_index) {
            return 1;
        }

        if (entries->front()->id.index == _last_log_index + 1) {
            // 第一条日志正好对上,只需要更新_last_log_index
            // Fast path
            _last_log_index = entries->back()->id.index;
        } else {
            // Appending entries overlap the local ones. We should find if there
            // is a conflicting index from which we should truncate the local
            // ones.
            // 找到第一个冲突的日志index
            size_t conflicting_index = 0;
            for (; conflicting_index < entries->size(); ++conflicting_index) {
                if (unsafe_get_term((*entries)[conflicting_index]->id.index)
                        != (*entries)[conflicting_index]->id.term) {
                    break;
                }
            }
            // 如果存在冲突
            if (conflicting_index != entries->size()) {
                if ((*entries)[conflicting_index]->id.index <= _last_log_index) {
                    // Truncate all the conflicting entries to make local logs
                    // consensus with the leader.
                    // truncate日志
                    unsafe_truncate_suffix(
                            (*entries)[conflicting_index]->id.index - 1);
                }
                _last_log_index = entries->back()->id.index;
            }  // else this is a duplicated AppendEntriesRequest, we have 
               // nothing to do besides releasing all the entries
            
            // Release all the entries before the conflicting_index and the rest
            // would be append to _logs_in_memory and _log_storage after this
            // function returns
            for (size_t i = 0; i < conflicting_index; ++i) {
                (*entries)[i]->Release();
            }
            // 不冲突的日志直接移除,已经持久化过了。
            entries->erase(entries->begin(), 
                           entries->begin() + conflicting_index);
        }
        done_guard.release();
        return 0;
    }
    CHECK(false) << "Can't reach here";
    done->status().set_error(EIO, "Impossible");
    return -1;
}

日志刷盘

在LogManager的构造函数中创建了一个执行队列。

1
2
3
4
5
6
7
8
int LogManager::start_disk_thread() {
    bthread::ExecutionQueueOptions queue_options;
    queue_options.bthread_attr = BTHREAD_ATTR_NORMAL;
    return bthread::execution_queue_start(&_disk_queue,
                                   &queue_options,
                                   disk_thread,
                                   this);
}

disk_thread是负责刷盘的函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
int LogManager::disk_thread(void* meta,
                            bthread::TaskIterator<StableClosure*>& iter) {
    if (iter.is_queue_stopped()) {
        return 0;
    }

    LogManager* log_manager = static_cast<LogManager*>(meta);
    // FIXME(chenzhangyi01): it's buggy
    LogId last_id = log_manager->_disk_id;
    StableClosure* storage[256];
    AppendBatcher ab(storage, ARRAY_SIZE(storage), &last_id, log_manager);
    
    for (; iter; ++iter) {
                // ^^^ Must iterate to the end to release to corresponding
                //     even if some error has occurred
        StableClosure* done = *iter;
        if (!done->_entries.empty()) {
            // 组batch
            ab.append(done);
        } else {
            // 刷盘
            ab.flush();
            int ret = 0;
            // 日志项为空的任务,是一些管理类的任务。如LastLogIdClosure, TruncatePrefixClosure, TruncateSuffixClosure, ResetClosure
            do {
                TruncatePrefixClosure* tpc = 
                        dynamic_cast<TruncatePrefixClosure*>(done);
                if (tpc) {
                    ret = log_manager->_log_storage->truncate_prefix(
                                    tpc->first_index_kept());
                    // 命中一种即退出循环
                    break;
                }
            } while (0);

            if (ret != 0) {
                log_manager->report_error(ret, "Failed operation on LogStorage");
            }
            done->Run();
        }
    }
    CHECK(!iter) << "Must iterate to the end";
    ab.flush();
    log_manager->set_disk_id(last_id);
    return 0;
}

上面AppendBatcher::append会将日志加入队列,AppendBatcher::flush会调用LogManager::append_to_storage来写盘。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
void LogManager::append_to_storage(std::vector<LogEntry*>* to_append, 
                                   LogId* last_id, IOMetric* metric) {
    if (!_has_error.load(butil::memory_order_relaxed)) {
        // 调用底层存储实现,默认为SegmentLogStorage
        int nappent = _log_storage->append_entries(*to_append, metric);
        if (nappent != (int)to_append->size()) {
            report_error(EIO, "Fail to append entries");
        }
    }
    for (size_t j = 0; j < to_append->size(); ++j) {
        (*to_append)[j]->Release();
    }
    to_append->clear();
}

同步follower

_send_entries 负责同步task到follower。在apply_task时会将任务写到log_manager里面,而在_send_entries实现中,会调用_prepare_entry来从log_manager获取entry。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
void Replicator::_send_entries() {
    std::unique_ptr<brpc::Controller> cntl(new brpc::Controller);
    std::unique_ptr<AppendEntriesRequest> request(new AppendEntriesRequest);
    std::unique_ptr<AppendEntriesResponse> response(new AppendEntriesResponse);
    if (_fill_common_fields(request.get(), _next_index - 1, false) != 0) {
        _reset_next_index();
        return _install_snapshot();
    }
    EntryMeta em;
    const int max_entries_size = FLAGS_raft_max_entries_size - _flying_append_entries_size;
    int prepare_entry_rc = 0;
    CHECK_GT(max_entries_size, 0);
    for (int i = 0; i < max_entries_size; ++i) {
        // 从log_manager来准备entry
        prepare_entry_rc = _prepare_entry(i, &em, &cntl->request_attachment());
        if (prepare_entry_rc != 0) {
            break;
        }
        request->add_entries()->Swap(&em);
    }
    if (request->entries_size() == 0) {
        // _id is unlock in _wait_more
        if (_next_index < _options.log_manager->first_log_index()) {
            _reset_next_index();
            return _install_snapshot();
        }
        return _wait_more_entries();
    }

    _append_entries_in_fly.push_back(FlyingAppendEntriesRpc(_next_index,
                                     request->entries_size(), cntl->call_id()));
    _append_entries_counter++;
    _next_index += request->entries_size();
    _flying_append_entries_size += request->entries_size();
    
    g_send_entries_batch_counter << request->entries_size();

    _st.st = APPENDING_ENTRIES;
    _st.first_log_index = _min_flying_index();
    _st.last_log_index = _next_index - 1;
    google::protobuf::Closure* done = brpc::NewCallback(
                _on_rpc_returned, _id.value, cntl.get(), 
                request.get(), response.get(), butil::monotonic_time_ms());
    RaftService_Stub stub(&_sending_channel);
    stub.append_entries(cntl.release(), request.release(), 
                        response.release(), done);
    _wait_more_entries();
}

send_entries结果处理

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
void Replicator::_on_rpc_returned(ReplicatorId id, brpc::Controller* cntl,
                     AppendEntriesRequest* request, 
                     AppendEntriesResponse* response,
                     int64_t rpc_send_time) {
    std::unique_ptr<brpc::Controller> cntl_guard(cntl);
    std::unique_ptr<AppendEntriesRequest>  req_guard(request);
    std::unique_ptr<AppendEntriesResponse> res_guard(response);
    Replicator *r = NULL;
    bthread_id_t dummy_id = { id };
    const long start_time_us = butil::gettimeofday_us();

    bool valid_rpc = false;
    int64_t rpc_first_index = request->prev_log_index() + 1;
    int64_t min_flying_index = r->_min_flying_index();
    CHECK_GT(min_flying_index, 0);

    // 省略校验call_id是否匹配的逻辑
    if (cntl->Failed()) {
        ss << " fail, sleep.";
        BRAFT_VLOG << ss.str();

        // If the follower crashes, any RPC to the follower fails immediately,
        // so we need to block the follower for a while instead of looping until
        // it comes back or be removed
        // dummy_id is unlock in block
        r->_reset_next_index();
        // 失败了block一段时间,有可能follower crash
        return r->_block(start_time_us, cntl->ErrorCode());
    }
    r->_consecutive_error_times = 0;
    if (!response->success()) {
        // 如果响应中的term更大则退位
        if (response->term() > r->_options.term) {
            NodeImpl *node_impl = r->_options.node;
            butil::Status status;
            status.set_error(EHIGHERTERMRESPONSE, "Leader receives higher term "
                    "%s from peer:%s", response->GetTypeName().c_str(), r->_options.peer_id.to_string().c_str());
            node_impl->increase_term_to(response->term(), status);
            node_impl->Release();
            return;
        }
        // prev_log_index and prev_log_term doesn't match
        r->_reset_next_index();
        // log index 不匹配则直接设置next_index
        if (response->last_log_index() + 1 < r->_next_index) {
            // The peer contains less logs than leader
            r->_next_index = response->last_log_index() + 1;
        } else {  
            // 否则依次减1直到匹配(XXX 看下面的实现不是term不匹配导致的,那什么情况会走到这个逻辑?)
            // The peer contains logs from old term which should be truncated,
            // decrease _last_log_at_peer by one to test the right index to keep
            if (BAIDU_LIKELY(r->_next_index > 1)) {
                --r->_next_index;
            } else {
                LOG(ERROR) << "Group " << r->_options.group_id 
                           << " peer=" << r->_options.peer_id
                           << " declares that log at index=0 doesn't match,"
                              " which is not supposed to happen";
            }
        }
        // dummy_id is unlock in _send_heartbeat
        // 用于更新log index信息
        r->_send_empty_entries(false);
        return;
    }

    ss << " success";
    BRAFT_VLOG << ss.str();
    
    if (response->term() != r->_options.term) {
        r->_reset_next_index();
        CHECK_EQ(0, bthread_id_unlock(r->_id)) << "Fail to unlock " << r->_id;
        return;
    }
    r->_update_last_rpc_send_timestamp(rpc_send_time);
    const int entries_size = request->entries_size();
    const int64_t rpc_last_log_index = request->prev_log_index() + entries_size;
    BRAFT_VLOG_IF(entries_size > 0) << "Group " << r->_options.group_id
                                    << " replicated logs in [" 
                                    << min_flying_index << ", " 
                                    << rpc_last_log_index
                                    << "] to peer " << r->_options.peer_id;
    if (entries_size > 0) {
        // 检查是否通过投票
        r->_options.ballot_box->commit_at(
                min_flying_index, rpc_last_log_index,
                r->_options.peer_id);
    }
    // A rpc is marked as success, means all request before it are success,
    // erase them sequentially.
    while (!r->_append_entries_in_fly.empty() &&
           r->_append_entries_in_fly.front().log_index <= rpc_first_index) {
        r->_flying_append_entries_size -= r->_append_entries_in_fly.front().entries_size;
        r->_append_entries_in_fly.pop_front();
    }
    r->_has_succeeded = true;
    r->_notify_on_caught_up(0, false);
    // dummy_id is unlock in _send_entries
    if (r->_timeout_now_index > 0 && r->_timeout_now_index < r->_min_flying_index()) {
        r->_send_timeout_now(false, false);
    }
    // 再次调用send
    r->_send_entries();
    return;
}

apply_log

在log被持久化到多数节点上时,BallotBox::commit_at会调用下列函数向队列添加COMMITED任务。

1
2
3
4
5
6
int FSMCaller::on_committed(int64_t committed_index) {
    ApplyTask t;
    t.type = COMMITTED;
    t.committed_index = committed_index;
    return bthread::execution_queue_execute(_queue_id, t);
}

队列执行函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
int FSMCaller::run(void* meta, bthread::TaskIterator<ApplyTask>& iter) {
    FSMCaller* caller = (FSMCaller*)meta;
    if (iter.is_queue_stopped()) {
        caller->do_shutdown();
        return 0;
    }
    int64_t max_committed_index = -1;
    int64_t counter = 0;
    size_t  batch_size = FLAGS_raft_fsm_caller_commit_batch;
    for (; iter; ++iter) {
        if (iter->type == COMMITTED && counter < batch_size) {
            if (iter->committed_index > max_committed_index) {
                max_committed_index = iter->committed_index;
                counter++;
            }
        } else {
            if (max_committed_index >= 0) {
                caller->_cur_task = COMMITTED;
                // 出现连续的COMMITED任务超过commit_batch,或者出现了别的类型的任务时调用do_committed
                caller->do_committed(max_committed_index);
                max_committed_index = -1;
                g_commit_tasks_batch_counter << counter;
                counter = 0;
                batch_size = FLAGS_raft_fsm_caller_commit_batch;
            }
            switch (iter->type) {
            case COMMITTED:
                if (iter->committed_index > max_committed_index) {
                    max_committed_index = iter->committed_index;
                    counter++;
                }
                break;
            case SNAPSHOT_SAVE:
                // ...
}

do_commited实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
void FSMCaller::do_committed(int64_t committed_index) {
    if (!_error.status().ok()) {
        return;
    }
    int64_t last_applied_index = _last_applied_index.load(
                                        butil::memory_order_relaxed);

    // We can tolerate the disorder of committed_index
    if (last_applied_index >= committed_index) {
        return;
    }
    std::vector<Closure*> closure;
    int64_t first_closure_index = 0;
    CHECK_EQ(0, _closure_queue->pop_closure_until(committed_index, &closure,
                                                  &first_closure_index));

    IteratorImpl iter_impl(_fsm, _log_manager, &closure, first_closure_index,
                 last_applied_index, committed_index, &_applying_index);
    for (; iter_impl.is_good();) {
        if (iter_impl.entry()->type != ENTRY_TYPE_DATA) {
            // 配置变更
            if (iter_impl.entry()->type == ENTRY_TYPE_CONFIGURATION) {
                if (iter_impl.entry()->old_peers == NULL) {
                    // Joint stage is not supposed to be noticeable by end users.
                    _fsm->on_configuration_committed(
                            Configuration(*iter_impl.entry()->peers),
                            iter_impl.entry()->id.index);
                }
            }
            // 其他任务执行回调
            // For other entries, we have nothing to do besides flush the
            // pending tasks and run this closure to notify the caller that the
            // entries before this one were successfully committed and applied.
            if (iter_impl.done()) {
                iter_impl.done()->Run();
            }
            iter_impl.next();
            continue;
        }
        Iterator iter(&iter_impl);
        // 执行用户逻辑来apply
        _fsm->on_apply(iter);
        // Try move to next in case that we pass the same log twice.
        iter.next();
    }
    if (iter_impl.has_error()) {
        set_error(iter_impl.error());
        iter_impl.run_the_rest_closure_with_error();
    }
    const int64_t last_index = iter_impl.index() - 1;
    const int64_t last_term = _log_manager->get_term(last_index);
    LogId last_applied_id(last_index, last_term);
    // 记录applied index
    _last_applied_index.store(committed_index, butil::memory_order_release);
    _last_applied_term = last_term;
    // 注意:applied index并没有持久化保存,只在内存中保存。
    _log_manager->set_applied_id(last_applied_id);
}

节点更新

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void NodeImpl::unsafe_register_conf_change(const Configuration& old_conf,
                                           const Configuration& new_conf,
                                           Closure* done) {
    // 非leader报错
    if (_state != STATE_LEADER) {
            if (_state == STATE_TRANSFERRING) {
                done->status().set_error(EBUSY, "Is transferring leadership");
            } else {
                done->status().set_error(EPERM, "Not leader");
            }
            run_closure_in_bthread(done);
        return;
    }

    // check concurrent conf change
    // 如果在更新conf,则报错
    if (_conf_ctx.is_busy()) {
        if (done) {
            done->status().set_error(EBUSY, "Doing another configuration change");
            run_closure_in_bthread(done);
        }
        return;
    }

    // Return immediately when the new peers equals to current configuration
    // conf不变直接返回
    if (_conf.conf.equals(new_conf)) {
        run_closure_in_bthread(done);
        return;
    }

    return _conf_ctx.start(old_conf, new_conf, done);
}

追赶阶段

如果新的节点配置相对于当前有新增的一个或者多个节点,leader对应的Replicator, 向把最新的snapshot再这个这些中安装,然后开始同步之后的日志。等到所有的新节点数据都追的差不多,就开始进入一下一阶段。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
void NodeImpl::ConfigurationCtx::start(const Configuration& old_conf,
                                       const Configuration& new_conf,
                                       Closure* done) {
    _done = done;
    // 设置状态为追赶阶段
    _stage = STAGE_CATCHING_UP;
   // diff得到节点变化
    new_conf.diffs(old_conf, &adding, &removing);
    // 没有新增节点进入下一阶段
    if (adding.empty()) {
        return next_stage();
    }
    adding.list_peers(&_adding_peers);
    for (std::set<PeerId>::const_iterator iter
            = _adding_peers.begin(); iter != _adding_peers.end(); ++iter) {
        // 创建replicator
        if (_node->_replicator_group.add_replicator(*iter) != 0) {
        OnCaughtUp* caught_up = new OnCaughtUp(
                _node, _node->_current_term, *iter, _version);
        timespec due_time = butil::milliseconds_from_now(
                _node->_options.get_catchup_timeout_ms());
        // 对新增节点,等待新节点数据追的差不多,默认margin为1000
        if (_node->_replicator_group.wait_caughtup(
            *iter, _node->_options.catchup_margin, &due_time, caught_up) != 0) {
            delete caught_up;
            return on_caughtup(_version, *iter, false);
        }
    }
}

int ReplicatorGroup::wait_caughtup(const PeerId& peer, 
                                   int64_t max_margin, const timespec* due_time,
                                   CatchupClosure* done) {
    std::map<PeerId, ReplicatorIdAndStatus>::iterator iter = _rmap.find(peer);
    ReplicatorId rid = iter->second.id;
    // 找replicator
    Replicator::wait_for_caught_up(rid, max_margin, due_time, done);
    return 0;
}

void Replicator::wait_for_caught_up(ReplicatorId id, 
                                    int64_t max_margin,
                                    const timespec* due_time,
                                    CatchupClosure* done) {
    bthread_id_t dummy_id = { id };
    Replicator* r = NULL;
    if (bthread_id_lock(dummy_id, (void**)&r) != 0) {
    done->_max_margin = max_margin;
    // 已经追上
    if (r->_has_succeeded && r->_is_catchup(max_margin)) {
        run_closure_in_bthread(done);
        CHECK_EQ(0, bthread_id_unlock(dummy_id))
                << "Fail to unlock" << dummy_id;
        return;
    }
    if (due_time != NULL) {
        done->_has_timer = true;
        // 创建定时器
        if (bthread_timer_add(&done->_timer,
                              *due_time,
                              _on_catch_up_timedout,
                              (void*)id) != 0) {
            done->status().set_error(EINVAL, "Duplicated call");
            run_closure_in_bthread(done);
            return;
        }
    }
    r->_catchup_closure = done;
    // success
    CHECK_EQ(0, bthread_id_unlock(dummy_id)) 
            << "Fail to unlock " << dummy_id;
    return;
}

caughtup回调

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
void NodeImpl::on_caughtup(const PeerId& peer, int64_t term,
                           int64_t version, const butil::Status& st) {
    BAIDU_SCOPED_LOCK(_mutex);
    // CHECK _state and _current_term to avoid ABA problem
    // leader退位处理
    if (_state != STATE_LEADER || term != _current_term) {
        // if leader stepped down, reset() has already been called in step_down(),
        // so nothing needs to be done here
        return;
    }

    // 成功追上
    if (st.ok()) {  // Caught up successfully
        _conf_ctx.on_caughtup(version, peer, true);
        return;
    }

    // Retry if this peer is still alive
    // 如果超时再次重试
    if (st.error_code() == ETIMEDOUT 
            && (butil::monotonic_time_ms()
                - _replicator_group.last_rpc_send_timestamp(peer))
                    <= _options.election_timeout_ms) {

        OnCaughtUp* caught_up = new OnCaughtUp(this, _current_term, peer, version);
        if (0 == _replicator_group.wait_caughtup(
                    peer, _options.catchup_margin, &due_time, caught_up)) {
            return;
        } else {
            LOG(WARNING) << "node " << _group_id << ":" << _server_id
                << " wait_caughtup failed, peer " << peer;
            delete caught_up;
        }
    }

    // 发起重试失败,则彻底失败
    _conf_ctx.on_caughtup(version, peer, false);
}

void NodeImpl::ConfigurationCtx::on_caughtup(
        int64_t version, const PeerId& peer_id, bool succ) {
    // 检查必须为追赶阶段
    CHECK_EQ(STAGE_CATCHING_UP, _stage);
    if (succ) {
        _adding_peers.erase(peer_id);
        // add的节点为空则进入下一阶段
        if (_adding_peers.empty()) {
            return next_stage();
        }
        return;
    }

    // Fail
    butil::Status err(ECATCHUP, "Peer %s failed to catch up",
                      peer_id.to_string().c_str());
    // 失败重置
    reset(&err);
}

联合选举阶段

leader会将旧节点配置和新节点配置写入Log, 在这个阶段之后直到下一个阶段之前,所有的选举和日志同步都需要在新老节点之间达到多数。

变更节点大于1时,进入联合选举阶段;如果这次只变更了一个节点, 则直接进入下一阶段。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
void NodeImpl::ConfigurationCtx::next_stage() {
    CHECK(is_busy());
    switch (_stage) {
    case STAGE_CATCHING_UP:
        if (_nchanges > 1) {
            _stage = STAGE_JOINT;
            Configuration old_conf(_old_peers);
            return _node->unsafe_apply_configuration(
                    Configuration(_new_peers), &old_conf, false);
        }
        // Skip joint consensus since only one peer has been changed here. Make
        // it a one-stage change to be compitible with the legacy
        // implementation.
    case STAGE_JOINT:
        _stage = STAGE_STABLE;
        return _node->unsafe_apply_configuration(
                    Configuration(_new_peers), NULL, false);
    // ...
}

它会生成一个类型为ENTRY_TYPE_CONFIGURATION的logEntry,将entry的peers设置为新配置,old_peers设置为旧配置。然后把这个任务添加到投票箱里面,并调用LogManager::append_entries把entry append到内存并持久化。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void NodeImpl::unsafe_apply_configuration(const Configuration& new_conf,
                                          const Configuration* old_conf,
                                          bool leader_start) {
    CHECK(_conf_ctx.is_busy());
    LogEntry* entry = new LogEntry();
    entry->AddRef();
    entry->id.term = _current_term;
    // 产生一个配置变更的日志项
    entry->type = ENTRY_TYPE_CONFIGURATION;
    entry->peers = new std::vector<PeerId>;
    new_conf.list_peers(entry->peers);
    if (old_conf) {
        entry->old_peers = new std::vector<PeerId>;
        old_conf->list_peers(entry->old_peers);
    }
    ConfigurationChangeDone* configuration_change_done =
            new ConfigurationChangeDone(this, _current_term, leader_start, _leader_lease.lease_epoch());
    // Use the new_conf to deal the quorum of this very log
    _ballot_box->append_pending_task(new_conf, old_conf, configuration_change_done);

    std::vector<LogEntry*> entries;
    entries.push_back(entry);
    // append 日志项
    _log_manager->append_entries(&entries,
                                 new LeaderStableClosure(
                                        NodeId(_group_id, _server_id),
                                        1u, _ballot_box));
    _log_manager->check_and_set_configuration(&_conf);
}

如果entry type是ENTRY_TYPE_CONFIGURATION的话就把这个配置append到_config_manager里面。之后会调用LogManager::check_and_set_configuration把_conf设置为刚刚放进去的新配置(其中old_conf为之前的配置)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
void LogManager::append_entries(
            std::vector<LogEntry*> *entries, StableClosure* done) {
    // ...
    for (size_t i = 0; i < entries->size(); ++i) {
        // Add ref for disk_thread
        (*entries)[i]->AddRef();
        if ((*entries)[i]->type == ENTRY_TYPE_CONFIGURATION) {
            ConfigurationEntry conf_entry(*((*entries)[i]));
            // 对于修改peer的类型的日志,加入config manager,影响后续的投票。
            _config_manager->add(conf_entry);
        }
    }
    // ...
}

在这个时间点之后,产生的任务,在放到投票箱的时候_conf.stable会返回false,然后将第二个参数设置为_conf.old_conf。因此这个时间点之后产生的任务需要新旧两个配置共同决定是否提交,也就是JOINT状态。

1
2
3
_ballot_box->append_pending_task(_conf.conf,
                                         _conf.stable() ? NULL : &_conf.old_conf,
                                         tasks[i].done);

在投票阶段,新旧配置同时达到多数才通过。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
int Ballot::init(const Configuration& conf, const Configuration* old_conf) {
    _peers.reserve(conf.size());
    for (Configuration::const_iterator
            iter = conf.begin(); iter != conf.end(); ++iter) {
        _peers.push_back(*iter);
    }
    // 设置
    _quorum = _peers.size() / 2 + 1;
    if (!old_conf) {
        return 0;
    }
    _old_peers.reserve(old_conf->size());
    for (Configuration::const_iterator
            iter = old_conf->begin(); iter != old_conf->end(); ++iter) {
        _old_peers.push_back(*iter);
    }
    // 设置
    _old_quorum = _old_peers.size() / 2 + 1;
    return 0;
}

Ballot::PosHint Ballot::grant(const PeerId& peer, PosHint hint) {
    std::vector<UnfoundPeerId>::iterator iter;
    iter = find_peer(peer, _peers, hint.pos0);
    if (iter != _peers.end()) {
            iter->found = true;
            // 新配置计数
            --_quorum;
    } else {
        hint.pos0 = -1;
    }

    if (_old_peers.empty()) {
        return hint;
    }

    iter = find_peer(peer, _old_peers, hint.pos1);

    if (iter != _old_peers.end()) {
            iter->found = true;
            // 旧配置计数
            --_old_quorum;
    } else {
        hint.pos1 = -1;
    }

    return hint;
}

bool granted() const { return _quorum <= 0 && _old_quorum <= 0; }

ConfigurationChangeDone回调时,进入下一阶段。

新配置同步阶段

当联合选举日志正式被新旧集群接受之后,leader将新节点配置写入log,之后所有的log和选举只需要在新集群中达成一致。 等待日志提交到新集群中的多数节点中之后, 正式完成节点变更。

1
2
3
4
    case STAGE_JOINT:
        _stage = STAGE_STABLE;
        return _node->unsafe_apply_configuration(
                    Configuration(_new_peers), NULL, false);

和联合选举阶段调用相同的实现,只是old_conf为NULL。

清理阶段

leader会将多余的Replicator(如果有)关闭,特别如果当leader本身已经从节点配置中被移除,这时候leader会执行stepdown并且唤醒一个合适的节点触发选举。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
case STAGE_STABLE:
        {
            bool should_step_down = 
                _new_peers.find(_node->_server_id) == _new_peers.end();
            butil::Status st = butil::Status::OK();
            reset(&st);
            if (should_step_down) {
                // 退位
                _node->step_down(_node->_current_term, true,
                        butil::Status(ELEADERREMOVED, "This node was removed"));
            }
            return;
        }

快照

打快照

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
void NodeImpl::do_snapshot(Closure* done) {
        _snapshot_executor->do_snapshot(done);
}

void SnapshotExecutor::do_snapshot(Closure* done) {
    // 创建writer,默认为LocalSnapshotWriter,使用临时文件路径来初始化
    SnapshotWriter* writer = _snapshot_storage->create();
    _saving_snapshot = true;
    SaveSnapshotDone* snapshot_save_done = new SaveSnapshotDone(this, writer, done);
    if (_fsm_caller->on_snapshot_save(snapshot_save_done) != 0);
}

int FSMCaller::on_snapshot_save(SaveSnapshotClosure* done) {
    ApplyTask task;
    task.type = SNAPSHOT_SAVE;
    task.done = done;
    // 加入队列等待执行
    return bthread::execution_queue_execute(_queue_id, task);
}

FSMCaller处理到打快照的任务时,执行下面的函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void FSMCaller::do_snapshot_save(SaveSnapshotClosure* done) {
    CHECK(done);

    int64_t last_applied_index = _last_applied_index.load(butil::memory_order_relaxed);

    // 设置meta信息
    SnapshotMeta meta;
    meta.set_last_included_index(last_applied_index);
    meta.set_last_included_term(_last_applied_term);
    ConfigurationEntry conf_entry;
    _log_manager->get_configuration(last_applied_index, &conf_entry);
    for (Configuration::const_iterator
            iter = conf_entry.conf.begin();
            iter != conf_entry.conf.end(); ++iter) { 
        *meta.add_peers() = iter->to_string();
    }
    for (Configuration::const_iterator
            iter = conf_entry.old_conf.begin();
            iter != conf_entry.old_conf.end(); ++iter) { 
        *meta.add_old_peers() = iter->to_string();
    }

    // 将meta保存到writer对象中
    SnapshotWriter* writer = done->start(meta);
    if (!writer) {
        done->status().set_error(EINVAL, "snapshot_storage create SnapshotWriter failed");
        done->Run();
        return;
    }

    // 调用用户的snapshot save实现
    _fsm->on_snapshot_save(writer, done);
    return;
}

在用户的snapshot save实现中,会回调SaveSnapshotClosure::Run,来完成snapshot meta信息的保存。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void SaveSnapshotDone::Run() {
    // 启动bthread来避免阻塞状态机
    // Avoid blocking FSMCaller
    // This continuation of snapshot saving is likely running inplace where the
    // on_snapshot_save is called (in the FSMCaller thread) and blocks all the
    // following on_apply. As blocking is not necessary and the continuation is
    // not important, so we start a bthread to do this.
    bthread_t tid;
    if (bthread_start_urgent(&tid, NULL, continue_run, this) != 0) {
        PLOG(ERROR) << "Fail to start bthread";
        continue_run(this);
    }
}

void* SaveSnapshotDone::continue_run(void* arg) {
    SaveSnapshotDone* self = (SaveSnapshotDone*)arg;
    std::unique_ptr<SaveSnapshotDone> self_guard(self);
    // Must call on_snapshot_save_done to clear _saving_snapshot
    int ret = self->_se->on_snapshot_save_done(
        self->status(), self->_meta, self->_writer);
    if (ret != 0 && self->status().ok()) {
        self->status().set_error(ret, "node call on_snapshot_save_done failed");
    }
    //user done, need set error
    if (self->_done) {
        self->_done->status() = self->status();
    }
    if (self->_done) {
        run_closure_in_bthread(self->_done, true);
    }
    return NULL;
}

上面的continue_run调用下面的实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
int SnapshotExecutor::on_snapshot_save_done(
    const butil::Status& st, const SnapshotMeta& meta, SnapshotWriter* writer) {
    std::unique_lock<raft_mutex_t> lck(_mutex);
    int ret = st.error_code();
    // InstallSnapshot can break SaveSnapshot, check InstallSnapshot when SaveSnapshot
    // because upstream Snapshot maybe newer than local Snapshot.
    if (st.ok()) {
        // 如果leader发送的snapshot更新,忽略本地的snapshot
        if (meta.last_included_index() <= _last_snapshot_index) {
            ret = ESTALE;
            writer->set_error(ESTALE, "Installing snapshot is older than local snapshot");
        }
    }
    lck.unlock();
    
    if (ret == 0) {
        if (writer->save_meta(meta)) {
        }
    } else {
        if (writer->ok()) {
            writer->set_error(ret, "Fail to do snapshot");
        }
    }

    // 默认调用LocalSnapshotStorage,保存snapshot meta信息到文件,将snapshot目录从临时路径重命名为最终路径
    if (_snapshot_storage->close(writer) != 0) {
        ret = EIO;
        LOG(WARNING) << "node " << _node->node_id() << " fail to close writer";
    }

    if (ret == 0) {
        _last_snapshot_index = meta.last_included_index();
        _last_snapshot_term = meta.last_included_term();
        lck.unlock();
        _log_manager->set_snapshot(&meta);
        lck.lock();
    }
    if (ret == EIO) {
        report_error(EIO, "Fail to save snapshot");
    }
    _saving_snapshot = false;
    lck.unlock();
    _running_jobs.signal();
    return ret;
}

set_snapshot更新LogManager的状态

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
void LogManager::set_snapshot(const SnapshotMeta* meta) {
    std::unique_lock<raft_mutex_t> lck(_mutex);
    if (meta->last_included_index() <= _last_snapshot_id.index) {
        return;
    }
    Configuration conf;
    for (int i = 0; i < meta->peers_size(); ++i) {
        conf.add_peer(meta->peers(i));
    }
    Configuration old_conf;
    for (int i = 0; i < meta->old_peers_size(); ++i) {
        old_conf.add_peer(meta->old_peers(i));
    }
    ConfigurationEntry entry;
    entry.id = LogId(meta->last_included_index(), meta->last_included_term());
    entry.conf = conf;
    entry.old_conf = old_conf;
    _config_manager->set_snapshot(entry);
    int64_t term = unsafe_get_term(meta->last_included_index());

    const LogId last_but_one_snapshot_id = _last_snapshot_id;
    _last_snapshot_id.index = meta->last_included_index();
    _last_snapshot_id.term = meta->last_included_term();
    if (_last_snapshot_id > _applied_id) {
        _applied_id = _last_snapshot_id;
    }
    // NOTICE: not to update disk_id here as we are not sure if this node really
    // has these logs on disk storage. Just leave disk_id as it was, which can keep
    // these logs in memory all the time until they are flushed to disk. By this 
    // way we can avoid some corner cases which failed to get logs.

    if (term == 0) {
        // 如果term等于0,说明last_included_index大于last_index(一般发生在follower处),
        // 则把缓存和文件里面的log entry从前面截断到last_included_index
        // last_included_index is larger than last_index
        // FIXME: what if last_included_index is less than first_index?
        _virtual_first_log_id = _last_snapshot_id;
        truncate_prefix(meta->last_included_index() + 1, lck);
        return;
    } else if (term == meta->last_included_term()) {
        // 如果term等于 meta->last_included_term,说明log entry里面还存在着这条记录,
        // 先不着急截断,把它截断到上一个快照处(如果有的话)。
        // Truncating log to the index of the last snapshot.
        // We don't truncate log before the latest snapshot immediately since
        // some log around last_snapshot_index is probably needed by some
        // followers
        if (last_but_one_snapshot_id.index > 0) {
            // We have last snapshot index
            _virtual_first_log_id = last_but_one_snapshot_id;
            truncate_prefix(last_but_one_snapshot_id.index + 1, lck);
        }
        return;
    } else {
        // 其他情况对应index上的term不等于meta->last_included_term,
        // 则可能是follower处正在安装快照,这种情况,直接reset,
        // 让_first_log_index指向last_included_index,
        // _last_log_index指向last_included_index-1,把entries清空。
        // TODO: check the result of reset.
        _virtual_first_log_id = _last_snapshot_id;
        reset(meta->last_included_index() + 1, lck);
        return;
    }
    CHECK(false) << "Cannot reach here";
}

InstallSnapshot

1
2
3
4
5
6
7
8
void Replicator::_send_entries() {
    // ...
   if (_fill_common_fields(request.get(), _next_index - 1, false) != 0) {
        _reset_next_index();
        return _install_snapshot();
    }
    // ...
}

在_send_entries实现中,如果要发送的entry在log_manager中不存在(已经合入snapshot并在本地清理了日志),_fill_common_fields返回非0值,此时会调用_install_snapshot来安装快照。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
void Replicator::_install_snapshot() {
    // pre-set replicator state to INSTALLING_SNAPSHOT, so replicator could be
    // blocked if something is wrong, such as throttled for a period of time 
    _st.st = INSTALLING_SNAPSHOT;

    // 返回最新的快照reader,默认实现为LocalSnapshotReader
    _reader = _options.snapshot_storage->open();
    if (!_reader) {
        e.status().set_error(EIO, "Fail to open snapshot");
        return;
    } 
    // 生成copy的uri,格式为 remote://host:port/reader_id_number
    // 通过reader id可以区分不同的读者
    std::string uri = _reader->generate_uri_for_copy();
    SnapshotMeta meta;
    // report error on failure
    if (_reader->load_meta(&meta) != 0) {
        e.status().set_error(EIO, "Fail to load meta from " + snapshot_path);
        return;
    } 
    brpc::Controller* cntl = new brpc::Controller;
    cntl->set_max_retry(0);
    cntl->set_timeout_ms(-1);
    InstallSnapshotRequest* request = new InstallSnapshotRequest();
    InstallSnapshotResponse* response = new InstallSnapshotResponse();
    request->set_term(_options.term);
    request->set_group_id(_options.group_id);
    request->set_server_id(_options.server_id.to_string());
    request->set_peer_id(_options.peer_id.to_string());
    // 设置meta和uri
    request->mutable_meta()->CopyFrom(meta);
    request->set_uri(uri);

    _install_snapshot_in_fly = cntl->call_id();
    _install_snapshot_counter++;
    _st.last_log_included = meta.last_included_index();
    _st.last_term_included = meta.last_included_term();
    google::protobuf::Closure* done = brpc::NewCallback<
                ReplicatorId, brpc::Controller*,
                InstallSnapshotRequest*, InstallSnapshotResponse*>(
                    _on_install_snapshot_returned, _id.value,
                    cntl, request, response);
    RaftService_Stub stub(&_sending_channel);
    stub.install_snapshot(cntl, request, response, done);
    CHECK_EQ(0, bthread_id_unlock(_id)) << "Fail to unlock " << _id;
}

follower的处理逻辑

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void NodeImpl::handle_install_snapshot_request(brpc::Controller* cntl,
                                    const InstallSnapshotRequest* request,
                                    InstallSnapshotResponse* response,
                                    google::protobuf::Closure* done) {
    // check stale term
    // 让过期的leader退位
    if (request->term() < _current_term) {
        response->set_term(_current_term);
        response->set_success(false);
        return;
    }
    
    check_step_down(request->term(), server_id);

    // 如果接收到了另一个peer的install snapshot请求,term加1,让两个leader都退位
    if (server_id != _leader_id) {
        // Increase the term by 1 and make both leaders step down to minimize the
        // loss of split brain
        butil::Status status;
        status.set_error(ELEADERCONFLICT, "More than one leader in the same term."); 
        step_down(request->term() + 1, false, status);
        response->set_success(false);
        response->set_term(request->term() + 1);
        return;
    }
    clear_append_entries_cache();
    lck.unlock();
    return _snapshot_executor->install_snapshot(
            cntl, request, response, done_guard.release());
}

install_snapshot实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void SnapshotExecutor::install_snapshot(brpc::Controller* cntl,
                                        const InstallSnapshotRequest* request,
                                        InstallSnapshotResponse* response,
                                        google::protobuf::Closure* done) {
    int ret = 0;
    brpc::ClosureGuard done_guard(done);
    SnapshotMeta meta = request->meta();

    std::unique_ptr<DownloadingSnapshot> ds(new DownloadingSnapshot);
    ds->cntl = cntl;
    ds->done = done;
    ds->response = response;
    ds->request = request;
    // 启动bthread来开始copy。
    // 如果已经存在任务,视index新旧取消原任务或忽略新任务
    ret = register_downloading_snapshot(ds.get());
    //    ^^^ DON'T access request, response, done and cntl after this point
    //        as the retry snapshot will replace this one.
    if (ret != 0) {
        return;
    }
    // Release done first as this RPC might be replaced by the retry one
    done_guard.release();
    CHECK(_cur_copier);
    // 等待copy的bthread执行完成
    _cur_copier->join();
    // 加载下载的snapshot
    return load_downloading_snapshot(ds.release(), meta);
}

LocalSnapshotCopier创建及初始化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
SnapshotCopier* LocalSnapshotStorage::start_to_copy_from(const std::string& uri) {
    LocalSnapshotCopier* copier = new LocalSnapshotCopier();
    copier->_storage = this;
    copier->_filter_before_copy_remote = _filter_before_copy_remote;
    copier->_fs = _fs.get();
    copier->_throttle = _snapshot_throttle.get();
    // 解析leader的ip:port以及reader_id
    if (copier->init(uri) != 0) {
        LOG(ERROR) << "Fail to init copier from " << uri
                   << " path: " << _path;
        delete copier;
        return NULL;
    }
    copier->start();
    return copier;
}

拷贝实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void LocalSnapshotCopier::copy() {
    do {
        // 加载remote meta信息,具体实现为将远程的meta文件读取到内存的IOBuf,然后进行解析
        load_meta_table();
        if (!ok()) {
            break;
        }
        // 根据开关,允许对和本地snapshot中文件名和校验和相同的文件跳过下载
        filter();
        if (!ok()) {
            break;
        }
        std::vector<std::string> files;
        _remote_snapshot.list_files(&files);
        // 逐个拷贝文件
        for (size_t i = 0; i < files.size() && ok(); ++i) {
            // 通过文件操作RPC,将文件分段拷贝过来。
            copy_file(files[i]);
        }
    } while (0);
    if (!ok() && _writer && _writer->ok()) {
        _writer->set_error(error_code(), error_cstr());
    }
    if (_writer) {
        // set_error for copier only when failed to close writer and copier was 
        // ok before this moment 
        if (_storage->close(_writer, _filter_before_copy_remote) != 0 && ok()) {
            set_error(EIO, "Fail to close writer");
        }
        _writer = NULL;
    }
    if (ok()) {
        _reader = _storage->open();
    }
}

在snapshot下载完成后,调用SnapshotExecutor::load_downloading_snapshot来加载

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void SnapshotExecutor::load_downloading_snapshot(DownloadingSnapshot* ds,
                                                 const SnapshotMeta& meta) {
    brpc::ClosureGuard done_guard(ds->done);
    CHECK(_cur_copier);
    SnapshotReader* reader = _cur_copier->get_reader();
    if (!_cur_copier->ok()) {
        ds->cntl->SetFailed(_cur_copier->error_code(), "%s",
                            _cur_copier->error_cstr());
        return;
    }
    _snapshot_storage->close(_cur_copier);
    _cur_copier = NULL;
    if (reader == NULL || !reader->ok()) {
        ds->cntl->SetFailed(brpc::EINTERNAL, 
                           "Fail to copy snapshot from %s",
                            ds->request->uri().c_str());
        return;
    }
    // The owner of ds is on_snapshot_load_done
    ds_guard.release();
    done_guard.release();
    _loading_snapshot = true;
    //                ^ After this point, this installing cannot be interrupted
    _loading_snapshot_meta = meta;
    lck.unlock();
    InstallSnapshotDone* install_snapshot_done =
            new InstallSnapshotDone(this, reader);
    // 调用用户的snapshot load实现
    int ret = _fsm_caller->on_snapshot_load(install_snapshot_done);
    if (ret != 0) {
        install_snapshot_done->status().set_error(EHOSTDOWN, "This raft node is down");
        return install_snapshot_done->Run();
    }
}

Leader对InstallSnapshot响应的处理逻辑

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void Replicator::_on_install_snapshot_returned(
            ReplicatorId id, brpc::Controller* cntl,
            InstallSnapshotRequest* request, 
            InstallSnapshotResponse* response) {
    Replicator *r = NULL;
    bthread_id_t dummy_id = { id };
    bool succ = true;
    if (bthread_id_lock(dummy_id, (void**)&r) != 0) {
        return;
    }
    if (r->_reader) {
        // 关闭reader
        r->_options.snapshot_storage->close(r->_reader);
        r->_reader = NULL;
        if (r->_options.snapshot_throttle) {
            r->_options.snapshot_throttle->finish_one_task(true);
        }
    }
    // 如果失败会在后续的_send_entries中再次调用InstallSnapshot
    // We don't retry installing the snapshot explicitly. 
    // dummy_id is unlock in _send_entries
    if (!succ) {
        return r->_block(butil::gettimeofday_us(), cntl->ErrorCode());
    }
    r->_has_succeeded = true;
    r->_notify_on_caught_up(0, false);
    if (r->_timeout_now_index > 0 && r->_timeout_now_index < r->_min_flying_index()) {
        r->_send_timeout_now(false, false);
    }
    // dummy_id is unlock in _send_entries
    return r->_send_entries();
}

参考