概述
前言
服务续约
默认情况下,客户端的服务实例每隔30秒向Eureka服务端发送一次心跳。如果90秒之内Eureka服务端没有收到服务实例的心跳,该服务实例会被执行剔除任务的线程(每隔60秒执行一次)从注册的服务实例列表中剔除。
自我保护机制
如果15分钟之内,心跳发送失败的比例低于85%,就会触发Eureka服务端的自我保护机制。Eureka不会剔除通信不正常的服务实例,并且仍然接收客户端的服务的注册与服务的查询。但是不会与其它Eureka服务端节点进行同步。自我保护机制是一种针对网络异常波动的安全保护措施,可以使Eureka集群更加的健壮、稳定的运行。
Peer to peer 架构
Eureka对于多个副本之间的复制方式并没有采用主从复制,而是选择了对等复制,即peer to peer的模式。副本之间没有主从,每个副本都可以处理读写请求,通过彼此之间的数据复制使数据进行同步更新。但是数据同步可能会出现冲突。Eureka对此的解决方案是校验lastDirtyTimestamp。
校验lastDirtyTimestamp:在peer节点之间的复制请求中,如果其它peer节点传递的服务实例的lastDirtyTimestamp大于当前服务端本地存储的服务实例的lastDirtyTimestamp,则返回404,要求服务实例重新注册;如果小于,则返回409,要求其同步最新的数据信息。
源码分析
接下来对Eureka的服务续约的分析分为客户端逻辑和服务端逻辑两个部分。
服务续约(客户端逻辑)
一、DiscoveryClient
在 DiscoveryClient 构造器方法中的 initScheduledTasks 方法,有如下内容:
// 获取客户端需要向Eureka服务端发送心跳的时间间隔,默认30秒
// 可以在配置文件中指定 eureka.instance.leaseRenewalIntervalInSeconds 来修改默认值
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
// 获取心跳任务的指数补偿的相关属性,默认10
// 可以在配置文件中指定 eureka.client.heartbeatExecutorExponentialBackOffBound 来修改默认值
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// 创建心跳任务
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
);
// 初始延迟30秒执行心跳任务,之后每隔30秒重复执行一次
scheduler.schedule(
heartbeatTask,
renewalIntervalInSecs, TimeUnit.SECONDS);
二、TimedSupervisorTask
@Override
public void run() {
Future<?> future = null;
try {
// 执行 HeartbeatThread#run 方法
future = executor.submit(task);
threadPoolLevelGauge.set((long) executor.getActiveCount());
future.get(timeoutMillis, TimeUnit.MILLISECONDS);
delay.set(timeoutMillis);
threadPoolLevelGauge.set((long) executor.getActiveCount());
successCounter.increment();
} catch (TimeoutException e) {
logger.warn("task supervisor timed out", e);
timeoutCounter.increment();
long currentDelay = delay.get();
long newDelay = Math.min(maxDelay, currentDelay * 2);
delay.compareAndSet(currentDelay, newDelay);
} catch (RejectedExecutionException e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, reject the task", e);
} else {
logger.warn("task supervisor rejected the task", e);
}
rejectedCounter.increment();
} catch (Throwable e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, can't accept the task");
} else {
logger.warn("task supervisor threw an exception", e);
}
throwableCounter.increment();
} finally {
if (future != null) {
future.cancel(true);
}
if (!scheduler.isShutdown()) {
// 每隔30秒重复执行一次
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
三、HeartbeatThread
private class HeartbeatThread implements Runnable {
public void run() {
// 如果续约成功
if (renew()) {
// 更新lastSuccessfulHeartbeatTimestamp属性为当前时间
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
// 向服务端发起服务续约
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
// 如果响应的状态码是404,则需要重新注册服务
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
// 服务重新注册
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
// 用响应的状态码是否是200来判断是否续约成功
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
服务续约(服务端逻辑)
一、InstanceResource
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
// 服务续约
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
// 如果续约失败,返回404
if (!isSuccess) {
logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
Response response;
// 客户端传递的lastDirtyTimestamp不为空,并且服务端开启了“时间戳不一致时进行同步”(默认开启)
// 可以通过 eureka.server.syncWhenTimestampDiffers 属性来修改默认值
if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
// 校验客户端传递的lastDirtyTimestamp与服务端本地的lastDirtyTimestamp
response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
&& (overriddenStatus != null)
&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
&& isFromReplicaNode) {
registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
}
// 其它情况,返回200
} else {
response = Response.ok().build();
}
logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
return response;
}
validateDirtyTimestamp
private Response validateDirtyTimestamp(Long lastDirtyTimestamp,
boolean isReplication) {
// 从registry缓存中获取指定应用名称、服务实例id对应的服务实例
InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);
if (appInfo != null) {
if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) {
Object[] args = {id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication};
// 如果客户端传递的lastDirtyTimestamp大于服务端本地的dirtyTimestamp,则返回404
if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) {
logger.debug(
"Time to sync, since the last dirty timestamp differs -"
+ " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
args);
return Response.status(Status.NOT_FOUND).build();
// 如果客户端传递的lastDirtyTimestamp小于服务端本地的dirtyTimestamp
} else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) {
// 如果是peer节点的复制请求,则返回409
if (isReplication) {
logger.debug(
"Time to sync, since the last dirty timestamp differs -"
+ " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
args);
return Response.status(Status.CONFLICT).entity(appInfo).build();
// 否则返回200
} else {
return Response.ok().build();
}
}
}
}
// 其余情况返回200
return Response.ok().build();
}
二、AbstractInstanceRegistry
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
// 从registry缓存中获取指定应用名称对应的gMap
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
// 如果registry缓存命中
if (gMap != null) {
// 再从缓存gMap中获取指定实例id对应的Lease<InstanceInfo>实例
// id:类似于“192.168.124.3:spring-cloud-order:8081”形式
leaseToRenew = gMap.get(id);
}
// 如果缓存未命中
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
// 返回false,表示续约失败
return false;
// 如果缓存命中
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// todo
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
// 如果服务端获取到的实例状态是“UNKNOWN”,则返回false,表示续约失败
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
// 如果客户端传递的实例状态与服务端获取到的实例状态不一致,则覆盖客户端的实例状态
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
overriddenInstanceStatus.name(),
instanceInfo.getId());
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
renewsLastMin.increment();
// 对缓存中的Lease<InstanceInfo>实例的lastUpdateTimestamp属性在当前时间基础上加90秒
// 90秒是由客户端传递的 eureka.instance.leaseExpirationDurationInSeconds 属性的值
leaseToRenew.renew();
// 返回true,表示续约成功
return true;
}
}
自我保护机制
一、EurekaServerInitializerConfiguration
SmartLicycle 接口的实现类 - Spring容器加载完所有的bean,并且初始化完成之后执行其关于生命周期的方法,比如 start、stop 等。
@Override
public void start() {
new Thread(() -> {
try {
// EurekaServerBootstrap#contextInitialized 核心方法
eurekaServerBootstrap.contextInitialized(
EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
// 发布EurekaRegistryAvailableEvent事件
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
EurekaServerInitializerConfiguration.this.running = true;
// 发布EurekaServerStartedEvent事件
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
log.error("Could not initialize Eureka servlet context", ex);
}
}).start();
}
二、EurekaServerBootstrap
public void contextInitialized(ServletContext context) {
try {
// 初始化Eureka的环境变量
initEurekaEnvironment();
// 初始化Eureka的上下文(其中包括调用PeerAwareInstanceRegistryImpl#openForTraffic)
initEurekaServerContext();
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
三、PeerAwareInstanceRegistryImpl
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
this.expectedNumberOfClientsSendingRenews = count;
// 更新服务端每分钟应该收到的服务续约数量
updateRenewsPerMinThreshold();
logger.info("Got {} instances from neighboring DS node", count);
logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
// 设置实例状态为“UP”,并且触发StatusChangeListener监听器对StatusChangeEvent事件的处理
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
super.postInit();
}
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
// 取消上一次未完成的驱逐任务的执行
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
// 默认每隔60秒执行一次驱逐任务
// 可以通过 eureka.server.evictionIntervalTimerInMs 属性修改默认值
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
protected void updateRenewsPerMinThreshold() {
// 计算每分钟应该收到的续约数量的阈值
this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
* serverConfig.getRenewalPercentThreshold());
}
值得注意的是,PeerAwareInstanceRegistryImpl#init 方法中也会调用该方法,默认每隔15分钟统计一次每分钟应该收到的续约数量的阈值。可以通过配置文件中 eureka.server.renewalThresholdUpdateIntervalMs 属性修改,默认值15分钟。
四、EvictionTask
@Override
public void run() {
try {
// 获取补偿时间
long compensationTimeMs = getCompensationTimeMs();
logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
evict(compensationTimeMs);
} catch (Throwable e) {
logger.error("Could not run the evict task", e);
}
}
五、AbstractInstanceRegistry
long getCompensationTimeMs() {
// 获取当前时间
long currNanos = getCurrentTimeNano();
// 对lastExecutionNanosRef缓存设置新值并返回旧值
long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
// 如果旧值为0,则直接返回0
if (lastNanos == 0l) {
return 0l;
}
// 计算执行时间,用当前时间减去旧值
long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
// 计算补偿时间,用执行时间减去驱逐任务的执行间隔(默认60秒)
// 可以通过 eureka.server.evictionIntervalTimerInMs 属性修改默认值
long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
// 补偿时间与0比较,取最大值
return compensationTime <= 0l ? 0l : compensationTime;
}
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
// 如果服务端开启了自我保护机制(默认开启),并且触发了自我保护机制,即最近一分钟收到的续约数<=阈值,则直接返回
// 可以通过 eureka.server.enableSelfPreservation 属性修改默认值
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
// 执行到这儿,说明服务端没有开启自我保护机制,或者开启了但是没有触发自我保护机制
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
// 遍历registry缓存
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
// 遍历leaseMap
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
// 如果服务实例的租约失效(也就是指定时间内服务端没有收到该服务实例的心跳)
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
// 添加到失效列表中
expiredLeases.add(lease);
}
}
}
}
// 获取registry缓存中的value数量
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
// 从失效列表中选取服务实例
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
// 剔除该失效的服务实例
internalCancel(appName, id, false);
}
}
}
protected boolean internalCancel(String appName, String id, boolean isReplication) {
read.lock();
try {
CANCEL.increment(isReplication);
// 从registry缓存中获取指定应用名称的gMap
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
// 从gMap中删除该实例
leaseToCancel = gMap.remove(id);
}
recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
// 从overriddenInstanceStatusMap缓存中删除该实例
InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
if (instanceStatus != null) {
logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
}
if (leaseToCancel == null) {
CANCEL_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
return false;
} else {
leaseToCancel.cancel();
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
if (instanceInfo != null) {
instanceInfo.setActionType(ActionType.DELETED);
// 将服务实例经过封装放到recentlyChangedQueue队列中
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
instanceInfo.setLastUpdatedTimestamp();
vip = instanceInfo.getVIPAddress();
svip = instanceInfo.getSecureVipAddress();
}
// 失效读写缓存
invalidateCache(appName, vip, svip);
logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
}
} finally {
read.unlock();
}
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// 对应该收到的客户端续约的数量减一
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
// 更新每分钟应该收到的续约数量的阈值
updateRenewsPerMinThreshold();
}
}
return true;
}
todo
服务续约-(服务端逻辑)
1、AbstractInstanceRegistry#getOverriddenInstanceStatus
最后
以上就是任性小伙为你收集整理的Eureka架构篇 - 服务续约与自我保护机制前言Peer to peer 架构源码分析的全部内容,希望文章能够帮你解决Eureka架构篇 - 服务续约与自我保护机制前言Peer to peer 架构源码分析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复