概述
hystrix作用
-
资源隔离:包括线程池隔离和信号量隔离,限制调用分布式服务的资源使用,某一个调用的服务出现问题不会影响其他服务调用。
-
降级机制:超时降级、资源不足时(线程或信号量)降级,降级后可以配合降级接口返回托底数据。
-
融断:当失败率达到阀值自动触发降级(如因网络故障/超时造成的失败率高),熔断器触发的快速失败会进行快速恢复。
hystrix隔离模式
Hystrix提供了两种隔离模式:线程池隔离模式、信号量隔离模式。
线程池隔离模式:使用一个线程池来存储当前请求,线程池对请求作处理,设置任务返回处理超时时间,堆积的请求先入线程池队列。这种方式要为每个依赖服务申请线程池,有一定的资源消耗,好处是可以应对突发流量(流量洪峰来临时,处理不完可将数据存储到线程池队里慢慢处理)
信号量隔离模式:使用一个原子计数器(或信号量)记录当前有多少个线程在运行,请求来先判断计数器的数值,若超过设置的最大线程个数则丢弃该类型的新请求,若不超过则执行计数操作请求来计数器+1,请求返回计数器-1。这种方式是严格的控制线程且立即返回模式,无法应对突发流量(流量洪峰来临时,处理的线程超过数量,其他的请求会直接返回,不继续去请求依赖的服务)
区别:
线程池 | 信号量 | |
---|---|---|
线程 | 请求线程和调用provider线程不是同一条线程 | 请求线程和调用provider线程是同一条线程 |
开销 | 排队、调度、上下文切换等 | 无线程切换,开销低 |
异步 | 支持 | 不支持 |
并发支持 | 支持:最大线程池大小 | 支持:最大信号量上限 |
传递Header | 不支持 | 支持 |
支持超时 | 支持 | 不支持 |
hystrix使用场景
线程池隔离
请求并发量大,并且耗时长(一般是计算量大或者读数据库):采用线程池隔离,这样的话,可以保证大量的容器线程可用,不会由于服务原因,一直处于阻塞或者等待状态,快速失败返回。
信号量隔离
请求并发量大,并且耗时短(一般是计算量小,或读缓存):采用信号量隔离:因为这类服务的返回往往非常快,不会占用容器线程太长时间,并且减少了线程切换的一些开销,提高了缓存服务的效率
进入 fallback时机
1)执行方法抛出了异常
2)方法执行超时了
3)断路器打开了
4)hystrix线程池拒绝了
HystrixPlugin源码解析
请求进入到HystrixPlugin
的doExecute
方法:
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
assert soulContext != null;
final HystrixHandle hystrixHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), HystrixHandle.class);
if (StringUtils.isBlank(hystrixHandle.getGroupKey())) {
hystrixHandle.setGroupKey(Objects.requireNonNull(soulContext).getModule());
}
if (StringUtils.isBlank(hystrixHandle.getCommandKey())) {
hystrixHandle.setCommandKey(Objects.requireNonNull(soulContext).getMethod());
}
// HystrixCommand:配置降级方法
Command command = fetchCommand(hystrixHandle, exchange, chain);
return Mono.create(s -> {
// 注册完整执行生命周期事件 :onCompleted、onError、onNext
Subscription sub = command.fetchObservable().subscribe(s::success,
s::error, s::success);
s.onCancel(sub::unsubscribe);
// 断路器是否已经打开
if (command.isCircuitBreakerOpen()) {
log.error("hystrix execute have circuitBreaker is Open! groupKey:{},commandKey:{}", hystrixHandle.getGroupKey(), hystrixHandle.getCommandKey());
}
}).doOnError(throwable -> {
// doExecute执行异常、超时
log.error("hystrix execute exception:", throwable);
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName());
chain.execute(exchange);
}).then();
}
doExecute
方法做了如下操作:
1)判断groupKey
、commandKey
是否为空,如果为空,设置默认值
if (StringUtils.isBlank(hystrixHandle.getGroupKey())) {
hystrixHandle.setGroupKey(Objects.requireNonNull(soulContext).getModule());
}
if (StringUtils.isBlank(hystrixHandle.getCommandKey())) {
hystrixHandle.setCommandKey(Objects.requireNonNull(soulContext).getMethod());
}
默认值:
groupKey:默认值是使用@HystrixCommand标注的方法所在的类名
commandKey:默认值是@HystrixCommand标注的方法名
2)使用HystrixCommand来构建隔离级别
信号量隔离级别对象HystrixObservableCommand:
public class HystrixCommand extends HystrixObservableCommand<Void> implements Command {
private final ServerWebExchange exchange;
private final SoulPluginChain chain;
private final URI callBackUri;
public HystrixCommand(final Setter setter,
final ServerWebExchange exchange,
final SoulPluginChain chain,
final String callBackUri) {
super(setter);
this.exchange = exchange;
this.chain = chain;
this.callBackUri = UriUtils.createUri(callBackUri);
}
@Override
protected Observable<Void> construct() {
return RxReactiveStreams.toObservable(chain.execute(exchange));
}
@Override
protected Observable<Void> resumeWithFallback() {
return RxReactiveStreams.toObservable(doFallback());
}
private Mono<Void> doFallback() {
if (isFailedExecution()) {
log.error("hystrix execute have error: ", getExecutionException());
}
final Throwable exception = getExecutionException();
return doFallback(exchange, exception);
}
@Override
public Observable<Void> fetchObservable() {
return this.toObservable();
}
@Override
public URI getCallBackUri() {
return callBackUri;
}
}
使用
Rxjava
的Observable
来操作
设置降级方法doFallback
线程池隔离级别对象:
public class HystrixCommandOnThread extends HystrixCommand<Mono<Void>> implements Command {
private final ServerWebExchange exchange;
private final SoulPluginChain chain;
private final URI callBackUri;
public HystrixCommandOnThread(final HystrixCommand.Setter setter,
final ServerWebExchange exchange,
final SoulPluginChain chain,
final String callBackUri) {
super(setter);
this.exchange = exchange;
this.chain = chain;
this.callBackUri = UriUtils.createUri(callBackUri);
}
@Override
protected Mono<Void> run() {
RxReactiveStreams.toObservable(chain.execute(exchange)).toBlocking().subscribe();
return Mono.empty();
}
@Override
protected Mono<Void> getFallback() {
if (isFailedExecution()) {
log.error("hystrix execute have error: ", getExecutionException());
}
final Throwable exception = getExecutionException();
return doFallback(exchange, exception);
}
@Override
public Observable<Void> fetchObservable() {
return RxReactiveStreams.toObservable(this.execute());
}
@Override
public URI getCallBackUri() {
return callBackUri;
}
}
使用
RxReactiveStreams
进行处理
设置降级方法doFallback
3)设置HystrixCommandProperties隔离的相关参数
页面设置的具体参数为:
跳闸最小请求数量 :最小的请求量,至少要达到这个量才会触发熔断
错误半分比阀值 : 这段时间内,发生异常的百分比。
最大并发量 : 最大的并发量
跳闸休眠时间(ms) :熔断以后恢复的时间。
分组Key: 一般设置为:contextPath
命令Key: 一般设置为具体的 路径接口。
public static HystrixObservableCommand.Setter build(final HystrixHandle hystrixHandle) {
initHystrixHandleOnRequire(hystrixHandle);
// HystrixCommand 命令所属的组的名称:默认注解方法类的名称
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey(hystrixHandle.getGroupKey());
// HystrixCommand 命令的key值,默认值为注解方法的名称
HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey(hystrixHandle.getCommandKey());
// 配置hystrix命令的参数
HystrixCommandProperties.Setter propertiesSetter =
HystrixCommandProperties.Setter()
// 超时时间
.withExecutionTimeoutInMilliseconds((int) hystrixHandle.getTimeout())
// 打开断路器
.withCircuitBreakerEnabled(true)
// 使用信号量进行隔离
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
.withExecutionIsolationSemaphoreMaxConcurrentRequests(hystrixHandle.getMaxConcurrentRequests())
// 【失败率】如果在一个采样时间窗口内,失败率超过该配置,则自动打开熔断开关实现降级处理,即快速失败。默认配置下采样周期为10s,失败率为50%
.withCircuitBreakerErrorThresholdPercentage(hystrixHandle.getErrorThresholdPercentage())
// 【失败次数】在断路器闭合情况下,在进行失败率判断之前,一个采样周期内必须进行至少N个请求才能进行采样统计,目的是有足够的采样使得失败率计算正确,默认为20
.withCircuitBreakerRequestVolumeThreshold(hystrixHandle.getRequestVolumeThreshold())
// 【监控时间】断路器闭合的重试时间窗口,且在该时间窗口内只允许一次重试。即在熔断开关打开后,在该时间窗口允许有一次重试,如果重试成功,则将重置Health采样统计并闭合断路器开关实现快速恢复,否则断路器开关还是打开状态,执行快速失败。
.withCircuitBreakerSleepWindowInMilliseconds(hystrixHandle.getSleepWindowInMilliseconds());
return HystrixObservableCommand.Setter
.withGroupKey(groupKey)
.andCommandKey(commandKey)
.andCommandPropertiesDefaults(propertiesSetter);
}
/**
* this is build HystrixCommand.Setter.
* @param hystrixHandle {@linkplain HystrixHandle}
* @return {@linkplain HystrixCommand.Setter}
*/
public static HystrixCommand.Setter buildForHystrixCommand(final HystrixHandle hystrixHandle) {
initHystrixHandleOnRequire(hystrixHandle);
// HystrixCommand 命令所属的组的名称:默认注解方法类的名称
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey(hystrixHandle.getGroupKey());
// HystrixCommand 命令的key值,默认值为注解方法的名称
HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey(hystrixHandle.getCommandKey());
// 配置hystrix命令的参数
HystrixCommandProperties.Setter propertiesSetter =
HystrixCommandProperties.Setter()
// 超时时间
.withExecutionTimeoutInMilliseconds((int) hystrixHandle.getTimeout())
// 打开断路器
.withCircuitBreakerEnabled(true)
// 如果在一个采样时间窗口内,失败率超过该配置,则自动打开熔断开关实现降级处理,即快速失败。默认配置下采样周期为10s,失败率为50%
.withCircuitBreakerErrorThresholdPercentage(hystrixHandle.getErrorThresholdPercentage())
// 在断路器闭合情况下,在进行失败率判断之前,一个采样周期内必须进行至少N个请求才能进行采样统计,目的是有足够的采样使得失败率计算正确,默认为20
.withCircuitBreakerRequestVolumeThreshold(hystrixHandle.getRequestVolumeThreshold())
// 断路器闭合的重试时间窗口,且在该时间窗口内只允许一次重试。即在熔断开关打开后,在该时间窗口允许有一次重试,如果重试成功,则将重置Health采样统计并闭合断路器开关实现快速恢复,否则断路器开关还是打开状态,执行快速失败。
.withCircuitBreakerSleepWindowInMilliseconds(hystrixHandle.getSleepWindowInMilliseconds());
// 配置hystrix依赖的线程池的参数
HystrixThreadPoolConfig hystrixThreadPoolConfig = hystrixHandle.getHystrixThreadPoolConfig();
HystrixThreadPoolProperties.Setter threadPoolPropertiesSetter =
HystrixThreadPoolProperties.Setter()
// 配置核心线程数的大小
.withCoreSize(hystrixThreadPoolConfig.getCoreSize())
// 配置最大的线程数大小
.withMaximumSize(hystrixThreadPoolConfig.getMaximumSize())
// 配置线程池队列的最大大小
.withMaxQueueSize(hystrixThreadPoolConfig.getMaxQueueSize())
// 线程池中空闲线程的生存时间
.withKeepAliveTimeMinutes(hystrixThreadPoolConfig.getKeepAliveTimeMinutes())
// 允许最大线程数超过核心线程数,默认是 false ,如果这个值不为 true ,则上方配置的 withMaximumSize() 不会生效
.withAllowMaximumSizeToDivergeFromCoreSize(true);
return HystrixCommand.Setter
.withGroupKey(groupKey)
.andCommandKey(commandKey)
.andCommandPropertiesDefaults(propertiesSetter)
.andThreadPoolPropertiesDefaults(threadPoolPropertiesSetter);
}
4)注册完整执行生命周期事件 :onCompleted、onError、onNext
处理调用链里的doOnSubscribe、doOnNext、doOnComplete、doOnError,它们关联到最近的上一个Observable,当Observable的emitter发送onSubscribe、onNext、onComplete、onError时它们就会被触发。
return Mono.create(s -> {
// 注册完整执行生命周期事件 :onCompleted、onError、onNext
Subscription sub = command.fetchObservable().subscribe(s::success,
s::error, s::success);
s.onCancel(sub::unsubscribe);
// 断路器是否已经打开
if (command.isCircuitBreakerOpen()) {
log.error("hystrix execute have circuitBreaker is Open! groupKey:{},commandKey:{}", hystrixHandle.getGroupKey(), hystrixHandle.getCommandKey());
}
}).doOnError(throwable -> {
// doExecute执行异常、超时
log.error("hystrix execute exception:", throwable);
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName());
chain.execute(exchange);
}).then();
}
至此,HystrixPlugin处理完成。
最后
以上就是悲凉柜子为你收集整理的soul 源码分析 —— 插件解析之hystrix插件的全部内容,希望文章能够帮你解决soul 源码分析 —— 插件解析之hystrix插件所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复