正常逻辑执行
AbstractCommand.toObservable()
方法中,当缓存特性未开启或者缓存未命中时,将applyHystrixSemantics
传入Observable.defer
方法中,声明执行命令的Observable
。
applyHystrixSemantics变量
创建applyHystrixSemantics
的代码如下:
1 | final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() { |
applyHystrixSemantics方法
1 | private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { |
- 调用
getExecutionSemaphore()
方法获得信号量(TryableSemaphore
)对象 - 创建信号量释放Action,用于下面命令执行Observable的
doOnTerminate
和doOnUnsubscribe
方法 - 创建命令执行失败Action,用于下面命令执行Observable的
doOnError
方法 - 调用
TryableSemaphore.tryAcquire
方法获取信号量 - 标记
executionResult
调用的开始时间 - 调用
executeCommandAndObserve
方法,获取命令执行Observable - 若发生异常,调用
Observable.error
方法返回Observable
- 若信号量(
TryableSemaphore
)使用失败,调用handleSemaphoreRejectionViaFallback()
方法,处理信号量拒绝的失败回退逻辑 - 若链路处于熔断状态,调用
handleShortCircuitViaFallback()
方法,处理链路熔断的失败回退逻辑。
TryableSemaphore
com.netflix.hystrix.AbstractCommand.TryableSemaphore
是Hystrix定义的信号量接口。代码如下:
1 | interface TryableSemaphore { |
TryableSemaphore共有两个子类实现:
- TryableSemaphoreNoOp
- TryableSemaphoreActual
TryableSemaphoreNoOp
com.netflix.hystrix.AbstractCommand.TryableSemaphoreNoOp
是无操作的信号量。代码如下:
1 | static class TryableSemaphoreNoOp implements TryableSemaphore { |
tryAcquire()
方法每次都返回true
,release()
方法无任何操作。这是为什么?在Hystrix里提供了两种执行隔离策略:
- Thread。该方式不使用信号量,因此使用TryableSemaphoreNoOp,这样每次调用
tryAcquire
都能返回true
。 - Semaphore。该方式使用信号量,因此使用TryableSemaphoreActual,这样每次调用
tryAcquire
根据情况返回true/false
。
TryableSemaphoreActual
com.netflix.hystrix.AbstractCommand.TryableSemaphoreActual
是真正的信号量实现。不过实际上,TryableSemaphoreActual
更加像一个计数器。代码如下:
1 | static class TryableSemaphoreActual implements TryableSemaphore { |
numberOfPermits
属性:信号量上限。com.netflix.hystrix.strategy.properties.HystrixProperty
是一个接口,当其使用类似com.netflix.hystrix.strategy.properties.archaius.IntegerDynamicProperty
动态属性的实现时,可以实现动态调整信号量的上限,这就是为什么不使用java.util.concurrent.Semaphore
的原因之一count
属性:信号量使用数量。这是为什么说TryableSemaphoreActual
更加像一个计数器的原因- 另一个不使用
java.util.concurrent.Semaphore
的原因,TryableSemaphoreActual
无阻塞获取信号量的需求,使用AtomicInteger
可以达到更加轻量级的实现。
getExecutionSemaphore
1 | /** |
getExecutionSemaphore
方法根据执行隔离策略的不同,返回不同的信号量实现:
Thread
。该方式不使用信号量,因此返回TryableSemaphoreNoOp
。Semaphore
。该方式使用信号量,因此返回TryableSemaphoreActual
。不同命令的信号量都保存在executionSemaphorePerCircuit
。
executeCommandAndObserve
executeCommandAndObserve
方法返回命令执行的Observable。代码如下:
1 | private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { |
- 获取请求上下文
- 分别定义doOnNext中的回调、doOnCompleted中的回调、onErrorResumeNext中的回调、doOnEach中的回调
- 调用
executeCommandWithSpecifiedIsolation
方法,获得命令执行Observable - 若执行命令超时特性开启,调用
Observable.lift
方法实现执行命令超时功能。
executeCommandWithSpecifiedIsolation
executeCommandWithSpecifiedIsolation
方法返回执行命令Observable。代码如下:
1 | private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) { |
根据执行隔离策略的不同,创建不同的命令执行Observable
。
执行隔离策略为Thread:
- 调用
executionResult.setExecutionOccurred()
,标记executionResult执行已发生 - 将
commandState
设置为USER_CODE_EXECUTED
。若设置失败,调用Observable.error
方法返回Observable - 检查是否超时。若超时,调用
Observable.error
方法返回Observable - 将
threadState
设置为ThreadState.STARTED
。若设置失败,调用Observable.error
方法返回Observable - 调用
getUserExecutionObervable
方法创建命令执行Observable。若发生异常,调用Observable.error
方法返回Observable - 调用
doOnTerminate
方法添加Action0 - 调用
doOnUnsubscribe
方法添加Action0 - 调用
Observable.subscribeOn
方法指定Observable自身在哪个调度器上执行。ThreadPool.getScheduler
方法获得Hystrix自定义实现的RxJava Scheduler
执行隔离策略为Semaphore:
- 调用
executionResult.setExecutionOccurred()
,标记executionResult执行已发生 - 将
commandState
设置为USER_CODE_EXECUTED
。若设置失败,调用Observable.error
方法返回Observable - 调用
getUserExecutionObervable
方法创建命令执行Observable。若发生异常,调用Observable.error
方法返回Observable
getUserExecutionObervable
getUserExecutionObervable
方法创建命令执行Observable
1 | private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) { |
调用getExecutionObservable
方法创建命令执行Observable
。getExecutionObservable
方法是个抽象方法,HystrixCommand
实现了该方法。
若发生异常,调用Observable.error
方法返回Observable
HystrixCommand.getExecutionObservable
调用HystrixCommand.getExecutionObservable
方法创建命令执行Observable
1 | final protected Observable<R> getExecutionObservable() { |
调用Observable.defer方法创建命令执行Observable
调用
run
方法,运行正常执行逻辑。通过Observable.just
方法返回创建的Observable调用
doOnSubscribe
方法,添加Action。该操作记录执行线程(executionThread
)。executionThread
用于HystrixCommand.queue()
方法,返回的Future结果,可以调用Future.cancel
方法run()
抽象方法,实现该方法,运行正常逻辑。
run()
抽象方法的实现为GenericCommand.run()
GenericCommand.run()
GenericCommand.run()
方法的代码如下:
1 | protected Object run() throws Exception { |
AbstractHystrixCommand.process
方法中调用Action
的execute()
方法。
execute()
方法:
- 首先调用
getCommandAction()
方法获取CommandAction
,我们的示例中获取到的是MethodExecutionAction
。 - 然后调用
MethodExecutionAction.execute
方法,传入ExecutionType
参数,我们的示例中传入的是ExecutionType.SYNCHRONOUS
。
代码如下:
1 | public Object execute(ExecutionType executionType) throws CommandActionExecutionException { |
我们看到最终在MethodExecutionAction.execute
方法中通过反射调用其中的Method
,返回执行结果。