概述
本博客是基于Dalston.SR2版本编写,理解有不对的地方希望读者指出,留言评论,一起学习了解。
参考文章:【Spring Cloud源码分析(一)Eureka 】及【深入理解Eureka之源码解析】
我们将一个普通的Spring-Boot应用注册到Eureka Servcer或者从Eureka Server中获取服务列表时,主要做了以下两件事:
在应用主类中配置了@EnableDiscoveryClient
在application.properties中用eureka.client.service-url.defaultZone参数指定了服务注册中心的位置
首先我们来看看@EnableDiscoveryClient的源码:
/**
* Annotation to enable a DiscoveryClient implementation.
* @author Spencer Gibb
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {
/**
* If true, the ServiceRegistry will automatically register the local server.
*/
boolean autoRegister() default true;
}
注释解释:
@Target:指定程序元定义的注释所使用的地方,它使用了另一个类:ElementType,是一个枚举类定义了注释类型可以应用到不同的程序元素以免使用者误用
ElementType是一个枚举类型,指明注释可以使用的地方,看看ElementType类:
public enum ElementType {
TYPE, // 指定适用点为 class, interface, enum
FIELD, // 指定适用点为 field
METHOD, // 指定适用点为 method
PARAMETER, // 指定适用点为 method 的 parameter
CONSTRUCTOR, // 指定适用点为 constructor
LOCAL_VARIABLE, // 指定使用点为 局部变量
ANNOTATION_TYPE, //指定适用点为 annotation 类型
PACKAGE // 指定适用点为 package
}
@Retention:这个元注释和java编译器处理注释的注释类型方式相关,告诉编译器在处理自定义注释类型的几种不同的选择,需要使用RetentionPolicy枚举类
枚举类只有一个成员变量,可以不用指明成名名称而赋值,看Retention的源代码:
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.ANNOTATION_TYPE)
public @interface Retention {
RetentionPolicy value();
}
类中有个RetentionPolicy类,也是一个枚举类,具体看代码:
public enum RetentionPolicy {
SOURCE, // 编译器处理完Annotation后不存储在class中
CLASS, // 编译器把Annotation存储在class中,这是默认值
RUNTIME // 编译器把Annotation存储在class中,可以由虚拟机读取,反射需要
}
@Documented:是一个标记注释,表示注释应该出现在类的javadoc中,因为在默认情况下注释时不包括在javadoc中的。
注意他与@Retention(RetentionPolicy.RUNTIME)配合使用,因为只有将注释保留在编译后的类文件中由虚拟机加载, 然后javadoc才能将其抽取出来添加至javadoc中。
@Inherited:将注释同样继承至使用了该注释类型的方法中(表达有点问题,就是如果一个方法使用了的注释用了@inherited, 那么其子类的该方法同样继承了该注释)
由此EnableDiscoveryClient注释,我们可以看出,它主要用来开启DiscoveryClient的实例。我们搜索DiscoveryClient可以发现有一个类与一个接口。在com.netflix.discovery包下的DiscoveryClient类,该类包含了Eureka Client向Eureka Server的相关方法。其中DiscoveryClient实现了EurekaClient接口,并且它是一个单例模式,而EurekaClient继承了LookupService接口。它们之间的关系如图所示。
继续看源码,在DiscoveryClient类中,我们看到有一个register(),其源码如下:
/**
* Register with the eureka service by making the appropriate REST call.
*/
boolean register() throws Throwable {
logger.info(PREFIX + appPathIdentifier + ": registering service...");
EurekaHttpResponse<Void> httpResponse;
try {
// Eureka Client客户端,调用Eureka服务端的入口
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
我们找到了服务注册的代码,那么我们由此继续追踪它被调用的地方,追踪发现它被com.netflix.discovery.InstanceInfoReplicator 类的run()方法调用,其中InstanceInfoReplicator实现了Runnable接口,run()方法代码如下:
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
InstanceInfoReplicator类是在DiscoveryClient初始化过程中使用的
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
if (args != null) {
this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
this.eventListeners.addAll(args.getEventListeners());
} else {
this.healthCheckCallbackProvider = null;
this.healthCheckHandlerProvider = null;
}
this.applicationInfoManager = applicationInfoManager;
InstanceInfo myInfo = applicationInfoManager.getInfo();
clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();
instanceInfo = myInfo;
if (myInfo != null) {
appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
} else {
logger.warn("Setting instanceInfo to a passed in null value");
}
this.backupRegistryProvider = backupRegistryProvider;
this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
localRegionApps.set(new Applications());
fetchRegistryGeneration = new AtomicLong(0);
remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
logger.info("Client configured to neither register nor query for data.");
scheduler = null;
heartbeatExecutor = null;
cacheRefreshExecutor = null;
eurekaTransport = null;
instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
return;
// no need to setup up an network tasks and we are done
}
try {
scheduler = Executors.newScheduledThreadPool(3,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
);
// use direct handoff
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
);
// use direct handoff
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
// 启动的时候初始化的一个定时任务,把本地的服务配置信息,自动刷新到注册服务器上。
initScheduledTasks();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
}
追踪源码initScheduledTasks():
/**
* Initializes all scheduled tasks.
*/
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
// 服务获取
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
// Heartbeat timer
// 服务续约
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// 服务注册
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
该方法主要开启了获取服务注册列表的信息,如果需要向Eureka Server注册,则开启注册,同时开启了定时向Eureka Server服务续约的定时任务。在源码中我们可以发现,“服务获取”任务相对于“服务注册”和“服务续约”任务则更为独立。服务注册”和“服务续约”任务在同一个if语句中,这个是很容易理解的,服务注册到Eureka Server后,自然需要区续约,告诉Eureka Server它还活着,防止被剔除,所以他们会出现在一块儿。
“服务续约”:
/**
* Renew with the eureka service by making the appropriate REST call
*/
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
return register();
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
return false;
}
}
继续查看源码,我们可以发现“服务续约”及“服务获取”都是以REST请求的方式实现的,就不多阐述了,有兴趣的朋友可以去看下源码。
服务注册中心处理
Eureka Servcer对于各类REST请求的定义都是位于com.netflix.eureka.resources包下
Eureka server服务端请求入口
ApplicationResource.java文件中,如下所示,可以看出Eureka是通过http post的方式去服务注册
/**
* Registers information about a particular instance for an
* {@link com.netflix.discovery.shared.Application}.
*
* @param info
*
{@link InstanceInfo} information of the instance.
* @param isReplication
*
a header parameter containing information whether this is
*
replicated from other nodes.
*/
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
}
// handle cases where clients may be registering with bad DataCenterInfo with missing data
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
registry.register(info, "true".equals(isReplication));
return Response.status(204).build();
// 204 to be backwards compatible
}
在进行很多次校验以后,它会调用com.netflix.eureka.registry.PeerAwareInstanceRegistry的实现类
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl中的public void register(final InstanceInfo info, final boolean isReplication)来进行服务注册
/**
* Registers the information about the {@link InstanceInfo} and replicates
* this information to all peer eureka nodes. If this is replication event
* from other replica nodes then it is not replicated.
*
* @param info
*
the {@link InstanceInfo} to be registered and replicated.
* @param isReplication
*
true if this is a replication event from other replica nodes,
*
false otherwise.
*/
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
// 默认的服务失效时间
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
在此方法中,我们可以看到它加入了一个服务默认90秒的失效时间,若Eureka Client中没有“服务续约”,那么在90秒后Eureka Service会将此服务剔除,在这里我们可以看到,我们在自定义设置“服务续约”间隔时间时,应该设置为小于服务失效时间。
其中 super.register(info, leaseDuration, isReplication)方法,点击进去到子类AbstractInstanceRegistry可以发现更多细节,其中注册列表的信息被保存在一个Map中。replicateToPeers()方法,即同步到其他Eureka Server的其他Peers节点,它会遍历循环向所有的Peers注册,最终执行类com.netflix.eureka.cluster.PeerEurekaNode的register()方法,该方法通过执行一个任务向其他节点同步该注册信息:
/**
* Sends the registration information of {@link InstanceInfo} receiving by
* this node to the peer node represented by this class.
*
* @param info
*
the instance information {@link InstanceInfo} of any instance
*
that is send to this instance.
* @throws Exception
*/
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
刚开始学习Spring-Cloud 如果有什么写的不对的地方,请提出指正。不喜勿喷,谢谢。
最后
以上就是优秀哈密瓜为你收集整理的Spring-Cloud学习之路-Eureka服务注册的全部内容,希望文章能够帮你解决Spring-Cloud学习之路-Eureka服务注册所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复