上一篇博客《Spring Cloud学习--Spring Cloud Eureka服务注册》中我们介绍了客户端注册到注册中心的操作,接下来我们了解学习一下Eureka client和server服务续约和保活机制。Eureka的服务续约保活使用注册者主动定期调用的,类似于hearbeant,每隔一段时间调用Eureka Server对外提供的保活接口,告诉注册中心注册者还是正常运行的。
服务续约保活
Renew操作会在Service Provider定时发起,用来通知Eureka Server自己还活着。 这里有两个比较重要的配置需要如下,可以在Run之前配置。1eureka.instance.leaseRenewalIntervalInSeconds
1eureka.instance.leaseExpirationDurationInSeconds
在DiscoveryClient 初始化的时候会创建定时任务定时执行HeartbeatThread线程完成服务续约保活。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20private void initScheduledTasks() { // Heartbeat timer //省略部分代码 scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); //省略部分代码 }
接下来我们看看HeartbeatThread中到底实现了什么操作。
在HeartbeatThread中实现的操作还是很简单的,最终调用了DiscoveryClient.renew方法来实现服务续约保活操作。
1
2
3
4
5
6
7
8
9private class HeartbeatThread implements Runnable { public void run() { //调用renew实现续约保活操作 if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } }
在renew中调用续约保活接口完成续约保活操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { //发送心跳信息 httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == 404) { REREGISTER_COUNTER.increment(); logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == 200; } catch (Throwable e) { logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e); return false; } }
通过PUT协议调用接口http://localhost:1001/apps/COMPUTER-SERVICE/127.0.0.1:computer-service:3333进行服务保活操作。
注册中心服务续约保活
Eureka Server对外提供PUT方法用于接收服务注册者的保活信息,更新服务注册者的相关状态信息。
在InstanceResource中提供renewLease方法接收注册者的保活信息,进行服务信息保活操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34@PUT public Response renewLease( @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication, @QueryParam("overriddenstatus") String overriddenStatus, @QueryParam("status") String status, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) { boolean isFromReplicaNode = "true".equals(isReplication); //根据服务信息进行服务保活操作 boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode); // Not found in the registry, immediately ask for a register if (!isSuccess) { logger.warn("Not Found (Renew): {} - {}", app.getName(), id); return Response.status(Status.NOT_FOUND).build(); } // Check if we need to sync based on dirty time stamp, the client // instance might have changed some value Response response = null; //进行服务信息更新操作 if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) { response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode); // Store the overridden status since the validation found out the node that replicates wins if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode() && (overriddenStatus != null) && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus)) && isFromReplicaNode) { registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus)); } } else { response = Response.ok().build(); } logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus()); return response; }
在InstanceRegistry中调用renew方法,首先会通过spring 的Event机制通知本地进行更新,然后调用父类PeerAwareInstanceRegistryImpl的renew方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22@Override public boolean renew(final String appName, final String serverId, boolean isReplication) { log("renew " + appName + " serverId " + serverId + ", isReplication {}" + isReplication); List<Application> applications = getSortedApplications(); for (Application input : applications) { if (input.getName().equals(appName)) { InstanceInfo instance = null; for (InstanceInfo info : input.getInstances()) { if (info.getId().equals(serverId)) { instance = info; break; } } publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId, instance, isReplication)); break; } } return super.renew(appName, serverId, isReplication); }
在父类PeerAwareInstanceRegistryImpl中调用renew方法更新状态信息并发送通知
1
2
3
4
5
6
7
8
9public boolean renew(final String appName, final String id, final boolean isReplication) { //调用父类的renew更新状态信息 if (super.renew(appName, id, isReplication)) { //给每个节点发送更新信息 replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication); return true; } return false; }
在父类AbstractInstanceRegistry中完成时间戳等信息的更新操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44public boolean renew(String appName, String id, boolean isReplication) { RENEW.increment(isReplication); //更加appName获取相同的app Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); Lease<InstanceInfo> leaseToRenew = null; if (gMap != null) { //根据id获取实例信息 leaseToRenew = gMap.get(id); } if (leaseToRenew == null) { RENEW_NOT_FOUND.increment(isReplication); logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id); return false; } else { InstanceInfo instanceInfo = leaseToRenew.getHolder(); if (instanceInfo != null) { // touchASGCache(instanceInfo.getASGName()); InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus( instanceInfo, leaseToRenew, isReplication); if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) { logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}" + "; re-register required", instanceInfo.getId()); RENEW_NOT_FOUND.increment(isReplication); return false; } if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) { Object[] args = { instanceInfo.getStatus().name(), instanceInfo.getOverriddenStatus().name(), instanceInfo.getId() }; logger.info( "The instance status {} is different from overridden instance status {} for instance {}. " + "Hence setting the status to overridden status", args); //更新实例信息 instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus); } } renewsLastMin.increment(); //更新时间戳信息 leaseToRenew.renew(); return true; } }
总结:
服务注册者定时主动调用注册中心的保活接口完成服务的保活工作,注册中心处理的逻辑也很简单。
盗图:
最后
以上就是虚幻长颈鹿最近收集整理的关于Spring Cloud学习--Spring Cloud Eureka服务续约保活的全部内容,更多相关Spring内容请搜索靠谱客的其他文章。
发表评论 取消回复