我是靠谱客的博主 内向大白,最近开发中收集的这篇文章主要介绍【源码】Spring Cloud —— Hystrix:HystrixCommandAspect AbstractCommand HystrixCircuitBreaker前言版本HystrixCommandAspectAbstractCommand#toObservableAbstractCommand#applyHystrixSemanticsHystrixCircuitBreakerAbstractCommand#executeCommandAndObserveAbstractComman,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

【源码】Spring Cloud —— Hystrix:HystrixCommandAspect AbstractCommand HystrixCircuitBreaker

  • 前言
  • 版本
  • HystrixCommandAspect
    • HystrixCommandAspect
    • ExecutionType
  • AbstractCommand#toObservable
  • AbstractCommand#applyHystrixSemantics
  • HystrixCircuitBreaker
    • HystrixCircuitBreakerImpl
  • AbstractCommand#executeCommandAndObserve
  • AbstractCommand#executeCommandWithSpecifiedIsolation
  • 总结
  • 参考

前言

Hystrix,以类似 断路器 的机制保证失效服务的隔离,同时提供 快速失败、服务降级 等功能,spring-cloud-netflix-hystrix 对其进行适配

本章节结合部分源码了解 Hystrix 的实现

版本

hystrix:1.5.18

HystrixCommandAspect

Hystrix 的实现使用到了 命令设计模式命令模式命令者、命令、命令 的实现解耦开来

Hystrix 借助 Aspect(切面),将服务的调用封装成对应的 命令,它们可以拥有自己的 线程资源 以实现 资源隔离

HystrixCommandAspect

// 是个 Aspect,可以被 spring-aop 解析
@Aspect
public class HystrixCommandAspect {
private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP;
/**
* 静态块维护解析 @HystrixCommand 和 @HystrixCollapser
*
对应的 MetaHolder 的 MetaHolderFactory
*/
static {
META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder()
.put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
.put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
.build();
}
// @HystrixCommand 切点
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}
// HystrixCollapser 切点
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
public void hystrixCollapserAnnotationPointcut() {
}
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
// 获取切点方法
Method method = getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
// 不能同时注解 @HystrixCommand 和 @HystrixCollapser
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
"annotations at the same time");
}
// 获取注解对应 MetaHolderFactory
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
// 解析对应的 MetaHolder
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
// 构建对应的 HystrixInvokable(命令)
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
// 命令类型
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
/**
* 根据不同的命令类型,执行命令,返回结果
*/
Object result;
try {
if (!metaHolder.isObservable()) {
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
// ...
}
return result;
}
// ...
}

HystrixCommandAspect 是一个 Aspect,拦截对应的方法,生成对应的 HystrixInvokable(命令),方法的执行委托给 命令 的执行

可以理解为,Hystrix 对所有注解了 @HystrixCommand@HystrixCollapser 的方法进行代理,将其行为委托给基于切点方法构造的 HystrixInvokable

ExecutionType

public enum ExecutionType {
// 异步
ASYNCHRONOUS,
// 同步
SYNCHRONOUS,
// 响应式(异步回调)
OBSERVABLE;
/**
* 命令的类型取决于切点方法的返回值
* Future -> ASYNCHRONOUS
* Observable -> OBSERVABLE
* 其他 -> SYNCHRONOUS
*/
public static ExecutionType getExecutionType(Class<?> type) {
if (Future.class.isAssignableFrom(type)) {
return ExecutionType.ASYNCHRONOUS;
} else if (Observable.class.isAssignableFrom(type)) {
return ExecutionType.OBSERVABLE;
} else {
return ExecutionType.SYNCHRONOUS;
}
}
}

构造命令的类型取决于切点方法的返回值:

  • ASYNCHRONOUS异步, 命令的返回值为 Future,可使用 Future#get 方法 阻塞式 获取执行结果
  • OBSERVABLE响应式,可获取发射执行结果的 Observable,进行订阅,订阅对应的异步回调方法
  • SYNCHRONOUS同步,同步获取对应的结果

整体上,命令的结果都基于 AbstractCommand#toObservable 方法,将对应的 Observable 转换成 Future,又或者直接 Future#get 同步返回结果,以满足对应的 命令类型

因此,整个命令的封装基于 rxjava 实现:

  • rx响应式编程思想,异步式回调,通过 Observer 订阅 Ovservable 的方式,无需类似 Future#get 方法阻塞式等待结果返回,而是回调式异步获取结果
  • rxjavarx 思想的 java 实现
更多关于 rxjava 的内容,传送门:

【从零学 RxJava】RxJava 1 Hot Observable & Cold Observable、Subject

AbstractCommand#toObservable

	public Observable<R> toObservable() {
final AbstractCommand<R> _cmd = this;
// terminate 回调
final Action0 terminateCommandCleanup = new Action0() {
// ...
};
// unsubscribe 回调
final Action0 unsubscribeCommandCleanup = new Action0() {
// ...
};
// 定义 Func0 创建对应的 Observale
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
return applyHystrixSemantics(_cmd);
}
};
final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {
// ...
};
// OnComplete 回调
final Action0 fireOnCompletedHook = new Action0() {
// ...
};
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// ...
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
/**
* 如果配置允许缓存,试图获取,默认 false
*/
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
/**
* defer 操作符会为每一个订阅者
*
返回一个全新的 Obervable
* 对应的 Obervable 由 applyHystrixSemantics
*
函数创建
*/
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
/**
* 如果允许缓存,缓存对应的 Observable
*/
if (requestCacheEnabled && cacheKey != null) {
// ...
} else {
afterCache = hystrixObservable;
}
// 生命周期回调设置
return afterCache
.doOnTerminate(terminateCommandCleanup)
.doOnUnsubscribe(unsubscribeCommandCleanup)
.doOnCompleted(fireOnCompletedHook);
}
});
}

该方法主要:

  • 缓存相关,试图从缓存获取以及对生成的 Observable 进行缓存
  • 生命周期相关,定义了部分 生命周期回调,比如 doOnTerminate doOnUnsubscribe doOnCompleted
  • 对应的 ObservableapplyHystrixSemantics 函数创建,该函数委托方法 applyHystrixSemantics 创建,此处用到 defer 操作符,可以为每个 订阅者 创建一个全新的 Observable

AbstractCommand#applyHystrixSemantics

	private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
/**
* circuitBreaker:断路器的实现
*
allowRequest:是否允许请求,即断路器是否闭合
*/
if (circuitBreaker.allowRequest()) {
// 信号量获取
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
// 信号释放回调
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
// 信号释放
executionSemaphore.release();
}
}
};
// OnError 回调
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
// ...
};
/**
* 获取信号,并返回对应的 Observable
*/
if (executionSemaphore.tryAcquire()) {
try {
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
// 信号量获取失败回调
return handleSemaphoreRejectionViaFallback();
}
} else {
// 断路器断开回调处理
return handleShortCircuitViaFallback();
}
}
  • 判断是否允许发送请求,这基于 断路器 实现,如果 断路器 打开,则进行对应回调处理(失败或降级)
  • 如果 断路器 关闭,则进行请求,先获取 信号,获取失败则处理对应回调
  • 获取成功,则由方法 executeCommandAndObserve
    创建对应的 Observable 实现 线程隔离、请求发送 等操作,同时注册了对应的 生命周期回调

HystrixCircuitBreaker

Hystrix 使用断路器,对请求失败达到 阈值 的服务,打开 断路器,断路器有如下状态:

  • 闭合,正常发送请求,当请求失败达到 阈值 后,断开
  • 断开,此时不进行服务调用,直接返回失败(或者降级处理),并设置一个 重置时间,在该时间后,断路器半开
  • 半开,允许发送请求,在调用成功达到一定条件后关闭断路器,否则再次打开

断路器 的逻辑主要由 HystrixCircuitBreaker 实现

public interface HystrixCircuitBreaker {
// 是否允许请求发送
public boolean allowRequest();
// 断路器是否打开
public boolean isOpen();
// 断路器半开时关闭
/* package */void markSuccess();
// ...
}

定义了三个方法,同时提供了两个内部实现类 HystrixCircuitBreakerImplNoOpCircuitBreaker,后者空实现,默认可以发送请求,重点关注 HystrixCircuitBreakerImpl

HystrixCircuitBreakerImpl

	/* package */static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
// HystrixCommand 配置
private final HystrixCommandProperties properties;
// HystrixCommand 监控指标
private final HystrixCommandMetrics metrics;
// 断路器开关
private AtomicBoolean circuitOpen = new AtomicBoolean(false);
// 断路器打开时间
private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();
/**
* CAS 操作关闭断路器
*/
public void markSuccess() {
if (circuitOpen.get()) {
if (circuitOpen.compareAndSet(true, false)) {
metrics.resetStream();
}
}
}
@Override
public boolean allowRequest() {
/**
* 如果配置强制打开断路器,
*
返回 false
*/
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
/**
* 如果配置强制关闭断路器,返回 true,但会执行 isOpen 方法
*
以保证强制关闭接触后,断路器状态仍然正确
*
(失效服务的断路器是开启的)
*/
if (properties.circuitBreakerForceClosed().get()) {
isOpen();
return true;
}
/**
* 如果断路器关闭,返回 true
* 如果断路器打开,则执行 allowSingleTest,
*
判断是否超过重置时间,如果是则半开
*/
return !isOpen() || allowSingleTest();
}
/**
* 如果断开超过设置的 重置时间,则半开
*/
public boolean allowSingleTest() {
long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
return true;
}
}
return false;
}
@Override
public boolean isOpen() {
// 断路器打开,则直接返回
if (circuitOpen.get()) {
return true;
}
// 获取服务的健康指标
HealthCounts health = metrics.getHealthCounts();
// 请求数不到阈值,返回 false
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
return false;
}
// 失败请求数未达到阈值,返回 false
if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
return false;
} else {
/**
* 失败请求达到阈值,打开断路器并更新打开时间
*
此处是 CAS 操作,如果竞争失败,其实
*
并不影响结果(断路器打开了,时间设置了)
*
因此直接返回 true 即可
*/
if (circuitOpen.compareAndSet(false, true)) {
circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
return true;
} else {
return true;
}
}
}
}

HystrixCircuitBreakerImpl 的实现并不复杂,因此此处贴出了所有代码以及详细注释,这里再进行简单的总结:

  • markSuccess:半开状态下关闭断路器,即对 开关 进行 CAS 关闭操作

  • allowRequest:是否允许请求,取决于 断路器 状态

    • 如果配置强制开启 断路器,则返回 false,拒绝发送请求
    • 如果配置强制关闭 断路器,则返回 true,允许发送请求,但是同时也要记录真正的 断路器 状态,在该设置被取消时,可以正确返回 断路器 状态
    • 正常状态下,如果 断路器 关闭,则返回 true,但如果断开,则由 allowSingleTest 方法判断是否需要 半开
    • allowSingleTest:断路器 打开时间超过配置的 重置时间,则 半开,返回 true
  • isOpen:如果 断路器 打开,直接返回 true,如果是关闭的,则根据其服务指标,决定是否打开断路器

AbstractCommand#executeCommandAndObserve

	private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
// ...
// doOnCompleted 回调
final Action0 markOnCompleted = new Action0() {
// ...
};
/**
* 各种失败回调处理
*/
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
// 线程调度失败回调
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
// 超时回调
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
// HystrixBadRequestException 异常回调
return handleBadRequestByEmittingError(e);
} else {
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
// 降级处理
return handleFailureViaFallback(e);
}
}
};
// doOnEach 回调
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
@Override
public void call(Notification<? super R> rNotification) {
setRequestContextIfNeeded(currentRequestContext);
}
};
/**
* executeCommandWithSpecifiedIsolation 方法
*
创建对应的 Observable,实现 线程隔离、请求发送
*
等操作
* 如果需要超时监控,借助 HystrixObservableTimeoutOperator
*
转换对应的 Observable
*/
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
// 回调设置
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}

此方法主要针对各种异常进行了 回调处理,比如 线程调度异常 服务调用超时 等,同时由 executeCommandWithSpecifiedIsolation 创建对应的 Observable 实现 线程隔离、请求发送

对于需要 超时监控Observable,借助 HystrixObservableTimeoutOperator 转换成对应的 Observable

AbstractCommand#executeCommandWithSpecifiedIsolation

	private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
// 线程隔离
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// ...
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
// ...
try {
// ...
// 请求服务
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
} else {
// ...
}
}
// OnTerminate 回调
}).doOnTerminate(new Action0() {
// ...
// OnUnsubscribe 回调
}).doOnUnsubscribe(new Action0() {
// ...
/**
* subscribeOn 通过接受一个 Scheduler,来指定
*
对数据的处理运行在特定的 Scheduler 上
* 此处的 Scheduler 由 threadPool.getScheduler
*
方法提供
*/
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
// 非线程隔离
} else {
// ...
}
}

该方法主要实现服务请求的 线程隔离,它借助 Observable#subscribeOn 实现,subscribeOn 通过接受一个 Scheduler,来指定对数据的处理运行在特定的 Scheduler 上,该 SchedulerthreadPool.getScheduler 提供

最后,服务的调用由方法 getUserExecutionObservable 实现,其本质便是返回一个发射服务调用结果的 Observable,不再深究

总结

至此,我们结合部分源码对 Hystrix 的实现及原理做了简单解析,Hystrix 源码大量使用 RxJava,大量的回调方法导致代码十分庞大,是 RxJava 教科书般的使用案例

同时,Hystrix 源码用到了并不常用的设计模式:命令设计模式,将 服务调用 封装成对应的 命令,实现了 命令调用者、命令和命令实现 的解耦

参考

《Spring Cloud 微服务架构进阶》 —— 朱荣鑫 张天 黄迪璇

最后

以上就是内向大白为你收集整理的【源码】Spring Cloud —— Hystrix:HystrixCommandAspect AbstractCommand HystrixCircuitBreaker前言版本HystrixCommandAspectAbstractCommand#toObservableAbstractCommand#applyHystrixSemanticsHystrixCircuitBreakerAbstractCommand#executeCommandAndObserveAbstractComman的全部内容,希望文章能够帮你解决【源码】Spring Cloud —— Hystrix:HystrixCommandAspect AbstractCommand HystrixCircuitBreaker前言版本HystrixCommandAspectAbstractCommand#toObservableAbstractCommand#applyHystrixSemanticsHystrixCircuitBreakerAbstractCommand#executeCommandAndObserveAbstractComman所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(61)

评论列表共有 0 条评论

立即
投稿
返回
顶部