我是靠谱客的博主 粗心方盒,这篇文章主要介绍brafteditor防抖_braft源码分析(一)选举和心跳保持部分,现在分享给大家,希望可以做个参考。

RAFT是一种新型易于理解的分布式一致性复制协议,由斯坦福大学的Diego Ongaro和John Ousterhout提出,作为RAMCloud项目中的中心协调组件。Raft是一种Leader-Based的Multi-Paxos变种,相比Paxos、Zab、View Stamped Replication等协议提供了更完整更清晰的协议描述,并提供了清晰的节点增删描述。RAFT是一种新型易于理解的分布式一致性复制协议,由斯坦福大学的Diego Ongaro和John Ousterhout提出,作为RAMCloud项目中的中心协调组件。Raft是一种Leader-Based的Multi-Paxos变种,相比Paxos、Zab、View Stamped Replication等协议提供了更完整更清晰的协议描述,并提供了清晰的节点增删描述。

一、使用braft

1.1注册并启动Server

braft需要运行在具体的brpc server里面,你可以让braft和你的业务共享同样的端口, 也可以将braft启动到不同的端口中。 brpc允许一个端口上注册多个逻辑Service, 如果你的Service同样运行在brpc Server里面,你可以管理brpc Server并且调用以下任意一个接口将braft相关的Service加入到你的Server中。这样能让braft和你的业务跑在同样的端口里面, 降低运维的复杂度。

int add_service(brpc::Server* server, const butil::EndPoint& listen_addr);

int add_service(brpc::Server* server, int port);

int add_service(brpc::Server* server, const char* const butil::EndPoint& listen_addr);

add_service的部分代码如下,负责把braft相关的service添加到brpc server里面。RaftServiceImpl主要和raft协议有关,有一些选举、append_entries和快照相关的接口。CliServiceImpl负责管理braft相关的工作,比如add_peer,get_leader,transfer_leader等操作。

int NodeManager::add_service(brpc::Server* server,

const butil::EndPoint& listen_address) {

...

if (0 != server->AddService(

new RaftServiceImpl(listen_address),

brpc::SERVER_OWNS_SERVICE)) {

LOG(ERROR) << "Fail to add RaftService";

return -1;

}

...

if (0 != server->AddService(new CliServiceImpl, brpc::SERVER_OWNS_SERVICE)) {

LOG(ERROR) << "Fail to add CliService";

return -1;

}

{

BAIDU_SCOPED_LOCK(_mutex);

_addr_set.insert(listen_address);

}

return 0;

}

1.2实现业务状态机

需要继承braft::StateMachine并且实现里面的接口

#include

class YourStateMachineImple : public braft::StateMachine {

protected:

// on_apply是*必须*实现的

// on_apply会在一条或者多条日志被多数节点持久化之后调用, 通知用户将这些日志所表示的操作应用到业务状态机中.

// 通过iter, 可以从遍历所有未处理但是已经提交的日志, 如果你的状态机支持批量更新,可以一次性获取多

// 条日志提高状态机的吞吐.

//

void on_apply(braft::Iterator& iter) {

for (; iter.valid(); iter.next()) {

// This guard helps invoke iter.done()->Run() asynchronously to

// avoid that callback blocks the StateMachine.

braft::AsyncClosureGuard closure_guard(iter.done());

// Parse operation from iter.data() and execute this operation

// op = parse(iter.data());

// result = process(op)

// The purpose of following logs is to help you understand the way

// this StateMachine works.

// Remove these logs in performance-sensitive servers.

LOG_IF(INFO, FLAGS_log_applied_task)

<< "Exeucted operation " << op

<< " and the result is " << result

<< " at log_index=" << iter.index();

}

}

// 当这个braft节点被shutdown之后, 当所有的操作都结束, 会调用on_shutdown, 来通知用户这个状态机不再被使用。

// 这时候你可以安全的释放一些资源了.

virtual void on_shutdown() {

// Cleanup resources you'd like

}

1.3构造braft::Node

一个Node代表了一个RAFT实例, Node的ID由两个部分组成:GroupId: 为一个string, 表示这个复制组的ID.

PeerId, 结构是一个EndPoint表示对外服务的端口, 外加一个index(默认为0). 其中index的作用是让不同的副本能运行在同一个进程内, 在下面几个场景中,这个值不能忽略:

Node(const GroupId& group_id, const PeerId& peer_id);

启动这个节点:

// Starts this node

int start() {

butil::EndPoint addr(butil::my_ip(), FLAGS_port);

braft::NodeOptions node_options;

if (node_options.initial_conf.parse_from(FLAGS_conf) != 0) {

LOG(ERROR) << "Fail to parse configuration `" << FLAGS_conf << ''';

return -1;

}

node_options.election_timeout_ms = FLAGS_election_timeout_ms;

node_options.fsm = this;

node_options.node_owns_fsm = false;

node_options.snapshot_interval_s = FLAGS_snapshot_interval;

std::string prefix = "local://" + FLAGS_data_path;

node_options.log_uri = prefix + "/log";

node_options.raft_meta_uri = prefix + "/raft_meta";

node_options.snapshot_uri = prefix + "/snapshot";

node_options.disable_cli = FLAGS_disable_cli;

braft::Node* node = new braft::Node(FLAGS_group, braft::PeerId(addr));

if (node->init(node_options) != 0) {

LOG(ERROR) << "Fail to init raft node";

delete node;

return -1;

}

_node = node;

return 0;

}initial_conf只有在这个复制组从空节点启动才会生效,当有snapshot和log里的数据不为空的时候的时候从其中恢复Configuration。initial_conf只用于创建复制组,第一个节点将自己设置进initial_conf,再调用add_peer添加其他节点,其他节点initial_conf设置为空;也可以多个节点同时设置相同的inital_conf(多个节点的ip:port)来同时启动空节点。

RAFT需要三种不同的持久存储, 分别是:RaftMetaStorage, 用来存放一些RAFT算法自身的状态数据, 比如term, vote_for等信息.

LogStorage, 用来存放用户提交的WAL

SnapshotStorage, 用来存放用户的Snapshot以及元信息. 用三个不同的uri来表示, 并且提供了基于本地文件系统的默认实现,type为local, 比如 local://data 就是存放到当前文件夹的data目录,local:///home/disk1/data 就是存放在 /home/disk1/data中。libraft中有默认的local://实现,用户可以根据需要继承实现相应的Storage。

1.4将操作提交到复制组

你需要将你的操作序列化成IOBuf, 这是一个非连续零拷贝的缓存结构。构造一个Task, 并且向braft::Node提交

#include

...

void function(op, callback) {

butil::IOBuf data;

serialize(op, &data);

braft::Task task;

task.data = &data;

task.done = make_closure(callback);

task.expected_term = expected_term;

return _node->apply(task);

}

二、选举阶段

2.1选举超时

在调用raft node的init初始化节点的时候,会初始化四个定时器。

CHECK_EQ(0, _vote_timer.init(this, options.election_timeout_ms));

CHECK_EQ(0, _election_timer.init(this, options.election_timeout_ms));

CHECK_EQ(0, _stepdown_timer.init(this, options.election_timeout_ms));

CHECK_EQ(0, _snapshot_timer.init(this, options.snapshot_interval_s * 1000));

如果设置的conf不为空,就会调用step_down将自己_state初始化为follower并启动_election_timer。

如果conf里面只有自己一个server,就会立马调用elect_self成为leader。

选举超时的时候_election_timer会被触发,然后去调用NodeImpl::handle_election_timeout函数:

// Reset leader as the leader is uncerntain on election timeout.

PeerId empty_id;

butil::Status status;

status.set_error(ERAFTTIMEDOUT, "Lost connection from leader %s",

_leader_id.to_string().c_str());

reset_leader_id(empty_id, status);

return pre_vote(&lck);

该函数重置当前leader为空,并发起prevote.

2.2prevote

与论文不同,在 braft 代码中,选举之前会有一次预选举(prevote)的过程,来源于 raft 作者的博士论文。

在基础的 raft 算法中,当一个 follower 节点与其他节点发生网络分区时,由于心跳超时,会主动发起一次选举,每次选举时会把 term 加一。由于网络分区的存在,每次 RequestVote RPC 都会超时,结果是,一直不断地发起新的选举,term 会不断增大。

在网络分区恢复,重新加入集群后,其 term 值会被其他节点知晓,导致其他节点更新自己的 term,并变为 follower。然后触发重新选举,但被隔离的节点日志不是最新,并不会竞选成功,整个集群的状态被该节点扰乱。

Prevote 算法是 raft 作者在其博士论文中提出的,在节点发起一次选举时,会先发起一次 prevote 请求,判断是否能够赢得选举,赢得选举的条件与正常选举相同。如果可以,则增加 term 值,并发起正常的选举。

超时节点发起prevote

prevote的代码如下:

void NodeImpl::pre_vote(std::unique_lock* lck) {

LOG(INFO) << "node " << _group_id << ":" << _server_id

<< " term " << _current_term << " start pre_vote";

...

int64_t old_term = _current_term;

// get last_log_id outof node mutex

lck->unlock();

const LogId last_log_id = _log_manager->last_log_id(true);

lck->lock();

// pre_vote need defense ABA after unlock&lock

if (old_term != _current_term) {

LOG(WARNING) << "node " << _group_id << ":" << _server_id

<< " raise term " << _current_term << " when get last_log_id";

return;

}

_pre_vote_ctx.init(_conf.conf, _conf.stable() ? NULL : &_conf.old_conf);

std::set peers;

_conf.list_peers(&peers);

for (std::set::const_iterator

iter = peers.begin(); iter != peers.end(); ++iter) {

if (*iter == _server_id) {

continue;

}

brpc::ChannelOptions options;

options.connection_type = brpc::CONNECTION_TYPE_SINGLE;

options.max_retry = 0;

brpc::Channel channel;

if (0 != channel.Init(iter->addr, &options)) {

LOG(WARNING) << "node " << _group_id << ":" << _server_id

<< " channel init failed, addr " << iter->addr;

continue;

}

OnPreVoteRPCDone* done = new OnPreVoteRPCDone(*iter, _current_term, this);

done->cntl.set_timeout_ms(_options.election_timeout_ms);

done->request.set_group_id(_group_id);

done->request.set_server_id(_server_id.to_string());

done->request.set_peer_id(iter->to_string());

done->request.set_term(_current_term + 1); // next term

done->request.set_last_log_index(last_log_id.index);

done->request.set_last_log_term(last_log_id.term);

RaftService_Stub stub(&channel);

stub.pre_vote(&done->cntl, &done->request, &done->response, done);

}

_pre_vote_ctx.grant(_server_id);

if (_pre_vote_ctx.granted()) {

elect_self(lck);

}

}

它会遍历所有peer,向它们发送preVote Rpc请求,回调为 OnPreVoteRPCDone,其 run 函数会调用 NodeImpl::handle_pre_vote_response。

收到rpc的节点处理请求

下面先看一下RaftService里面的pre_vote收到RPC之后做了什么:

void RaftServiceImpl::pre_vote(google::protobuf::RpcController* cntl_base,

const RequestVoteRequest* request,

RequestVoteResponse* response,

google::protobuf::Closure* done) {

brpc::ClosureGuard done_guard(done);

brpc::Controller* cntl =

static_cast<:controller>(cntl_base);

PeerId peer_id;

if (0 != peer_id.parse(request->peer_id())) {

cntl->SetFailed(EINVAL, "peer_id invalid");

return;

}

scoped_refptr node_ptr = NodeManager::GetInstance()->get(request->group_id(),

peer_id);

NodeImpl* node = node_ptr.get();

if (!node) {

cntl->SetFailed(ENOENT, "peer_id not exist");

return;

}

// TODO: should return butil::Status

int rc = node->handle_pre_vote_request(request, response);

if (rc != 0) {

cntl->SetFailed(rc, "%s", berror(rc));

return;

}

}

通过request里面的group_id和peer_id得到对应的node,然后调用它的handle_pre_vote_request。该函数从request中取出candidateId,然后比较request里面的term和自己的当前term。如果对方的term比自己的小,则设置response的granted为false。否则比较两者的日志,对方的日志较新才会granted。

int NodeImpl::handle_pre_vote_request(const RequestVoteRequest* request,

RequestVoteResponse* response) {

std::unique_lock lck(_mutex);

...

PeerId candidate_id;

if (0 != candidate_id.parse(request->server_id())) {

...

return EINVAL;

}

bool granted = false;

do {

if (request->term() < _current_term) {

// ignore older term

...

break;

}

// get last_log_id outof node mutex

lck.unlock();

LogId last_log_id = _log_manager->last_log_id(true);

lck.lock();

// pre_vote not need ABA check after unlock&lock

granted = (LogId(request->last_log_index(), request->last_log_term())

>= last_log_id);

...

} while (0);

response->set_term(_current_term);

response->set_granted(granted);

return 0;

}

发起rpc的节点收到response后处理请求

发起prevote的node在收到RPC响应后会调用回调,也就是NodeImpl::handle_pre_vote_response。看一下它做了什么:

void NodeImpl::handle_pre_vote_response(const PeerId& peer_id, const int64_t term,

const RequestVoteResponse& response) {

std::unique_lock lck(_mutex);

...

// check response term

if (response.term() > _current_term) {

...

butil::Status status;

status.set_error(EHIGHERTERMRESPONSE, "Raft node receives higher term "

"pre_vote_response.");

step_down(response.term(), false, status);

return;

}

...

// check if the quorum granted

if (response.granted()) {

_pre_vote_ctx.grant(peer_id);

if (_pre_vote_ctx.granted()) {

elect_self(&lck);

}

}

}

检查response中的term,如果大于自身的term,则会通过 step_down 退位成 follower 状态,并设置 term 值。如果response.granted为true,说明收到选票,更新_pre_vote_ctx,_pre_vote_ctx中存储着当前的投票情况,它的成员_quorum初始化为peer数量的一半加一,每次被grant就会减一,_pre_vote_ctx.granted是检查_quorum是否小于等于0。_pre_vote_ctx.granted为true,说明获得了大多数的选票,调用elect_self开始选举。

2.3开始选举

candidate节点发起rpc请求

elect_self和prevote很像,下面分块介绍该函数的工作:

1、如果当前是follower状态,则停止_election_timer:

// cancel follower election timer

if (_state == STATE_FOLLOWER) {

BRAFT_VLOG << "node " << _group_id << ":" << _server_id

<< " term " << _current_term << " stop election_timer";

_election_timer.stop();

}

2、将leader设置为空:

// reset leader_id before vote

PeerId empty_id;

butil::Status status;

status.set_error(ERAFTTIMEDOUT,

"A follower's leader_id is reset to NULL "

"as it begins to request_vote.");

reset_leader_id(empty_id, status);

3、把状态设置为candidate,_current_term加一,然后给自己投票

_state = STATE_CANDIDATE;

_current_term++;

_voted_id = _server_id;

4、启动_vote_timer,该定时器负责选举阶段的超时。

_vote_timer.start();

_vote_ctx.init(_conf.conf, _conf.stable() ? NULL : &_conf.old_conf);

5、获取最新的log:

int64_t old_term = _current_term;

// get last_log_id outof node mutex

lck->unlock();

const LogId last_log_id = _log_manager->last_log_id(true);

lck->lock();

// vote need defense ABA after unlock&lock

if (old_term != _current_term) {

// term changed cause by step_down

...

return;

}

6、接下来的部分和prevote类似,获取所有peer,并向其他节点发起RequestVoteRPC。不过它调用的是RaftService的request_vote函数,回调是OnRequestVoteRPCDone。 7、给自己投票,并检查投票结果:

_vote_ctx.grant(_server_id);

if (_vote_ctx.granted()) {

become_leader();

}

收到RequestVoteRPC的节点处理请求

下面先看一下RaftService里面的request_vote函数收到请求后,会和prevote一样的方法获取到对应的node,然后调用node的handle_request_vote_request。下面分步讲解handle_request_vote_request做的工作:

1、从request中获取candidate_id:

PeerId candidate_id;

if (0 != candidate_id.parse(request->server_id())) {

LOG(WARNING) << "node " << _group_id << ":" << _server_id

<< " received RequestVote from " << request->server_id()

<< " server_id bad format";

return EINVAL;

}

2、如果request中的term大于自己的term,则回退到follower状态并重启election_timeout,如果等于,就继续,如果小于就直接跳到5:

if (request->term() >= _current_term) {

...

// incress current term, change state to follower

if (request->term() > _current_term) {

butil::Status status;

status.set_error(EHIGHERTERMREQUEST, "Raft node receives higher term "

"request_vote_request.");

step_down(request->term(), false, status);

}

3、获取最新log_id:

// get last_log_id outof node mutex

lck.unlock();

LogId last_log_id = _log_manager->last_log_id(true);

lck.lock();

// vote need ABA check after unlock&lock

if (request->term() != _current_term) {

LOG(WARNING) << "node " << _group_id << ":" << _server_id

<< " raise term " << _current_term << " when get last_log_id";

break;

}

4、如果request中的log比自身的新,而且当前节点还没投票的话,就回退到Follower并给candidate投票。

bool log_is_ok = (LogId(request->last_log_index(), request->last_log_term())

>= last_log_id);

// save

if (log_is_ok && _voted_id.is_empty()) {

butil::Status status;

status.set_error(EVOTEFORCANDIDATE, "Raft node votes for some candidate, "

"step down to restart election_timer.");

step_down(request->term(), false, status);

_voted_id = candidate_id;

//TODO: outof lock

_meta_storage->set_votedfor(candidate_id);

}

5、设置response中的term和granted

response->set_term(_current_term);

response->set_granted(request->term() == _current_term && _voted_id == candidate_id);

发起rpc的节点收到response后处理请求

发起选举的node在收到RPC响应后会调用回调,也就是NodeImpl::handle_request_vote_response。看一下它做了什么:

1、首先要确认当前状态是不是candidate(因为可能选举已经成功,节点已经成为leader了)。然后检查term是不是等于当前term(有可能收到来自上一次rpc的response)。

BAIDU_SCOPED_LOCK(_mutex);

// check state

if (_state != STATE_CANDIDATE) {

...

return;

}

// check stale response

if (term != _current_term) {

...

return;

}

2、如果收到的term大于自身的term,则回退到follower状态。

// check response term

if (response.term() > _current_term) {

...

butil::Status status;

status.set_error(EHIGHERTERMRESPONSE, "Raft node receives higher term "

"request_vote_response.");

step_down(response.term(), false, status);

return;

}

3、最后检查response的granted,设置 _vote_ctx的granted,然后检查当前是否赢得选举(策略和prevote一样),如果赢了,就调用become_leader

// check if the quorum granted

if (response.granted()) {

_vote_ctx.grant(peer_id);

if (_vote_ctx.granted()) {

become_leader();

}

}

投票超时

如果vote_timer被触发的时候节点还处于candidate状态(也就是没选出leader)的话,就会调用handle_vote_timeout:

void NodeImpl::handle_vote_timeout() {

std::unique_lock lck(_mutex);

// check state

if (_state != STATE_CANDIDATE) {

return;

}

if (FLAGS_raft_step_down_when_vote_timedout) {

// step down to follower

...

butil::Status status;

status.set_error(ERAFTTIMEDOUT, "Fail to get quorum vote-granted");

step_down(_current_term, false, status);

pre_vote(&lck);

} else {

// retry vote

LOG(WARNING) << "node " << _group_id << ":" << _server_id

<< " term " << _current_term << " retry elect";

elect_self(&lck);

}

}

如果设置了raft_step_down_when_vote_timedout,就回退到follower开始新的prevote,否则就直接开始新的选举。

三、Leader心跳保持

3.1 become_leader

candidate赢得选举之后会调用become_leader,become_leader做的工作如下:

void NodeImpl::become_leader() {

CHECK(_state == STATE_CANDIDATE);

...

// cancel candidate vote timer

_vote_timer.stop();

_state = STATE_LEADER;

_leader_id = _server_id;

_replicator_group.reset_term(_current_term);

std::set peers;

_conf.list_peers(&peers);

for (std::set::const_iterator

iter = peers.begin(); iter != peers.end(); ++iter) {

if (*iter == _server_id) {

continue;

}

BRAFT_VLOG << "node " << _group_id << ":" << _server_id

<< " term " << _current_term

<< " add replicator " << *iter;

//TODO: check return code

_replicator_group.add_replicator(*iter);

}

// init commit manager

_ballot_box->reset_pending_index(_log_manager->last_log_index() + 1);

// Register _conf_ctx to reject configuration changing before the first log

// is committed.

CHECK(!_conf_ctx.is_busy());

_conf_ctx.flush(_conf.conf, _conf.old_conf);

_stepdown_timer.start();

}

首先停止vote_timer,然后把state设置为leader,_leader_id设置为自己。然后把_replicator_group的term设置为当前term,并把其他peer添加到_replicator_group里面。添加的时候会给每个peer分配一个Replicator,并调用Replicator::start。

3.2Replicator::start

下面看一下Replicator::start做了什么:

int Replicator::start(const ReplicatorOptions& options, ReplicatorId *id) {

...

Replicator* r = new Replicator();

brpc::ChannelOptions channel_opt;

//channel_opt.connect_timeout_ms = *options.heartbeat_timeout_ms;

channel_opt.timeout_ms = -1; // We don't need RPC timeout

if (r->_sending_channel.Init(options.peer_id.addr, &channel_opt) != 0) {

LOG(ERROR) << "Fail to init sending channel"

<< ", group " << options.group_id;

delete r;

return -1;

}

// bind lifecycle with node, AddRef

// Replicator stop is async

options.node->AddRef();

r->_options = options;

r->_next_index = r->_options.log_manager->last_log_index() + 1;

if (bthread_id_create(&r->_id, r, _on_error) != 0) {

LOG(ERROR) << "Fail to create bthread_id"

<< ", group " << options.group_id;

delete r;

return -1;

}

bthread_id_lock(r->_id, NULL);

if (id) {

*id = r->_id.value;

}

LOG(INFO) << "Replicator=" << r->_id << "@" << r->_options.peer_id << " is started"

<< ", group " << r->_options.group_id;

r->_catchup_closure = NULL;

r->_last_rpc_send_timestamp = butil::monotonic_time_ms();

r->_start_heartbeat_timer(butil::gettimeofday_us());

// Note: r->_id is unlock in _send_empty_entries, don't touch r ever after

r->_send_empty_entries(false);

return 0;

}

开始heartbeat_timer并发送空的entries通知follower自己的leader身份。其他节点收到第一个空的entry就会回退成follower并把自己的leader_id设置成request里面包含的serverId。

3.3发送心跳

心跳定时器

在Replicator::start里面开始了heartbeat_timer,它是个bthread_timer,在超时的时候会调用Replicator::_on_timedout,该函数会把对应的id设置为ETIMEDOUT:

void Replicator::_on_timedout(void* arg) {

bthread_id_t id = { (uint64_t)arg };

bthread_id_error(id, ETIMEDOUT);

}

bthread_id_error会去调用_on_error,然后开始_send_heartbeat:

else if (error_code == ETIMEDOUT) {

// This error is issued in the TimerThread, start a new bthread to avoid

// blocking the caller.

// Unlock id to remove the context-switch out of the critical section

CHECK_EQ(0, bthread_id_unlock(id)) << "Fail to unlock" << id;

bthread_t tid;

if (bthread_start_urgent(&tid, NULL, _send_heartbeat,

reinterpret_cast(id.value)) != 0) {

PLOG(ERROR) << "Fail to start bthread";

_send_heartbeat(reinterpret_cast(id.value));

}

return 0;

发送心跳

_send_heartbeat会去调用Replicator::_send_empty_entries发送心跳:

void Replicator::_send_empty_entries(bool is_heartbeat) {

std::unique_ptr<:controller> cntl(new brpc::Controller);

std::unique_ptr request(new AppendEntriesRequest);

std::unique_ptr response(new AppendEntriesResponse);

...

if (is_heartbeat) {

_heartbeat_in_fly = cntl->call_id();

_heartbeat_counter++;

// set RPC timeout for heartbeat, how long should timeout be is waiting to be optimized.

cntl->set_timeout_ms(*_options.election_timeout_ms / 2);

} else {

...

}

...

google::protobuf::Closure* done = brpc::NewCallback(

is_heartbeat ? _on_heartbeat_returned : _on_rpc_returned,

_id.value, cntl.get(), request.get(), response.get(),

butil::monotonic_time_ms());

RaftService_Stub stub(&_sending_channel);

stub.append_entries(cntl.release(), request.release(),

response.release(), done);

CHECK_EQ(0, bthread_id_unlock(_id)) << "Fail to unlock " << _id;

}

收到心跳

对应的服务收到RPC后,会获取对应的node,并调用handle_append_entries_request。该函数的步骤如下:

1、从request中获取server_id并检查term,如果request中的term小于自己的term,则将response的success设置为false,term设置为自己的term并返回 2、否则回退成leader并将_leader_id设置为server_id 3、更新_last_leader_timestamp(election_timout被触发的时候会通过检查这个时间来判断是否超时)

if (!from_append_entries_cache) {

// Requests from cache already updated timestamp

_last_leader_timestamp = butil::monotonic_time_ms();

}

}

4、如果节点正在安装快照,则返回失败

if (request->entries_size() > 0 &&

(_snapshot_executor

&& _snapshot_executor->is_installing_snapshot())) {

LOG(WARNING) << "node " << _group_id << ":" << _server_id

<< " received append entries while installing snapshot";

cntl->SetFailed(EBUSY, "Is installing snapshot");

return;

}

5、设置response并返回

if (request->entries_size() == 0) {

response->set_success(true);

response->set_term(_current_term);

response->set_last_log_index(_log_manager->last_log_index());

response->set_readonly(_node_readonly);

lck.unlock();

// see the comments at FollowerStableClosure::run()

_ballot_box->set_last_committed_index(

std::min(request->committed_index(),

prev_log_index));

return;

}

收到心跳响应

收到心跳之后会调用_on_heartbeat_returned回调,如果response里面的term大于当前term,则更新term并回退到follower。否则重启heartbeat_timer开始下一轮heartbeat。

参考文献

Notes

如有理解和描述上有疏漏或者错误的地方,欢迎共同交流;参考已经在参考文献中注明,但仍有可能有疏漏的地方,有任何侵权或者不明确的地方,欢迎指出,必定及时更正或者删除;文章供于学习交流,转载注明出处。

最后

以上就是粗心方盒最近收集整理的关于brafteditor防抖_braft源码分析(一)选举和心跳保持部分的全部内容,更多相关brafteditor防抖_braft源码分析(一)选举和心跳保持部分内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(41)

评论列表共有 0 条评论

立即
投稿
返回
顶部