我是靠谱客的博主 漂亮睫毛,最近开发中收集的这篇文章主要介绍Spring Cloud Hystrix 熔断实现,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

只分析使用@HystrixCommand注解的情况。feign结合暂时不考虑,也不考虑异步。

如果要使用hystrix,可以通过@HystrixCommand注解,注释需要熔断降级的方法,hystrix会根据调用情况和配置值执行熔断、降级。

@HystrixCommand注解解析就成了重要的一步,hystrix内通过AOP的方式,处理@HystrixCommand注解。

@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}
...
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
Method method = getMethodFromTarget(joinPoint);
...
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
Object result;
try {
if (!metaHolder.isObservable()) {
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
}...
return result;
}

Hystrix 代理了将注解注释的方法,使用Invokable封装。先忽略中间的封装处理过程,直接看CommandExecutor.execute(invokable, executionType, metaHolder)方法执行逻辑。

CommandExecutor.execute(invokable, executionType, metaHolder)方法内,将Invokable转为HystrixExecutable对象,执行execute方法。

public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
...
switch (executionType) {
case SYNCHRONOUS: {
return castToExecutable(invokable, executionType).execute();
}
...
}
}

HystrixExecutable是一个接口,在本文场景里它的实现类为HystrixCommand对象,execute
方法内,先调用queue返回一个Future,阻塞等待结果。

public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}

queue方法代码比较长,主要逻辑是调用HystrixCommand.toObservable方法,返回RxJavaObserable对象,将对象转为Future对象。

toObservable方法也是一长串的代码,都是围绕AbstractCommand.applyHystrixSemantics方法展开,接下来就分析applyHystrixSemantics方法。

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
...
// 请求能否执行
if (circuitBreaker.attemptExecution()) {
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
...
// 如果机遇 Thread,这里的信号量条件默认都是 true
if (executionSemaphore.tryAcquire()) {
try {
...
// 执行 command
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
return handleShortCircuitViaFallback();
}
}

applyHystrixSemantics方法内,首先通过熔断器判断请求能否被执行,有可能当前已经触发熔断,请求不会执行,直接调用handleShortCircuitViaFallback方法,执行fallback方法。

executeCommandAndObserve方法执行HystrixCommand,执行逻辑都是基于RxJava实现。executeCommandAndObserve代码在下面贴了出来,关键代码我已经加上了注释,Hystrix执行方法基于RxJava实现,executeCommandWithSpecifiedIsolation方法执行请求,当调用失败时,触发onErrorResumeNext(handleFallback)逻辑,调用降级方法。

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
...
// 降级处理逻辑,因为代码基于 RxJava 实现,所以在这里封装了一层,当请求失败时,会调用 handleFallback,可以看到
// handleFallback 内,针对不同的异常,调用了不同的 fallback,但是最后都是调用 @HystrixCommand 内配置的 fallback 方法。
// 细节可以自己研究
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
circuitBreaker.markNonSuccess();
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
...
Observable<R> execution;
// 执行请求
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
// 容错降级
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}

executeCommandWithSpecifiedIsolation方法内又是一堆面条状代码,代码里会调用AbstractCommand.getExecutionObservable方法,在此方法内,调用run抽象方法,执行。

@Override
final protected Observable<R> getExecutionObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
return Observable.just(run());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
}).doOnSubscribe(new Action0() {
@Override
public void call() {
// Save thread on which we get subscribed so that we can interrupt it later if needed
executionThread.set(Thread.currentThread());
}
});
}

run方法由GenericCommand类实现,getCommandAction().execute(getExecutionType())逻辑很简单,就是执行被@HystrixCommand注释的方法,不再展开。

@Override
protected Object run() throws Exception {
LOGGER.debug("execute command: {}", getCommandKey().name());
return process(new Action() {
@Override
Object execute() {
return getCommandAction().execute(getExecutionType());
}
});
}

回顾上面的代码,逻辑很简单,当@HystrixCommand注释的方法被调用的时候,Hystrix通过AOP处理了请求,用RxJava包装了方法的执行过程,当方法执行出错时,调用fallback方法,同时根据hystrix配置和实际调用情况,决定是否触发熔断。

最后

以上就是漂亮睫毛为你收集整理的Spring Cloud Hystrix 熔断实现的全部内容,希望文章能够帮你解决Spring Cloud Hystrix 熔断实现所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(50)

评论列表共有 0 条评论

立即
投稿
返回
顶部