概述
- Register: 注册者
- Get Registry: 得到注册表
- Renew: 更新
- Cancel: 取消
- Make Remote call: 进行远程呼叫
文章目录
- 1.源码分析入口
- 2. EurekaClientAutoConfiguration
- 2.1 RefreshableEurekaClientConfiguration
- 2.2 EurekaClient
- 2.3 CloudEurekaClient
- 2.4 DiscoveryClient
- 3.从服务端获取注册表
- 3.1 全量获取
- 3.2 增量获取
- 4.注册信息
- 5.initScheduledTasks
- 5.1 定时更新客户端注册表
- 5.2 定时续约
- 5.3 定时更新Client信息给Server
1.源码分析入口
2. EurekaClientAutoConfiguration
EurekaClientAutoConfiguration -> RefreshableEurekaClientConfiguration -> EurekaClient ->
2.1 RefreshableEurekaClientConfiguration
RefreshableEurekaClientConfiguration()
2.2 EurekaClient
eurekaClient() -> CloudEurekaClient cloudEurekaClient = new CloudEurekaClient()
2.3 CloudEurekaClient
2.4 DiscoveryClient
4个参数:
3个参数:
3.从服务端获取注册表
5参数中间的方法:
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
getAndStoreFullRegistry();
} else {
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
第一次获取为全量获取, 后面都是增量获取。
全量获取的都放在本地Region中, 增量获取分本地Region和远程Region两种。
3.1 全量获取
这里是一个判断: 如果得到的VIP用户全量为空的话获取全量, 如果不为空的话获取VIP全量
AbstractJerseyEurekaHttpClient.getApplications():
最后发送的是get请求, 获取到注册表
3.2 增量获取
- getDelta(): 获取注册表
- getAndStoreFullRegistry: 如果之前获取的注册表为空的话, 进行全量获取。
- updateDelta(): 从Server获取所有变更的信息到本地缓存, 这些信息为两类Region, 本地Region和远程Region, 本地Region分为两类, 缓存本地Region的applications与缓存所有远程Region的注册信息的map(key为远程Region, value为远程Region的注册表)。
远程Region的获取
本地Region: 分为添加修改和删除
通过instanceInfo中的instance的id完成添加和修改功能。
4.注册信息
DiscoveryClient -> registry()
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
发送post请求
5.initScheduledTasks
- 定时更新客户端注册表
- 定时续约
- 定时更新客户端信息
5.1 定时更新客户端注册表
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
TimedSupervisorTask.class
本来任务只执行一次, 但是通过异步的执行任务多次
CacheRefreshThread.class
这里加了锁, 因为这是共享数据, 为迭代稳定性代码。
如果是全量注册的话:同上
如果是增强注册的话:同上
5.2 定时续约
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
先续约, 如果没有完成续约的话进行注册
sendHeartBeat(): 完成心跳续约, 发生put请求, 存在的信息为status和lastDirtyTimestamp, lastDirtyTimestamp是在client端修改的时间戳。overriddenStatus是处理续约和注册请求时候, 需要在服务端的instanceInfo的status进行重新计算。
register(): 注册注册表
5.3 定时更新Client信息给Server
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
1.instanceInfoReplicator: instanceInfo的备份的事件
2.statusChangeListener: 监听器
3.instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()): instanceInfo的备份的事件启动
启动监听器的话, 调用instanceInfoReplicator.onDemandUpdate();中的InstanceInfoReplicator.this.run();方法, 按需修改客户端注册信息
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
因为存在cancel定时任务的操作, 所以不会出现多条线程去执行注册信息的修改, 多条线程是由于一个是定时更新一个是按需更新。
1.discoveryClient.refreshInstanceInfo();
2.discoveryClient.register();
3.Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
1.discoveryClient.refreshInstanceInfo();
2.discoveryClient.register();
3.Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); 进行重复的上面的定时任务
Client提交注册的情况:
- 应用启动时候直接register(), 需要提前配置配置文件
- 续约renew时, server端返回的是NOT_FOUND, 则提交register()
- 当Client的配置文件发生变更, 则提交register()
最后
以上就是魁梧钢笔为你收集整理的SpringCloud源码解析 (Eureka-Client源码解析-更新Server的注册信息) (三)的全部内容,希望文章能够帮你解决SpringCloud源码解析 (Eureka-Client源码解析-更新Server的注册信息) (三)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复