我是靠谱客的博主 烂漫舞蹈,最近开发中收集的这篇文章主要介绍SpringCloud(四)番外篇(一):Hystrix 断路器,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述


编写不易,转载请注明(http://shihlei.iteye.com/blog/2428557)!

 

一 概述 

近些年,微服务架构模式在企业中的得到应用推广,基于SpringBoot,SpringCloud快速实现微服务的技术也得到广泛应用。

 

SpringCloud是什么?简单的说是一套组件框架,提供服务化需要的一些基础能力,如分布式/版本化配置、服务注册和发现、路由、负载均衡、断路器、分布式消息等。

 

出于Hystrix实现的好奇,对Hystrix源码进行了研究。本文算是之前SpringCloud系列的番外篇,旨在介绍微服务中“断路器” 及 Netflix Hystrix的基本使用和配置。关于Hystrix的细节,会在之后《Hystrix 源码分析篇》做介绍。

 

关于SpringCloud的CircuitBreaker使用会在后面的文章进行总结。对SpringCloud系列感兴趣,可以阅读下已完成的部分:《SpringCloud》。

 

关于Hystrix,底层基于RxJava及观察者模式实现,有兴趣可以读一下之前的文章:《响应式编程 RxJava》、《RxJava2.x 操作Demo》。

 

二 断路器

 

1)场景概述

 

服务或方法调用过程,如遇处理过慢等待时间长,会造成调用线程长时阻塞。高并发的情况,这种长久的阻塞会造成资源耗尽,无法响应其他请求。

 

特别在微服务架构下,服务级联调用,常常因为一个耗时处理,产生级联失败,继而引发雪崩。

 

所以需要一套 “ 保护机制 ”,在服务访问性能不达标的情况,阻止无限等待。(注,短时的失效,重试即可)

 

2)解决思路

 

(1)常规思路

异步(配合线程池控制资源)执行并计时,超时interrupt异步任务,(throw TimeoutException),业务端进行容错处理。

 

优点:简单,封装的好可以解决一定的问题

缺点:任然会请求,调用端任然等待,仍有大量请求到达后端。

 

(2)断路器

常规思路的升级版,思路引入快速失效概念,当大量超时后,进入断路状态,返回某个异常或默认值,之后一段时间内不再请求源服务(快速失效)。一定时间后尝试开启,确定服务源是否可用(自我恢复)。

 

基于这个定位,断路器需要提供如下功能:失效监控;状态(关闭,打开,半开)管理;自动修复;

 

基本状态说明:

(a)关闭(closed):访问正常,未达到失效的阈值,断路机制未启用

(b)打开(open):一段时间内失效次数达到阈值,断路机制开启,访问直接返回异常或默认值

(c)半开(half-open):断路器打开状态,指定时间后,分流部分请求尝试调用服务,如果成功,关闭断路器

 

三 Hystrix

 

1)概述

Hystrix:实现的断路器功能的lib,基于AOP模式,底层基于RxJava;通过隔离服务之间的访问点,阻止级联故障并提供提fallback执行,以提高系统的整体弹性。

 

核心功能:

(1)超时失效断路:阻止级联故障,降级,快速失效,快速恢复。

(2)实时操作:实时监控,实时配置,发现属性变化快速生效。

(3)并发性:并行执行,并发感知缓存请求,自动批量处理请求断路。

 

附加功能:报表,报警。

 

github:https://github.com/Netflix/Hystrix

 

2)使用过程

(1)继承:

HystrixCommand:返回单个响应

HystrixObservableCommand:返回多个响应

 

(2)重写执行和回退方法:

(a)执行方法:重写如下方法实现监控业务逻辑

HystrixCommand.run() :返回单个响应;

HystrixObservableCommand.construct():返回 Observable 用于提交多个结果;

 

(b)fallback:重写如下方法方法,实现断路后的默认返回

HystrixCommand.getFallback() :返回单个fallback响应;

HystrixObservableCommand.resumeWithFallback():返回 Observable 用于提交多个fallback结果;

 

(3)执行业务:

同步执行方式:String s = new CommandHelloWorld("Bob").execute();

异步执行方式:Future<String> s = new CommandHelloWorld("Bob").queue();

响应式:Observable<String> s = new CommandHelloWorld("Bob").observe();

 

3)简单demo

(1)依赖:

		<dependency>
			<groupId>com.netflix.hystrix</groupId>
			<artifactId>hystrix-core</artifactId>
                        <version>1.5.12</version>
		</dependency>

 

(2)demo:

 

package x.demo.netflix.hystrix;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import rx.Observable;

/**
 * Hystrix demo:
 *
 * @author shilei
 */
public class HystrixDemo {

    private Service service = new ServiceCircuitBreakerProxy();

    public static void main(String[] args) throws Exception {
        HystrixDemo demo = new HystrixDemo();

        //请求服务
        for (int i = 0; i < 50; i++) {
            demo.run();
        }

        //等待任务结束
        TimeUnit.MINUTES.sleep(1);
    }

    void run() {
        service.service();
    }

    /**
     * 待监测的服务接口
     */
    interface Service {
        boolean service();
    }

    /**
     * 断路器代理
     */
    static class ServiceCircuitBreakerProxy extends HystrixCommand<Boolean> implements Service {

        /**
         * 配置HystrixCommand:
         * 滑动窗口:1000 毫秒
         * 桶数:1
         * --- 则每个统计周期1000毫秒
         * 超时时间:100毫秒
         * 断路器打开错误率:50%
         * --- 则一个滑动窗口内全部超时约执行10次(有其他是爱你消耗),预计执行8~9次run() 断路器打开,之后请求直接进入getFallback
         */
        public ServiceCircuitBreakerProxy() {
            super(Setter
                    /*
                    一般情况相同业务功能会使用相同的CommandGroupKey。对CommandKey分组,进行逻辑隔离。相同CommandGroupKey会使用同一个线程池或者信号量
                     */
                    .withGroupKey(HystrixCommandGroupKey.Factory.asKey(ServiceCircuitBreakerProxy.class.getSimpleName()))
                    /*
                    一般同一监控服务使用相同的CommandKey,目的把HystrixCommand,HystrixCircuitBreaker,HytrixCommandMerics
                    以及其他相关对象关联在一起,形成一个原子组。采用原生接口的话,默认值为类名;采用注解形式的话,默认值为方法名
                     */
                    .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"))
                    .andCommandPropertiesDefaults(
                            HystrixCommandProperties.Setter()
                                    /*
                                    隔离级别,默认线程
                                     */
                                    .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
                                    /*
                                    线程执行超时时间,默认1000,一般选择所服务tp99的时间
                                     */
                                    .withExecutionTimeoutInMilliseconds(50)
                                    /*
                                    默认20;一个滑动窗口内“触发断路”要达到的最小访问次数。低于该次数,技术错误率达到,也不会触发断路操作,用于测试压力是否满足要求。
                                     */
                                    .withCircuitBreakerRequestVolumeThreshold(1)
                                    /*
                                    一个窗口内“触发断路”错误率。满足则进入断路状态,快速失效。
                                     */
                                    .withCircuitBreakerErrorThresholdPercentage(50)
                                    /*
                                    默认 5000(即5s);断路器打开后过多久调用时间服务进行重试。
                                     */
                                    .withCircuitBreakerSleepWindowInMilliseconds(10000)
                    )
                    .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
                            /*
                            线程池大小,默认10
                             */
                            .withCoreSize(1)
                            /*
                            任务队列大小,使用BlockingQueue,默认-1
                             */
                            .withMaxQueueSize(-1)
                            /*
                            默认1000(即10);设置统计的滑动窗口大小,毫秒值。每一个滑动窗口是决策周期,用于CircuitBreaker计算错误率,做状态改变。
                             */
                            .withMetricsRollingStatisticalWindowInMilliseconds(1000)
                            /*
                            默认10;设置一个滑动窗口内桶的数量,一个bucket的时间周期=timeInMilliseconds/numBuckets,
                            是统计的最小时间单元,独立计数。Hystrix的滑动窗口按照一个bucket的时间周期向前滑动,合并最近的n个bucket的统计数据,即为一个时间窗口,计算错误率,改变状态。
                             */
                            .withMetricsRollingStatisticalWindowBuckets(1)
                    )
            );
        }

        /**
         * 代理实际业务
         */
        @Override
        public boolean service() {
            return doExecute();

        }

        /**
         * 同步方式调用:以同步堵塞方式执行的run()。
         * 调用execute()后,hystrix先创建一个新线程运行run(),接着调用程序要在execute()调用处一直堵塞着,直到run()运行完成
         *
         * @return 结果
         */
        private boolean doExecute() {
            return new ServiceCircuitBreakerProxy().execute();
        }

        @Override
        protected Boolean run() throws Exception {
            System.out.println("Thread " + Thread.currentThread().getId() + " :run()");
            //模拟超时
            TimeUnit.SECONDS.sleep(1);
            return true;
        }

        @Override
        protected Boolean getFallback() {
            System.out.println("Thread " + Thread.currentThread().getId() + " :getFallback()");
            return false;
        }
    }
}
 

 

(3)结果:

 

Thread 13 :run()
Thread 12 :getFallback()
Thread 13 :run()
Thread 12 :getFallback()
Thread 13 :run()
Thread 14 :getFallback()
Thread 13 :run()
Thread 12 :getFallback()
Thread 13 :run()
Thread 15 :getFallback()
Thread 13 :run()
Thread 14 :getFallback()
Thread 13 :run()
Thread 16 :getFallback()
Thread 13 :run()
Thread 12 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
 

注:

 滑动窗口:1000 毫秒;桶数:1 --- 则每个统计周期1000毫秒

 超时时间:100毫秒;断路器打开错误率:50% --- 则一个滑动窗口内全部超时约执行10次(有其消耗时间操作),预计执行8~9次run() 断路器打开,之后请求直接进入getFallback

 

4)处理流程分析

 

解释下图中的各个步骤:

(1)创建HystrixCommand or HystrixObservableCommand 代表需要监控断路的服务

 

(2)执行命令:4种执行方法

(a)K             value   = command.execute();           //阻塞,直到获得这侧请求的结果或抛出异常

(b)Future<K>     fValue  = command.queue();       //返回Future,用于之后获得这次请求的结果

(c)Observable<K> ohValue = command.observe();             //hot observable,返回Observable用于观察和获得结果

(d)Observable<K> ocValue = command.toObservable();    //cold observable,返回Observable,用于观察和获得结果

 

注:所有的HystrixCommand都是基于Observable实现的,同步方式调用,只是框架帮我们调用了toObservable().toBlocking().toFuture()。

 

(3)如果开启请求缓存,请求条件相同会立即从cache重返回。

 

(4)判断断路器是否打开

 断路器进入打开状态的判断依据:

(a)断路器访问量达到阈值:HystrixCommandProperties.circuitBreakerRequestVolumeThreshold():

(b)错误百分比超过所设置错误百分比阈值:HystrixCommandProperties.circuitBreakerErrorThresholdPercentage()

一个时间窗口内,满足(a)(b)断路器进入打开状态经过一段时间(HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds())后,请求成功。如果请求失败,断路器会在休眠窗口期间返回OPEN状态。如果请求成功,断路器将切换到CLOSED,并且逻辑1将再次接通。

 

(5)判断Thread Pool/Queue/Semaphore 是否满

如果full,说明资源耗尽,则rejected,可以直接调用getFallback()

 

(6)执行HystrixCommand.run()/HystrixObservableCommand.construct()

如果超时,则中断线程,并抛出InterruptedExceptions ,如果客户端会抛出该异常要小心。

 

(7)计算断路器状态

Hystrix会报告执行结果,包括 successes, failures, rejections, and timeouts,用于更新时间窗口内的统计信息,并更新断路器状态。

 

(8)执行fallback

用途:

实现通用返回,包括从内存cache或其方式读取这次请求的值。

 

触发方式:

(a)由construct() or run()抛出了一个异常

(b)断路器已经打开的时候

(c)没有空闲的线程池和队列或者信号量

(d)一次命令执行超时

 

注:如果fallback中做复杂网络操作,或抛异常也会做断路

 

(9)获得正常返回结果 

 

 5)核心配置说明(如demo)

(1)CommandKey:一般同一监控服务使用相同的CommandKey,目的把HystrixCommand,HystrixCircuitBreaker,HytrixCommandMerics以及其他相关对象关联在一起,形成一个原子组。采用原生接口的话,默认值为类名;采用注解形式的话,默认值为方法名。

 

(2)GroupKey:一般情况相同业务功能会使用相同的CommandGroupKey。对CommandKey分组,进行逻辑隔离。相同CommandGroupKey会使用同一个线程池或者信号量。

 

(3)ThreadPoolKey:物理隔离(相对于GroupKey逻辑隔离),当没有设置ThreadPoolKey的时候,线程池或者信号量的划分按照CommandGroupKey,当设置了ThreadPoolKey,那么线程池和信号量的划分就按照ThreadPoolKey来处理,相同ThreadPoolKey采用同一个线程池或者信号量。

 

(4)命令类

(a)Execution:

execution.isolation.strategy:隔离级别,默认线程

execution.isolation.thread.timeoutInMilliseconds:线程执行超时时间,默认1000,一般选择所服务tp99的时间。

 

(b)Circuit Breaker:

circuitBreaker.requestVolumeThreshold: 默认20;一个滑动窗口内“触发断路”要达到的最小访问次数。低于该次数,技术错误率达到,也不会触发断路操作,用于测试压力是否满足要求。

circuitBreaker.errorThresholdPercentage:默认 50(即50%);一个窗口内“触发断路”错误率。满足则进入断路状态,快速失效。

circuitBreaker.sleepWindowInMilliseconds:默认 5000(即5s);断路器打开后过多久调用时间服务进行重试。

 

(5)ThreadPool 线程池:

coreSize:线程池大小,默认10

maxQueueSize:任务队列大小,使用BlockingQueue,默认-1 

      

metrics.rollingStats.timeInMilliseconds:默认1000(即10);设置统计的滑动窗口大小,毫秒值。每一个滑动窗口是决策周期,用于CircuitBreaker计算错误率,做状态改变。

metrics.rollingStats.numBuckets:默认10;设置一个滑动窗口内桶的数量,一个bucket的时间周期=timeInMilliseconds/numBuckets,是统计的最小时间单元,独立计数。Hystrix的滑动窗口按照一个bucket的时间周期向前滑动,合并最近的n个bucket的统计数据,即为一个时间窗口,计算错误率,改变状态。

 

滑动窗口样子如下:

 

 

解释一下:滑动窗口模式,相对于固定窗口概念

 

(a)固定时间窗口:选取一个时间段,进行统计错误率,触发操作;如果错误集中在两个时间窗口的临界点,虽然临界点时间段内满足阈值,但是在各自时间窗口内没有达到错误阈值,操作不会执行。

 如:1分钟错误不超过100次,如果在左后10s错误了50次,在下一分钟开始10s错误了50次,其实阈值已经满足,但是固定窗口与统计不到,无法做响应的操作。

 

(b)滑动时间窗口:将固定窗口切成更小的n时间段,以每个小的时间段作为统计周期,每次向前滑动一个小时间段,统计最近的n个小时间段的统计,作为触发决策条件。

如:1分钟错误不超过100次,分6个bucket,每个bucket 10秒作为统计周期,每过10秒,统计一次最近的6个bucket的统计信息查看错误率,刚才的场景,就能处理了。

 

 四 hystrix + hystrix-javanica + aop 组合使用

 

1)概述

 

Java本身的提供反射和注解特性可以明显降低开发复杂度,提高灵活性,hystrix-javanica 提供了 Hystrix基于注解的开发模式。

 

github:https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-javanica

 

2)使用流程

(1)@HystrixCommand注释需要监控断路的方法

(2)提供fallback方法

       

3)简单demo

(1)maven

    <dependencies>
        <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-core</artifactId>
            <version>1.5.12</version>
        </dependency>
        <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-javanica</artifactId>
            <version>1.5.12</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>5.0.6.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.0.6.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
            <version>5.0.6.RELEASE</version>
        </dependency>
    </dependencies>

 

(2)demo

package x.demo.netflix.hystrix;

import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixProperty;
import com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.stereotype.Component;

/**
 * StringHystrixJavanicaDemo
 *
 * @author shilei
 */
@EnableAspectJAutoProxy
public class StringHystrixJavanicaDemo {

    @Resource
    private Service service;

    public static void main(String[] args) throws Exception {
        try (AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();) {
            context.register(StringHystrixJavanicaDemo.class);
            context.refresh();

            StringHystrixJavanicaDemo demo = context.getBean(StringHystrixJavanicaDemo.class);
            for (int i = 0; i < 20; i++) {
                demo.run();
            }
        }
    }

    void run() throws InterruptedException {
        service.doService();
    }

    @Component
    static class Service {

        /**
         * 被管理的方法
         *
         * @return 是否调用成功
         */
        @HystrixCommand(
                groupKey = "StringHystrixJavanicaDemo",
                commandKey = "HelloWorld",
                defaultFallback = "doFallack",
                commandProperties = {
                        @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "100"),

                        @HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "1"),
                        @HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "50"),
                        @HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds", value = "10000")
                },
                threadPoolProperties = {
                        @HystrixProperty(name = "coreSize", value = "1"),
                        @HystrixProperty(name = "maxQueueSize", value = "-1"),
                        @HystrixProperty(name = "metrics.rollingStats.timeInMilliseconds", value = "1000"),
                        @HystrixProperty(name = "metrics.rollingStats.numBuckets", value = "1")})

        public boolean doService() throws InterruptedException {
            System.out.println("Thread " + Thread.currentThread().getId() + " :run()");
            //模拟超时
            TimeUnit.SECONDS.sleep(1);
            return true;
        }


        /**
         * 断路器开启后,控制默认返回
         *
         * @return 期望的默认值
         */
        public boolean doFallack() {
            System.out.println("Thread " + Thread.currentThread().getId() + " :getFallback()");
            return false;
        }
    }

    @Configuration
    static class HystrixConfiguration {

        /**
         * 处理代理类
         *
         * @return 代理
         */
        @Bean
        public HystrixCommandAspect hystrixCommandAspect() {
            return new HystrixCommandAspect();
        }
    }
}

 

(3)结果

Thread 14 :run()
Thread 13 :getFallback()
Thread 14 :run()
Thread 13 :getFallback()
Thread 14 :run()
Thread 15 :getFallback()
Thread 14 :run()
Thread 13 :getFallback()
Thread 14 :run()
Thread 16 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()

 

基于javanica 和 spring 框架,确实少写了不少代码

 

五 附录

最后

以上就是烂漫舞蹈为你收集整理的SpringCloud(四)番外篇(一):Hystrix 断路器的全部内容,希望文章能够帮你解决SpringCloud(四)番外篇(一):Hystrix 断路器所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(69)

评论列表共有 0 条评论

立即
投稿
返回
顶部