我是靠谱客的博主 悲凉柜子,最近开发中收集的这篇文章主要介绍soul 源码分析 —— 插件解析之hystrix插件,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

hystrix作用

  • 资源隔离:包括线程池隔离和信号量隔离,限制调用分布式服务的资源使用,某一个调用的服务出现问题不会影响其他服务调用。

  • 降级机制:超时降级、资源不足时(线程或信号量)降级,降级后可以配合降级接口返回托底数据。

  • 融断:当失败率达到阀值自动触发降级(如因网络故障/超时造成的失败率高),熔断器触发的快速失败会进行快速恢复。

hystrix隔离模式

Hystrix提供了两种隔离模式:线程池隔离模式、信号量隔离模式。

线程池隔离模式:使用一个线程池来存储当前请求,线程池对请求作处理,设置任务返回处理超时时间,堆积的请求先入线程池队列。这种方式要为每个依赖服务申请线程池,有一定的资源消耗,好处是可以应对突发流量(流量洪峰来临时,处理不完可将数据存储到线程池队里慢慢处理)

信号量隔离模式:使用一个原子计数器(或信号量)记录当前有多少个线程在运行,请求来先判断计数器的数值,若超过设置的最大线程个数则丢弃该类型的新请求,若不超过则执行计数操作请求来计数器+1,请求返回计数器-1。这种方式是严格的控制线程且立即返回模式,无法应对突发流量(流量洪峰来临时,处理的线程超过数量,其他的请求会直接返回,不继续去请求依赖的服务)

区别:

线程池信号量
线程请求线程和调用provider线程不是同一条线程请求线程和调用provider线程是同一条线程
开销排队、调度、上下文切换等无线程切换,开销低
异步支持不支持
并发支持支持:最大线程池大小支持:最大信号量上限
传递Header不支持支持
支持超时支持不支持

hystrix使用场景

线程池隔离
请求并发量大,并且耗时长(一般是计算量大或者读数据库):采用线程池隔离,这样的话,可以保证大量的容器线程可用,不会由于服务原因,一直处于阻塞或者等待状态,快速失败返回。

信号量隔离
请求并发量大,并且耗时短(一般是计算量小,或读缓存):采用信号量隔离:因为这类服务的返回往往非常快,不会占用容器线程太长时间,并且减少了线程切换的一些开销,提高了缓存服务的效率

进入 fallback时机

1)执行方法抛出了异常
2)方法执行超时了
3)断路器打开了
4)hystrix线程池拒绝了

HystrixPlugin源码解析

请求进入到HystrixPlugindoExecute方法:

@Override
    protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
        final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
        assert soulContext != null;
        final HystrixHandle hystrixHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), HystrixHandle.class);
        if (StringUtils.isBlank(hystrixHandle.getGroupKey())) {
            hystrixHandle.setGroupKey(Objects.requireNonNull(soulContext).getModule());
        }
        if (StringUtils.isBlank(hystrixHandle.getCommandKey())) {
            hystrixHandle.setCommandKey(Objects.requireNonNull(soulContext).getMethod());
        }
        // HystrixCommand:配置降级方法
        Command command = fetchCommand(hystrixHandle, exchange, chain);
        return Mono.create(s -> {
            // 注册完整执行生命周期事件 :onCompleted、onError、onNext
            Subscription sub = command.fetchObservable().subscribe(s::success,
                    s::error, s::success);
            s.onCancel(sub::unsubscribe);
            // 断路器是否已经打开
            if (command.isCircuitBreakerOpen()) {
                log.error("hystrix execute have circuitBreaker is Open! groupKey:{},commandKey:{}", hystrixHandle.getGroupKey(), hystrixHandle.getCommandKey());
            }
        }).doOnError(throwable -> {
            // doExecute执行异常、超时
            log.error("hystrix execute exception:", throwable);
            exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName());
            chain.execute(exchange);
        }).then();
    }

doExecute方法做了如下操作:
1)判断groupKeycommandKey是否为空,如果为空,设置默认值

if (StringUtils.isBlank(hystrixHandle.getGroupKey())) {
            hystrixHandle.setGroupKey(Objects.requireNonNull(soulContext).getModule());
}
if (StringUtils.isBlank(hystrixHandle.getCommandKey())) {
            hystrixHandle.setCommandKey(Objects.requireNonNull(soulContext).getMethod());
}

默认值:
groupKey:默认值是使用@HystrixCommand标注的方法所在的类名
commandKey:默认值是@HystrixCommand标注的方法名

2)使用HystrixCommand来构建隔离级别
信号量隔离级别对象HystrixObservableCommand:

public class HystrixCommand extends HystrixObservableCommand<Void> implements Command {
    private final ServerWebExchange exchange;
    private final SoulPluginChain chain;
    private final URI callBackUri;
    
    public HystrixCommand(final Setter setter,
                   final ServerWebExchange exchange,
                   final SoulPluginChain chain,
                   final String callBackUri) {
        super(setter);
        this.exchange = exchange;
        this.chain = chain;
        this.callBackUri = UriUtils.createUri(callBackUri);
    }

    @Override
    protected Observable<Void> construct() {
        return RxReactiveStreams.toObservable(chain.execute(exchange));
    }

    @Override
    protected Observable<Void> resumeWithFallback() {
        return RxReactiveStreams.toObservable(doFallback());
    }

    private Mono<Void> doFallback() {
        if (isFailedExecution()) {
            log.error("hystrix execute have error: ", getExecutionException());
        }
        final Throwable exception = getExecutionException();
        return doFallback(exchange, exception);
    }

    @Override
    public Observable<Void> fetchObservable() {
        return this.toObservable();
    }

    @Override
    public URI getCallBackUri() {
        return callBackUri;
    }
}

使用RxjavaObservable来操作
设置降级方法doFallback

线程池隔离级别对象:

public class HystrixCommandOnThread extends HystrixCommand<Mono<Void>> implements Command {
    private final ServerWebExchange exchange;
    private final SoulPluginChain chain;
    private final URI callBackUri;
    
    public HystrixCommandOnThread(final HystrixCommand.Setter setter,
                          final ServerWebExchange exchange,
                          final SoulPluginChain chain,
                          final String callBackUri) {
        super(setter);
        this.exchange = exchange;
        this.chain = chain;
        this.callBackUri = UriUtils.createUri(callBackUri);
    }

    @Override
    protected Mono<Void> run() {
        RxReactiveStreams.toObservable(chain.execute(exchange)).toBlocking().subscribe();
        return Mono.empty();
    }

    @Override
    protected Mono<Void> getFallback() {
        if (isFailedExecution()) {
            log.error("hystrix execute have error: ", getExecutionException());
        }
        final Throwable exception = getExecutionException();
        return doFallback(exchange, exception);
    }

    @Override
    public Observable<Void> fetchObservable() {
        return RxReactiveStreams.toObservable(this.execute());
    }

    @Override
    public URI getCallBackUri() {
        return callBackUri;
    }
}

使用RxReactiveStreams进行处理
设置降级方法doFallback

3)设置HystrixCommandProperties隔离的相关参数
页面设置的具体参数为:

跳闸最小请求数量 :最小的请求量,至少要达到这个量才会触发熔断
错误半分比阀值 : 这段时间内,发生异常的百分比。
最大并发量 : 最大的并发量
跳闸休眠时间(ms) :熔断以后恢复的时间。
分组Key: 一般设置为:contextPath
命令Key: 一般设置为具体的 路径接口。

public static HystrixObservableCommand.Setter build(final HystrixHandle hystrixHandle) {
        initHystrixHandleOnRequire(hystrixHandle);
        // HystrixCommand 命令所属的组的名称:默认注解方法类的名称
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey(hystrixHandle.getGroupKey());
        // HystrixCommand 命令的key值,默认值为注解方法的名称
        HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey(hystrixHandle.getCommandKey());
        // 配置hystrix命令的参数
        HystrixCommandProperties.Setter propertiesSetter =
                HystrixCommandProperties.Setter()
                        // 超时时间
                        .withExecutionTimeoutInMilliseconds((int) hystrixHandle.getTimeout())
                        // 打开断路器
                        .withCircuitBreakerEnabled(true)
                        // 使用信号量进行隔离
                        .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
                        .withExecutionIsolationSemaphoreMaxConcurrentRequests(hystrixHandle.getMaxConcurrentRequests())
                        // 【失败率】如果在一个采样时间窗口内,失败率超过该配置,则自动打开熔断开关实现降级处理,即快速失败。默认配置下采样周期为10s,失败率为50%
                        .withCircuitBreakerErrorThresholdPercentage(hystrixHandle.getErrorThresholdPercentage())
                        // 【失败次数】在断路器闭合情况下,在进行失败率判断之前,一个采样周期内必须进行至少N个请求才能进行采样统计,目的是有足够的采样使得失败率计算正确,默认为20
                        .withCircuitBreakerRequestVolumeThreshold(hystrixHandle.getRequestVolumeThreshold())
                        // 【监控时间】断路器闭合的重试时间窗口,且在该时间窗口内只允许一次重试。即在熔断开关打开后,在该时间窗口允许有一次重试,如果重试成功,则将重置Health采样统计并闭合断路器开关实现快速恢复,否则断路器开关还是打开状态,执行快速失败。
                        .withCircuitBreakerSleepWindowInMilliseconds(hystrixHandle.getSleepWindowInMilliseconds());
        return HystrixObservableCommand.Setter
                .withGroupKey(groupKey)
                .andCommandKey(commandKey)
                .andCommandPropertiesDefaults(propertiesSetter);
    }

    /**
     * this is build HystrixCommand.Setter.
     * @param hystrixHandle {@linkplain HystrixHandle}
     * @return {@linkplain HystrixCommand.Setter}
     */
    public static HystrixCommand.Setter buildForHystrixCommand(final HystrixHandle hystrixHandle) {
        initHystrixHandleOnRequire(hystrixHandle);
        // HystrixCommand 命令所属的组的名称:默认注解方法类的名称
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey(hystrixHandle.getGroupKey());
        // HystrixCommand 命令的key值,默认值为注解方法的名称
        HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey(hystrixHandle.getCommandKey());
        // 配置hystrix命令的参数
        HystrixCommandProperties.Setter propertiesSetter =
                HystrixCommandProperties.Setter()
                        // 超时时间
                        .withExecutionTimeoutInMilliseconds((int) hystrixHandle.getTimeout())
                        // 打开断路器
                        .withCircuitBreakerEnabled(true)
                        // 如果在一个采样时间窗口内,失败率超过该配置,则自动打开熔断开关实现降级处理,即快速失败。默认配置下采样周期为10s,失败率为50%
                        .withCircuitBreakerErrorThresholdPercentage(hystrixHandle.getErrorThresholdPercentage())
                        // 在断路器闭合情况下,在进行失败率判断之前,一个采样周期内必须进行至少N个请求才能进行采样统计,目的是有足够的采样使得失败率计算正确,默认为20
                        .withCircuitBreakerRequestVolumeThreshold(hystrixHandle.getRequestVolumeThreshold())
                        // 断路器闭合的重试时间窗口,且在该时间窗口内只允许一次重试。即在熔断开关打开后,在该时间窗口允许有一次重试,如果重试成功,则将重置Health采样统计并闭合断路器开关实现快速恢复,否则断路器开关还是打开状态,执行快速失败。
                        .withCircuitBreakerSleepWindowInMilliseconds(hystrixHandle.getSleepWindowInMilliseconds());
        // 配置hystrix依赖的线程池的参数
        HystrixThreadPoolConfig hystrixThreadPoolConfig = hystrixHandle.getHystrixThreadPoolConfig();
        HystrixThreadPoolProperties.Setter threadPoolPropertiesSetter =
                HystrixThreadPoolProperties.Setter()
                        // 配置核心线程数的大小
                        .withCoreSize(hystrixThreadPoolConfig.getCoreSize())
                        // 配置最大的线程数大小
                        .withMaximumSize(hystrixThreadPoolConfig.getMaximumSize())
                        // 配置线程池队列的最大大小
                        .withMaxQueueSize(hystrixThreadPoolConfig.getMaxQueueSize())
                        // 线程池中空闲线程的生存时间
                        .withKeepAliveTimeMinutes(hystrixThreadPoolConfig.getKeepAliveTimeMinutes())
                        // 允许最大线程数超过核心线程数,默认是 false ,如果这个值不为 true ,则上方配置的 withMaximumSize() 不会生效
                        .withAllowMaximumSizeToDivergeFromCoreSize(true);
        return HystrixCommand.Setter
                .withGroupKey(groupKey)
                .andCommandKey(commandKey)
                .andCommandPropertiesDefaults(propertiesSetter)
                .andThreadPoolPropertiesDefaults(threadPoolPropertiesSetter);
    }

4)注册完整执行生命周期事件 :onCompleted、onError、onNext
处理调用链里的doOnSubscribe、doOnNext、doOnComplete、doOnError,它们关联到最近的上一个Observable,当Observable的emitter发送onSubscribe、onNext、onComplete、onError时它们就会被触发。

return Mono.create(s -> {
            // 注册完整执行生命周期事件 :onCompleted、onError、onNext
            Subscription sub = command.fetchObservable().subscribe(s::success,
                    s::error, s::success);
            s.onCancel(sub::unsubscribe);
            // 断路器是否已经打开
            if (command.isCircuitBreakerOpen()) {
                log.error("hystrix execute have circuitBreaker is Open! groupKey:{},commandKey:{}", hystrixHandle.getGroupKey(), hystrixHandle.getCommandKey());
            }
        }).doOnError(throwable -> {
            // doExecute执行异常、超时
            log.error("hystrix execute exception:", throwable);
            exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName());
            chain.execute(exchange);
        }).then();
    }

至此,HystrixPlugin处理完成。

最后

以上就是悲凉柜子为你收集整理的soul 源码分析 —— 插件解析之hystrix插件的全部内容,希望文章能够帮你解决soul 源码分析 —— 插件解析之hystrix插件所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(61)

评论列表共有 0 条评论

立即
投稿
返回
顶部