正常逻辑执行
AbstractCommand.toObservable()方法中,当缓存特性未开启或者缓存未命中时,将applyHystrixSemantics传入Observable.defer方法中,声明执行命令的Observable。
applyHystrixSemantics变量
创建applyHystrixSemantics的代码如下:
1 | final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() { |
applyHystrixSemantics方法
1 | private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { |
- 调用
getExecutionSemaphore()方法获得信号量(TryableSemaphore)对象 - 创建信号量释放Action,用于下面命令执行Observable的
doOnTerminate和doOnUnsubscribe方法 - 创建命令执行失败Action,用于下面命令执行Observable的
doOnError方法 - 调用
TryableSemaphore.tryAcquire方法获取信号量 - 标记
executionResult调用的开始时间 - 调用
executeCommandAndObserve方法,获取命令执行Observable - 若发生异常,调用
Observable.error方法返回Observable - 若信号量(
TryableSemaphore)使用失败,调用handleSemaphoreRejectionViaFallback()方法,处理信号量拒绝的失败回退逻辑 - 若链路处于熔断状态,调用
handleShortCircuitViaFallback()方法,处理链路熔断的失败回退逻辑。
TryableSemaphore
com.netflix.hystrix.AbstractCommand.TryableSemaphore是Hystrix定义的信号量接口。代码如下:
1 | interface TryableSemaphore { |
TryableSemaphore共有两个子类实现:
- TryableSemaphoreNoOp
- TryableSemaphoreActual
TryableSemaphoreNoOp
com.netflix.hystrix.AbstractCommand.TryableSemaphoreNoOp是无操作的信号量。代码如下:
1 | static class TryableSemaphoreNoOp implements TryableSemaphore { |
tryAcquire()方法每次都返回true,release()方法无任何操作。这是为什么?在Hystrix里提供了两种执行隔离策略:
- Thread。该方式不使用信号量,因此使用TryableSemaphoreNoOp,这样每次调用
tryAcquire都能返回true。 - Semaphore。该方式使用信号量,因此使用TryableSemaphoreActual,这样每次调用
tryAcquire根据情况返回true/false。
TryableSemaphoreActual
com.netflix.hystrix.AbstractCommand.TryableSemaphoreActual是真正的信号量实现。不过实际上,TryableSemaphoreActual更加像一个计数器。代码如下:
1 | static class TryableSemaphoreActual implements TryableSemaphore { |
numberOfPermits属性:信号量上限。com.netflix.hystrix.strategy.properties.HystrixProperty是一个接口,当其使用类似com.netflix.hystrix.strategy.properties.archaius.IntegerDynamicProperty动态属性的实现时,可以实现动态调整信号量的上限,这就是为什么不使用java.util.concurrent.Semaphore的原因之一count属性:信号量使用数量。这是为什么说TryableSemaphoreActual更加像一个计数器的原因- 另一个不使用
java.util.concurrent.Semaphore的原因,TryableSemaphoreActual无阻塞获取信号量的需求,使用AtomicInteger可以达到更加轻量级的实现。
getExecutionSemaphore
1 | /** |
getExecutionSemaphore方法根据执行隔离策略的不同,返回不同的信号量实现:
Thread。该方式不使用信号量,因此返回TryableSemaphoreNoOp。Semaphore。该方式使用信号量,因此返回TryableSemaphoreActual。不同命令的信号量都保存在executionSemaphorePerCircuit。
executeCommandAndObserve
executeCommandAndObserve方法返回命令执行的Observable。代码如下:
1 | private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { |
- 获取请求上下文
- 分别定义doOnNext中的回调、doOnCompleted中的回调、onErrorResumeNext中的回调、doOnEach中的回调
- 调用
executeCommandWithSpecifiedIsolation方法,获得命令执行Observable - 若执行命令超时特性开启,调用
Observable.lift方法实现执行命令超时功能。
executeCommandWithSpecifiedIsolation
executeCommandWithSpecifiedIsolation方法返回执行命令Observable。代码如下:
1 | private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) { |
根据执行隔离策略的不同,创建不同的命令执行Observable。
执行隔离策略为Thread:
- 调用
executionResult.setExecutionOccurred(),标记executionResult执行已发生 - 将
commandState设置为USER_CODE_EXECUTED。若设置失败,调用Observable.error方法返回Observable - 检查是否超时。若超时,调用
Observable.error方法返回Observable - 将
threadState设置为ThreadState.STARTED。若设置失败,调用Observable.error方法返回Observable - 调用
getUserExecutionObervable方法创建命令执行Observable。若发生异常,调用Observable.error方法返回Observable - 调用
doOnTerminate方法添加Action0 - 调用
doOnUnsubscribe方法添加Action0 - 调用
Observable.subscribeOn方法指定Observable自身在哪个调度器上执行。ThreadPool.getScheduler方法获得Hystrix自定义实现的RxJava Scheduler
执行隔离策略为Semaphore:
- 调用
executionResult.setExecutionOccurred(),标记executionResult执行已发生 - 将
commandState设置为USER_CODE_EXECUTED。若设置失败,调用Observable.error方法返回Observable - 调用
getUserExecutionObervable方法创建命令执行Observable。若发生异常,调用Observable.error方法返回Observable
getUserExecutionObervable
getUserExecutionObervable方法创建命令执行Observable
1 | private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) { |
调用getExecutionObservable方法创建命令执行Observable。getExecutionObservable方法是个抽象方法,HystrixCommand实现了该方法。
若发生异常,调用Observable.error方法返回Observable
HystrixCommand.getExecutionObservable
调用HystrixCommand.getExecutionObservable方法创建命令执行Observable
1 | final protected Observable<R> getExecutionObservable() { |
调用Observable.defer方法创建命令执行Observable
调用
run方法,运行正常执行逻辑。通过Observable.just方法返回创建的Observable调用
doOnSubscribe方法,添加Action。该操作记录执行线程(executionThread)。executionThread用于HystrixCommand.queue()方法,返回的Future结果,可以调用Future.cancel方法run()抽象方法,实现该方法,运行正常逻辑。
run()抽象方法的实现为GenericCommand.run()
GenericCommand.run()
GenericCommand.run()方法的代码如下:
1 | protected Object run() throws Exception { |
AbstractHystrixCommand.process方法中调用Action的execute()方法。
execute()方法:
- 首先调用
getCommandAction()方法获取CommandAction,我们的示例中获取到的是MethodExecutionAction。 - 然后调用
MethodExecutionAction.execute方法,传入ExecutionType参数,我们的示例中传入的是ExecutionType.SYNCHRONOUS。
代码如下:
1 | public Object execute(ExecutionType executionType) throws CommandActionExecutionException { |
我们看到最终在MethodExecutionAction.execute方法中通过反射调用其中的Method,返回执行结果。