摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-collapser-execute/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要基于 Hystrix 1.5.X 版本
- 1. 概述
- 2. HystrixCollapser
- 2.1 构造方法
- 2.2 执行命令方式
- 2.3 核心方法
- 3. RequestCollapserFactory
- 4. RequestCollapser
- 4.1 构造方法
- 4.2 RequestBatch
- 4.3 #submitRequest(arg)
- 4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)
- 5. CollapserTimer
- 5.1 RealCollapserTimer
- 5.2 CollapsedTask
1. 概述
本文主要分享 Hystrix 命令合并执行。
在 《【翻译】Hystrix文档-实现原理》「请求合并」 中,对 Hystrix 命令合并执行的概念、原理、使用场景、优缺点已经做了非常详细透彻的分享,所以胖友可以先认真阅读学习下。
命令合并执行整体流程如下图 :
FROM 《【翻译】Hystrix文档-实现原理》「请求合并」
- 第一步,提交单个命令请求到请求队列( RequestQueue )
- 第二部,定时任务( TimerTask ) 固定周期从请求队列获取多个命令执行,合并执行。
在官方提供的示例中,我们通过 CommandCollapserGetValueForKey 熟悉命令合并执行的使用。
推荐 Spring Cloud 书籍:
- 请支持正版。下载盗版,等于主动编写低级 BUG 。
- 程序猿DD —— 《Spring Cloud微服务实战》
- 周立 —— 《Spring Cloud与Docker微服务架构实战》
- 两书齐买,京东包邮。
2. HystrixCollapser
com.netflix.hystrix.HystrixCollapser ,命令合并器抽象父类。
NOTE :
com.netflix.hystrix.HystrixObservableCollapser,另一种命令合并器抽象父类,本文暂不解析。
2.1 构造方法
HystrixCollapser 构造方法,代码如下 :
|
- BatchReturnType 泛型,多个命令合并执行返回结果类型。
- ResponseType 泛型,单个命令执行返回结果类型。
- RequestArgumentType 泛型,单个命令参数类型。
collapserFactory属性,RequestCollapser 工厂,在 「3. RequestCollapserFactory」 详细解析。requestCache属性,TODO 【2012】【请求上下文】collapserInstanceWrapper属性,命令合并器包装器。com.netflix.hystrix.collapser.HystrixCollapserBridge接口,点击 链接 查看代码。- HystrixCollapserBridge ,为 RequestBatch 透明调用 HystrixCollapser 或 HystrixObservableCollapser 的方法不同的实现。参见 《桥接模式》 。
metrics属性,TODO 【2002】【metrics】
2.2 执行命令方式
在 《Hystrix 源码解析 —— 执行命令方式》 中,我们已经看了 HystrixCommand 提供的四种执行命令方式。
HystrixCollapser 类似于 HystrixCommand ,也提供四种相同的执行命令方式,其中如下三种方式代码基本类似,我们就给下传送门,就不重复啰嗦了 :
#observe()方法 :传送门 。#queue()方法 :传送门 。#execute()方法 :传送门 。
下面一起来看看 #toObservable() 方法的实现,代码如下 :
|
observeOn方法参数,实际方法暂未用到,跳过无视。- 第 11 至 13 行 :缓存存开关、KEY 。
- 【反向】第 32 至 41 行 :获得【缓存 Observable】。这块代码和
AbstractCommand#toObservavle(...)类似,在 《Hystrix 源码解析 —— 执行结果缓存》「4. AbstractCommand#toObservavle(…)」 有详细解析。 - 【反向】第 44 行 :获得【非缓存 Observable】。
- 注意 :返回的 Observable ,很可能命令实际并未执行,或者说并未执行完成,此时在
#queue()/#execute()方法,通过 BlockingObservable 阻塞等待执行完成。BlockingObservable 在 《RxJava 源码解析 —— BlockingObservable》 有详细解析。 - 第 26 行 :调用
RequestCollapserFactory#getRequestCollapser(),获得 RequestCollapser 。在 「3. RequestCollapserFactory」 详细解析。 - 第 29 行 :提交单个命令请求到请求队列( RequestQueue ),即命令合并执行整体流程第一步。在 「4. RequestCollapser」 详细解析。
2.3 核心方法
#getRequestArgument(...)抽象方法,获得单个命令参数。代码如下 :public abstract RequestArgumentType getRequestArgument();
#createCommand(...)抽象方法,将多个命令请求合并,创建一个 HystrixCommand 。代码如下 :protected abstract HystrixCommand<BatchReturnType> createCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
#mapResponseToRequests(...)抽象方法,将一个 HystrixCommand 的执行结果,映射回对应的命令请求们。protected abstract void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
#shardRequests(...)方法,将多个命令请求分片成 N 个【多个命令请求】。默认实现下,不进行分片。代码如下 :protected Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shardRequests(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {return Collections.singletonList(requests);}
在未重写
#shardRequests(...)的情况下,整体方法流程如下 :
在重写
#shardRequests(...)的情况下,整体方法流程如下 :
- 本图中命令请求分片仅仅是例子,实际根据重写的逻辑不同而不同。
3. RequestCollapserFactory
com.netflix.hystrix.collapser.RequestCollapserFactory ,RequestCollapser 工厂。
|
timer属性,命令合并器的定时器,在 「5. CollapserTimer」 详细解析。collapserKey属性,命令合并器标识,实现类似 HystrixThreadPoolKey 。- HystrixCollapserKey ,点击 链接 查看代码。
- HystrixThreadPoolKey ,在 《Hystrix 源码解析 —— 命令执行(二)之执行隔离策略》「3. HystrixThreadPoolKey」 有详细解析。
properties属性,命令合并器属性配置。concurrencyStrategy属性,并发策略,在 《Hystrix 源码解析 —— 命令执行(二)之执行隔离策略》「4. HystrixConcurrencyStrategy」 有详细解析。scope属性,命令请求作用域。目前有两种作用域 :REQUEST:请求上下文( HystrixRequestContext )。Typically this means that requests within a single user-request (ie. HTTP request) are collapsed.
No interaction with other user requests.
1 queue per user request.GLOBAL:全局。Requests from any thread (ie. all HTTP requests) within the JVM will be collapsed.
1 queue for entire app.
调用 #getRequestCollapser() 方法,获得 RequestCollapser 。代码如下 :
|
- 根据
scope不同,调用两个不同方法,获得 RequestCollapser 。这两个方法大体逻辑相同,优先从缓存中查找满足条件的 RequestCollapser 返回;若不存在,则创建满足条件的 RequestCollapser 添加到缓存并返回。REQUEST:调用#getCollapserForUserRequest()方法,TODO 【2012】【请求上下文】。GLOBAL:调用#getCollapserForGlobalScope()方法,点击 链接 查看中文注释的代码。
4. RequestCollapser
com.netflix.hystrix.collapser.RequestCollapser ,命令请求合并器。主要用于 :
- 提交单个命令请求到请求队列( RequestQueue )。
- 接收来自定时任务提交的多个命令,合并执行。
4.1 构造方法
RequestCollapser 构造方法,代码如下 :
|
commandCollapser属性,命令合并器包装器。batch属性,RequestBatch,即是本文一直说的请求队列。在 「4.2 RequestBatch」 也会详细解析。timerListenerReference属性,注册在命令合并器的定时器的监听器。每个 RequestCollapser 独有一个监听器。该监听器( 实际上会使用该监听器创建定时任务 )固定周期从请求队列获取多个命令执行,提交 RequestCollapser 合并执行。在 「5. CollapserTimer」 也会详细解析。timerListenerRegistered属性,timerListenerReference是否已经注册。timer属性,命令合并器的定时器。properties属性,命令合并器属性配置。concurrencyStrategy属性,并发策略。
4.2 RequestBatch
com.netflix.hystrix.collapser.RequestBatch ,命令请求队列。提供如下功能 :
- 命令请求的添加
- 命令请求的移除
- 命令请求的批量执行。笔者把 RequestBatch 解释成 “命令请求队列”,主要方便大家理解。
- 那可能有胖友有疑问,为啥该功能不在 RequestCollapser 直接实现,这样 RequestBatch 成为纯粹的队列呢?在 「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」 详细解析。
RequestBatch 构造方法,代码如下 :
|
commandCollapser属性,命令合并器包装器。maxBatchSize属性,队列最大长度。batchStarted属性,执行是否开始。argumentMap属性,命令请求参数映射( 队列 )。properties属性,命令合并器属性配置。batchLock属性,argumentMap操作的读写锁。
RequestBatch 实现队列具体的操作方法,在 「4.3 #submitRequest(arg)」/「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」 一起解析。
4.3 #submitRequest(arg)
在 #toObservable() 方法里,调用 #submitRequest(arg) 方法,提交单个命令请求到 RequestBatch 。代码如下 :
|
- 第 5 至 8 行 :当 RequestCollapser 的监听任务( CollapsedTask )还未创建,进行初始化。
- 第 11 至 35 行 :死循环,直到提交单个命令请求到 RequestBatch 成功。
- 第 13 至 16 行 :获得 RequestBatch 。从目前代码看下来,除非 RequestCollapser 被
#shutdown()后才会出现为null的情况。 - 第 19 至 24 行 :调动
RequestBatch#offer(...)方法,提交单个命令请求到 RequestBatch ,并获得 Observable 。这里对arg == null做了特殊处理,因为RequestBatch.argumentMap是 ConcurrentHashMap ,不允许值为null。另外,RequestBatch#offer(...)方法的实现代码,在结束了当前方法,详细解析。 - 第 28 至 29 行 :添加成功,返回 Observable 。
- 第 30 至 34 行 :添加失败,执行当前 RequestBatch 的多个命令合并执行,并创建新的 RequestBatch 。在 「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」 详细解析。
- 第 13 至 16 行 :获得 RequestBatch 。从目前代码看下来,除非 RequestCollapser 被
RequestBatch#offer(...) 方法,代码如下 :
|
- 第 4 至 6 行 :执行已经开始,添加失败。在
RequestBatch#executeBatchIfNotAlreadyStarted(...)方法的开头,优先 CAS 使batchStarted = true。 - 第 11 行 :获得读锁。
The 'read' just means non-exclusive even though we are writing.,即使该方法实际在做“写操作”,不排他,线程安全,所以可以使用读锁。 - 第 15 至 17 行 :
double-check,执行已经开始,添加失败。在RequestBatch#executeBatchIfNotAlreadyStarted(...)方法,优先 CAS 使batchStarted = true,再获取写锁,所以会出现该情况。 - 第 20 至 21 行 :超过队列最大长度,添加失败。
第 24 至 25 行 :创建
com.netflix.hystrix.collapser.CollapsedRequestSubject,并将它添加到队列(argumentMap) 。- CollapsedRequestSubject 实现
com.netflix.hystrix.HystrixCollapser.CollapsedRequest接口,定义了批量命令执行的请求,不仅限于获得请求参数(#getArgument()方法 ),也包括对批量命令执行结束后,每个请求的结果设置(#setResponse(...)/#emitResponse(...)/#setException(...)/#setComplete()方法 ),点击 链接 查看该接口的代码。 CollapsedRequestSubject 构造方法,代码如下:
/* package */ class CollapsedRequestSubject<T, R> implements CollapsedRequest<T, R> {/*** 参数*/private final R argument;/*** 结果( response ) 是否设置*/private AtomicBoolean valueSet = new AtomicBoolean( false);/*** 可回放的 ReplaySubject*/private final ReplaySubject<T> subject = ReplaySubject.create();/*** 带订阅数量的 ReplaySubject*/private final Observable<T> subjectWithAccounting;/*** 订阅数量*/private volatile int outstandingSubscriptions = 0;public CollapsedRequestSubject(final R arg, final RequestBatch<?, T, R> containingBatch) {// 设置 argumentif (arg == RequestCollapser.NULL_SENTINEL) {this.argument = null;} else {this.argument = arg;}// 设置 带订阅数量的 ReplaySubjectthis.subjectWithAccounting = subject.doOnSubscribe( new Action0() {public void call() {outstandingSubscriptions++;}}).doOnUnsubscribe( new Action0() {public void call() {outstandingSubscriptions--;if (outstandingSubscriptions == 0) {containingBatch.remove(arg);}}});}}argument属性,单个命令请求参数。valueSet属性,结果( Response ) 是否设置,通过#setResponse()/#emitResponse()方法设置。subject属性,可回放执行结果的 Subject 。此处使用 ReplaySubject 的主要目的,当 HystrixCollapser 开启缓存功能时,通过回放执行结果,在 《Hystrix 源码解析 —— 执行结果缓存》「5. HystrixCachedObservable」 也有相同的实现。另外,这里有一点要注意下,ReplaySubject 并没有向任何 Observable 订阅结果,而是通过#setResponse()/#emitResponse()方法设置结果。outstandingSubscriptions属性,订阅数量。subjectWithAccounting属性,带订阅数量的 ReplaySubject 。当取消订阅时,调用RequestBatch#remove(arg)方法,移除单个命令请求。
- CollapsedRequestSubject 实现
第 38 至 47 行 :返回 Observable 。
- 当
argumentMap已经存在arg对应的 Observable 时,必须开启缓存 (HystrixCollapserProperties.requestCachingEnabled = true) 功能。原因是,如果在相同的arg,并且未开启缓存,同时第 43 行实现的是collapsedRequest.toObservable(),那么相同的arg将有多个 Observable 执行命令,此时HystrixCollapserBridge#mapResponseToRequests(...)方法无法将执行( Response )赋值到arg对应的命令请求( CollapsedRequestSubject ) 。更多讨论,见 https://github.com/Netflix/Hystrix/pull/1176 。 - 回过头看
HystrixCollapser#toObservable()方法的第 32 至 41 行的代码,这里也有对缓存功能,是不是重复了呢?argumentMap针对的是 RequestBatch 级的缓存,HystrixCollapser : RequestCollapser : RequestBatch 是1 : 1 : N的关系,通过HystrixCollapser#toObservable()对缓存的处理逻辑,保证 RequestBatch 切换后,依然有缓存。
- 当
RequestBatch#remove() 方法,代码如下 :
|
- 当 RequestBatch 开始执行,不允许移除单个命令请求。
4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)
本小节建议在 「5. CollapserTimer」 后,再回过头看。
#createNewBatchAndExecutePreviousIfNeeded(previousBatch) 方法,代码如下 :
|
- 第 5 行 :通过 CAS 修改
batch,保证并发情况下的线程安全。同时注意,此处也进行了新的 RequestBatch ,切换掉老的 RequestBatch 。 - 第 6 行 :使用老的 RequestBatch ,调用
RequestBatch#executeBatchIfNotAlreadyStarted()方法,命令合并执行。
RequestBatch#executeBatchIfNotAlreadyStarted() 方法,代码如下 :
|
- 代码看起来是有点长哈,请对照着官方示例 CommandCollapserGetValueForKey 一起看,临门一脚了,胖友!
- 第 7 行 :通过 CAS 修改
batchStarted,保证并发情况下的线程安全。 - 第 10 行 :获得写锁。等待调用
#offer(...)/#remove(...)方法的线程执行完成,以保证命令合并执行时,不再有新的请求添加或移除。 - 第 15 行 :调用
HystrixCollapserBridge#shardRequests(...)方法,将多个命令请求分片成 N 个【多个命令请求】。默认实现下,不进行分片。点击 链接 查看代码。 - 第 17 行 :循环 N 个【多个命令请求】。
- 第 21 行 :调用
HystrixCollapserBridge#createObservableCommand(...)方法,将多个命令请求合并,创建一个 HystrixCommand 。点击 链接 查看代码。 - 第 24 行 :调用
HystrixCollapserBridge#mapResponseToRequests(...)方法,将一个 HystrixCommand 的执行结果,映射回对应的命令请求们。点击 链接 查看代码。Observable#single()方法,如果 Observable 终止时只发射了一个值,返回那个值,否则抛出异常。在 《ReactiveX文档中文翻译》「single」 有相关分享。Observable#ignoreElements()方法,抑制原始 Observable 发射的所有数据,只允许它的终止通知(#onError()或#onCompleted())通过。在 《ReactiveX文档中文翻译》「IgnoreElements」 有相关分享。也推荐点击rx.internal.operators.OperatorIgnoreElements看下源码,可能更加易懂。Observable#cast()方法,将原始 Observable 发射的每一项数据都强制转换为一个指定的类型,然后再发射数据,它是map的一个特殊版本。在 《ReactiveX文档中文翻译》「cast」 有相关分享。也推荐点击rx.internal.operators.OperatorCast看下源码,可能更加易懂。- 使用
Observable#ignoreElements()/Observable#cast()方法,用于将 Observable 变成不再继续向下发射数据项,只给现有方法里Observable#doNext()处理数据项,调用HystrixCollapser#mapResponseToRequests(...)方法。 - 点击 链接 ,查看
CollapsedRequestSubject#setResponse(response)方法的代码。
- 第 24 至 50 行 :调用
Observable#doError(Action1)方法,当命令合并执行发生异常时,设置每个 CollapsedRequestSubject 的执行结果为异常。- 点击 链接,查看
CollapsedRequestSubject#setResponse(response)方法的代码。
- 点击 链接,查看
- 第 52 至 68 行 :调用
Observable#doOnCompleted(Action0)方法,当命令合并执行完成时,检查每个 CollapsedRequestSubject 是否都有返回结果。设置没有返回结果的 CollapsedRequestSubject 的执行结果为异常。一般情况下,是用户实现HystrixCollapser#mapResponseToRequests(...)方法存在 BUG 。另外,如果不设置,将导致无结果的单个命令请求无限阻塞。 - 第 70 行 :调用
Observable#subscribe()方法,触发 HystrixCommand 执行。 - 第 72 至 96 行 :发生异常,设置每个 CollapsedRequestSubject 的执行结果为异常。
- 点击 链接,查看
CollapsedRequestSubject#setException(response)方法的代码。
- 点击 链接,查看
- 第 97 至 99 行 :释放写锁。
5. CollapserTimer
com.netflix.hystrix.collapser.CollapserTimer ,命令合并器的定时器接口,定义了提交定时监听器,生成定时任务的接口方法,代码如下 :
|
5.1 RealCollapserTimer
com.netflix.hystrix.collapser.RealCollapserTimer ,命令合并器的定时器实现类,代码如下 :
|
- 实际上,使用的是 HystrixTimer 提供的单例。在 《Hystrix 源码解析 —— 执行结果缓存》「3. HystrixTimer
」 有详细解析。
5.2 CollapsedTask
com.netflix.hystrix.collapser.RequestCollapser.CollapsedTask ,定时任务,固定周期( 可配,默认 HystrixCollapserProperties.timerDelayInMilliseconds = 10ms ) 轮询其对应的一个 RequestCollapser 当前 RequestBatch 。若有命令需要执行,则提交 RequestCollapser 合并执行。
代码比较简单,点击 链接 直接看代码。
最后
以上就是爱笑黑猫最近收集整理的关于Hystrix 源码解析 —— 命令合并执行1. 概述2. HystrixCollapser3. RequestCollapserFactory4. RequestCollapser5. CollapserTimer 的全部内容,更多相关Hystrix内容请搜索靠谱客的其他文章。

发表评论 取消回复