概述
编写不易,转载请注明(http://shihlei.iteye.com/blog/2428557)!
一 概述
近些年,微服务架构模式在企业中的得到应用推广,基于SpringBoot,SpringCloud快速实现微服务的技术也得到广泛应用。
SpringCloud是什么?简单的说是一套组件框架,提供服务化需要的一些基础能力,如分布式/版本化配置、服务注册和发现、路由、负载均衡、断路器、分布式消息等。
出于Hystrix实现的好奇,对Hystrix源码进行了研究。本文算是之前SpringCloud系列的番外篇,旨在介绍微服务中“断路器” 及 Netflix Hystrix的基本使用和配置。关于Hystrix的细节,会在之后《Hystrix 源码分析篇》做介绍。
关于SpringCloud的CircuitBreaker使用会在后面的文章进行总结。对SpringCloud系列感兴趣,可以阅读下已完成的部分:《SpringCloud》。
关于Hystrix,底层基于RxJava及观察者模式实现,有兴趣可以读一下之前的文章:《响应式编程 RxJava》、《RxJava2.x 操作Demo》。
二 断路器
1)场景概述
服务或方法调用过程,如遇处理过慢等待时间长,会造成调用线程长时阻塞。高并发的情况,这种长久的阻塞会造成资源耗尽,无法响应其他请求。
特别在微服务架构下,服务级联调用,常常因为一个耗时处理,产生级联失败,继而引发雪崩。
所以需要一套 “ 保护机制 ”,在服务访问性能不达标的情况,阻止无限等待。(注,短时的失效,重试即可)
2)解决思路
(1)常规思路
异步(配合线程池控制资源)执行并计时,超时interrupt异步任务,(throw TimeoutException),业务端进行容错处理。
优点:简单,封装的好可以解决一定的问题
缺点:任然会请求,调用端任然等待,仍有大量请求到达后端。
(2)断路器:
常规思路的升级版,思路引入快速失效概念,当大量超时后,进入断路状态,返回某个异常或默认值,之后一段时间内不再请求源服务(快速失效)。一定时间后尝试开启,确定服务源是否可用(自我恢复)。
基于这个定位,断路器需要提供如下功能:失效监控;状态(关闭,打开,半开)管理;自动修复;
基本状态说明:
(a)关闭(closed):访问正常,未达到失效的阈值,断路机制未启用
(b)打开(open):一段时间内失效次数达到阈值,断路机制开启,访问直接返回异常或默认值
(c)半开(half-open):断路器打开状态,指定时间后,分流部分请求尝试调用服务,如果成功,关闭断路器
三 Hystrix
1)概述
Hystrix:实现的断路器功能的lib,基于AOP模式,底层基于RxJava;通过隔离服务之间的访问点,阻止级联故障并提供提fallback执行,以提高系统的整体弹性。
核心功能:
(1)超时失效断路:阻止级联故障,降级,快速失效,快速恢复。
(2)实时操作:实时监控,实时配置,发现属性变化快速生效。
(3)并发性:并行执行,并发感知缓存请求,自动批量处理请求断路。
附加功能:报表,报警。
github:https://github.com/Netflix/Hystrix
2)使用过程
(1)继承:
HystrixCommand:返回单个响应
或
HystrixObservableCommand:返回多个响应
(2)重写执行和回退方法:
(a)执行方法:重写如下方法实现监控业务逻辑
HystrixCommand.run() :返回单个响应;
HystrixObservableCommand.construct():返回 Observable 用于提交多个结果;
(b)fallback:重写如下方法方法,实现断路后的默认返回
HystrixCommand.getFallback() :返回单个fallback响应;
HystrixObservableCommand.resumeWithFallback():返回 Observable 用于提交多个fallback结果;
(3)执行业务:
同步执行方式:String s = new CommandHelloWorld("Bob").execute();
异步执行方式:Future<String> s = new CommandHelloWorld("Bob").queue();
响应式:Observable<String> s = new CommandHelloWorld("Bob").observe();
3)简单demo
(1)依赖:
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>1.5.12</version>
</dependency>
(2)demo:
package x.demo.netflix.hystrix;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import rx.Observable;
/**
* Hystrix demo:
*
* @author shilei
*/
public class HystrixDemo {
private Service service = new ServiceCircuitBreakerProxy();
public static void main(String[] args) throws Exception {
HystrixDemo demo = new HystrixDemo();
//请求服务
for (int i = 0; i < 50; i++) {
demo.run();
}
//等待任务结束
TimeUnit.MINUTES.sleep(1);
}
void run() {
service.service();
}
/**
* 待监测的服务接口
*/
interface Service {
boolean service();
}
/**
* 断路器代理
*/
static class ServiceCircuitBreakerProxy extends HystrixCommand<Boolean> implements Service {
/**
* 配置HystrixCommand:
* 滑动窗口:1000 毫秒
* 桶数:1
* --- 则每个统计周期1000毫秒
* 超时时间:100毫秒
* 断路器打开错误率:50%
* --- 则一个滑动窗口内全部超时约执行10次(有其他是爱你消耗),预计执行8~9次run() 断路器打开,之后请求直接进入getFallback
*/
public ServiceCircuitBreakerProxy() {
super(Setter
/*
一般情况相同业务功能会使用相同的CommandGroupKey。对CommandKey分组,进行逻辑隔离。相同CommandGroupKey会使用同一个线程池或者信号量
*/
.withGroupKey(HystrixCommandGroupKey.Factory.asKey(ServiceCircuitBreakerProxy.class.getSimpleName()))
/*
一般同一监控服务使用相同的CommandKey,目的把HystrixCommand,HystrixCircuitBreaker,HytrixCommandMerics
以及其他相关对象关联在一起,形成一个原子组。采用原生接口的话,默认值为类名;采用注解形式的话,默认值为方法名
*/
.andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
/*
隔离级别,默认线程
*/
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
/*
线程执行超时时间,默认1000,一般选择所服务tp99的时间
*/
.withExecutionTimeoutInMilliseconds(50)
/*
默认20;一个滑动窗口内“触发断路”要达到的最小访问次数。低于该次数,技术错误率达到,也不会触发断路操作,用于测试压力是否满足要求。
*/
.withCircuitBreakerRequestVolumeThreshold(1)
/*
一个窗口内“触发断路”错误率。满足则进入断路状态,快速失效。
*/
.withCircuitBreakerErrorThresholdPercentage(50)
/*
默认 5000(即5s);断路器打开后过多久调用时间服务进行重试。
*/
.withCircuitBreakerSleepWindowInMilliseconds(10000)
)
.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
/*
线程池大小,默认10
*/
.withCoreSize(1)
/*
任务队列大小,使用BlockingQueue,默认-1
*/
.withMaxQueueSize(-1)
/*
默认1000(即10);设置统计的滑动窗口大小,毫秒值。每一个滑动窗口是决策周期,用于CircuitBreaker计算错误率,做状态改变。
*/
.withMetricsRollingStatisticalWindowInMilliseconds(1000)
/*
默认10;设置一个滑动窗口内桶的数量,一个bucket的时间周期=timeInMilliseconds/numBuckets,
是统计的最小时间单元,独立计数。Hystrix的滑动窗口按照一个bucket的时间周期向前滑动,合并最近的n个bucket的统计数据,即为一个时间窗口,计算错误率,改变状态。
*/
.withMetricsRollingStatisticalWindowBuckets(1)
)
);
}
/**
* 代理实际业务
*/
@Override
public boolean service() {
return doExecute();
}
/**
* 同步方式调用:以同步堵塞方式执行的run()。
* 调用execute()后,hystrix先创建一个新线程运行run(),接着调用程序要在execute()调用处一直堵塞着,直到run()运行完成
*
* @return 结果
*/
private boolean doExecute() {
return new ServiceCircuitBreakerProxy().execute();
}
@Override
protected Boolean run() throws Exception {
System.out.println("Thread " + Thread.currentThread().getId() + " :run()");
//模拟超时
TimeUnit.SECONDS.sleep(1);
return true;
}
@Override
protected Boolean getFallback() {
System.out.println("Thread " + Thread.currentThread().getId() + " :getFallback()");
return false;
}
}
}
(3)结果:
Thread 13 :run() Thread 12 :getFallback() Thread 13 :run() Thread 12 :getFallback() Thread 13 :run() Thread 14 :getFallback() Thread 13 :run() Thread 12 :getFallback() Thread 13 :run() Thread 15 :getFallback() Thread 13 :run() Thread 14 :getFallback() Thread 13 :run() Thread 16 :getFallback() Thread 13 :run() Thread 12 :getFallback() Thread 1 :getFallback() Thread 1 :getFallback() Thread 1 :getFallback() Thread 1 :getFallback() Thread 1 :getFallback() Thread 1 :getFallback() Thread 1 :getFallback() Thread 1 :getFallback() Thread 1 :getFallback() Thread 1 :getFallback() Thread 1 :getFallback() Thread 1 :getFallback()
注:
滑动窗口:1000 毫秒;桶数:1 --- 则每个统计周期1000毫秒
超时时间:100毫秒;断路器打开错误率:50% --- 则一个滑动窗口内全部超时约执行10次(有其消耗时间操作),预计执行8~9次run() 断路器打开,之后请求直接进入getFallback
4)处理流程分析
解释下图中的各个步骤:
(1)创建HystrixCommand or HystrixObservableCommand 代表需要监控断路的服务
(2)执行命令:4种执行方法
(a)K value = command.execute(); //阻塞,直到获得这侧请求的结果或抛出异常
(b)Future<K> fValue = command.queue(); //返回Future,用于之后获得这次请求的结果
(c)Observable<K> ohValue = command.observe(); //hot observable,返回Observable用于观察和获得结果
(d)Observable<K> ocValue = command.toObservable(); //cold observable,返回Observable,用于观察和获得结果
注:所有的HystrixCommand都是基于Observable实现的,同步方式调用,只是框架帮我们调用了toObservable().toBlocking().toFuture()。
(3)如果开启请求缓存,请求条件相同会立即从cache重返回。
(4)判断断路器是否打开
断路器进入打开状态的判断依据:
(a)断路器访问量达到阈值:HystrixCommandProperties.circuitBreakerRequestVolumeThreshold():
(b)错误百分比超过所设置错误百分比阈值:HystrixCommandProperties.circuitBreakerErrorThresholdPercentage()
一个时间窗口内,满足(a)(b)断路器进入打开状态经过一段时间(HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds())后,请求成功。如果请求失败,断路器会在休眠窗口期间返回OPEN状态。如果请求成功,断路器将切换到CLOSED,并且逻辑1将再次接通。
(5)判断Thread Pool/Queue/Semaphore 是否满
如果full,说明资源耗尽,则rejected,可以直接调用getFallback()
(6)执行HystrixCommand.run()/HystrixObservableCommand.construct()
如果超时,则中断线程,并抛出InterruptedExceptions ,如果客户端会抛出该异常要小心。
(7)计算断路器状态
Hystrix会报告执行结果,包括 successes, failures, rejections, and timeouts,用于更新时间窗口内的统计信息,并更新断路器状态。
(8)执行fallback
用途:
实现通用返回,包括从内存cache或其方式读取这次请求的值。
触发方式:
(a)由construct() or run()抛出了一个异常
(b)断路器已经打开的时候
(c)没有空闲的线程池和队列或者信号量
(d)一次命令执行超时
注:如果fallback中做复杂网络操作,或抛异常也会做断路
(9)获得正常返回结果
5)核心配置说明(如demo)
(1)CommandKey:一般同一监控服务使用相同的CommandKey,目的把HystrixCommand,HystrixCircuitBreaker,HytrixCommandMerics以及其他相关对象关联在一起,形成一个原子组。采用原生接口的话,默认值为类名;采用注解形式的话,默认值为方法名。
(2)GroupKey:一般情况相同业务功能会使用相同的CommandGroupKey。对CommandKey分组,进行逻辑隔离。相同CommandGroupKey会使用同一个线程池或者信号量。
(3)ThreadPoolKey:物理隔离(相对于GroupKey逻辑隔离),当没有设置ThreadPoolKey的时候,线程池或者信号量的划分按照CommandGroupKey,当设置了ThreadPoolKey,那么线程池和信号量的划分就按照ThreadPoolKey来处理,相同ThreadPoolKey采用同一个线程池或者信号量。
(4)命令类
(a)Execution:
execution.isolation.strategy:隔离级别,默认线程
execution.isolation.thread.timeoutInMilliseconds:线程执行超时时间,默认1000,一般选择所服务tp99的时间。
(b)Circuit Breaker:
circuitBreaker.requestVolumeThreshold: 默认20;一个滑动窗口内“触发断路”要达到的最小访问次数。低于该次数,技术错误率达到,也不会触发断路操作,用于测试压力是否满足要求。
circuitBreaker.errorThresholdPercentage:默认 50(即50%);一个窗口内“触发断路”错误率。满足则进入断路状态,快速失效。
circuitBreaker.sleepWindowInMilliseconds:默认 5000(即5s);断路器打开后过多久调用时间服务进行重试。
(5)ThreadPool 线程池:
coreSize:线程池大小,默认10
maxQueueSize:任务队列大小,使用BlockingQueue,默认-1
metrics.rollingStats.timeInMilliseconds:默认1000(即10);设置统计的滑动窗口大小,毫秒值。每一个滑动窗口是决策周期,用于CircuitBreaker计算错误率,做状态改变。
metrics.rollingStats.numBuckets:默认10;设置一个滑动窗口内桶的数量,一个bucket的时间周期=timeInMilliseconds/numBuckets,是统计的最小时间单元,独立计数。Hystrix的滑动窗口按照一个bucket的时间周期向前滑动,合并最近的n个bucket的统计数据,即为一个时间窗口,计算错误率,改变状态。
滑动窗口样子如下:
解释一下:滑动窗口模式,相对于固定窗口概念
(a)固定时间窗口:选取一个时间段,进行统计错误率,触发操作;如果错误集中在两个时间窗口的临界点,虽然临界点时间段内满足阈值,但是在各自时间窗口内没有达到错误阈值,操作不会执行。
如:1分钟错误不超过100次,如果在左后10s错误了50次,在下一分钟开始10s错误了50次,其实阈值已经满足,但是固定窗口与统计不到,无法做响应的操作。
(b)滑动时间窗口:将固定窗口切成更小的n时间段,以每个小的时间段作为统计周期,每次向前滑动一个小时间段,统计最近的n个小时间段的统计,作为触发决策条件。
如:1分钟错误不超过100次,分6个bucket,每个bucket 10秒作为统计周期,每过10秒,统计一次最近的6个bucket的统计信息查看错误率,刚才的场景,就能处理了。
四 hystrix + hystrix-javanica + aop 组合使用
1)概述
Java本身的提供反射和注解特性可以明显降低开发复杂度,提高灵活性,hystrix-javanica 提供了 Hystrix基于注解的开发模式。
github:https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-javanica
2)使用流程
(1)@HystrixCommand注释需要监控断路的方法
(2)提供fallback方法
3)简单demo
(1)maven
<dependencies> <dependency> <groupId>com.netflix.hystrix</groupId> <artifactId>hystrix-core</artifactId> <version>1.5.12</version> </dependency> <dependency> <groupId>com.netflix.hystrix</groupId> <artifactId>hystrix-javanica</artifactId> <version>1.5.12</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>5.0.6.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.0.6.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> <version>5.0.6.RELEASE</version> </dependency> </dependencies>
(2)demo
package x.demo.netflix.hystrix;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixProperty;
import com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.stereotype.Component;
/**
* StringHystrixJavanicaDemo
*
* @author shilei
*/
@EnableAspectJAutoProxy
public class StringHystrixJavanicaDemo {
@Resource
private Service service;
public static void main(String[] args) throws Exception {
try (AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();) {
context.register(StringHystrixJavanicaDemo.class);
context.refresh();
StringHystrixJavanicaDemo demo = context.getBean(StringHystrixJavanicaDemo.class);
for (int i = 0; i < 20; i++) {
demo.run();
}
}
}
void run() throws InterruptedException {
service.doService();
}
@Component
static class Service {
/**
* 被管理的方法
*
* @return 是否调用成功
*/
@HystrixCommand(
groupKey = "StringHystrixJavanicaDemo",
commandKey = "HelloWorld",
defaultFallback = "doFallack",
commandProperties = {
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "100"),
@HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "1"),
@HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "50"),
@HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds", value = "10000")
},
threadPoolProperties = {
@HystrixProperty(name = "coreSize", value = "1"),
@HystrixProperty(name = "maxQueueSize", value = "-1"),
@HystrixProperty(name = "metrics.rollingStats.timeInMilliseconds", value = "1000"),
@HystrixProperty(name = "metrics.rollingStats.numBuckets", value = "1")})
public boolean doService() throws InterruptedException {
System.out.println("Thread " + Thread.currentThread().getId() + " :run()");
//模拟超时
TimeUnit.SECONDS.sleep(1);
return true;
}
/**
* 断路器开启后,控制默认返回
*
* @return 期望的默认值
*/
public boolean doFallack() {
System.out.println("Thread " + Thread.currentThread().getId() + " :getFallback()");
return false;
}
}
@Configuration
static class HystrixConfiguration {
/**
* 处理代理类
*
* @return 代理
*/
@Bean
public HystrixCommandAspect hystrixCommandAspect() {
return new HystrixCommandAspect();
}
}
}
(3)结果
Thread 14 :run()
Thread 13 :getFallback()
Thread 14 :run()
Thread 13 :getFallback()
Thread 14 :run()
Thread 15 :getFallback()
Thread 14 :run()
Thread 13 :getFallback()
Thread 14 :run()
Thread 16 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
Thread 1 :getFallback()
基于javanica 和 spring 框架,确实少写了不少代码
五 附录
最后
以上就是烂漫舞蹈为你收集整理的SpringCloud(四)番外篇(一):Hystrix 断路器的全部内容,希望文章能够帮你解决SpringCloud(四)番外篇(一):Hystrix 断路器所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复