概述
一、Eureka Client 的工作
- 应用启动阶段
- 读取与Eureka Server 交互的配置信息,封装成EurekaClientConfig
- 读取自身服务实例配置信息,封装成EurekaInstanceConfig
- 从Eureka Server 中拉取注册表信息,并缓存到本地
- 服务注册
- 初始化发送心跳、缓存刷新(拉取注册表信息更新本地缓存)和按需注册(监控服务实例信息变化,决定是否重新发起注册,更新注册表中的服务实例元数据)定时任务
- 应用执行阶段
- 定时发送心跳到Eureka Server 中,维持在注册表的租约。
- 定时从Eureka Server 中拉取注册表信息,更新本地注册表缓存。
- 监控应用自身信息变化,若发生变化,需要重新发起服务注册。
- 应用销毁阶段
- 从Eureka Server 注销自身服务实例。
二、源码解析
Eureka Client 是通过Starter的方式引入依赖,Spring Boot 将会为项目使用以下自动配置类:
1. EurekaClientAutoConfiguration :Eureka Client 自动配置类,负责 Eureka Client 中关键 Beans 的配置和初始化,如 ApplicationInfoManager 和 EurekaClientConfig等。
2. RibbonEurekaAutoConfiguration: Ribbon 负载均衡相关配置。
3. EurekaDiscoveryClientConfiguration: 配置自动注册和应用的健康检查器。
2.1 读取应用自身配置信息
通过EurekaDiscoverClientConfig 配置类, Spring Boot 帮助 Eureka Client 完成很多必要 Bean 的属性读取和配置, 现列出 EurekaDiscoveryClientConfiguation 中的属性读取和配置类。
类名 | 作用和介绍 |
EurekaClientConfig | 封装 Eureka Client 与 Eureka Server 交互所需要的配置信息。 Spring Cloud 为其提供了一个默认配置类的 EurekaClientConfigBean ,可以在配置文件中通过前缀 eureka.client+属性名进行属性覆盖 |
ApplicationInfoManager | 作为应用信息管理器,管理服务实例的信息类 InstanceInfo 和服务实例的配置信息类 EurekaInstanceConfig |
InstanceInfo | 封装将被发送到Eureka Server 进行服务注册的服务实例元数据。它在 Eureka Server 的注册中代表一个服务实例,其他服务实例可以通过 InstanceInfo 了解该服务实例的相关信息从而发起服务请求 |
EurekaInstanceConfig | 封装 Eureka Client 自身服务实例的配置信息,主要用于构建 InstanceInfo 通常这些信息在配置文件中的 eureka.instance 前缀下进行设置, Spring Cloud 通过EurekaInstanceConfigBean 配置类提供了默认配置 |
DiscoveryClient | Spring Cloud 中定义用来服务发现的客户端接口 |
2.2 服务发现客户端
2.1.1 DiscoveryClient 职责
DiscoveryClient 是 Eureka Client 的核心类,包括与 Eureka Server 交互的关键逻辑,具备了以下职能:
- 注册服务实例到 Eureka Server 中
- 发送心跳更新与 Eureka Server 的租约
- 在服务关闭时从 Eureka Server 中取消租约,服务下线
- 查询在 Eureka Server 中注册的服务实例列表
2.1.2 DiscoveryClient 构造函数
构造函数中初始化发送心跳、缓存刷新等定时任务
// com.netflix.discovery.DiscoveryClient#DiscoveryClient()
// 对应配置为 eureka.client.fetch-register , true 表示Eureka Client 将从 Eureka Server 中拉取注册表信息
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
// 对应配置为 eureka.client.register-with-eureka , true 表示Eureka Client 将注册到 Eureka Server 中
if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
接着定义一个基于线程池的定时器线程池 ScheduledExecutorService,线程池大小为2,一个线程用于发送心跳,另一个线程用于缓存刷新,同时定义了发送心跳和缓存刷新线程池,代码如下所示:
// com.netflix.discovery.DiscoveryClient#DiscoveryClient()
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
cacheRefreshExecutor = new ThreadPoolExecutor(...);
// 内部类,封装 http 调用的 Jersey 客户端
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
接着从Eureka Server 中拉取注册表信息,代码如下所示:
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
拉取完 Eureka Server 中的注册表信息后,将对服务实例进行注册,代码如下所示:
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
// 发起服务注册
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
// 初始化定时任务
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
initScheduledTasks();
最后总结一下,在 DiscoveryClient 的构造函数中,主要依次做了以下的事情:
- 相关配置的赋值,类似 ApplicationInfoManager 、 EurekaClientConfig 等
- 备份注册中心的初始化,默认没有实现
- 拉取Eureka Server 注册表中的信息
- 注册前预处理
- 向 Eureka Server 注册自身
- 初始化心跳定时任务、缓存刷新和按需注册等定时任务
2.3 拉取注册表信息
com.netflix.discovery.DiscoveryClient#fetchRegistry
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
//如果增量式拉取被禁止,或者Applications 为 null ,进行全是拉取
// If the delta is disabled or if it is the first time, get all
// applications
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
// 全量拉取注册表信息
getAndStoreFullRegistry();
} else {
// 增量拉取注册表信息
getAndUpdateDelta(applications);
}
// 计算应用集合一致性哈希码
applications.setAppsHashCode(applications.getReconcileHashCode());
// 打印注册表上所有服务实例的总数量
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// 在更新远程实例状态之前推送缓存刷新事件,但是Eureka中并没有提供默认的事件监听器
// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();
// 基于缓存中被刷新的数据更新远程实例状态
// Update remote status based on refreshed data held in the cache
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
2.3.1 全量拉取注册表信息
com.netflix.discovery.DiscoveryClient#getAndStoreFullRegistry
接口: http://localhost:20000/eureka/apps/
getAndStoreFullRegistry 方法可能被多个线程同时调用,导致新拉取的注册表被旧的注册表覆盖,产生脏数据,对此,Eureka 通过类型为 AtomicLong 的 currentUpdateGeneration 对 apps 的更新版本进行跟踪。如果更新版本不一致,说明本次拉取的注册信息已过时,不需要保存到本地。拉取到注册表信息之后会对获取到的apps进行筛选,只保留状态为 UP 的服务实例信息。
2.3.2 增量式拉取注册表信息
com.netflix.discovery.DiscoveryClient#getAndUpdateDelta
接口: http://localhost:20000/eureka/apps/delta
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
// 获取增量拉取失败,则进行全量拉取
if (delta == null) {
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
+ "Hence got the full registry.");
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
// 更新本地缓存
updateDelta(delta);
// 计算应用集合一致性哈希码
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// 比较应用集合一致性哈希码,如果不一致将认为本次增量式拉取数据已脏,将发起全量拉取更新本地注册表信息
// There is a diff in number of instances for some reason
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}
appsHashCode 的一般表示方式为:
appsHashCode = ${status}_${count}_
它通过将应用状态和数量拼接成字符串,表示了当前注册表中服务实例状态的统计信息。举个简单的例子,有10个应用实例的状态为UP,有5个应用实例状态为DOWN,其他的状态数据为0(不进行表示),那么appsHashCode 的形式将是:
appsHashCode = UP_10_DOWN_5_
2.4 服务注册
com.netflix.discovery.DiscoveryClient#register
接口: http://localhost:20000/eureka/apps/${APP_NAME}
/**
* Register with the eureka service by making the appropriate REST call.
*/
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
// 将自身服务实例元数据封装到 InstanceInfo 中,发送到 Eureka Server中请求服务注册, 当Eureka Server 返回 204 状态码时,说明服务注册成功。
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
2.5 初始化定时任务
com.netflix.discovery.DiscoveryClient#initScheduledTasks
这个方法中初始化三个定时器任务,一个用于向 Eureka Server 拉取注册表信息刷新本地缓存;一个用于向 Eureka Server 发送心跳;一个用于进行按需注册的操作。代码如下所示:
/**
* Initializes all scheduled tasks.
*/
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// 注册表缓存刷新定时器
// 获取配置文件中刷新间隔,默认为30秒,可以通过 eureka.client.registry-fetch-interval-seconds 进行设置
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask("cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds,
TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// 发送心跳定时器,默认30秒发送一次心跳
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask("heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs,
TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// 按需注册定时器 。。。。
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
2.5.1 按需注册定时任务
按需注册定时任务的作用是当 Eureka Client 中的 InstanceInfo 或者 status 发生变化时,重新向 Eureka Server 发起注册请求,更新注册表中的服务实例信息,保证 Eureka Server 注册表中服务实例信息有效和可用。
// 按需注册定时器 。。。。
// InstanceInfo replicator
// 定时检查刷新服务实例信息,检查是否有变化,是否需要重新注册
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
// 监控应用的 status 变化,发生变化即可发起重新注册
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
// 注册应用状态改变监控器
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 启动定时器按需注册定时任务
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
2.6 服务下线
com.netflix.discovery.DiscoveryClient#shutdown
/**
* Shuts down Eureka Client. Also sends a deregistration request to the
* eureka server.
*/
@PreDestroy
@Override
public synchronized void shutdown() {
// 同步方法
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
// 原子操作,确保只会执行一次
if (statusChangeListener != null && applicationInfoManager != null) {
// 注销状态监听器
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
// 取消定时任务
cancelScheduledTasks();
// If APPINFO was registered
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {
// 服务下线
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
unregister();
}
// 关闭 Jersy 客户端
if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
// 关闭相关 Monitor
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
logger.info("Completed shut down of DiscoveryClient");
}
}
最后
以上就是安详枕头为你收集整理的Eureka Client 服务发现原理的全部内容,希望文章能够帮你解决Eureka Client 服务发现原理所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复