我是靠谱客的博主 内向大白,这篇文章主要介绍【源码】Spring Cloud —— Hystrix:HystrixCommandAspect AbstractCommand HystrixCircuitBreaker前言版本HystrixCommandAspectAbstractCommand#toObservableAbstractCommand#applyHystrixSemanticsHystrixCircuitBreakerAbstractCommand#executeCommandAndObserveAbstractComman,现在分享给大家,希望可以做个参考。

【源码】Spring Cloud —— Hystrix:HystrixCommandAspect AbstractCommand HystrixCircuitBreaker

  • 前言
  • 版本
  • HystrixCommandAspect
    • HystrixCommandAspect
    • ExecutionType
  • AbstractCommand#toObservable
  • AbstractCommand#applyHystrixSemantics
  • HystrixCircuitBreaker
    • HystrixCircuitBreakerImpl
  • AbstractCommand#executeCommandAndObserve
  • AbstractCommand#executeCommandWithSpecifiedIsolation
  • 总结
  • 参考

前言

Hystrix,以类似 断路器 的机制保证失效服务的隔离,同时提供 快速失败、服务降级 等功能,spring-cloud-netflix-hystrix 对其进行适配

本章节结合部分源码了解 Hystrix 的实现

版本

hystrix:1.5.18

HystrixCommandAspect

Hystrix 的实现使用到了 命令设计模式命令模式命令者、命令、命令 的实现解耦开来

Hystrix 借助 Aspect(切面),将服务的调用封装成对应的 命令,它们可以拥有自己的 线程资源 以实现 资源隔离

HystrixCommandAspect

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// 是个 Aspect,可以被 spring-aop 解析 @Aspect public class HystrixCommandAspect { private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP; /** * 静态块维护解析 @HystrixCommand 和 @HystrixCollapser * 对应的 MetaHolder 的 MetaHolderFactory */ static { META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder() .put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory()) .put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory()) .build(); } // @HystrixCommand 切点 @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)") public void hystrixCommandAnnotationPointcut() { } // HystrixCollapser 切点 @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)") public void hystrixCollapserAnnotationPointcut() { } @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()") public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable { // 获取切点方法 Method method = getMethodFromTarget(joinPoint); Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint); // 不能同时注解 @HystrixCommand 和 @HystrixCollapser if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) { throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " + "annotations at the same time"); } // 获取注解对应 MetaHolderFactory MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method)); // 解析对应的 MetaHolder MetaHolder metaHolder = metaHolderFactory.create(joinPoint); // 构建对应的 HystrixInvokable(命令) HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder); // 命令类型 ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType(); /** * 根据不同的命令类型,执行命令,返回结果 */ Object result; try { if (!metaHolder.isObservable()) { result = CommandExecutor.execute(invokable, executionType, metaHolder); } else { result = executeObservable(invokable, executionType, metaHolder); } } catch (HystrixBadRequestException e) { // ... } return result; } // ... }

HystrixCommandAspect 是一个 Aspect,拦截对应的方法,生成对应的 HystrixInvokable(命令),方法的执行委托给 命令 的执行

可以理解为,Hystrix 对所有注解了 @HystrixCommand@HystrixCollapser 的方法进行代理,将其行为委托给基于切点方法构造的 HystrixInvokable

ExecutionType

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public enum ExecutionType { // 异步 ASYNCHRONOUS, // 同步 SYNCHRONOUS, // 响应式(异步回调) OBSERVABLE; /** * 命令的类型取决于切点方法的返回值 * Future -> ASYNCHRONOUS * Observable -> OBSERVABLE * 其他 -> SYNCHRONOUS */ public static ExecutionType getExecutionType(Class<?> type) { if (Future.class.isAssignableFrom(type)) { return ExecutionType.ASYNCHRONOUS; } else if (Observable.class.isAssignableFrom(type)) { return ExecutionType.OBSERVABLE; } else { return ExecutionType.SYNCHRONOUS; } } }

构造命令的类型取决于切点方法的返回值:

  • ASYNCHRONOUS异步, 命令的返回值为 Future,可使用 Future#get 方法 阻塞式 获取执行结果
  • OBSERVABLE响应式,可获取发射执行结果的 Observable,进行订阅,订阅对应的异步回调方法
  • SYNCHRONOUS同步,同步获取对应的结果

整体上,命令的结果都基于 AbstractCommand#toObservable 方法,将对应的 Observable 转换成 Future,又或者直接 Future#get 同步返回结果,以满足对应的 命令类型

因此,整个命令的封装基于 rxjava 实现:

  • rx响应式编程思想,异步式回调,通过 Observer 订阅 Ovservable 的方式,无需类似 Future#get 方法阻塞式等待结果返回,而是回调式异步获取结果
  • rxjavarx 思想的 java 实现
复制代码
1
2
更多关于 rxjava 的内容,传送门:

【从零学 RxJava】RxJava 1 Hot Observable & Cold Observable、Subject

AbstractCommand#toObservable

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public Observable<R> toObservable() { final AbstractCommand<R> _cmd = this; // terminate 回调 final Action0 terminateCommandCleanup = new Action0() { // ... }; // unsubscribe 回调 final Action0 unsubscribeCommandCleanup = new Action0() { // ... }; // 定义 Func0 创建对应的 Observale final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() { @Override public Observable<R> call() { if (commandState.get().equals(CommandState.UNSUBSCRIBED)) { return Observable.never(); } return applyHystrixSemantics(_cmd); } }; final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() { // ... }; // OnComplete 回调 final Action0 fireOnCompletedHook = new Action0() { // ... }; return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { // ... final boolean requestCacheEnabled = isRequestCachingEnabled(); final String cacheKey = getCacheKey(); /** * 如果配置允许缓存,试图获取,默认 false */ if (requestCacheEnabled) { HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); if (fromCache != null) { isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } /** * defer 操作符会为每一个订阅者 * 返回一个全新的 Obervable * 对应的 Obervable 由 applyHystrixSemantics * 函数创建 */ Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics) .map(wrapWithAllOnNextHooks); Observable<R> afterCache; /** * 如果允许缓存,缓存对应的 Observable */ if (requestCacheEnabled && cacheKey != null) { // ... } else { afterCache = hystrixObservable; } // 生命周期回调设置 return afterCache .doOnTerminate(terminateCommandCleanup) .doOnUnsubscribe(unsubscribeCommandCleanup) .doOnCompleted(fireOnCompletedHook); } }); }

该方法主要:

  • 缓存相关,试图从缓存获取以及对生成的 Observable 进行缓存
  • 生命周期相关,定义了部分 生命周期回调,比如 doOnTerminate doOnUnsubscribe doOnCompleted
  • 对应的 ObservableapplyHystrixSemantics 函数创建,该函数委托方法 applyHystrixSemantics 创建,此处用到 defer 操作符,可以为每个 订阅者 创建一个全新的 Observable

AbstractCommand#applyHystrixSemantics

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { /** * circuitBreaker:断路器的实现 * allowRequest:是否允许请求,即断路器是否闭合 */ if (circuitBreaker.allowRequest()) { // 信号量获取 final TryableSemaphore executionSemaphore = getExecutionSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); // 信号释放回调 final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { // 信号释放 executionSemaphore.release(); } } }; // OnError 回调 final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() { // ... }; /** * 获取信号,并返回对应的 Observable */ if (executionSemaphore.tryAcquire()) { try { executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { // 信号量获取失败回调 return handleSemaphoreRejectionViaFallback(); } } else { // 断路器断开回调处理 return handleShortCircuitViaFallback(); } }
  • 判断是否允许发送请求,这基于 断路器 实现,如果 断路器 打开,则进行对应回调处理(失败或降级)
  • 如果 断路器 关闭,则进行请求,先获取 信号,获取失败则处理对应回调
  • 获取成功,则由方法 executeCommandAndObserve
    创建对应的 Observable 实现 线程隔离、请求发送 等操作,同时注册了对应的 生命周期回调

HystrixCircuitBreaker

Hystrix 使用断路器,对请求失败达到 阈值 的服务,打开 断路器,断路器有如下状态:

  • 闭合,正常发送请求,当请求失败达到 阈值 后,断开
  • 断开,此时不进行服务调用,直接返回失败(或者降级处理),并设置一个 重置时间,在该时间后,断路器半开
  • 半开,允许发送请求,在调用成功达到一定条件后关闭断路器,否则再次打开

断路器 的逻辑主要由 HystrixCircuitBreaker 实现

复制代码
1
2
3
4
5
6
7
8
9
10
public interface HystrixCircuitBreaker { // 是否允许请求发送 public boolean allowRequest(); // 断路器是否打开 public boolean isOpen(); // 断路器半开时关闭 /* package */void markSuccess(); // ... }

定义了三个方法,同时提供了两个内部实现类 HystrixCircuitBreakerImplNoOpCircuitBreaker,后者空实现,默认可以发送请求,重点关注 HystrixCircuitBreakerImpl

HystrixCircuitBreakerImpl

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/* package */static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker { // HystrixCommand 配置 private final HystrixCommandProperties properties; // HystrixCommand 监控指标 private final HystrixCommandMetrics metrics; // 断路器开关 private AtomicBoolean circuitOpen = new AtomicBoolean(false); // 断路器打开时间 private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong(); /** * CAS 操作关闭断路器 */ public void markSuccess() { if (circuitOpen.get()) { if (circuitOpen.compareAndSet(true, false)) { metrics.resetStream(); } } } @Override public boolean allowRequest() { /** * 如果配置强制打开断路器, * 返回 false */ if (properties.circuitBreakerForceOpen().get()) { return false; } /** * 如果配置强制关闭断路器,返回 true,但会执行 isOpen 方法 * 以保证强制关闭接触后,断路器状态仍然正确 * (失效服务的断路器是开启的) */ if (properties.circuitBreakerForceClosed().get()) { isOpen(); return true; } /** * 如果断路器关闭,返回 true * 如果断路器打开,则执行 allowSingleTest, * 判断是否超过重置时间,如果是则半开 */ return !isOpen() || allowSingleTest(); } /** * 如果断开超过设置的 重置时间,则半开 */ public boolean allowSingleTest() { long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get(); if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) { if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) { return true; } } return false; } @Override public boolean isOpen() { // 断路器打开,则直接返回 if (circuitOpen.get()) { return true; } // 获取服务的健康指标 HealthCounts health = metrics.getHealthCounts(); // 请求数不到阈值,返回 false if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { return false; } // 失败请求数未达到阈值,返回 false if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { return false; } else { /** * 失败请求达到阈值,打开断路器并更新打开时间 * 此处是 CAS 操作,如果竞争失败,其实 * 并不影响结果(断路器打开了,时间设置了) * 因此直接返回 true 即可 */ if (circuitOpen.compareAndSet(false, true)) { circuitOpenedOrLastTestedTime.set(System.currentTimeMillis()); return true; } else { return true; } } } }

HystrixCircuitBreakerImpl 的实现并不复杂,因此此处贴出了所有代码以及详细注释,这里再进行简单的总结:

  • markSuccess:半开状态下关闭断路器,即对 开关 进行 CAS 关闭操作

  • allowRequest:是否允许请求,取决于 断路器 状态

    • 如果配置强制开启 断路器,则返回 false,拒绝发送请求
    • 如果配置强制关闭 断路器,则返回 true,允许发送请求,但是同时也要记录真正的 断路器 状态,在该设置被取消时,可以正确返回 断路器 状态
    • 正常状态下,如果 断路器 关闭,则返回 true,但如果断开,则由 allowSingleTest 方法判断是否需要 半开
    • allowSingleTest:断路器 打开时间超过配置的 重置时间,则 半开,返回 true
  • isOpen:如果 断路器 打开,直接返回 true,如果是关闭的,则根据其服务指标,决定是否打开断路器

AbstractCommand#executeCommandAndObserve

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { // ... // doOnCompleted 回调 final Action0 markOnCompleted = new Action0() { // ... }; /** * 各种失败回调处理 */ final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call(Throwable t) { Exception e = getExceptionFromThrowable(t); executionResult = executionResult.setExecutionException(e); if (e instanceof RejectedExecutionException) { // 线程调度失败回调 return handleThreadPoolRejectionViaFallback(e); } else if (t instanceof HystrixTimeoutException) { // 超时回调 return handleTimeoutViaFallback(); } else if (t instanceof HystrixBadRequestException) { // HystrixBadRequestException 异常回调 return handleBadRequestByEmittingError(e); } else { if (e instanceof HystrixBadRequestException) { eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey); return Observable.error(e); } // 降级处理 return handleFailureViaFallback(e); } } }; // doOnEach 回调 final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() { @Override public void call(Notification<? super R> rNotification) { setRequestContextIfNeeded(currentRequestContext); } }; /** * executeCommandWithSpecifiedIsolation 方法 * 创建对应的 Observable,实现 线程隔离、请求发送 * 等操作 * 如果需要超时监控,借助 HystrixObservableTimeoutOperator * 转换对应的 Observable */ Observable<R> execution; if (properties.executionTimeoutEnabled().get()) { execution = executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { execution = executeCommandWithSpecifiedIsolation(_cmd); } // 回调设置 return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); }

此方法主要针对各种异常进行了 回调处理,比如 线程调度异常 服务调用超时 等,同时由 executeCommandWithSpecifiedIsolation 创建对应的 Observable 实现 线程隔离、请求发送

对于需要 超时监控Observable,借助 HystrixObservableTimeoutOperator 转换成对应的 Observable

AbstractCommand#executeCommandWithSpecifiedIsolation

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) { // 线程隔离 if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { // ... if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) { // ... try { // ... // 请求服务 return getUserExecutionObservable(_cmd); } catch (Throwable ex) { return Observable.error(ex); } } else { // ... } } // OnTerminate 回调 }).doOnTerminate(new Action0() { // ... // OnUnsubscribe 回调 }).doOnUnsubscribe(new Action0() { // ... /** * subscribeOn 通过接受一个 Scheduler,来指定 * 对数据的处理运行在特定的 Scheduler 上 * 此处的 Scheduler 由 threadPool.getScheduler * 方法提供 */ }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() { @Override public Boolean call() { return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; } })); // 非线程隔离 } else { // ... } }

该方法主要实现服务请求的 线程隔离,它借助 Observable#subscribeOn 实现,subscribeOn 通过接受一个 Scheduler,来指定对数据的处理运行在特定的 Scheduler 上,该 SchedulerthreadPool.getScheduler 提供

最后,服务的调用由方法 getUserExecutionObservable 实现,其本质便是返回一个发射服务调用结果的 Observable,不再深究

总结

至此,我们结合部分源码对 Hystrix 的实现及原理做了简单解析,Hystrix 源码大量使用 RxJava,大量的回调方法导致代码十分庞大,是 RxJava 教科书般的使用案例

同时,Hystrix 源码用到了并不常用的设计模式:命令设计模式,将 服务调用 封装成对应的 命令,实现了 命令调用者、命令和命令实现 的解耦

参考

《Spring Cloud 微服务架构进阶》 —— 朱荣鑫 张天 黄迪璇

最后

以上就是内向大白最近收集整理的关于【源码】Spring Cloud —— Hystrix:HystrixCommandAspect AbstractCommand HystrixCircuitBreaker前言版本HystrixCommandAspectAbstractCommand#toObservableAbstractCommand#applyHystrixSemanticsHystrixCircuitBreakerAbstractCommand#executeCommandAndObserveAbstractComman的全部内容,更多相关【源码】Spring内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(71)

评论列表共有 0 条评论

立即
投稿
返回
顶部