我是靠谱客的博主 忧伤鸡翅,最近开发中收集的这篇文章主要介绍Nacos(1.4.2)注册中心原理及源码系列(五)- 心跳机制与服务健康检查设计原理及源码剖析心跳机制与服务健康检查设计原理及源码剖析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

心跳机制与服务健康检查设计原理及源码剖析

   客户端会调用/nacos/v1/ns/instance/beat接口进行心跳,主要逻辑有:

   1.如果在Nacos Server没有找到相对应的Instance,那么就构造一个Instance,源码如下:

    // com.alibaba.nacos.naming.controllers.InstanceController#beat
    @CanDistro
    @PutMapping("/beat")
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public ObjectNode beat(HttpServletRequest request) throws Exception {
    // 省略其它代码。。。。。。。

    // 获取Instance
    Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
    // 如果Instance为null,那么就构造一个Instance进行注册
    if (instance == null) {
        if (clientBeat == null) {
            result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
            return result;
        }

        Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
                             + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);

        instance = new Instance();
        instance.setPort(clientBeat.getPort());
        instance.setIp(clientBeat.getIp());
        instance.setWeight(clientBeat.getWeight());
        instance.setMetadata(clientBeat.getMetadata());
        instance.setClusterName(clusterName);
        instance.setServiceName(serviceName);
        instance.setInstanceId(instance.getInstanceId());
        instance.setEphemeral(clientBeat.isEphemeral());
		// 把Client注册到Nacos
        serviceManager.registerInstance(namespaceId, serviceName, instance);
    }

    Service service = serviceManager.getService(namespaceId, serviceName);
	// 省略其它代码。。。。。。。
	// 处理Client端的心跳
    service.processClientBeat(clientBeat);

   // 省略其它代码。。。。。。。
    return result;
}

2.如果在Nacos Server中有当前实例,那么就更新一下lastBeat、healthy等,源码如下:

// com.alibaba.nacos.naming.healthcheck.ClientBeatProcessor#run
@Override
public void run() {
    // 省略其它代码。。。。。。。

    for (Instance instance : instances) {
        // 从Service中找到需要更新的Instance
        if (instance.getIp().equals(ip) && instance.getPort() == port) {
            if (Loggers.EVT_LOG.isDebugEnabled()) {
                Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
            }
            // 把当前时间设置到Insance的lastBeat中
            instance.setLastBeat(System.currentTimeMillis());
            if (!instance.isMarked()) {
                if (!instance.isHealthy()) {
                    // 更新一下健康状态
                    instance.setHealthy(true);
                    Loggers.EVT_LOG
                        .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
                              cluster.getService().getName(), ip, port, cluster.getName(),
                              UtilsAndCommons.LOCALHOST_SITE);
                    getPushService().serviceChanged(service);
                }
            }
        }
    }
}

    服务端也会启动线程去检测客户端心跳信息,来判断客户端是否存活,Nacos是怎么启动心跳检测的,怎么心跳检测的?

    请看源码:

// com.alibaba.nacos.naming.core.Service#init
// 在注册Instance的时候会调用这个方法
public void init() {
    // 
    HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
        entry.getValue().setService(this);
        entry.getValue().init();
    }
}
// com.alibaba.nacos.naming.healthcheck.HealthCheckReactor#scheduleCheck
public static void scheduleCheck(ClientBeatCheckTask task) {
    // 这里会启动一个定时任务来检测心跳,按照的是固定频率5s
    // 其实这里最后采用的JDK的scheduleWithFixedDelay
    futureMap.computeIfAbsent(task.taskKey(),
                              k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}

// com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask#run
@Override
public void run() {
    try {
        // 判断是否是当前Nacos实例来启动心跳检测
        if (!getDistroMapper().responsible(service.getName())) {
            return;
        }

        if (!getSwitchDomain().isHealthCheckEnabled()) {
            return;
        }

        List<Instance> instances = service.allIPs(true);

        // first set health status of instances:
        // 检测实例的健康状态
        for (Instance instance : instances) {
            // 判断心跳是否超时,默认超时时间为15秒
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                if (!instance.isMarked()) {
                    if (instance.isHealthy()) {
                        // 如果心跳超时,首先把此服务的健康状态设置为false
                        instance.setHealthy(false);
                        Loggers.EVT_LOG
                            .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                  instance.getIp(), instance.getPort(), instance.getClusterName(),
                                  service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                  instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());

                        // 发布一个服务变更事件
                        getPushService().serviceChanged(service);
                        // 发布一个心跳超时事件
                        ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                    }
                }
            }
        }

        if (!getGlobalConfig().isExpireInstance()) {
            return;
        }

        // then remove obsolete instances:
        // 删除过时的实例(默认30s)
        for (Instance instance : instances) {

            if (instance.isMarked()) {
                continue;
            }

            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                // delete instance
                Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                                     JacksonUtils.toJson(instance));
                deleteIp(instance);
            }
        }

    } catch (Exception e) {
        Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
    }

}

    默认的心跳超时时间为15s,如果发现Instance中的lastBeat的时间与当前时间对比,小于当前时间15s以上那么就判断Instance为不健康的状态,首先会设置Instance中的healthy为false,然后发布一个服务变更事件,再发布一个心跳超时事件。

最后

以上就是忧伤鸡翅为你收集整理的Nacos(1.4.2)注册中心原理及源码系列(五)- 心跳机制与服务健康检查设计原理及源码剖析心跳机制与服务健康检查设计原理及源码剖析的全部内容,希望文章能够帮你解决Nacos(1.4.2)注册中心原理及源码系列(五)- 心跳机制与服务健康检查设计原理及源码剖析心跳机制与服务健康检查设计原理及源码剖析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(47)

评论列表共有 0 条评论

立即
投稿
返回
顶部