我是靠谱客的博主 任性小伙,最近开发中收集的这篇文章主要介绍Eureka架构篇 - 服务续约与自我保护机制前言Peer to peer 架构源码分析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

前言

服务续约

默认情况下,客户端的服务实例每隔30秒向Eureka服务端发送一次心跳。如果90秒之内Eureka服务端没有收到服务实例的心跳,该服务实例会被执行剔除任务的线程(每隔60秒执行一次)从注册的服务实例列表中剔除。

自我保护机制

如果15分钟之内,心跳发送失败的比例低于85%,就会触发Eureka服务端的自我保护机制。Eureka不会剔除通信不正常的服务实例,并且仍然接收客户端的服务的注册与服务的查询。但是不会与其它Eureka服务端节点进行同步。自我保护机制是一种针对网络异常波动的安全保护措施,可以使Eureka集群更加的健壮、稳定的运行。

Peer to peer 架构

Eureka对于多个副本之间的复制方式并没有采用主从复制,而是选择了对等复制,即peer to peer的模式。副本之间没有主从,每个副本都可以处理读写请求,通过彼此之间的数据复制使数据进行同步更新。但是数据同步可能会出现冲突。Eureka对此的解决方案是校验lastDirtyTimestamp

校验lastDirtyTimestamp:在peer节点之间的复制请求中,如果其它peer节点传递的服务实例的lastDirtyTimestamp大于当前服务端本地存储的服务实例的lastDirtyTimestamp,则返回404,要求服务实例重新注册;如果小于,则返回409,要求其同步最新的数据信息。

源码分析

接下来对Eureka的服务续约的分析分为客户端逻辑和服务端逻辑两个部分。

服务续约(客户端逻辑)

一、DiscoveryClient

在 DiscoveryClient 构造器方法中的 initScheduledTasks 方法,有如下内容:

// 获取客户端需要向Eureka服务端发送心跳的时间间隔,默认30秒
// 可以在配置文件中指定 eureka.instance.leaseRenewalIntervalInSeconds 来修改默认值
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();

// 获取心跳任务的指数补偿的相关属性,默认10
// 可以在配置文件中指定 eureka.client.heartbeatExecutorExponentialBackOffBound 来修改默认值
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();

logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

// 创建心跳任务
heartbeatTask = new TimedSupervisorTask(
        "heartbeat",
        scheduler,
        heartbeatExecutor,
        renewalIntervalInSecs,
        TimeUnit.SECONDS,
        expBackOffBound,
        new HeartbeatThread()
);
// 初始延迟30秒执行心跳任务,之后每隔30秒重复执行一次
scheduler.schedule(
        heartbeatTask,
        renewalIntervalInSecs, TimeUnit.SECONDS);

二、TimedSupervisorTask

@Override
public void run() {
    Future<?> future = null;
    try {
      	// 执行 HeartbeatThread#run 方法
        future = executor.submit(task);
        threadPoolLevelGauge.set((long) executor.getActiveCount());
        future.get(timeoutMillis, TimeUnit.MILLISECONDS); 
        delay.set(timeoutMillis);
        threadPoolLevelGauge.set((long) executor.getActiveCount());
        successCounter.increment();
    } catch (TimeoutException e) {
        logger.warn("task supervisor timed out", e);
        timeoutCounter.increment();
        long currentDelay = delay.get();
        long newDelay = Math.min(maxDelay, currentDelay * 2);
        delay.compareAndSet(currentDelay, newDelay);
    } catch (RejectedExecutionException e) {
        if (executor.isShutdown() || scheduler.isShutdown()) {
            logger.warn("task supervisor shutting down, reject the task", e);
        } else {
            logger.warn("task supervisor rejected the task", e);
        }
        rejectedCounter.increment();
    } catch (Throwable e) {
        if (executor.isShutdown() || scheduler.isShutdown()) {
            logger.warn("task supervisor shutting down, can't accept the task");
        } else {
            logger.warn("task supervisor threw an exception", e);
        }
        throwableCounter.increment();
    } finally {
        if (future != null) {
            future.cancel(true);
        }
        if (!scheduler.isShutdown()) {
          	// 每隔30秒重复执行一次
            scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
        }
    }
}

三、HeartbeatThread

private class HeartbeatThread implements Runnable {

    public void run() {
      	// 如果续约成功
        if (renew()) {
          	// 更新lastSuccessfulHeartbeatTimestamp属性为当前时间
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}

boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
      	// 向服务端发起服务续约
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
      	// 如果响应的状态码是404,则需要重新注册服务
        if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
            REREGISTER_COUNTER.increment();
            logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
            long timestamp = instanceInfo.setIsDirtyWithTime();
          	// 服务重新注册
            boolean success = register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
            return success;
        }
      	// 用响应的状态码是否是200来判断是否续约成功
        return httpResponse.getStatusCode() == Status.OK.getStatusCode();
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
        return false;
    }
}

服务续约(服务端逻辑)

一、InstanceResource

@PUT
public Response renewLease(
        @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
        @QueryParam("overriddenstatus") String overriddenStatus,
        @QueryParam("status") String status,
        @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    boolean isFromReplicaNode = "true".equals(isReplication);
  	// 服务续约
    boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
	// 如果续约失败,返回404
    if (!isSuccess) {
        logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
        return Response.status(Status.NOT_FOUND).build();
    }
    Response response;
  	// 客户端传递的lastDirtyTimestamp不为空,并且服务端开启了“时间戳不一致时进行同步”(默认开启)
  	// 可以通过 eureka.server.syncWhenTimestampDiffers 属性来修改默认值
    if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
      // 校验客户端传递的lastDirtyTimestamp与服务端本地的lastDirtyTimestamp
      response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
        if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                && (overriddenStatus != null)
                && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                && isFromReplicaNode) {
            registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
        }
    // 其它情况,返回200
    } else {
        response = Response.ok().build();
    }
    logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
    return response;
}

validateDirtyTimestamp

private Response validateDirtyTimestamp(Long lastDirtyTimestamp,
                                        boolean isReplication) {
  	// 从registry缓存中获取指定应用名称、服务实例id对应的服务实例
    InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);
    if (appInfo != null) {
        if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) {
            Object[] args = {id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication};
			// 如果客户端传递的lastDirtyTimestamp大于服务端本地的dirtyTimestamp,则返回404
            if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) {
                logger.debug(
                        "Time to sync, since the last dirty timestamp differs -"
                                + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
                        args);
                return Response.status(Status.NOT_FOUND).build();
            // 如果客户端传递的lastDirtyTimestamp小于服务端本地的dirtyTimestamp
            } else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) {
              	// 如果是peer节点的复制请求,则返回409
                if (isReplication) {
                    logger.debug(
                            "Time to sync, since the last dirty timestamp differs -"
                                    + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
                            args);
                    return Response.status(Status.CONFLICT).entity(appInfo).build();
                // 否则返回200
                } else {
                    return Response.ok().build();
                }
            }
        }

    }
  	// 其余情况返回200
    return Response.ok().build();
}

二、AbstractInstanceRegistry

public boolean renew(String appName, String id, boolean isReplication) {
    RENEW.increment(isReplication);
  	// 从registry缓存中获取指定应用名称对应的gMap
    Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
    Lease<InstanceInfo> leaseToRenew = null;
  	// 如果registry缓存命中
    if (gMap != null) {
      	// 再从缓存gMap中获取指定实例id对应的Lease<InstanceInfo>实例
      	// id:类似于“192.168.124.3:spring-cloud-order:8081”形式
        leaseToRenew = gMap.get(id);
    }
  	// 如果缓存未命中
    if (leaseToRenew == null) {
        RENEW_NOT_FOUND.increment(isReplication);
        logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
      	// 返回false,表示续约失败
        return false;
    // 如果缓存命中
    } else {
        InstanceInfo instanceInfo = leaseToRenew.getHolder();
        if (instanceInfo != null) {
          	// todo
            InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                    instanceInfo, leaseToRenew, isReplication);
          	// 如果服务端获取到的实例状态是“UNKNOWN”,则返回false,表示续约失败
            if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                        + "; re-register required", instanceInfo.getId());
                RENEW_NOT_FOUND.increment(isReplication);
                return false;
            }
          	// 如果客户端传递的实例状态与服务端获取到的实例状态不一致,则覆盖客户端的实例状态
            if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                logger.info(
                        "The instance status {} is different from overridden instance status {} for instance {}. "
                                + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                overriddenInstanceStatus.name(),
                                instanceInfo.getId());
                instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

            }
        }
        renewsLastMin.increment();
      	// 对缓存中的Lease<InstanceInfo>实例的lastUpdateTimestamp属性在当前时间基础上加90秒
      	// 90秒是由客户端传递的 eureka.instance.leaseExpirationDurationInSeconds 属性的值
        leaseToRenew.renew();
      	// 返回true,表示续约成功
        return true;
    }
}

自我保护机制

一、EurekaServerInitializerConfiguration

SmartLicycle 接口的实现类 - Spring容器加载完所有的bean,并且初始化完成之后执行其关于生命周期的方法,比如 start、stop 等。

@Override
public void start() {
   new Thread(() -> {
      try {
         // EurekaServerBootstrap#contextInitialized 核心方法
         eurekaServerBootstrap.contextInitialized(
               EurekaServerInitializerConfiguration.this.servletContext);
         log.info("Started Eureka Server");
		 // 发布EurekaRegistryAvailableEvent事件
         publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
         EurekaServerInitializerConfiguration.this.running = true;
         // 发布EurekaServerStartedEvent事件
         publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
      }
      catch (Exception ex) {
         log.error("Could not initialize Eureka servlet context", ex);
      }
   }).start();
}

二、EurekaServerBootstrap

public void contextInitialized(ServletContext context) {
   try {
   	  // 初始化Eureka的环境变量
      initEurekaEnvironment();
   	  // 初始化Eureka的上下文(其中包括调用PeerAwareInstanceRegistryImpl#openForTraffic)
      initEurekaServerContext();

      context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
   }
   catch (Throwable e) {
      log.error("Cannot bootstrap eureka server :", e);
      throw new RuntimeException("Cannot bootstrap eureka server :", e);
   }
}

三、PeerAwareInstanceRegistryImpl

@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    this.expectedNumberOfClientsSendingRenews = count;
  	// 更新服务端每分钟应该收到的服务续约数量
    updateRenewsPerMinThreshold();
    logger.info("Got {} instances from neighboring DS node", count);
    logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
    this.startupTime = System.currentTimeMillis();
    if (count > 0) {
        this.peerInstancesTransferEmptyOnStartup = false;
    }
    DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    boolean isAws = Name.Amazon == selfName;
    if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
        logger.info("Priming AWS connections for all replicas..");
        primeAwsReplicas(applicationInfoManager);
    }
    logger.info("Changing status to UP");
  	// 设置实例状态为“UP”,并且触发StatusChangeListener监听器对StatusChangeEvent事件的处理
    applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
    super.postInit();
}

protected void postInit() {
    renewsLastMin.start();
    if (evictionTaskRef.get() != null) {
      	// 取消上一次未完成的驱逐任务的执行
        evictionTaskRef.get().cancel();
    }
    evictionTaskRef.set(new EvictionTask());
  	// 默认每隔60秒执行一次驱逐任务
  	// 可以通过 eureka.server.evictionIntervalTimerInMs 属性修改默认值
    evictionTimer.schedule(evictionTaskRef.get(),
            serverConfig.getEvictionIntervalTimerInMs(),
            serverConfig.getEvictionIntervalTimerInMs());
}

protected void updateRenewsPerMinThreshold() {
	// 计算每分钟应该收到的续约数量的阈值
    this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
            * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
            * serverConfig.getRenewalPercentThreshold());
}

值得注意的是,PeerAwareInstanceRegistryImpl#init 方法中也会调用该方法,默认每隔15分钟统计一次每分钟应该收到的续约数量的阈值。可以通过配置文件中 eureka.server.renewalThresholdUpdateIntervalMs 属性修改,默认值15分钟。

四、EvictionTask

@Override
public void run() {
    try {
      	// 获取补偿时间
        long compensationTimeMs = getCompensationTimeMs();
        logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
        evict(compensationTimeMs);
    } catch (Throwable e) {
        logger.error("Could not run the evict task", e);
    }
}

五、AbstractInstanceRegistry

long getCompensationTimeMs() {
  	// 获取当前时间
    long currNanos = getCurrentTimeNano();
  	// 对lastExecutionNanosRef缓存设置新值并返回旧值
    long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
  	// 如果旧值为0,则直接返回0
    if (lastNanos == 0l) {
        return 0l;
    }
	// 计算执行时间,用当前时间减去旧值
    long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
  	// 计算补偿时间,用执行时间减去驱逐任务的执行间隔(默认60秒)
    // 可以通过 eureka.server.evictionIntervalTimerInMs 属性修改默认值
    long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
  	// 补偿时间与0比较,取最大值
    return compensationTime <= 0l ? 0l : compensationTime;
}
public void evict(long additionalLeaseMs) {
    logger.debug("Running the evict task");
	// 如果服务端开启了自我保护机制(默认开启),并且触发了自我保护机制,即最近一分钟收到的续约数<=阈值,则直接返回
  	// 可以通过 eureka.server.enableSelfPreservation 属性修改默认值
    if (!isLeaseExpirationEnabled()) {
        logger.debug("DS: lease expiration is currently disabled.");
        return;
    }
		
  	// 执行到这儿,说明服务端没有开启自我保护机制,或者开启了但是没有触发自我保护机制
    List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
  	// 遍历registry缓存
    for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
        Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
        if (leaseMap != null) {
          	// 遍历leaseMap
            for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                Lease<InstanceInfo> lease = leaseEntry.getValue();
              	// 如果服务实例的租约失效(也就是指定时间内服务端没有收到该服务实例的心跳)
                if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                  	// 添加到失效列表中
                    expiredLeases.add(lease);
                }
            }
        }
    }
	// 获取registry缓存中的value数量
    int registrySize = (int) getLocalRegistrySize();
    int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
    int evictionLimit = registrySize - registrySizeThreshold;

    int toEvict = Math.min(expiredLeases.size(), evictionLimit);
    if (toEvict > 0) {
        logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < toEvict; i++) {
            int next = i + random.nextInt(expiredLeases.size() - i);
            Collections.swap(expiredLeases, i, next);
          	// 从失效列表中选取服务实例
            Lease<InstanceInfo> lease = expiredLeases.get(i);
            String appName = lease.getHolder().getAppName();
            String id = lease.getHolder().getId();
            EXPIRED.increment();
            logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
          	// 剔除该失效的服务实例
            internalCancel(appName, id, false);
        }
    }
}

protected boolean internalCancel(String appName, String id, boolean isReplication) {
    read.lock();
    try {
        CANCEL.increment(isReplication);
      	// 从registry缓存中获取指定应用名称的gMap
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToCancel = null;
        if (gMap != null) {
          	// 从gMap中删除该实例
            leaseToCancel = gMap.remove(id);
        }
        recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
      	// 从overriddenInstanceStatusMap缓存中删除该实例
        InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
        if (instanceStatus != null) {
            logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
        }
        if (leaseToCancel == null) {
            CANCEL_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
            return false;
        } else {
            leaseToCancel.cancel();
            InstanceInfo instanceInfo = leaseToCancel.getHolder();
            String vip = null;
            String svip = null;
            if (instanceInfo != null) {
                instanceInfo.setActionType(ActionType.DELETED);
                // 将服务实例经过封装放到recentlyChangedQueue队列中
                recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                instanceInfo.setLastUpdatedTimestamp();
                vip = instanceInfo.getVIPAddress();
                svip = instanceInfo.getSecureVipAddress();
            }
          	// 失效读写缓存
            invalidateCache(appName, vip, svip);
            logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
        }
    } finally {
        read.unlock();
    }

    synchronized (lock) {
        if (this.expectedNumberOfClientsSendingRenews > 0) {
          	// 对应该收到的客户端续约的数量减一
            this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
          	// 更新每分钟应该收到的续约数量的阈值
            updateRenewsPerMinThreshold();
        }
    }

    return true;
}

todo

服务续约-(服务端逻辑)

1、AbstractInstanceRegistry#getOverriddenInstanceStatus

最后

以上就是任性小伙为你收集整理的Eureka架构篇 - 服务续约与自我保护机制前言Peer to peer 架构源码分析的全部内容,希望文章能够帮你解决Eureka架构篇 - 服务续约与自我保护机制前言Peer to peer 架构源码分析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(52)

评论列表共有 0 条评论

立即
投稿
返回
顶部