RAFT是一种新型易于理解的分布式一致性复制协议,由斯坦福大学的Diego Ongaro和John Ousterhout提出,作为RAMCloud项目中的中心协调组件。Raft是一种Leader-Based的Multi-Paxos变种,相比Paxos、Zab、View Stamped Replication等协议提供了更完整更清晰的协议描述,并提供了清晰的节点增删描述。RAFT是一种新型易于理解的分布式一致性复制协议,由斯坦福大学的Diego Ongaro和John Ousterhout提出,作为RAMCloud项目中的中心协调组件。Raft是一种Leader-Based的Multi-Paxos变种,相比Paxos、Zab、View Stamped Replication等协议提供了更完整更清晰的协议描述,并提供了清晰的节点增删描述。
一、使用braft
1.1注册并启动Server
braft需要运行在具体的brpc server里面,你可以让braft和你的业务共享同样的端口, 也可以将braft启动到不同的端口中。 brpc允许一个端口上注册多个逻辑Service, 如果你的Service同样运行在brpc Server里面,你可以管理brpc Server并且调用以下任意一个接口将braft相关的Service加入到你的Server中。这样能让braft和你的业务跑在同样的端口里面, 降低运维的复杂度。
int add_service(brpc::Server* server, const butil::EndPoint& listen_addr);
int add_service(brpc::Server* server, int port);
int add_service(brpc::Server* server, const char* const butil::EndPoint& listen_addr);
add_service的部分代码如下,负责把braft相关的service添加到brpc server里面。RaftServiceImpl主要和raft协议有关,有一些选举、append_entries和快照相关的接口。CliServiceImpl负责管理braft相关的工作,比如add_peer,get_leader,transfer_leader等操作。
int NodeManager::add_service(brpc::Server* server,
const butil::EndPoint& listen_address) {
...
if (0 != server->AddService(
new RaftServiceImpl(listen_address),
brpc::SERVER_OWNS_SERVICE)) {
LOG(ERROR) << "Fail to add RaftService";
return -1;
}
...
if (0 != server->AddService(new CliServiceImpl, brpc::SERVER_OWNS_SERVICE)) {
LOG(ERROR) << "Fail to add CliService";
return -1;
}
{
BAIDU_SCOPED_LOCK(_mutex);
_addr_set.insert(listen_address);
}
return 0;
}
1.2实现业务状态机
需要继承braft::StateMachine并且实现里面的接口
#include
class YourStateMachineImple : public braft::StateMachine {
protected:
// on_apply是*必须*实现的
// on_apply会在一条或者多条日志被多数节点持久化之后调用, 通知用户将这些日志所表示的操作应用到业务状态机中.
// 通过iter, 可以从遍历所有未处理但是已经提交的日志, 如果你的状态机支持批量更新,可以一次性获取多
// 条日志提高状态机的吞吐.
//
void on_apply(braft::Iterator& iter) {
for (; iter.valid(); iter.next()) {
// This guard helps invoke iter.done()->Run() asynchronously to
// avoid that callback blocks the StateMachine.
braft::AsyncClosureGuard closure_guard(iter.done());
// Parse operation from iter.data() and execute this operation
// op = parse(iter.data());
// result = process(op)
// The purpose of following logs is to help you understand the way
// this StateMachine works.
// Remove these logs in performance-sensitive servers.
LOG_IF(INFO, FLAGS_log_applied_task)
<< "Exeucted operation " << op
<< " and the result is " << result
<< " at log_index=" << iter.index();
}
}
// 当这个braft节点被shutdown之后, 当所有的操作都结束, 会调用on_shutdown, 来通知用户这个状态机不再被使用。
// 这时候你可以安全的释放一些资源了.
virtual void on_shutdown() {
// Cleanup resources you'd like
}
1.3构造braft::Node
一个Node代表了一个RAFT实例, Node的ID由两个部分组成:GroupId: 为一个string, 表示这个复制组的ID.
PeerId, 结构是一个EndPoint表示对外服务的端口, 外加一个index(默认为0). 其中index的作用是让不同的副本能运行在同一个进程内, 在下面几个场景中,这个值不能忽略:
Node(const GroupId& group_id, const PeerId& peer_id);
启动这个节点:
// Starts this node
int start() {
butil::EndPoint addr(butil::my_ip(), FLAGS_port);
braft::NodeOptions node_options;
if (node_options.initial_conf.parse_from(FLAGS_conf) != 0) {
LOG(ERROR) << "Fail to parse configuration `" << FLAGS_conf << ''';
return -1;
}
node_options.election_timeout_ms = FLAGS_election_timeout_ms;
node_options.fsm = this;
node_options.node_owns_fsm = false;
node_options.snapshot_interval_s = FLAGS_snapshot_interval;
std::string prefix = "local://" + FLAGS_data_path;
node_options.log_uri = prefix + "/log";
node_options.raft_meta_uri = prefix + "/raft_meta";
node_options.snapshot_uri = prefix + "/snapshot";
node_options.disable_cli = FLAGS_disable_cli;
braft::Node* node = new braft::Node(FLAGS_group, braft::PeerId(addr));
if (node->init(node_options) != 0) {
LOG(ERROR) << "Fail to init raft node";
delete node;
return -1;
}
_node = node;
return 0;
}initial_conf只有在这个复制组从空节点启动才会生效,当有snapshot和log里的数据不为空的时候的时候从其中恢复Configuration。initial_conf只用于创建复制组,第一个节点将自己设置进initial_conf,再调用add_peer添加其他节点,其他节点initial_conf设置为空;也可以多个节点同时设置相同的inital_conf(多个节点的ip:port)来同时启动空节点。
RAFT需要三种不同的持久存储, 分别是:RaftMetaStorage, 用来存放一些RAFT算法自身的状态数据, 比如term, vote_for等信息.
LogStorage, 用来存放用户提交的WAL
SnapshotStorage, 用来存放用户的Snapshot以及元信息. 用三个不同的uri来表示, 并且提供了基于本地文件系统的默认实现,type为local, 比如 local://data 就是存放到当前文件夹的data目录,local:///home/disk1/data 就是存放在 /home/disk1/data中。libraft中有默认的local://实现,用户可以根据需要继承实现相应的Storage。
1.4将操作提交到复制组
你需要将你的操作序列化成IOBuf, 这是一个非连续零拷贝的缓存结构。构造一个Task, 并且向braft::Node提交
#include
...
void function(op, callback) {
butil::IOBuf data;
serialize(op, &data);
braft::Task task;
task.data = &data;
task.done = make_closure(callback);
task.expected_term = expected_term;
return _node->apply(task);
}
二、选举阶段
2.1选举超时
在调用raft node的init初始化节点的时候,会初始化四个定时器。
CHECK_EQ(0, _vote_timer.init(this, options.election_timeout_ms));
CHECK_EQ(0, _election_timer.init(this, options.election_timeout_ms));
CHECK_EQ(0, _stepdown_timer.init(this, options.election_timeout_ms));
CHECK_EQ(0, _snapshot_timer.init(this, options.snapshot_interval_s * 1000));
如果设置的conf不为空,就会调用step_down将自己_state初始化为follower并启动_election_timer。
如果conf里面只有自己一个server,就会立马调用elect_self成为leader。
选举超时的时候_election_timer会被触发,然后去调用NodeImpl::handle_election_timeout函数:
// Reset leader as the leader is uncerntain on election timeout.
PeerId empty_id;
butil::Status status;
status.set_error(ERAFTTIMEDOUT, "Lost connection from leader %s",
_leader_id.to_string().c_str());
reset_leader_id(empty_id, status);
return pre_vote(&lck);
该函数重置当前leader为空,并发起prevote.
2.2prevote
与论文不同,在 braft 代码中,选举之前会有一次预选举(prevote)的过程,来源于 raft 作者的博士论文。
在基础的 raft 算法中,当一个 follower 节点与其他节点发生网络分区时,由于心跳超时,会主动发起一次选举,每次选举时会把 term 加一。由于网络分区的存在,每次 RequestVote RPC 都会超时,结果是,一直不断地发起新的选举,term 会不断增大。
在网络分区恢复,重新加入集群后,其 term 值会被其他节点知晓,导致其他节点更新自己的 term,并变为 follower。然后触发重新选举,但被隔离的节点日志不是最新,并不会竞选成功,整个集群的状态被该节点扰乱。
Prevote 算法是 raft 作者在其博士论文中提出的,在节点发起一次选举时,会先发起一次 prevote 请求,判断是否能够赢得选举,赢得选举的条件与正常选举相同。如果可以,则增加 term 值,并发起正常的选举。
超时节点发起prevote
prevote的代码如下:
void NodeImpl::pre_vote(std::unique_lock* lck) {
LOG(INFO) << "node " << _group_id << ":" << _server_id
<< " term " << _current_term << " start pre_vote";
...
int64_t old_term = _current_term;
// get last_log_id outof node mutex
lck->unlock();
const LogId last_log_id = _log_manager->last_log_id(true);
lck->lock();
// pre_vote need defense ABA after unlock&lock
if (old_term != _current_term) {
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " raise term " << _current_term << " when get last_log_id";
return;
}
_pre_vote_ctx.init(_conf.conf, _conf.stable() ? NULL : &_conf.old_conf);
std::set peers;
_conf.list_peers(&peers);
for (std::set::const_iterator
iter = peers.begin(); iter != peers.end(); ++iter) {
if (*iter == _server_id) {
continue;
}
brpc::ChannelOptions options;
options.connection_type = brpc::CONNECTION_TYPE_SINGLE;
options.max_retry = 0;
brpc::Channel channel;
if (0 != channel.Init(iter->addr, &options)) {
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " channel init failed, addr " << iter->addr;
continue;
}
OnPreVoteRPCDone* done = new OnPreVoteRPCDone(*iter, _current_term, this);
done->cntl.set_timeout_ms(_options.election_timeout_ms);
done->request.set_group_id(_group_id);
done->request.set_server_id(_server_id.to_string());
done->request.set_peer_id(iter->to_string());
done->request.set_term(_current_term + 1); // next term
done->request.set_last_log_index(last_log_id.index);
done->request.set_last_log_term(last_log_id.term);
RaftService_Stub stub(&channel);
stub.pre_vote(&done->cntl, &done->request, &done->response, done);
}
_pre_vote_ctx.grant(_server_id);
if (_pre_vote_ctx.granted()) {
elect_self(lck);
}
}
它会遍历所有peer,向它们发送preVote Rpc请求,回调为 OnPreVoteRPCDone,其 run 函数会调用 NodeImpl::handle_pre_vote_response。
收到rpc的节点处理请求
下面先看一下RaftService里面的pre_vote收到RPC之后做了什么:
void RaftServiceImpl::pre_vote(google::protobuf::RpcController* cntl_base,
const RequestVoteRequest* request,
RequestVoteResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl =
static_cast<:controller>(cntl_base);
PeerId peer_id;
if (0 != peer_id.parse(request->peer_id())) {
cntl->SetFailed(EINVAL, "peer_id invalid");
return;
}
scoped_refptr node_ptr = NodeManager::GetInstance()->get(request->group_id(),
peer_id);
NodeImpl* node = node_ptr.get();
if (!node) {
cntl->SetFailed(ENOENT, "peer_id not exist");
return;
}
// TODO: should return butil::Status
int rc = node->handle_pre_vote_request(request, response);
if (rc != 0) {
cntl->SetFailed(rc, "%s", berror(rc));
return;
}
}
通过request里面的group_id和peer_id得到对应的node,然后调用它的handle_pre_vote_request。该函数从request中取出candidateId,然后比较request里面的term和自己的当前term。如果对方的term比自己的小,则设置response的granted为false。否则比较两者的日志,对方的日志较新才会granted。
int NodeImpl::handle_pre_vote_request(const RequestVoteRequest* request,
RequestVoteResponse* response) {
std::unique_lock lck(_mutex);
...
PeerId candidate_id;
if (0 != candidate_id.parse(request->server_id())) {
...
return EINVAL;
}
bool granted = false;
do {
if (request->term() < _current_term) {
// ignore older term
...
break;
}
// get last_log_id outof node mutex
lck.unlock();
LogId last_log_id = _log_manager->last_log_id(true);
lck.lock();
// pre_vote not need ABA check after unlock&lock
granted = (LogId(request->last_log_index(), request->last_log_term())
>= last_log_id);
...
} while (0);
response->set_term(_current_term);
response->set_granted(granted);
return 0;
}
发起rpc的节点收到response后处理请求
发起prevote的node在收到RPC响应后会调用回调,也就是NodeImpl::handle_pre_vote_response。看一下它做了什么:
void NodeImpl::handle_pre_vote_response(const PeerId& peer_id, const int64_t term,
const RequestVoteResponse& response) {
std::unique_lock lck(_mutex);
...
// check response term
if (response.term() > _current_term) {
...
butil::Status status;
status.set_error(EHIGHERTERMRESPONSE, "Raft node receives higher term "
"pre_vote_response.");
step_down(response.term(), false, status);
return;
}
...
// check if the quorum granted
if (response.granted()) {
_pre_vote_ctx.grant(peer_id);
if (_pre_vote_ctx.granted()) {
elect_self(&lck);
}
}
}
检查response中的term,如果大于自身的term,则会通过 step_down 退位成 follower 状态,并设置 term 值。如果response.granted为true,说明收到选票,更新_pre_vote_ctx,_pre_vote_ctx中存储着当前的投票情况,它的成员_quorum初始化为peer数量的一半加一,每次被grant就会减一,_pre_vote_ctx.granted是检查_quorum是否小于等于0。_pre_vote_ctx.granted为true,说明获得了大多数的选票,调用elect_self开始选举。
2.3开始选举
candidate节点发起rpc请求
elect_self和prevote很像,下面分块介绍该函数的工作:
1、如果当前是follower状态,则停止_election_timer:
// cancel follower election timer
if (_state == STATE_FOLLOWER) {
BRAFT_VLOG << "node " << _group_id << ":" << _server_id
<< " term " << _current_term << " stop election_timer";
_election_timer.stop();
}
2、将leader设置为空:
// reset leader_id before vote
PeerId empty_id;
butil::Status status;
status.set_error(ERAFTTIMEDOUT,
"A follower's leader_id is reset to NULL "
"as it begins to request_vote.");
reset_leader_id(empty_id, status);
3、把状态设置为candidate,_current_term加一,然后给自己投票
_state = STATE_CANDIDATE;
_current_term++;
_voted_id = _server_id;
4、启动_vote_timer,该定时器负责选举阶段的超时。
_vote_timer.start();
_vote_ctx.init(_conf.conf, _conf.stable() ? NULL : &_conf.old_conf);
5、获取最新的log:
int64_t old_term = _current_term;
// get last_log_id outof node mutex
lck->unlock();
const LogId last_log_id = _log_manager->last_log_id(true);
lck->lock();
// vote need defense ABA after unlock&lock
if (old_term != _current_term) {
// term changed cause by step_down
...
return;
}
6、接下来的部分和prevote类似,获取所有peer,并向其他节点发起RequestVoteRPC。不过它调用的是RaftService的request_vote函数,回调是OnRequestVoteRPCDone。 7、给自己投票,并检查投票结果:
_vote_ctx.grant(_server_id);
if (_vote_ctx.granted()) {
become_leader();
}
收到RequestVoteRPC的节点处理请求
下面先看一下RaftService里面的request_vote函数收到请求后,会和prevote一样的方法获取到对应的node,然后调用node的handle_request_vote_request。下面分步讲解handle_request_vote_request做的工作:
1、从request中获取candidate_id:
PeerId candidate_id;
if (0 != candidate_id.parse(request->server_id())) {
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " received RequestVote from " << request->server_id()
<< " server_id bad format";
return EINVAL;
}
2、如果request中的term大于自己的term,则回退到follower状态并重启election_timeout,如果等于,就继续,如果小于就直接跳到5:
if (request->term() >= _current_term) {
...
// incress current term, change state to follower
if (request->term() > _current_term) {
butil::Status status;
status.set_error(EHIGHERTERMREQUEST, "Raft node receives higher term "
"request_vote_request.");
step_down(request->term(), false, status);
}
3、获取最新log_id:
// get last_log_id outof node mutex
lck.unlock();
LogId last_log_id = _log_manager->last_log_id(true);
lck.lock();
// vote need ABA check after unlock&lock
if (request->term() != _current_term) {
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " raise term " << _current_term << " when get last_log_id";
break;
}
4、如果request中的log比自身的新,而且当前节点还没投票的话,就回退到Follower并给candidate投票。
bool log_is_ok = (LogId(request->last_log_index(), request->last_log_term())
>= last_log_id);
// save
if (log_is_ok && _voted_id.is_empty()) {
butil::Status status;
status.set_error(EVOTEFORCANDIDATE, "Raft node votes for some candidate, "
"step down to restart election_timer.");
step_down(request->term(), false, status);
_voted_id = candidate_id;
//TODO: outof lock
_meta_storage->set_votedfor(candidate_id);
}
5、设置response中的term和granted
response->set_term(_current_term);
response->set_granted(request->term() == _current_term && _voted_id == candidate_id);
发起rpc的节点收到response后处理请求
发起选举的node在收到RPC响应后会调用回调,也就是NodeImpl::handle_request_vote_response。看一下它做了什么:
1、首先要确认当前状态是不是candidate(因为可能选举已经成功,节点已经成为leader了)。然后检查term是不是等于当前term(有可能收到来自上一次rpc的response)。
BAIDU_SCOPED_LOCK(_mutex);
// check state
if (_state != STATE_CANDIDATE) {
...
return;
}
// check stale response
if (term != _current_term) {
...
return;
}
2、如果收到的term大于自身的term,则回退到follower状态。
// check response term
if (response.term() > _current_term) {
...
butil::Status status;
status.set_error(EHIGHERTERMRESPONSE, "Raft node receives higher term "
"request_vote_response.");
step_down(response.term(), false, status);
return;
}
3、最后检查response的granted,设置 _vote_ctx的granted,然后检查当前是否赢得选举(策略和prevote一样),如果赢了,就调用become_leader
// check if the quorum granted
if (response.granted()) {
_vote_ctx.grant(peer_id);
if (_vote_ctx.granted()) {
become_leader();
}
}
投票超时
如果vote_timer被触发的时候节点还处于candidate状态(也就是没选出leader)的话,就会调用handle_vote_timeout:
void NodeImpl::handle_vote_timeout() {
std::unique_lock lck(_mutex);
// check state
if (_state != STATE_CANDIDATE) {
return;
}
if (FLAGS_raft_step_down_when_vote_timedout) {
// step down to follower
...
butil::Status status;
status.set_error(ERAFTTIMEDOUT, "Fail to get quorum vote-granted");
step_down(_current_term, false, status);
pre_vote(&lck);
} else {
// retry vote
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " term " << _current_term << " retry elect";
elect_self(&lck);
}
}
如果设置了raft_step_down_when_vote_timedout,就回退到follower开始新的prevote,否则就直接开始新的选举。
三、Leader心跳保持
3.1 become_leader
candidate赢得选举之后会调用become_leader,become_leader做的工作如下:
void NodeImpl::become_leader() {
CHECK(_state == STATE_CANDIDATE);
...
// cancel candidate vote timer
_vote_timer.stop();
_state = STATE_LEADER;
_leader_id = _server_id;
_replicator_group.reset_term(_current_term);
std::set peers;
_conf.list_peers(&peers);
for (std::set::const_iterator
iter = peers.begin(); iter != peers.end(); ++iter) {
if (*iter == _server_id) {
continue;
}
BRAFT_VLOG << "node " << _group_id << ":" << _server_id
<< " term " << _current_term
<< " add replicator " << *iter;
//TODO: check return code
_replicator_group.add_replicator(*iter);
}
// init commit manager
_ballot_box->reset_pending_index(_log_manager->last_log_index() + 1);
// Register _conf_ctx to reject configuration changing before the first log
// is committed.
CHECK(!_conf_ctx.is_busy());
_conf_ctx.flush(_conf.conf, _conf.old_conf);
_stepdown_timer.start();
}
首先停止vote_timer,然后把state设置为leader,_leader_id设置为自己。然后把_replicator_group的term设置为当前term,并把其他peer添加到_replicator_group里面。添加的时候会给每个peer分配一个Replicator,并调用Replicator::start。
3.2Replicator::start
下面看一下Replicator::start做了什么:
int Replicator::start(const ReplicatorOptions& options, ReplicatorId *id) {
...
Replicator* r = new Replicator();
brpc::ChannelOptions channel_opt;
//channel_opt.connect_timeout_ms = *options.heartbeat_timeout_ms;
channel_opt.timeout_ms = -1; // We don't need RPC timeout
if (r->_sending_channel.Init(options.peer_id.addr, &channel_opt) != 0) {
LOG(ERROR) << "Fail to init sending channel"
<< ", group " << options.group_id;
delete r;
return -1;
}
// bind lifecycle with node, AddRef
// Replicator stop is async
options.node->AddRef();
r->_options = options;
r->_next_index = r->_options.log_manager->last_log_index() + 1;
if (bthread_id_create(&r->_id, r, _on_error) != 0) {
LOG(ERROR) << "Fail to create bthread_id"
<< ", group " << options.group_id;
delete r;
return -1;
}
bthread_id_lock(r->_id, NULL);
if (id) {
*id = r->_id.value;
}
LOG(INFO) << "Replicator=" << r->_id << "@" << r->_options.peer_id << " is started"
<< ", group " << r->_options.group_id;
r->_catchup_closure = NULL;
r->_last_rpc_send_timestamp = butil::monotonic_time_ms();
r->_start_heartbeat_timer(butil::gettimeofday_us());
// Note: r->_id is unlock in _send_empty_entries, don't touch r ever after
r->_send_empty_entries(false);
return 0;
}
开始heartbeat_timer并发送空的entries通知follower自己的leader身份。其他节点收到第一个空的entry就会回退成follower并把自己的leader_id设置成request里面包含的serverId。
3.3发送心跳
心跳定时器
在Replicator::start里面开始了heartbeat_timer,它是个bthread_timer,在超时的时候会调用Replicator::_on_timedout,该函数会把对应的id设置为ETIMEDOUT:
void Replicator::_on_timedout(void* arg) {
bthread_id_t id = { (uint64_t)arg };
bthread_id_error(id, ETIMEDOUT);
}
bthread_id_error会去调用_on_error,然后开始_send_heartbeat:
else if (error_code == ETIMEDOUT) {
// This error is issued in the TimerThread, start a new bthread to avoid
// blocking the caller.
// Unlock id to remove the context-switch out of the critical section
CHECK_EQ(0, bthread_id_unlock(id)) << "Fail to unlock" << id;
bthread_t tid;
if (bthread_start_urgent(&tid, NULL, _send_heartbeat,
reinterpret_cast(id.value)) != 0) {
PLOG(ERROR) << "Fail to start bthread";
_send_heartbeat(reinterpret_cast(id.value));
}
return 0;
发送心跳
_send_heartbeat会去调用Replicator::_send_empty_entries发送心跳:
void Replicator::_send_empty_entries(bool is_heartbeat) {
std::unique_ptr<:controller> cntl(new brpc::Controller);
std::unique_ptr request(new AppendEntriesRequest);
std::unique_ptr response(new AppendEntriesResponse);
...
if (is_heartbeat) {
_heartbeat_in_fly = cntl->call_id();
_heartbeat_counter++;
// set RPC timeout for heartbeat, how long should timeout be is waiting to be optimized.
cntl->set_timeout_ms(*_options.election_timeout_ms / 2);
} else {
...
}
...
google::protobuf::Closure* done = brpc::NewCallback(
is_heartbeat ? _on_heartbeat_returned : _on_rpc_returned,
_id.value, cntl.get(), request.get(), response.get(),
butil::monotonic_time_ms());
RaftService_Stub stub(&_sending_channel);
stub.append_entries(cntl.release(), request.release(),
response.release(), done);
CHECK_EQ(0, bthread_id_unlock(_id)) << "Fail to unlock " << _id;
}
收到心跳
对应的服务收到RPC后,会获取对应的node,并调用handle_append_entries_request。该函数的步骤如下:
1、从request中获取server_id并检查term,如果request中的term小于自己的term,则将response的success设置为false,term设置为自己的term并返回 2、否则回退成leader并将_leader_id设置为server_id 3、更新_last_leader_timestamp(election_timout被触发的时候会通过检查这个时间来判断是否超时)
if (!from_append_entries_cache) {
// Requests from cache already updated timestamp
_last_leader_timestamp = butil::monotonic_time_ms();
}
}
4、如果节点正在安装快照,则返回失败
if (request->entries_size() > 0 &&
(_snapshot_executor
&& _snapshot_executor->is_installing_snapshot())) {
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " received append entries while installing snapshot";
cntl->SetFailed(EBUSY, "Is installing snapshot");
return;
}
5、设置response并返回
if (request->entries_size() == 0) {
response->set_success(true);
response->set_term(_current_term);
response->set_last_log_index(_log_manager->last_log_index());
response->set_readonly(_node_readonly);
lck.unlock();
// see the comments at FollowerStableClosure::run()
_ballot_box->set_last_committed_index(
std::min(request->committed_index(),
prev_log_index));
return;
}
收到心跳响应
收到心跳之后会调用_on_heartbeat_returned回调,如果response里面的term大于当前term,则更新term并回退到follower。否则重启heartbeat_timer开始下一轮heartbeat。
参考文献
Notes
如有理解和描述上有疏漏或者错误的地方,欢迎共同交流;参考已经在参考文献中注明,但仍有可能有疏漏的地方,有任何侵权或者不明确的地方,欢迎指出,必定及时更正或者删除;文章供于学习交流,转载注明出处。
最后
以上就是粗心方盒最近收集整理的关于brafteditor防抖_braft源码分析(一)选举和心跳保持部分的全部内容,更多相关brafteditor防抖_braft源码分析(一)选举和心跳保持部分内容请搜索靠谱客的其他文章。
发表评论 取消回复