摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-collapser-execute/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要基于 Hystrix 1.5.X 版本
- 1. 概述
- 2. HystrixCollapser
- 2.1 构造方法
- 2.2 执行命令方式
- 2.3 核心方法
- 3. RequestCollapserFactory
- 4. RequestCollapser
- 4.1 构造方法
- 4.2 RequestBatch
- 4.3 #submitRequest(arg)
- 4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)
- 5. CollapserTimer
- 5.1 RealCollapserTimer
- 5.2 CollapsedTask
1. 概述
本文主要分享 Hystrix 命令合并执行。
在 《【翻译】Hystrix文档-实现原理》「请求合并」 中,对 Hystrix 命令合并执行的概念、原理、使用场景、优缺点已经做了非常详细透彻的分享,所以胖友可以先认真阅读学习下。
命令合并执行整体流程如下图 :
FROM 《【翻译】Hystrix文档-实现原理》「请求合并」
- 第一步,提交单个命令请求到请求队列( RequestQueue )
- 第二部,定时任务( TimerTask ) 固定周期从请求队列获取多个命令执行,合并执行。
在官方提供的示例中,我们通过 CommandCollapserGetValueForKey 熟悉命令合并执行的使用。
推荐 Spring Cloud 书籍:
- 请支持正版。下载盗版,等于主动编写低级 BUG 。
- 程序猿DD —— 《Spring Cloud微服务实战》
- 周立 —— 《Spring Cloud与Docker微服务架构实战》
- 两书齐买,京东包邮。
2. HystrixCollapser
com.netflix.hystrix.HystrixCollapser
,命令合并器抽象父类。
NOTE :
com.netflix.hystrix.HystrixObservableCollapser
,另一种命令合并器抽象父类,本文暂不解析。
2.1 构造方法
HystrixCollapser 构造方法,代码如下 :
|
- BatchReturnType 泛型,多个命令合并执行返回结果类型。
- ResponseType 泛型,单个命令执行返回结果类型。
- RequestArgumentType 泛型,单个命令参数类型。
collapserFactory
属性,RequestCollapser 工厂,在 「3. RequestCollapserFactory」 详细解析。requestCache
属性,TODO 【2012】【请求上下文】collapserInstanceWrapper
属性,命令合并器包装器。com.netflix.hystrix.collapser.HystrixCollapserBridge
接口,点击 链接 查看代码。- HystrixCollapserBridge ,为 RequestBatch 透明调用 HystrixCollapser 或 HystrixObservableCollapser 的方法不同的实现。参见 《桥接模式》 。
metrics
属性,TODO 【2002】【metrics】
2.2 执行命令方式
在 《Hystrix 源码解析 —— 执行命令方式》 中,我们已经看了 HystrixCommand 提供的四种执行命令方式。
HystrixCollapser 类似于 HystrixCommand ,也提供四种相同的执行命令方式,其中如下三种方式代码基本类似,我们就给下传送门,就不重复啰嗦了 :
#observe()
方法 :传送门 。#queue()
方法 :传送门 。#execute()
方法 :传送门 。
下面一起来看看 #toObservable()
方法的实现,代码如下 :
|
observeOn
方法参数,实际方法暂未用到,跳过无视。- 第 11 至 13 行 :缓存存开关、KEY 。
- 【反向】第 32 至 41 行 :获得【缓存 Observable】。这块代码和
AbstractCommand#toObservavle(...)
类似,在 《Hystrix 源码解析 —— 执行结果缓存》「4. AbstractCommand#toObservavle(…)」 有详细解析。 - 【反向】第 44 行 :获得【非缓存 Observable】。
- 注意 :返回的 Observable ,很可能命令实际并未执行,或者说并未执行完成,此时在
#queue()
/#execute()
方法,通过 BlockingObservable 阻塞等待执行完成。BlockingObservable 在 《RxJava 源码解析 —— BlockingObservable》 有详细解析。 - 第 26 行 :调用
RequestCollapserFactory#getRequestCollapser()
,获得 RequestCollapser 。在 「3. RequestCollapserFactory」 详细解析。 - 第 29 行 :提交单个命令请求到请求队列( RequestQueue ),即命令合并执行整体流程第一步。在 「4. RequestCollapser」 详细解析。
2.3 核心方法
#getRequestArgument(...)
抽象方法,获得单个命令参数。代码如下 :public abstract RequestArgumentType getRequestArgument();
#createCommand(...)
抽象方法,将多个命令请求合并,创建一个 HystrixCommand 。代码如下 :protected abstract HystrixCommand<BatchReturnType> createCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
#mapResponseToRequests(...)
抽象方法,将一个 HystrixCommand 的执行结果,映射回对应的命令请求们。protected abstract void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
#shardRequests(...)
方法,将多个命令请求分片成 N 个【多个命令请求】。默认实现下,不进行分片。代码如下 :protected Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shardRequests(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {return Collections.singletonList(requests);}
在未重写
#shardRequests(...)
的情况下,整体方法流程如下 :在重写
#shardRequests(...)
的情况下,整体方法流程如下 :- 本图中命令请求分片仅仅是例子,实际根据重写的逻辑不同而不同。
3. RequestCollapserFactory
com.netflix.hystrix.collapser.RequestCollapserFactory
,RequestCollapser 工厂。
|
timer
属性,命令合并器的定时器,在 「5. CollapserTimer」 详细解析。collapserKey
属性,命令合并器标识,实现类似 HystrixThreadPoolKey 。- HystrixCollapserKey ,点击 链接 查看代码。
- HystrixThreadPoolKey ,在 《Hystrix 源码解析 —— 命令执行(二)之执行隔离策略》「3. HystrixThreadPoolKey」 有详细解析。
properties
属性,命令合并器属性配置。concurrencyStrategy
属性,并发策略,在 《Hystrix 源码解析 —— 命令执行(二)之执行隔离策略》「4. HystrixConcurrencyStrategy」 有详细解析。scope
属性,命令请求作用域。目前有两种作用域 :REQUEST
:请求上下文( HystrixRequestContext )。Typically this means that requests within a single user-request (ie. HTTP request) are collapsed.
No interaction with other user requests.
1 queue per user request.GLOBAL
:全局。Requests from any thread (ie. all HTTP requests) within the JVM will be collapsed.
1 queue for entire app.
调用 #getRequestCollapser()
方法,获得 RequestCollapser 。代码如下 :
|
- 根据
scope
不同,调用两个不同方法,获得 RequestCollapser 。这两个方法大体逻辑相同,优先从缓存中查找满足条件的 RequestCollapser 返回;若不存在,则创建满足条件的 RequestCollapser 添加到缓存并返回。REQUEST
:调用#getCollapserForUserRequest()
方法,TODO 【2012】【请求上下文】。GLOBAL
:调用#getCollapserForGlobalScope()
方法,点击 链接 查看中文注释的代码。
4. RequestCollapser
com.netflix.hystrix.collapser.RequestCollapser
,命令请求合并器。主要用于 :
- 提交单个命令请求到请求队列( RequestQueue )。
- 接收来自定时任务提交的多个命令,合并执行。
4.1 构造方法
RequestCollapser 构造方法,代码如下 :
|
commandCollapser
属性,命令合并器包装器。batch
属性,RequestBatch,即是本文一直说的请求队列。在 「4.2 RequestBatch」 也会详细解析。timerListenerReference
属性,注册在命令合并器的定时器的监听器。每个 RequestCollapser 独有一个监听器。该监听器( 实际上会使用该监听器创建定时任务 )固定周期从请求队列获取多个命令执行,提交 RequestCollapser 合并执行。在 「5. CollapserTimer」 也会详细解析。timerListenerRegistered
属性,timerListenerReference
是否已经注册。timer
属性,命令合并器的定时器。properties
属性,命令合并器属性配置。concurrencyStrategy
属性,并发策略。
4.2 RequestBatch
com.netflix.hystrix.collapser.RequestBatch
,命令请求队列。提供如下功能 :
- 命令请求的添加
- 命令请求的移除
- 命令请求的批量执行。笔者把 RequestBatch 解释成 “命令请求队列”,主要方便大家理解。
- 那可能有胖友有疑问,为啥该功能不在 RequestCollapser 直接实现,这样 RequestBatch 成为纯粹的队列呢?在 「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」 详细解析。
RequestBatch 构造方法,代码如下 :
|
commandCollapser
属性,命令合并器包装器。maxBatchSize
属性,队列最大长度。batchStarted
属性,执行是否开始。argumentMap
属性,命令请求参数映射( 队列 )。properties
属性,命令合并器属性配置。batchLock
属性,argumentMap
操作的读写锁。
RequestBatch 实现队列具体的操作方法,在 「4.3 #submitRequest(arg)」/「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」 一起解析。
4.3 #submitRequest(arg)
在 #toObservable()
方法里,调用 #submitRequest(arg)
方法,提交单个命令请求到 RequestBatch 。代码如下 :
|
- 第 5 至 8 行 :当 RequestCollapser 的监听任务( CollapsedTask )还未创建,进行初始化。
- 第 11 至 35 行 :死循环,直到提交单个命令请求到 RequestBatch 成功。
- 第 13 至 16 行 :获得 RequestBatch 。从目前代码看下来,除非 RequestCollapser 被
#shutdown()
后才会出现为null
的情况。 - 第 19 至 24 行 :调动
RequestBatch#offer(...)
方法,提交单个命令请求到 RequestBatch ,并获得 Observable 。这里对arg == null
做了特殊处理,因为RequestBatch.argumentMap
是 ConcurrentHashMap ,不允许值为null
。另外,RequestBatch#offer(...)
方法的实现代码,在结束了当前方法,详细解析。 - 第 28 至 29 行 :添加成功,返回 Observable 。
- 第 30 至 34 行 :添加失败,执行当前 RequestBatch 的多个命令合并执行,并创建新的 RequestBatch 。在 「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」 详细解析。
- 第 13 至 16 行 :获得 RequestBatch 。从目前代码看下来,除非 RequestCollapser 被
RequestBatch#offer(...)
方法,代码如下 :
|
- 第 4 至 6 行 :执行已经开始,添加失败。在
RequestBatch#executeBatchIfNotAlreadyStarted(...)
方法的开头,优先 CAS 使batchStarted = true
。 - 第 11 行 :获得读锁。
The 'read' just means non-exclusive even though we are writing.
,即使该方法实际在做“写操作”,不排他,线程安全,所以可以使用读锁。 - 第 15 至 17 行 :
double-check
,执行已经开始,添加失败。在RequestBatch#executeBatchIfNotAlreadyStarted(...)
方法,优先 CAS 使batchStarted = true
,再获取写锁,所以会出现该情况。 - 第 20 至 21 行 :超过队列最大长度,添加失败。
第 24 至 25 行 :创建
com.netflix.hystrix.collapser.CollapsedRequestSubject
,并将它添加到队列(argumentMap
) 。- CollapsedRequestSubject 实现
com.netflix.hystrix.HystrixCollapser.CollapsedRequest
接口,定义了批量命令执行的请求,不仅限于获得请求参数(#getArgument()
方法 ),也包括对批量命令执行结束后,每个请求的结果设置(#setResponse(...)
/#emitResponse(...)
/#setException(...)
/#setComplete()
方法 ),点击 链接 查看该接口的代码。 CollapsedRequestSubject 构造方法,代码如下:
/* package */ class CollapsedRequestSubject<T, R> implements CollapsedRequest<T, R> {/*** 参数*/private final R argument;/*** 结果( response ) 是否设置*/private AtomicBoolean valueSet = new AtomicBoolean( false);/*** 可回放的 ReplaySubject*/private final ReplaySubject<T> subject = ReplaySubject.create();/*** 带订阅数量的 ReplaySubject*/private final Observable<T> subjectWithAccounting;/*** 订阅数量*/private volatile int outstandingSubscriptions = 0;public CollapsedRequestSubject(final R arg, final RequestBatch<?, T, R> containingBatch) {// 设置 argumentif (arg == RequestCollapser.NULL_SENTINEL) {this.argument = null;} else {this.argument = arg;}// 设置 带订阅数量的 ReplaySubjectthis.subjectWithAccounting = subject.doOnSubscribe( new Action0() {public void call() {outstandingSubscriptions++;}}).doOnUnsubscribe( new Action0() {public void call() {outstandingSubscriptions--;if (outstandingSubscriptions == 0) {containingBatch.remove(arg);}}});}}argument
属性,单个命令请求参数。valueSet
属性,结果( Response ) 是否设置,通过#setResponse()
/#emitResponse()
方法设置。subject
属性,可回放执行结果的 Subject 。此处使用 ReplaySubject 的主要目的,当 HystrixCollapser 开启缓存功能时,通过回放执行结果,在 《Hystrix 源码解析 —— 执行结果缓存》「5. HystrixCachedObservable」 也有相同的实现。另外,这里有一点要注意下,ReplaySubject 并没有向任何 Observable 订阅结果,而是通过#setResponse()
/#emitResponse()
方法设置结果。outstandingSubscriptions
属性,订阅数量。subjectWithAccounting
属性,带订阅数量的 ReplaySubject 。当取消订阅时,调用RequestBatch#remove(arg)
方法,移除单个命令请求。
- CollapsedRequestSubject 实现
第 38 至 47 行 :返回 Observable 。
- 当
argumentMap
已经存在arg
对应的 Observable 时,必须开启缓存 (HystrixCollapserProperties.requestCachingEnabled = true
) 功能。原因是,如果在相同的arg
,并且未开启缓存,同时第 43 行实现的是collapsedRequest.toObservable()
,那么相同的arg
将有多个 Observable 执行命令,此时HystrixCollapserBridge#mapResponseToRequests(...)
方法无法将执行( Response )赋值到arg
对应的命令请求( CollapsedRequestSubject ) 。更多讨论,见 https://github.com/Netflix/Hystrix/pull/1176 。 - 回过头看
HystrixCollapser#toObservable()
方法的第 32 至 41 行的代码,这里也有对缓存功能,是不是重复了呢?argumentMap
针对的是 RequestBatch 级的缓存,HystrixCollapser : RequestCollapser : RequestBatch 是1 : 1 : N
的关系,通过HystrixCollapser#toObservable()
对缓存的处理逻辑,保证 RequestBatch 切换后,依然有缓存。
- 当
RequestBatch#remove()
方法,代码如下 :
|
- 当 RequestBatch 开始执行,不允许移除单个命令请求。
4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)
本小节建议在 「5. CollapserTimer」 后,再回过头看。
#createNewBatchAndExecutePreviousIfNeeded(previousBatch)
方法,代码如下 :
|
- 第 5 行 :通过 CAS 修改
batch
,保证并发情况下的线程安全。同时注意,此处也进行了新的 RequestBatch ,切换掉老的 RequestBatch 。 - 第 6 行 :使用老的 RequestBatch ,调用
RequestBatch#executeBatchIfNotAlreadyStarted()
方法,命令合并执行。
RequestBatch#executeBatchIfNotAlreadyStarted()
方法,代码如下 :
|
- 代码看起来是有点长哈,请对照着官方示例 CommandCollapserGetValueForKey 一起看,临门一脚了,胖友!
- 第 7 行 :通过 CAS 修改
batchStarted
,保证并发情况下的线程安全。 - 第 10 行 :获得写锁。等待调用
#offer(...)
/#remove(...)
方法的线程执行完成,以保证命令合并执行时,不再有新的请求添加或移除。 - 第 15 行 :调用
HystrixCollapserBridge#shardRequests(...)
方法,将多个命令请求分片成 N 个【多个命令请求】。默认实现下,不进行分片。点击 链接 查看代码。 - 第 17 行 :循环 N 个【多个命令请求】。
- 第 21 行 :调用
HystrixCollapserBridge#createObservableCommand(...)
方法,将多个命令请求合并,创建一个 HystrixCommand 。点击 链接 查看代码。 - 第 24 行 :调用
HystrixCollapserBridge#mapResponseToRequests(...)
方法,将一个 HystrixCommand 的执行结果,映射回对应的命令请求们。点击 链接 查看代码。Observable#single()
方法,如果 Observable 终止时只发射了一个值,返回那个值,否则抛出异常。在 《ReactiveX文档中文翻译》「single」 有相关分享。Observable#ignoreElements()
方法,抑制原始 Observable 发射的所有数据,只允许它的终止通知(#onError()
或#onCompleted()
)通过。在 《ReactiveX文档中文翻译》「IgnoreElements」 有相关分享。也推荐点击rx.internal.operators.OperatorIgnoreElements
看下源码,可能更加易懂。Observable#cast()
方法,将原始 Observable 发射的每一项数据都强制转换为一个指定的类型,然后再发射数据,它是map
的一个特殊版本。在 《ReactiveX文档中文翻译》「cast」 有相关分享。也推荐点击rx.internal.operators.OperatorCast
看下源码,可能更加易懂。- 使用
Observable#ignoreElements()
/Observable#cast()
方法,用于将 Observable 变成不再继续向下发射数据项,只给现有方法里Observable#doNext()
处理数据项,调用HystrixCollapser#mapResponseToRequests(...)
方法。 - 点击 链接 ,查看
CollapsedRequestSubject#setResponse(response)
方法的代码。
- 第 24 至 50 行 :调用
Observable#doError(Action1)
方法,当命令合并执行发生异常时,设置每个 CollapsedRequestSubject 的执行结果为异常。- 点击 链接,查看
CollapsedRequestSubject#setResponse(response)
方法的代码。
- 点击 链接,查看
- 第 52 至 68 行 :调用
Observable#doOnCompleted(Action0)
方法,当命令合并执行完成时,检查每个 CollapsedRequestSubject 是否都有返回结果。设置没有返回结果的 CollapsedRequestSubject 的执行结果为异常。一般情况下,是用户实现HystrixCollapser#mapResponseToRequests(...)
方法存在 BUG 。另外,如果不设置,将导致无结果的单个命令请求无限阻塞。 - 第 70 行 :调用
Observable#subscribe()
方法,触发 HystrixCommand 执行。 - 第 72 至 96 行 :发生异常,设置每个 CollapsedRequestSubject 的执行结果为异常。
- 点击 链接,查看
CollapsedRequestSubject#setException(response)
方法的代码。
- 点击 链接,查看
- 第 97 至 99 行 :释放写锁。
5. CollapserTimer
com.netflix.hystrix.collapser.CollapserTimer
,命令合并器的定时器接口,定义了提交定时监听器,生成定时任务的接口方法,代码如下 :
|
5.1 RealCollapserTimer
com.netflix.hystrix.collapser.RealCollapserTimer
,命令合并器的定时器实现类,代码如下 :
|
- 实际上,使用的是 HystrixTimer 提供的单例。在 《Hystrix 源码解析 —— 执行结果缓存》「3. HystrixTimer
」 有详细解析。
5.2 CollapsedTask
com.netflix.hystrix.collapser.RequestCollapser.CollapsedTask
,定时任务,固定周期( 可配,默认 HystrixCollapserProperties.timerDelayInMilliseconds = 10ms
) 轮询其对应的一个 RequestCollapser 当前 RequestBatch 。若有命令需要执行,则提交 RequestCollapser 合并执行。
代码比较简单,点击 链接 直接看代码。
最后
以上就是爱笑黑猫最近收集整理的关于Hystrix 源码解析 —— 命令合并执行1. 概述2. HystrixCollapser3. RequestCollapserFactory4. RequestCollapser5. CollapserTimer 的全部内容,更多相关Hystrix内容请搜索靠谱客的其他文章。
发表评论 取消回复