内容简介:上文中介绍了Hystrix的由来,本文会深入分析Hystrix的执行过程。Hystrix的大部分逻辑基于RxJava,其实现让很热多人望而却步,停留在了仅仅使用的地步,从一个简单的HelloWorld开始。通过简单的实现run方法和getFallback 方法,
上文中介绍了Hystrix的由来,本文会深入分析Hystrix的执行过程。
Hystrix的大部分逻辑基于RxJava,其实现让很热多人望而却步,停留在了仅仅使用的地步,从一个简单的HelloWorld开始。
public class CommandHelloWorld extends HystrixCommand<String> { private final String name; public CommandHelloWorld(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected String run() { // 省略业务逻辑 // 该方法可能会抛出异常 return "Hello " + name + "!"; } @Override protected String getFallback() { return "Hello Failure " + name + "!"; } }
通过简单的实现run方法和getFallback 方法, CommandHelloWorld
具备了熔断降级的功能,其中 HystrixCommand
提供了4个方法:execute(),queue(),observe(),toObservable(),平时只需要关注execute和queue即可。
- queue():
异步调用,返回一个Future对象,后面可以通过Future获取结果。 - execute()
同步调用,调用后直接block住,直到依赖服务返回结果,或者抛出异常。
public R execute() { try { return queue().get(); } catch (Exception e) { throw Exceptions.sneakyThrow(decomposeException(e)); } }
其实同步方式是通过直接执行Future的get方法进行实现的,这里需要知道这个返回的Future对象到底是什么?
通过实现可以发现,返回的Futrue对象只是对 toObservable
返回结果的封装代理,把注意力转移到 toObservable
方法的实现。
第一次看到这个方法的实现,也许会很崩溃,方法内部初始化了一系列的Action0对象,这是RxJava的内部对象,可以看作是订阅一个事件后的回调。
一开始,不要太过在意这些Action,不然会陷入迷失,在debug的时候,通过不断的回调,经常会不知身处何处。
直接看到 toObservable
方法的return处,又是一坨回调,崩溃。
如果不熟悉RxJava语法,看这种代码真心的累,call方法的前面部分主要做两件事。
1、记录请求日志(日志功能开启)
2、从缓存中返回结果(定义了cacheKey方法,并且缓存功能开启)
如果缓存没有开启,或者返回null,那只能执行正常逻辑从下游服务拿取数据,这里通过上面定义的 applyHystrixSemantics
Action进行回调,最终执行的是 applyHystrixSemantics
方法,这个方法才是精华所在,想调试的同学,直接在这个方法入口打个断点,事半功倍。
其中 circuitBreaker.attemptExecution()
的返回结果,决定了接下去是执行正常逻辑、还是降级逻辑,这才是精华的精华。
看一下熔断器的 attemptExecution
方法,内部涉及了多个开关
- forceOpen
强制开启,所以请求都执行降级逻辑 - forceClose
强制关闭,所以请求都执行正常逻辑 - circuitOpened
熔断开关,默认为-1,请求执行正常逻辑,如果发生熔断,该值会被修改成0,请求执行降级逻辑 - HALF_OPEN
熔断半开,即熔断之后,每隔一段会进行试探
个人觉得,这里的实现过于复杂。
如果没有发生熔断,还有一道门槛,Hystrix提供了一个信号量限流器,限制进入熔断器最大并发数,可以控制请求下游的并发量,如果超过这个阈值,会被降级处理,有效的保护下游服务不会被突发流量给攻击。
通过的请求,继续调用 executeCommandAndObserve
方法,在该方法中,又定义了一堆让人迷惑的Action,不过这次通过名字和实现,可以知道个大概,先把这些Action放一边,看看接下去会执行的方法。
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { // 忽略一堆Action的定义 Observable<R> execution; if (properties.executionTimeoutEnabled().get()) { execution = executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { execution = executeCommandWithSpecifiedIsolation(_cmd); } return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); }
Hystrix内部提供了超时检查的机制,如果参数 executionTimeoutEnabled
开启,则每次请求都会提交一个任务到线程池中延迟执行。由于Hystrix实现中考虑的东西太多,所以在实现上还是很复杂。
这里只给出了关键逻辑,如果配置了超时时间10ms,会提交一个延迟10ms执行的任务,其中 tick
方法会通过CAS机制保证超时状态的变更,最终对应command的会执行onError方法,这里加入的 HystrixContextRunnable
主要为了跨线程的上下文数据传递。
在执行正常逻辑的实现中,Hystrix内部提供了信号量、线程池两种模式,默认使用线程池模式。在 executeCommandWithSpecifiedIsolation
方法中,分别对这两种模式进行了处理,而且可以根据参数随时进行切换,这就是为什么线程池一开始就要初始化的原因,虽然有资源的消耗,但是带来了更好的灵活性,在需要的时候可以从信号量模式变成线程池模式进行隔离。
下面以信号量的方式为例,分析下如何执行用户自定义的 run
方法.
又想吐槽Hystrix的代码,又是这种回调,在call实现中,暂时忽略前面的一大坨逻辑,跟进 getUserExecutionObservable
方法。
继续查看 getExecutionObservable
方法,该方法在 HystrixCommand
中被重写实现。
run
方法终于执行了。 run
方法执行之后,如果正常返回、抛出异常、或者其它情况,都需要对应的后续处理,这时之前 executeCommandAndObserve
方法中定义的Action,就开始起作用了。
execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext);
-
markEmits
run方法正常返回时执行,主要记录执行耗时;触发执行成功的通知事件,可以通过扩展插件做更多事情;如果当前是熔断状态,则关闭熔断。
-
handleFallback
run方法发生异常时执行,最终执行降级逻辑,但是整个过程实现还是很复杂的。
后续文章
Hystrix系列之自动熔断和恢复
Hystrix系列之插件实现
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Redis Lua脚本的执行原理
- ThinkPHP远程命令执行漏洞原理及复现
- nginx+php执行请求的工作原理
- Go的执行原理以及Go的命令
- Kafka简介、基本原理、执行流程与使用场景
- Kafka简介、基本原理、执行流程与使用场景
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Master Switch
Tim Wu / Knopf / 2010-11-2 / USD 27.95
In this age of an open Internet, it is easy to forget that every American information industry, beginning with the telephone, has eventually been taken captive by some ruthless monopoly or cartel. Wit......一起来看看 《The Master Switch》 这本书的介绍吧!