概述
摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-collapser-execute/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要基于 Hystrix 1.5.X 版本
- 1. 概述
- 2. HystrixCollapser
- 2.1 构造方法
- 2.2 执行命令方式
- 2.3 核心方法
- 3. RequestCollapserFactory
- 4. RequestCollapser
- 4.1 构造方法
- 4.2 RequestBatch
- 4.3 #submitRequest(arg)
- 4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)
- 5. CollapserTimer
- 5.1 RealCollapserTimer
- 5.2 CollapsedTask
1. 概述
本文主要分享 Hystrix 命令合并执行。
在 《【翻译】Hystrix文档-实现原理》「请求合并」 中,对 Hystrix 命令合并执行的概念、原理、使用场景、优缺点已经做了非常详细透彻的分享,所以胖友可以先认真阅读学习下。
命令合并执行整体流程如下图 :
FROM 《【翻译】Hystrix文档-实现原理》「请求合并」
- 第一步,提交单个命令请求到请求队列( RequestQueue )
- 第二部,定时任务( TimerTask ) 固定周期从请求队列获取多个命令执行,合并执行。
在官方提供的示例中,我们通过 CommandCollapserGetValueForKey 熟悉命令合并执行的使用。
推荐 Spring Cloud 书籍:
- 请支持正版。下载盗版,等于主动编写低级 BUG 。
- 程序猿DD —— 《Spring Cloud微服务实战》
- 周立 —— 《Spring Cloud与Docker微服务架构实战》
- 两书齐买,京东包邮。
2. HystrixCollapser
com.netflix.hystrix.HystrixCollapser
,命令合并器抽象父类。
NOTE :
com.netflix.hystrix.HystrixObservableCollapser
,另一种命令合并器抽象父类,本文暂不解析。
2.1 构造方法
HystrixCollapser 构造方法,代码如下 :
|
- BatchReturnType 泛型,多个命令合并执行返回结果类型。
- ResponseType 泛型,单个命令执行返回结果类型。
- RequestArgumentType 泛型,单个命令参数类型。
collapserFactory
属性,RequestCollapser 工厂,在 「3. RequestCollapserFactory」 详细解析。requestCache
属性,TODO 【2012】【请求上下文】collapserInstanceWrapper
属性,命令合并器包装器。com.netflix.hystrix.collapser.HystrixCollapserBridge
接口,点击 链接 查看代码。- HystrixCollapserBridge ,为 RequestBatch 透明调用 HystrixCollapser 或 HystrixObservableCollapser 的方法不同的实现。参见 《桥接模式》 。
metrics
属性,TODO 【2002】【metrics】
2.2 执行命令方式
在 《Hystrix 源码解析 —— 执行命令方式》 中,我们已经看了 HystrixCommand 提供的四种执行命令方式。
HystrixCollapser 类似于 HystrixCommand ,也提供四种相同的执行命令方式,其中如下三种方式代码基本类似,我们就给下传送门,就不重复啰嗦了 :
#observe()
方法 :传送门 。#queue()
方法 :传送门 。#execute()
方法 :传送门 。
下面一起来看看 #toObservable()
方法的实现,代码如下 :
|
observeOn
方法参数,实际方法暂未用到,跳过无视。- 第 11 至 13 行 :缓存存开关、KEY 。
- 【反向】第 32 至 41 行 :获得【缓存 Observable】。这块代码和
AbstractCommand#toObservavle(...)
类似,在 《Hystrix 源码解析 —— 执行结果缓存》「4. AbstractCommand#toObservavle(…)」 有详细解析。 - 【反向】第 44 行 :获得【非缓存 Observable】。
- 注意 :返回的 Observable ,很可能命令实际并未执行,或者说并未执行完成,此时在
#queue()
/#execute()
方法,通过 BlockingObservable 阻塞等待执行完成。BlockingObservable 在 《RxJava 源码解析 —— BlockingObservable》 有详细解析。 - 第 26 行 :调用
RequestCollapserFactory#getRequestCollapser()
,获得 RequestCollapser 。在 「3. RequestCollapserFactory」 详细解析。 - 第 29 行 :提交单个命令请求到请求队列( RequestQueue ),即命令合并执行整体流程第一步。在 「4. RequestCollapser」 详细解析。
2.3 核心方法
#getRequestArgument(...)
抽象方法,获得单个命令参数。代码如下 :public abstract RequestArgumentType getRequestArgument();
#createCommand(...)
抽象方法,将多个命令请求合并,创建一个 HystrixCommand 。代码如下 :protected abstract HystrixCommand<BatchReturnType> createCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
#mapResponseToRequests(...)
抽象方法,将一个 HystrixCommand 的执行结果,映射回对应的命令请求们。protected abstract void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
#shardRequests(...)
方法,将多个命令请求分片成 N 个【多个命令请求】。默认实现下,不进行分片。代码如下 :protected Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shardRequests(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {return Collections.singletonList(requests);}
在未重写
#shardRequests(...)
的情况下,整体方法流程如下 :在重写
#shardRequests(...)
的情况下,整体方法流程如下 :- 本图中命令请求分片仅仅是例子,实际根据重写的逻辑不同而不同。
3. RequestCollapserFactory
com.netflix.hystrix.collapser.RequestCollapserFactory
,RequestCollapser 工厂。
|
timer
属性,命令合并器的定时器,在 「5. CollapserTimer」 详细解析。collapserKey
属性,命令合并器标识,实现类似 HystrixThreadPoolKey 。- HystrixCollapserKey ,点击 链接 查看代码。
- HystrixThreadPoolKey ,在 《Hystrix 源码解析 —— 命令执行(二)之执行隔离策略》「3. HystrixThreadPoolKey」 有详细解析。
properties
属性,命令合并器属性配置。concurrencyStrategy
属性,并发策略,在 《Hystrix 源码解析 —— 命令执行(二)之执行隔离策略》「4. HystrixConcurrencyStrategy」 有详细解析。scope
属性,命令请求作用域。目前有两种作用域 :REQUEST
:请求上下文( HystrixRequestContext )。Typically this means that requests within a single user-request (ie. HTTP request) are collapsed.
No interaction with other user requests.
1 queue per user request.GLOBAL
:全局。Requests from any thread (ie. all HTTP requests) within the JVM will be collapsed.
1 queue for entire app.
调用 #getRequestCollapser()
方法,获得 RequestCollapser 。代码如下 :
|
- 根据
scope
不同,调用两个不同方法,获得 RequestCollapser 。这两个方法大体逻辑相同,优先从缓存中查找满足条件的 RequestCollapser 返回;若不存在,则创建满足条件的 RequestCollapser 添加到缓存并返回。REQUEST
:调用#getCollapserForUserRequest()
方法,TODO 【2012】【请求上下文】。GLOBAL
:调用#getCollapserForGlobalScope()
方法,点击 链接 查看中文注释的代码。
4. RequestCollapser
com.netflix.hystrix.collapser.RequestCollapser
,命令请求合并器。主要用于 :
- 提交单个命令请求到请求队列( RequestQueue )。
- 接收来自定时任务提交的多个命令,合并执行。
4.1 构造方法
RequestCollapser 构造方法,代码如下 :
|
commandCollapser
属性,命令合并器包装器。batch
属性,RequestBatch,即是本文一直说的请求队列。在 「4.2 RequestBatch」 也会详细解析。timerListenerReference
属性,注册在命令合并器的定时器的监听器。每个 RequestCollapser 独有一个监听器。该监听器( 实际上会使用该监听器创建定时任务 )固定周期从请求队列获取多个命令执行,提交 RequestCollapser 合并执行。在 「5. CollapserTimer」 也会详细解析。timerListenerRegistered
属性,timerListenerReference
是否已经注册。timer
属性,命令合并器的定时器。properties
属性,命令合并器属性配置。concurrencyStrategy
属性,并发策略。
4.2 RequestBatch
com.netflix.hystrix.collapser.RequestBatch
,命令请求队列。提供如下功能 :
- 命令请求的添加
- 命令请求的移除
- 命令请求的批量执行。笔者把 RequestBatch 解释成 “命令请求队列”,主要方便大家理解。
- 那可能有胖友有疑问,为啥该功能不在 RequestCollapser 直接实现,这样 RequestBatch 成为纯粹的队列呢?在 「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」 详细解析。
RequestBatch 构造方法,代码如下 :
|
commandCollapser
属性,命令合并器包装器。maxBatchSize
属性,队列最大长度。batchStarted
属性,执行是否开始。argumentMap
属性,命令请求参数映射( 队列 )。properties
属性,命令合并器属性配置。batchLock
属性,argumentMap
操作的读写锁。
RequestBatch 实现队列具体的操作方法,在 「4.3 #submitRequest(arg)」/「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」 一起解析。
4.3 #submitRequest(arg)
在 #toObservable()
方法里,调用 #submitRequest(arg)
方法,提交单个命令请求到 RequestBatch 。代码如下 :
|
- 第 5 至 8 行 :当 RequestCollapser 的监听任务( CollapsedTask )还未创建,进行初始化。
- 第 11 至 35 行 :死循环,直到提交单个命令请求到 RequestBatch 成功。
- 第 13 至 16 行 :获得 RequestBatch 。从目前代码看下来,除非 RequestCollapser 被
#shutdown()
后才会出现为null
的情况。 - 第 19 至 24 行 :调动
RequestBatch#offer(...)
方法,提交单个命令请求到 RequestBatch ,并获得 Observable 。这里对arg == null
做了特殊处理,因为RequestBatch.argumentMap
是 ConcurrentHashMap ,不允许值为null
。另外,RequestBatch#offer(...)
方法的实现代码,在结束了当前方法,详细解析。 - 第 28 至 29 行 :添加成功,返回 Observable 。
- 第 30 至 34 行 :添加失败,执行当前 RequestBatch 的多个命令合并执行,并创建新的 RequestBatch 。在 「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」 详细解析。
- 第 13 至 16 行 :获得 RequestBatch 。从目前代码看下来,除非 RequestCollapser 被
RequestBatch#offer(...)
方法,代码如下 :
|
- 第 4 至 6 行 :执行已经开始,添加失败。在
RequestBatch#executeBatchIfNotAlreadyStarted(...)
方法的开头,优先 CAS 使batchStarted = true
。 - 第 11 行 :获得读锁。
The 'read' just means non-exclusive even though we are writing.
,即使该方法实际在做“写操作”,不排他,线程安全,所以可以使用读锁。 - 第 15 至 17 行 :
double-check
,执行已经开始,添加失败。在RequestBatch#executeBatchIfNotAlreadyStarted(...)
方法,优先 CAS 使batchStarted = true
,再获取写锁,所以会出现该情况。 - 第 20 至 21 行 :超过队列最大长度,添加失败。
第 24 至 25 行 :创建
com.netflix.hystrix.collapser.CollapsedRequestSubject
,并将它添加到队列(argumentMap
) 。- CollapsedRequestSubject 实现
com.netflix.hystrix.HystrixCollapser.CollapsedRequest
接口,定义了批量命令执行的请求,不仅限于获得请求参数(#getArgument()
方法 ),也包括对批量命令执行结束后,每个请求的结果设置(#setResponse(...)
/#emitResponse(...)
/#setException(...)
/#setComplete()
方法 ),点击 链接 查看该接口的代码。 CollapsedRequestSubject 构造方法,代码如下:
/* package */ class CollapsedRequestSubject<T, R> implements CollapsedRequest<T, R> {/*** 参数*/private final R argument;/*** 结果( response ) 是否设置*/private AtomicBoolean valueSet = new AtomicBoolean( false);/*** 可回放的 ReplaySubject*/private final ReplaySubject<T> subject = ReplaySubject.create();/*** 带订阅数量的 ReplaySubject*/private final Observable<T> subjectWithAccounting;/*** 订阅数量*/private volatile int outstandingSubscriptions = 0;public CollapsedRequestSubject(final R arg, final RequestBatch<?, T, R> containingBatch) {// 设置 argumentif (arg == RequestCollapser.NULL_SENTINEL) {this.argument = null;} else {this.argument = arg;}// 设置 带订阅数量的 ReplaySubjectthis.subjectWithAccounting = subject.doOnSubscribe( new Action0() {public void call() {outstandingSubscriptions++;}}).doOnUnsubscribe( new Action0() {public void call() {outstandingSubscriptions--;if (outstandingSubscriptions == 0) {containingBatch.remove(arg);}}});}}argument
属性,单个命令请求参数。valueSet
属性,结果( Response ) 是否设置,通过#setResponse()
/#emitResponse()
方法设置。subject
属性,可回放执行结果的 Subject 。此处使用 ReplaySubject 的主要目的,当 HystrixCollapser 开启缓存功能时,通过回放执行结果,在 《Hystrix 源码解析 —— 执行结果缓存》「5. HystrixCachedObservable」 也有相同的实现。另外,这里有一点要注意下,ReplaySubject 并没有向任何 Observable 订阅结果,而是通过#setResponse()
/#emitResponse()
方法设置结果。outstandingSubscriptions
属性,订阅数量。subjectWithAccounting
属性,带订阅数量的 ReplaySubject 。当取消订阅时,调用RequestBatch#remove(arg)
方法,移除单个命令请求。
- CollapsedRequestSubject 实现
第 38 至 47 行 :返回 Observable 。
- 当
argumentMap
已经存在arg
对应的 Observable 时,必须开启缓存 (HystrixCollapserProperties.requestCachingEnabled = true
) 功能。原因是,如果在相同的arg
,并且未开启缓存,同时第 43 行实现的是collapsedRequest.toObservable()
,那么相同的arg
将有多个 Observable 执行命令,此时HystrixCollapserBridge#mapResponseToRequests(...)
方法无法将执行( Response )赋值到arg
对应的命令请求( CollapsedRequestSubject ) 。更多讨论,见 https://github.com/Netflix/Hystrix/pull/1176 。 - 回过头看
HystrixCollapser#toObservable()
方法的第 32 至 41 行的代码,这里也有对缓存功能,是不是重复了呢?argumentMap
针对的是 RequestBatch 级的缓存,HystrixCollapser : RequestCollapser : RequestBatch 是1 : 1 : N
的关系,通过HystrixCollapser#toObservable()
对缓存的处理逻辑,保证 RequestBatch 切换后,依然有缓存。
- 当
RequestBatch#remove()
方法,代码如下 :
|
- 当 RequestBatch 开始执行,不允许移除单个命令请求。
4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)
本小节建议在 「5. CollapserTimer」 后,再回过头看。
#createNewBatchAndExecutePreviousIfNeeded(previousBatch)
方法,代码如下 :
|
- 第 5 行 :通过 CAS 修改
batch
,保证并发情况下的线程安全。同时注意,此处也进行了新的 RequestBatch ,切换掉老的 RequestBatch 。 - 第 6 行 :使用老的 RequestBatch ,调用
RequestBatch#executeBatchIfNotAlreadyStarted()
方法,命令合并执行。
RequestBatch#executeBatchIfNotAlreadyStarted()
方法,代码如下 :
|
- 代码看起来是有点长哈,请对照着官方示例 CommandCollapserGetValueForKey 一起看,临门一脚了,胖友!
- 第 7 行 :通过 CAS 修改
batchStarted
,保证并发情况下的线程安全。 - 第 10 行 :获得写锁。等待调用
#offer(...)
/#remove(...)
方法的线程执行完成,以保证命令合并执行时,不再有新的请求添加或移除。 - 第 15 行 :调用
HystrixCollapserBridge#shardRequests(...)
方法,将多个命令请求分片成 N 个【多个命令请求】。默认实现下,不进行分片。点击 链接 查看代码。 - 第 17 行 :循环 N 个【多个命令请求】。
- 第 21 行 :调用
HystrixCollapserBridge#createObservableCommand(...)
方法,将多个命令请求合并,创建一个 HystrixCommand 。点击 链接 查看代码。 - 第 24 行 :调用
HystrixCollapserBridge#mapResponseToRequests(...)
方法,将一个 HystrixCommand 的执行结果,映射回对应的命令请求们。点击 链接 查看代码。Observable#single()
方法,如果 Observable 终止时只发射了一个值,返回那个值,否则抛出异常。在 《ReactiveX文档中文翻译》「single」 有相关分享。Observable#ignoreElements()
方法,抑制原始 Observable 发射的所有数据,只允许它的终止通知(#onError()
或#onCompleted()
)通过。在 《ReactiveX文档中文翻译》「IgnoreElements」 有相关分享。也推荐点击rx.internal.operators.OperatorIgnoreElements
看下源码,可能更加易懂。Observable#cast()
方法,将原始 Observable 发射的每一项数据都强制转换为一个指定的类型,然后再发射数据,它是map
的一个特殊版本。在 《ReactiveX文档中文翻译》「cast」 有相关分享。也推荐点击rx.internal.operators.OperatorCast
看下源码,可能更加易懂。- 使用
Observable#ignoreElements()
/Observable#cast()
方法,用于将 Observable 变成不再继续向下发射数据项,只给现有方法里Observable#doNext()
处理数据项,调用HystrixCollapser#mapResponseToRequests(...)
方法。 - 点击 链接 ,查看
CollapsedRequestSubject#setResponse(response)
方法的代码。
- 第 24 至 50 行 :调用
Observable#doError(Action1)
方法,当命令合并执行发生异常时,设置每个 CollapsedRequestSubject 的执行结果为异常。- 点击 链接,查看
CollapsedRequestSubject#setResponse(response)
方法的代码。
- 点击 链接,查看
- 第 52 至 68 行 :调用
Observable#doOnCompleted(Action0)
方法,当命令合并执行完成时,检查每个 CollapsedRequestSubject 是否都有返回结果。设置没有返回结果的 CollapsedRequestSubject 的执行结果为异常。一般情况下,是用户实现HystrixCollapser#mapResponseToRequests(...)
方法存在 BUG 。另外,如果不设置,将导致无结果的单个命令请求无限阻塞。 - 第 70 行 :调用
Observable#subscribe()
方法,触发 HystrixCommand 执行。 - 第 72 至 96 行 :发生异常,设置每个 CollapsedRequestSubject 的执行结果为异常。
- 点击 链接,查看
CollapsedRequestSubject#setException(response)
方法的代码。
- 点击 链接,查看
- 第 97 至 99 行 :释放写锁。
5. CollapserTimer
com.netflix.hystrix.collapser.CollapserTimer
,命令合并器的定时器接口,定义了提交定时监听器,生成定时任务的接口方法,代码如下 :
|
5.1 RealCollapserTimer
com.netflix.hystrix.collapser.RealCollapserTimer
,命令合并器的定时器实现类,代码如下 :
|
- 实际上,使用的是 HystrixTimer 提供的单例。在 《Hystrix 源码解析 —— 执行结果缓存》「3. HystrixTimer
」 有详细解析。
5.2 CollapsedTask
com.netflix.hystrix.collapser.RequestCollapser.CollapsedTask
,定时任务,固定周期( 可配,默认 HystrixCollapserProperties.timerDelayInMilliseconds = 10ms
) 轮询其对应的一个 RequestCollapser 当前 RequestBatch 。若有命令需要执行,则提交 RequestCollapser 合并执行。
代码比较简单,点击 链接 直接看代码。
最后
以上就是爱笑黑猫为你收集整理的Hystrix 源码解析 —— 命令合并执行1. 概述2. HystrixCollapser3. RequestCollapserFactory4. RequestCollapser5. CollapserTimer 的全部内容,希望文章能够帮你解决Hystrix 源码解析 —— 命令合并执行1. 概述2. HystrixCollapser3. RequestCollapserFactory4. RequestCollapser5. CollapserTimer 所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复