概述
只分析使用@HystrixCommand
注解的情况。feign
结合暂时不考虑,也不考虑异步。
如果要使用hystrix
,可以通过@HystrixCommand
注解,注释需要熔断降级的方法,hystrix
会根据调用情况和配置值执行熔断、降级。
@HystrixCommand
注解解析就成了重要的一步,hystrix
内通过AOP
的方式,处理@HystrixCommand
注解。
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}
...
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
Method method = getMethodFromTarget(joinPoint);
...
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
Object result;
try {
if (!metaHolder.isObservable()) {
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
}...
return result;
}
Hystrix
代理了将注解注释的方法,使用Invokable
封装。先忽略中间的封装处理过程,直接看CommandExecutor.execute(invokable, executionType, metaHolder)
方法执行逻辑。
在CommandExecutor.execute(invokable, executionType, metaHolder)
方法内,将Invokable
转为HystrixExecutable
对象,执行execute
方法。
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
...
switch (executionType) {
case SYNCHRONOUS: {
return castToExecutable(invokable, executionType).execute();
}
...
}
}
HystrixExecutable
是一个接口,在本文场景里它的实现类为HystrixCommand
对象,execute
方法内,先调用queue
返回一个Future
,阻塞等待结果。
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
queue
方法代码比较长,主要逻辑是调用HystrixCommand.toObservable
方法,返回RxJava
的Obserable
对象,将对象转为Future
对象。
toObservable
方法也是一长串的代码,都是围绕AbstractCommand.applyHystrixSemantics
方法展开,接下来就分析applyHystrixSemantics
方法。
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
...
// 请求能否执行
if (circuitBreaker.attemptExecution()) {
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
...
// 如果机遇 Thread,这里的信号量条件默认都是 true
if (executionSemaphore.tryAcquire()) {
try {
...
// 执行 command
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
return handleShortCircuitViaFallback();
}
}
在applyHystrixSemantics
方法内,首先通过熔断器判断请求能否被执行,有可能当前已经触发熔断,请求不会执行,直接调用handleShortCircuitViaFallback
方法,执行fallback
方法。
executeCommandAndObserve
方法执行HystrixCommand
,执行逻辑都是基于RxJava
实现。executeCommandAndObserve
代码在下面贴了出来,关键代码我已经加上了注释,Hystrix
执行方法基于RxJava
实现,executeCommandWithSpecifiedIsolation
方法执行请求,当调用失败时,触发onErrorResumeNext(handleFallback)
逻辑,调用降级方法。
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
...
// 降级处理逻辑,因为代码基于 RxJava 实现,所以在这里封装了一层,当请求失败时,会调用 handleFallback,可以看到
// handleFallback 内,针对不同的异常,调用了不同的 fallback,但是最后都是调用 @HystrixCommand 内配置的 fallback 方法。
// 细节可以自己研究
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
circuitBreaker.markNonSuccess();
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
...
Observable<R> execution;
// 执行请求
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
// 容错降级
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
executeCommandWithSpecifiedIsolation
方法内又是一堆面条状代码,代码里会调用AbstractCommand.getExecutionObservable
方法,在此方法内,调用run
抽象方法,执行。
@Override
final protected Observable<R> getExecutionObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
return Observable.just(run());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
}).doOnSubscribe(new Action0() {
@Override
public void call() {
// Save thread on which we get subscribed so that we can interrupt it later if needed
executionThread.set(Thread.currentThread());
}
});
}
run
方法由GenericCommand
类实现,getCommandAction().execute(getExecutionType())
逻辑很简单,就是执行被@HystrixCommand
注释的方法,不再展开。
@Override
protected Object run() throws Exception {
LOGGER.debug("execute command: {}", getCommandKey().name());
return process(new Action() {
@Override
Object execute() {
return getCommandAction().execute(getExecutionType());
}
});
}
回顾上面的代码,逻辑很简单,当@HystrixCommand
注释的方法被调用的时候,Hystrix
通过AOP
处理了请求,用RxJava
包装了方法的执行过程,当方法执行出错时,调用fallback
方法,同时根据hystrix
配置和实际调用情况,决定是否触发熔断。
最后
以上就是漂亮睫毛为你收集整理的Spring Cloud Hystrix 熔断实现的全部内容,希望文章能够帮你解决Spring Cloud Hystrix 熔断实现所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复