我是靠谱客的博主 仁爱帽子,这篇文章主要介绍dubbo心跳检测机制,现在分享给大家,希望可以做个参考。

  • 目的:
    • 维持provider和consumer之间的长连接
  • 实现:
    • dubbo心跳时间heartbeat默认是60s,超过heartbeat时间没有收到消息,就发送心跳消息(provider,consumer一样),如果连着3次(heartbeatTimeout为heartbeat*3)没有收到心跳响应,provider会关闭channel,而consumer会进行重连;不论是provider还是consumer的心跳检测都是通过启动定时任务的方式实现;
  • provider绑定和consumer连接的入口:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class HeaderExchanger implements Exchanger { public static final String NAME = "header"; @Override public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); } @Override public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } }
  • provider启动心跳检测
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public HeaderExchangeServer(Server server) { if (server == null) { throw new IllegalArgumentException("server == null"); } this.server = server; this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); //心跳超时时间默认为心跳时间的3倍 this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); //如果心跳超时时间小于心跳时间的两倍则抛异常 if (heartbeatTimeout < heartbeat * 2) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } startHeatbeatTimer(); }
  • startHeatbeatTimer的实现
    • 先停止已有的定时任务,启动新的定时任务
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void startHeatbeatTimer() { // 停止原有定时任务 stopHeartbeatTimer(); // 发起新的定时任务 if (heartbeat > 0) { heatbeatTimer = scheduled.scheduleWithFixedDelay( new HeartBeatTask(new HeartBeatTask.ChannelProvider() { public Collection<Channel> getChannels() { return Collections.unmodifiableCollection(HeaderExchangeServer.this.getChannels()); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat, TimeUnit.MILLISECONDS); } }
  • HeartBeatTask的实现
    • 遍历所有的channel,检测心跳间隔,如果超过心跳间隔没有读或写,则发送需要回复的心跳消息,最有判断是否心跳超时(heartbeatTimeout),如果超时,provider关闭channel,consumer进行重连
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public void run() { try { long now = System.currentTimeMillis(); for (Channel channel : channelProvider.getChannels()) { if (channel.isClosed()) { continue; } try { Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP); Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP); // 读写的时间,任一超过心跳间隔,发送心跳 if ((lastRead != null && now - lastRead > heartbeat) || (lastWrite != null && now - lastWrite > heartbeat)) { Request req = new Request(); req.setVersion("2.0.0"); req.setTwoWay(true); // 需要响应的心跳事件 req.setEvent(Request.HEARTBEAT_EVENT); channel.send(req); if (logger.isDebugEnabled()) { logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress() + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms"); } } // 最后读的时间,超过心跳超时时间 if (lastRead != null && now - lastRead > heartbeatTimeout) { logger.warn("Close channel " + channel + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms"); // 客户端侧,重新连接服务端 if (channel instanceof Client) { try { ((Client) channel).reconnect(); } catch (Exception e) { //do nothing } // 服务端侧,关闭客户端连接 } else { channel.close(); } } } catch (Throwable t) { logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t); } } } catch (Throwable t) { logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t); } }
  • consumer端的实现
    • 默认需要心跳检测
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public HeaderExchangeClient(Client client, boolean needHeartbeat) { if (client == null) { throw new IllegalArgumentException("client == null"); } this.client = client; // 创建 HeaderExchangeChannel 对象 this.channel = new HeaderExchangeChannel(client); // 读取心跳相关配置 String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY); this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0); this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); if (heartbeatTimeout < heartbeat * 2) { // 避免间隔太短 throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } // 发起心跳定时器 if (needHeartbeat) { startHeatbeatTimer(); } }

我们可以看到dubbo的心跳检测,服务端会发送发送心跳包,客户端也会发送心跳包,与一般只有客户端发送心跳包,服务端接受心跳是有所不同的;

最后

以上就是仁爱帽子最近收集整理的关于dubbo心跳检测机制的全部内容,更多相关dubbo心跳检测机制内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(59)

评论列表共有 0 条评论

立即
投稿
返回
顶部