我是靠谱客的博主 安详枕头,最近开发中收集的这篇文章主要介绍Eureka Client 服务发现原理,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

 

一、Eureka Client 的工作

  1. 应用启动阶段
    1. 读取与Eureka Server 交互的配置信息,封装成EurekaClientConfig
    2. 读取自身服务实例配置信息,封装成EurekaInstanceConfig
    3. 从Eureka Server 中拉取注册表信息,并缓存到本地
    4. 服务注册
    5. 初始化发送心跳、缓存刷新(拉取注册表信息更新本地缓存)和按需注册(监控服务实例信息变化,决定是否重新发起注册,更新注册表中的服务实例元数据)定时任务
  2. 应用执行阶段
    1. 定时发送心跳到Eureka Server 中,维持在注册表的租约。
    2. 定时从Eureka Server 中拉取注册表信息,更新本地注册表缓存。
    3. 监控应用自身信息变化,若发生变化,需要重新发起服务注册。
  3. 应用销毁阶段
    1. 从Eureka Server 注销自身服务实例。

二、源码解析

Eureka Client 是通过Starter的方式引入依赖,Spring Boot 将会为项目使用以下自动配置类:

1. EurekaClientAutoConfiguration :Eureka Client 自动配置类,负责 Eureka Client 中关键 Beans 的配置和初始化,如 ApplicationInfoManager 和 EurekaClientConfig等。

2. RibbonEurekaAutoConfiguration: Ribbon 负载均衡相关配置。

3. EurekaDiscoveryClientConfiguration: 配置自动注册和应用的健康检查器。

2.1 读取应用自身配置信息

通过EurekaDiscoverClientConfig 配置类, Spring Boot 帮助 Eureka Client 完成很多必要 Bean 的属性读取和配置, 现列出 EurekaDiscoveryClientConfiguation 中的属性读取和配置类。

类名

作用和介绍

EurekaClientConfig

封装 Eureka Client 与 Eureka Server 交互所需要的配置信息。 Spring Cloud 为其提供了一个默认配置类的 EurekaClientConfigBean ,可以在配置文件中通过前缀 eureka.client+属性名进行属性覆盖

ApplicationInfoManager

作为应用信息管理器,管理服务实例的信息类 InstanceInfo 和服务实例的配置信息类 EurekaInstanceConfig

InstanceInfo

封装将被发送到Eureka Server 进行服务注册的服务实例元数据。它在 Eureka Server 的注册中代表一个服务实例,其他服务实例可以通过 InstanceInfo 了解该服务实例的相关信息从而发起服务请求

EurekaInstanceConfig

封装 Eureka Client 自身服务实例的配置信息,主要用于构建 InstanceInfo 通常这些信息在配置文件中的 eureka.instance 前缀下进行设置, Spring Cloud 通过EurekaInstanceConfigBean 配置类提供了默认配置

DiscoveryClient

Spring Cloud 中定义用来服务发现的客户端接口

2.2 服务发现客户端

2.1.1 DiscoveryClient 职责

DiscoveryClient 是 Eureka Client 的核心类,包括与 Eureka Server 交互的关键逻辑,具备了以下职能:

  1. 注册服务实例到 Eureka Server 中
  2. 发送心跳更新与 Eureka Server 的租约
  3. 在服务关闭时从 Eureka Server 中取消租约,服务下线
  4. 查询在 Eureka Server 中注册的服务实例列表

2.1.2 DiscoveryClient 构造函数

构造函数中初始化发送心跳、缓存刷新等定时任务

// com.netflix.discovery.DiscoveryClient#DiscoveryClient()
// 对应配置为 eureka.client.fetch-register , true 表示Eureka Client 将从 Eureka Server 中拉取注册表信息
if (config.shouldFetchRegistry()) {
    this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
    this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
// 对应配置为 eureka.client.register-with-eureka , true 表示Eureka Client 将注册到 Eureka Server 中
if (config.shouldRegisterWithEureka()) {
    this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
    this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}

接着定义一个基于线程池的定时器线程池 ScheduledExecutorService,线程池大小为2,一个线程用于发送心跳,另一个线程用于缓存刷新,同时定义了发送心跳和缓存刷新线程池,代码如下所示:

// com.netflix.discovery.DiscoveryClient#DiscoveryClient()

scheduler = Executors.newScheduledThreadPool(2,
        new ThreadFactoryBuilder()
                .setNameFormat("DiscoveryClient-%d")
                .setDaemon(true)
                .build());

heartbeatExecutor = new ThreadPoolExecutor(
        1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>(),
        new ThreadFactoryBuilder()
                .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                .setDaemon(true)
                .build()
);  // use direct handoff

cacheRefreshExecutor = new ThreadPoolExecutor(...);

// 内部类,封装 http 调用的 Jersey 客户端
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);

接着从Eureka Server 中拉取注册表信息,代码如下所示:

if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
    fetchRegistryFromBackup();
}

拉取完 Eureka Server 中的注册表信息后,将对服务实例进行注册,代码如下所示:

if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
    try {
        // 发起服务注册
        if (!register() ) {
            throw new IllegalStateException("Registration error at startup. Invalid server response.");
        }
    } catch (Throwable th) {
        logger.error("Registration error at startup: {}", th.getMessage());
        throw new IllegalStateException(th);
    }
}
// 初始化定时任务
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
initScheduledTasks();

最后总结一下,在 DiscoveryClient 的构造函数中,主要依次做了以下的事情:

  1. 相关配置的赋值,类似 ApplicationInfoManager 、 EurekaClientConfig 等
  2. 备份注册中心的初始化,默认没有实现
  3. 拉取Eureka Server 注册表中的信息
  4. 注册前预处理
  5. 向 Eureka Server 注册自身
  6. 初始化心跳定时任务、缓存刷新和按需注册等定时任务

2.3 拉取注册表信息

com.netflix.discovery.DiscoveryClient#fetchRegistry

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

    try {
        //如果增量式拉取被禁止,或者Applications 为 null ,进行全是拉取
        // If the delta is disabled or if it is the first time, get all
        // applications
        Applications applications = getApplications();

        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            // 全量拉取注册表信息
            getAndStoreFullRegistry();
        } else {
            // 增量拉取注册表信息
            getAndUpdateDelta(applications);
        }
        // 计算应用集合一致性哈希码
        applications.setAppsHashCode(applications.getReconcileHashCode());
        // 打印注册表上所有服务实例的总数量
        logTotalInstances();
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }
    // 在更新远程实例状态之前推送缓存刷新事件,但是Eureka中并没有提供默认的事件监听器
    // Notify about cache refresh before updating the instance remote status
    onCacheRefreshed();
    
    // 基于缓存中被刷新的数据更新远程实例状态
    // Update remote status based on refreshed data held in the cache
    updateInstanceRemoteStatus();

    // registry was fetched successfully, so return true
    return true;
}

2.3.1 全量拉取注册表信息

com.netflix.discovery.DiscoveryClient#getAndStoreFullRegistry

接口: http://localhost:20000/eureka/apps/

getAndStoreFullRegistry 方法可能被多个线程同时调用,导致新拉取的注册表被旧的注册表覆盖,产生脏数据,对此,Eureka 通过类型为 AtomicLong 的 currentUpdateGeneration 对 apps 的更新版本进行跟踪。如果更新版本不一致,说明本次拉取的注册信息已过时,不需要保存到本地。拉取到注册表信息之后会对获取到的apps进行筛选,只保留状态为 UP 的服务实例信息。

2.3.2 增量式拉取注册表信息

com.netflix.discovery.DiscoveryClient#getAndUpdateDelta

接口: http://localhost:20000/eureka/apps/delta

private void getAndUpdateDelta(Applications applications) throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    Applications delta = null;
    EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        delta = httpResponse.getEntity();
    }
    // 获取增量拉取失败,则进行全量拉取
    if (delta == null) {
        logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                + "Hence got the full registry.");
        getAndStoreFullRegistry();
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
        String reconcileHashCode = "";
        if (fetchRegistryUpdateLock.tryLock()) {
            try {
                // 更新本地缓存
                updateDelta(delta);
                // 计算应用集合一致性哈希码
                reconcileHashCode = getReconcileHashCode(applications);
            } finally {
                fetchRegistryUpdateLock.unlock();
            }
        } else {
            logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
        }
        // 比较应用集合一致性哈希码,如果不一致将认为本次增量式拉取数据已脏,将发起全量拉取更新本地注册表信息
        // There is a diff in number of instances for some reason
        if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
            reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
        }
    } else {
        logger.warn("Not updating application delta as another thread is updating it already");
        logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
    }
}

appsHashCode 的一般表示方式为:

appsHashCode = ${status}_${count}_

它通过将应用状态和数量拼接成字符串,表示了当前注册表中服务实例状态的统计信息。举个简单的例子,有10个应用实例的状态为UP,有5个应用实例状态为DOWN,其他的状态数据为0(不进行表示),那么appsHashCode 的形式将是:

appsHashCode = UP_10_DOWN_5_

2.4 服务注册

com.netflix.discovery.DiscoveryClient#register

接口: http://localhost:20000/eureka/apps/${APP_NAME}


/**
 * Register with the eureka service by making the appropriate REST call.
 */
boolean register() throws Throwable {
    logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
    EurekaHttpResponse<Void> httpResponse;
    try {
        // 将自身服务实例元数据封装到 InstanceInfo 中,发送到 Eureka Server中请求服务注册, 当Eureka Server 返回 204 状态码时,说明服务注册成功。
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}

2.5 初始化定时任务

com.netflix.discovery.DiscoveryClient#initScheduledTasks

这个方法中初始化三个定时器任务,一个用于向 Eureka Server 拉取注册表信息刷新本地缓存;一个用于向 Eureka Server 发送心跳;一个用于进行按需注册的操作。代码如下所示:


/**
 * Initializes all scheduled tasks.
 */
private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        // 注册表缓存刷新定时器
        // 获取配置文件中刷新间隔,默认为30秒,可以通过 eureka.client.registry-fetch-interval-seconds 进行设置
        // registry cache refresh timer
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        scheduler.schedule(
                new TimedSupervisorTask("cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds,
                        TimeUnit.SECONDS,  expBackOffBound, new CacheRefreshThread()
                ),
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }

    if (clientConfig.shouldRegisterWithEureka()) {
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
        
        // 发送心跳定时器,默认30秒发送一次心跳
        // Heartbeat timer
        scheduler.schedule(
                new TimedSupervisorTask("heartbeat", scheduler, heartbeatExecutor,  renewalIntervalInSecs,
                        TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread()
                ),
                renewalIntervalInSecs, TimeUnit.SECONDS);

        // 按需注册定时器 。。。。

    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

2.5.1 按需注册定时任务

按需注册定时任务的作用是当 Eureka Client 中的 InstanceInfo 或者 status 发生变化时,重新向 Eureka Server 发起注册请求,更新注册表中的服务实例信息,保证 Eureka Server 注册表中服务实例信息有效和可用。

      // 按需注册定时器 。。。。
         // InstanceInfo replicator
        // 定时检查刷新服务实例信息,检查是否有变化,是否需要重新注册
        instanceInfoReplicator = new InstanceInfoReplicator(
                this,
                instanceInfo,
                clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                2); // burstSize
        // 监控应用的 status 变化,发生变化即可发起重新注册
        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
            @Override
            public String getId() {
                return "statusChangeListener";
            }

            @Override
            public void notify(StatusChangeEvent statusChangeEvent) {
                if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                        InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                    // log at warn level if DOWN was involved
                    logger.warn("Saw local status change event {}", statusChangeEvent);
                } else {
                    logger.info("Saw local status change event {}", statusChangeEvent);
                }
                instanceInfoReplicator.onDemandUpdate();
            }
        };

        if (clientConfig.shouldOnDemandUpdateStatusChange()) {
            // 注册应用状态改变监控器
            applicationInfoManager.registerStatusChangeListener(statusChangeListener);
        }
        // 启动定时器按需注册定时任务
        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());

2.6 服务下线

com.netflix.discovery.DiscoveryClient#shutdown


/**
 * Shuts down Eureka Client. Also sends a deregistration request to the
 * eureka server.
 */
@PreDestroy
@Override
public synchronized void shutdown() {
    // 同步方法
    if (isShutdown.compareAndSet(false, true)) {
        logger.info("Shutting down DiscoveryClient ...");
        // 原子操作,确保只会执行一次
        if (statusChangeListener != null && applicationInfoManager != null) {
            // 注销状态监听器
            applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
        }
        // 取消定时任务
        cancelScheduledTasks();

        // If APPINFO was registered
        if (applicationInfoManager != null
                && clientConfig.shouldRegisterWithEureka()
                && clientConfig.shouldUnregisterOnShutdown()) {
            // 服务下线
            applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
            unregister();
        }
        // 关闭 Jersy 客户端
        if (eurekaTransport != null) {
            eurekaTransport.shutdown();
        }
        // 关闭相关 Monitor
        heartbeatStalenessMonitor.shutdown();
        registryStalenessMonitor.shutdown();

        logger.info("Completed shut down of DiscoveryClient");
    }
}

最后

以上就是安详枕头为你收集整理的Eureka Client 服务发现原理的全部内容,希望文章能够帮你解决Eureka Client 服务发现原理所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(53)

评论列表共有 0 条评论

立即
投稿
返回
顶部