概述
- 目的:
- 维持provider和consumer之间的长连接
- 实现:
- dubbo心跳时间heartbeat默认是60s,超过heartbeat时间没有收到消息,就发送心跳消息(provider,consumer一样),如果连着3次(heartbeatTimeout为heartbeat*3)没有收到心跳响应,provider会关闭channel,而consumer会进行重连;不论是provider还是consumer的心跳检测都是通过启动定时任务的方式实现;
- provider绑定和consumer连接的入口:
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
- provider启动心跳检测
public HeaderExchangeServer(Server server) {
if (server == null) {
throw new IllegalArgumentException("server == null");
}
this.server = server;
this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
//心跳超时时间默认为心跳时间的3倍
this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
//如果心跳超时时间小于心跳时间的两倍则抛异常
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}
startHeatbeatTimer();
}
- startHeatbeatTimer的实现
- 先停止已有的定时任务,启动新的定时任务
private void startHeatbeatTimer() {
// 停止原有定时任务
stopHeartbeatTimer();
// 发起新的定时任务
if (heartbeat > 0) {
heatbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
public Collection<Channel> getChannels() {
return Collections.unmodifiableCollection(HeaderExchangeServer.this.getChannels());
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS);
}
}
- HeartBeatTask的实现
- 遍历所有的channel,检测心跳间隔,如果超过心跳间隔没有读或写,则发送需要回复的心跳消息,最有判断是否心跳超时(heartbeatTimeout),如果超时,provider关闭channel,consumer进行重连
public void run() {
try {
long now = System.currentTimeMillis();
for (Channel channel : channelProvider.getChannels()) {
if (channel.isClosed()) {
continue;
}
try {
Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP);
Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
// 读写的时间,任一超过心跳间隔,发送心跳
if ((lastRead != null && now - lastRead > heartbeat)
|| (lastWrite != null && now - lastWrite > heartbeat)) {
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true); // 需要响应的心跳事件
req.setEvent(Request.HEARTBEAT_EVENT);
channel.send(req);
if (logger.isDebugEnabled()) {
logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
}
}
// 最后读的时间,超过心跳超时时间
if (lastRead != null && now - lastRead > heartbeatTimeout) {
logger.warn("Close channel " + channel
+ ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
// 客户端侧,重新连接服务端
if (channel instanceof Client) {
try {
((Client) channel).reconnect();
} catch (Exception e) {
//do nothing
}
// 服务端侧,关闭客户端连接
} else {
channel.close();
}
}
} catch (Throwable t) {
logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
}
}
} catch (Throwable t) {
logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
}
}
- consumer端的实现
- 默认需要心跳检测
public HeaderExchangeClient(Client client, boolean needHeartbeat) {
if (client == null) {
throw new IllegalArgumentException("client == null");
}
this.client = client;
// 创建 HeaderExchangeChannel 对象
this.channel = new HeaderExchangeChannel(client);
// 读取心跳相关配置
String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (heartbeatTimeout < heartbeat * 2) { // 避免间隔太短
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}
// 发起心跳定时器
if (needHeartbeat) {
startHeatbeatTimer();
}
}
我们可以看到dubbo的心跳检测,服务端会发送发送心跳包,客户端也会发送心跳包,与一般只有客户端发送心跳包,服务端接受心跳是有所不同的;
最后
以上就是仁爱帽子为你收集整理的dubbo心跳检测机制的全部内容,希望文章能够帮你解决dubbo心跳检测机制所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复