從源碼分析 Hystrix 工作機制
作者:vivo 互聯網服務器團隊 - Pu Shuai
一、Hystrix 解決了什麼問題?
在複雜的分佈式應用中有着許多的依賴,各個依賴都難免會在某個時刻失敗,如果應用不隔離各個依賴,降低外部的風險,那容易拖垮整個應用。
舉個電商場景中常見的例子,比如訂單服務調用了庫存服務、商品服務、積分服務、支付服務,系統均正常情況下,訂單模塊正常運行。
但是當積分服務發生異常時且會阻塞 30s 時,訂單服務就會有部分請求失敗,且工作線程阻塞在調用積分服務上。
流量高峯時,問題會更加嚴重,訂單服務的所有請求都會阻塞在調用積分服務上,工作線程全部掛起,導致機器資源耗盡,訂單服務也不可用,造成級聯影響,整個集羣宕機,這種稱爲雪崩效應。
所以需要一種機制,使得單個服務出現故障時,整個集羣可用性不受到影響。Hystrix 就是實現這種機制的框架,下面我們分析一下 Hystrix 整體的工作機制。
二、整體機制
-
**【入口】**Hystrix 的執行入口是 HystrixCommand 或 HystrixObservableCommand 對象,通常在 Spring 應用中會通過註解和 AOP 來實現對象的構造,以降低對業務代碼的侵入性;
-
**【緩存】**HystrixCommand 對象實際開始執行後,首先是否開啓緩存,若開啓緩存且命中,則直接返回;
-
**【熔斷】**若熔斷器打開,則執行短路,直接走降級邏輯;若熔斷器關閉,繼續下一步,進入隔離邏輯。熔斷器的狀態主要基於窗口期內執行失敗率,若失敗率過高,則熔斷器自動打開;
-
**【隔離】**用戶可配置走線程池隔離或信號量隔離,判斷線程池任務已滿(或信號量),則進入降級邏輯;否則繼續下一步,實際由線程池任務線程執行業務調用;
-
**【執行】**實際開始執行業務調用,若執行失敗或異常,則進入降級邏輯;若執行成功,則正常返回;
-
**【超時】**通過定時器延時任務檢測業務調用執行是否超時,若超時則取消業務執行的線程,進入降級邏輯;若未超時,則正常返回。線程池、信號量兩種策略均隔離方式支持超時配置(信號量策略存在缺陷);
-
**【降級】**進入降級邏輯後,當業務實現了 HystrixCommand.getFallback() 方法,則返回降級處理的數據;當未實現時,則返回異常;
-
**【統計】**業務調用執行結果成功、失敗、超時等均會進入統計模塊,通過健康統計結果來決定熔斷器打開或關閉。
都說源碼裏沒有祕密,下面我們來分析下核心功能源碼,看看 Hystrix 如何實現整體的工作機制。
三、熔斷
家用電路中都有保險絲,保險絲的作用場景是,當電路發生故障或異常時,伴隨着電流不斷升高,並且升高的電流有可能損壞電路中的某些重要器件或貴重器件,也有可能燒燬電路甚至造成火災。
若電路中正確地安置了保險絲,那麼保險絲就會在電流異常升高到一定程度的時候,自身熔斷切斷電流,從而起到保護電路安全運行的作用。Hystrix 提供的熔斷器就有類似功能,應用調用某個服務提供者,當一定時間內請求總數超過配置的閾值,且窗口期內錯誤率過高,那 Hystrix 就會對調用請求熔斷,後續的請求直接短路,直接進入降級邏輯,執行本地的降級策略。
Hystrix 具有自我調節的能力,熔斷器打開在一定時間後,會嘗試通過一個請求,並根據執行結果調整熔斷器狀態,讓熔斷器在 closed,open,half-open 三種狀態之間自動切換。
【HystrixCircuitBreaker】
boolean attemptExecution():
每次 HystrixCommand 執行,都要調用這個方法,判斷是否可以繼續執行,若熔斷器狀態爲打開且超過休眠窗口,更新熔斷器狀態爲 half-open;通過 CAS 原子變更熔斷器狀態來保證只放過一條業務請求實際調用提供方,並根據執行結果調整狀態。
public boolean attemptExecution() {
//判斷配置是否強制打開熔斷器
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
//判斷配置是否強制關閉熔斷器
if (properties.circuitBreakerForceClosed().get()) {
return true;
}
//判斷熔斷器開關是否關閉
if (circuitOpened.get() == -1) {
return true;
} else {
//判斷請求是否在休眠窗口後
if (isAfterSleepWindow()) {
//更新開關爲半開,並允許本次請求通過
if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
return true;
} else {
return false;
}
} else {
//拒絕請求
return false;
}
}
}
【HystrixCircuitBreaker】
void markSuccess():HystrixCommand 執行成功後調用,當熔斷器狀態爲 half-open,更新熔斷器狀態爲 closed。此種情況爲熔斷器原本爲 open,放過單條請求實際調用服務提供者,並且後續執行成功,Hystrix 自動調節熔斷器爲 closed。
public void markSuccess() {
//更新熔斷器開關爲關閉
if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
//重置訂閱健康統計
metrics.resetStream();
Subscription previousSubscription = activeSubscription.get();
if (previousSubscription != null) {
previousSubscription.unsubscribe();
}
Subscription newSubscription = subscribeToStream();
activeSubscription.set(newSubscription);
//更新熔斷器開關爲關閉
circuitOpened.set(-1L);
}
}
【HystrixCircuitBreaker】
void markNonSuccess():HystrixCommand 執行成功後調用,若熔斷器狀態爲 half-open,更新熔斷器狀態爲 open。此種情況爲熔斷器原本爲 open,放過單條請求實際調用服務提供者,並且後續執行失敗,Hystrix 繼續保持熔斷器打開,並把此次請求作爲休眠窗口期開始時間。
public void markNonSuccess() {
//更新熔斷器開關,從半開變爲打開
if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
//記錄失敗時間,作爲休眠窗口開始時間
circuitOpened.set(System.currentTimeMillis());
}
}
【HystrixCircuitBreaker】
void subscribeToStream():熔斷器訂閱健康統計結果,若當前請求數據大於一定值且錯誤率大於閾值,自動更新熔斷器狀態爲 opened,後續請求短路,不再實際調用服務提供者,直接進入降級邏輯。
private Subscription subscribeToStream() {
//訂閱監控統計信息
return metrics.getHealthCountsStream()
.observe()
.subscribe(new Subscriber<HealthCounts>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(HealthCounts hc) {
// 判斷總請求數量是否超過配置閾值,若未超過,則不改變熔斷器狀態
if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
} else {
//判斷請求錯誤率是否超過配置錯誤率閾值,若未超過,則不改變熔斷器狀態;若超過,則錯誤率過高,更新熔斷器狀態未打開,拒絕後續請求
if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
} else {
if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
circuitOpened.set(System.currentTimeMillis());
}
}
}
}
});
}
四、資源隔離
在貨船中,爲了防止漏水和火災的擴散,一般會將貨倉進行分割,避免了一個貨倉出事導致整艘船沉沒的悲劇。同樣的,在 Hystrix 中,也採用了這樣的艙壁模式,將系統中的服務提供者隔離起來,一個服務提供者延遲升高或者失敗,並不會導致整個系統的失敗,同時也能夠控制調用這些服務的併發度。如下圖,訂單服務調用下游積分、庫存等服務使用不同的線程池,當積分服務故障時,只會把對應線程池打滿,而不會影響到其他服務的調用。Hystrix 隔離模式支持線程池和信號量兩種方式。
4.1 信號量模式
信號量模式控制單個服務提供者執行併發度,比如單個 CommondKey 下正在請求數爲 N,若 N 小於 maxConcurrentRequests,則繼續執行;若大於等於 maxConcurrentRequests,則直接拒絕,進入降級邏輯。信號量模式使用請求線程本身執行,沒有線程上下文切換,開銷較小,但超時機制失效。
【AbstractCommand】
ObservableapplyHystrixSemantics(finalAbstractCommand _cmd):嘗試獲取信號量,若能獲取到,則繼續調用服務提供者;若不能獲取到,則進入降級策略。
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
executionHook.onStart(_cmd);
//判斷熔斷器是否通過
if (circuitBreaker.attemptExecution()) {
//獲取信號量
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
//嘗試獲取信號量
if (executionSemaphore.tryAcquire()) {
try {
//記錄業務執行開始時間
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
//繼續執行業務
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
//信號量拒絕,進入降級邏輯
return handleSemaphoreRejectionViaFallback();
}
} else {
//熔斷器拒絕,直接短路,進入降級邏輯
return handleShortCircuitViaFallback();
}
}
【AbstractCommand】
TryableSemaphore getExecutionSemaphore():
獲取信號量實例,若當前隔離模式爲信號量,則根據 commandKey 獲取信號量,不存在時初始化並緩存;若當前隔離模式爲線程池,則使用默認信號量 TryableSemaphoreNoOp.DEFAULT,全部請求可通過。
protected TryableSemaphore getExecutionSemaphore() {
//判斷隔離模式是否爲信號量
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) {
if (executionSemaphoreOverride == null) {
//獲取信號量
TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());
if (_s == null) {
//初始化信號量並緩存
executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));
//返回信號量
return executionSemaphorePerCircuit.get(commandKey.name());
} else {
return _s;
}
} else {
return executionSemaphoreOverride;
}
} else {
//返回默認信號量,任何請求均可通過
return TryableSemaphoreNoOp.DEFAULT;
}
}
4.2 線程池模式
線程池模式控制單個服務提供者執行併發度,代碼上都會先走獲取信號量,只是使用默認信號量,全部請求可通過,然後實際調用線程池邏輯。線程池模式下,比如單個 CommondKey 下正在請求數爲 N,若 N 小於 maximumPoolSize,會先從 Hystrix 管理的線程池裏面獲得一個線程,然後將參數傳遞給任務線程去執行真正調用,如果併發請求數多於線程池線程個數,就有任務需要進入隊列排隊,但排隊隊列也有上限,如果排隊隊列也滿,則進去降級邏輯。線程池模式可以支持異步調用,支持超時調用,存在線程切換,開銷大。
**【AbstractCommand】**ObservableexecuteCommandWithSpecifiedIsolation(final AbstractCommand _cmd):從線程池中獲取線程,並執行,過程中記錄線程狀態。
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
//判斷是否爲線程池隔離模式
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
//統計信息
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
//判斷是否超時,若超時,直接拋出異常
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
return Observable.error(new RuntimeException("timed out before executing run()"));
}
//更新線程狀態爲已開始
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
HystrixCounters.incrementGlobalConcurrentThreads();
threadPool.markThreadExecution();
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
executionResult = executionResult.setExecutedInThread();
//執行hook,若異常,則直接拋出異常
try {
executionHook.onThreadStart(_cmd);
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
} else {
//空返回
return Observable.empty();
}
}
}).doOnTerminate(new Action0() {
@Override
public void call() {
//結束邏輯,省略
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
//取消訂閱邏輯,省略
}
//從線程池中獲取業務執行線程
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
//判斷是否超時
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
} else {
//信號量模式
//省略
}
}
【HystrixThreadPool】
Subscription schedule(final Action0 action):HystrixContextScheduler 是 Hystrix 對 rx 中 Scheduler 調度器的重寫,主要爲了實現在 Observable 未被訂閱時,不執行命令,以及支持在命令執行過程中能夠打斷運行。在 rx 中,Scheduler 將生成對應的 Worker 給 Observable 用於執行命令,由 Worker 具體負責相關執行線程的調度,ThreadPoolWorker 是 Hystrix 自行實現的 Worker,執行調度的核心方法。
public Subscription schedule(final Action0 action) {
//若無訂閱,則不執行直接返回
if (subscription.isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
ScheduledAction sa = new ScheduledAction(action);
subscription.add(sa);
sa.addParent(subscription);
//獲取線程池
ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
//提交執行任務
FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
return sa;
}
五、超時檢測
Hystrix 超時機制降低了第三方依賴項延遲過高對調用方的影響,使請求快速失敗。主要通過延遲任務機制實現,包括註冊延時任務過程和執行延時任務過程。
當隔離策略爲線程池時,主線程訂閱執行結果,線程池中任務線程調用提供者服務端,同時會有定時器線程在一定時間後檢測任務是否完成,若未完成則表示任務超時,拋出超時異常,並且後續任務線程的執行結果也會跳過不再發布;若已完成則表示任務在超時時間內完成執行完成,定時器檢測任務結束。
當隔離策略爲信號量時,主線程訂閱執行結果並實際調用提供者服務端(沒有任務線程),當超出指定時間,主線程仍然會執行完業務調用,然後拋出超時異常。信號量模式下超時配置有一定缺陷,不能取消在執行的調用,並不能限制主線程返回時間。
【AbstractCommand】
ObservableexecuteCommandAndObserve(finalAbstractCommand _cmd):超時檢測入口,執行 lift(new HystrixObservableTimeout
-Operator(_cmd)) 關聯超時檢測任務。
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
//省略
Observable<R> execution;
//判斷是否開啓超時檢測
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
//增加超時檢測操作
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
//正常執行
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
【HystrixObservableTimeoutOperator】
Subscriber<? super R> call(final Subscriber<? super R> child):創建檢測任務,並關聯延遲任務;若檢測任務執行時仍未執行完成,則拋出超時異常;若已執行完成或異常,則清除檢測任務。
public Subscriber<? super R> call(final Subscriber<? super R> child) {
final CompositeSubscription s = new CompositeSubscription();
child.add(s);
final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread();
//實列化監聽器
TimerListener listener = new TimerListener() {
@Override
public void tick() {
//若任務未執行完成,則更新爲超時
if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
// 上報超時失敗
originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
// 取消訂閱
s.unsubscribe();
final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {
@Override
public void run() {
child.onError(new HystrixTimeoutException());
}
});
//拋出超時異常
timeoutRunnable.run();
}
}
//超時時間配置
@Override
public int getIntervalTimeInMilliseconds() {
return originalCommand.properties.executionTimeoutInMilliseconds().get();
}
};
//註冊監聽器,關聯檢測任務
final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
originalCommand.timeoutTimer.set(tl);
Subscriber<R> parent = new Subscriber<R>() {
@Override
public void onCompleted() {
if (isNotTimedOut()) {
// 未超時情況下,任務執行完成,清除超時檢測任務
tl.clear();
child.onCompleted();
}
}
@Override
public void onError(Throwable e) {
if (isNotTimedOut()) {
// 未超時情況下,任務執行異常,清除超時檢測任務
tl.clear();
child.onError(e);
}
}
@Override
public void onNext(R v) {
//未超時情況下,發佈執行結果;超時時則直接跳過發佈執行結果
if (isNotTimedOut()) {
child.onNext(v);
}
}
//判斷是否超時
private boolean isNotTimedOut() {
return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
}
};
s.add(parent);
return parent;
}
}
【HystrixTimer】
ReferenceaddTimerListener(finalTimerListener listener):addTimerListener 通過 java 的定時任務服務 scheduleAtFixedRate 在延遲超時時間後執行。
public Reference<TimerListener> addTimerListener(final TimerListener listener) {
//初始化xian
startThreadIfNeeded();
//構造檢測任務
Runnable r = new Runnable() {
@Override
public void run() {
try {
listener.tick();
} catch (Exception e) {
logger.error("Failed while ticking TimerListener", e);
}
}
};
//延遲執行檢測任務
ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
return new TimerReference(listener, f);
}
六、降級
Hystrix 降級邏輯作爲兜底的策略,當出現業務執行異常、線程池或信號量已滿、執行超時等情況時,會進入降級邏輯。降級邏輯中應從內存或靜態邏輯獲取通用返回,儘量不依賴依賴網絡調用,如果未實現降級方法或降級方法中也出現異常,則業務線程中會引發異常。
**【AbstractCommand】**Observable getFallbackOrThrowException(finalAbstractCommand _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException):首先判斷是否爲不可恢復異常,若是則不走降級邏輯,直接異常返回;其次判斷是否能獲取到降級信號量,然後走降級邏輯;當降級邏輯中也發生異常或者沒有降級方法實現時,則異常返回。
private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread();
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
executionResult = executionResult.addEvent((int) latency, eventType);
//判斷是否爲不可恢復異常,如棧溢出、OOM等
if (isUnrecoverable(originalException)) {
logger.error("Unrecoverable Error for HystrixCommand so will throw HystrixRuntimeException and not apply fallback. ", originalException);
Exception e = wrapWithOnErrorHook(failureType, originalException);
//直接返回異常
return Observable.error(new HystrixRuntimeException(failureType, this.getClass(), getLogMessagePrefix() + " " + message + " and encountered unrecoverable error.", e, null));
} else {
//判斷爲是否可恢復錯誤
if (isRecoverableError(originalException)) {
logger.warn("Recovered from java.lang.Error by serving Hystrix fallback", originalException);
}
//判斷降級配置是否打開
if (properties.fallbackEnabled().get()) {
/**
* 省略
*/
final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
Exception e = wrapWithOnErrorHook(failureType, originalException);
Exception fe = getExceptionFromThrowable(t);
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
Exception toEmit;
//是否是不支持操作異常,當業務中沒有覆寫getFallBack方法時,會拋出此異常
if (fe instanceof UnsupportedOperationException) {
logger.debug("No fallback for HystrixCommand. ", fe);
eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING);
toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe);
} else {
//執行降級邏輯時發生異常
logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe);
eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE);
toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe);
}
//判斷異常是否包裝
if (shouldNotBeWrapped(originalException)) {
//拋出異常
return Observable.error(e);
}
//拋出異常
return Observable.error(toEmit);
}
};
//獲取降級信號量
final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
fallbackSemaphore.release();
}
}
};
Observable<R> fallbackExecutionChain;
// 嘗試獲取降級信號量
if (fallbackSemaphore.tryAcquire()) {
try {
//判斷是否定義了fallback方法
if (isFallbackUserDefined()) {
executionHook.onFallbackStart(this);
//執行降級邏輯
fallbackExecutionChain = getFallbackObservable();
} else {
//執行降級邏輯
fallbackExecutionChain = getFallbackObservable();
}
} catch (Throwable ex) {
fallbackExecutionChain = Observable.error(ex);
}
return fallbackExecutionChain
.doOnEach(setRequestContext)
.lift(new FallbackHookApplication(_cmd))
.lift(new DeprecatedOnFallbackHookApplication(_cmd))
.doOnNext(markFallbackEmit)
.doOnCompleted(markFallbackCompleted)
.onErrorResumeNext(handleFallbackError)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} else {
//處理降級信號量拒絕異常
return handleFallbackRejectionByEmittingError();
}
} else {
//處理降級配置關閉時異常
return handleFallbackDisabledByEmittingError(originalException, failureType, message);
}
}
}
【HystrixCommand】
R getFallback():HystrixCommand 默認拋出操作不支持異常,需要子類覆寫 getFalBack 方法實現降級邏輯。
protected R getFallback() {
throw new UnsupportedOperationException("No fallback available.");
}
七、健康統計
Hystrix 基於通過滑動窗口的數據統計判定服務失敗佔比選擇性熔斷,能夠實現快速失敗並走降級邏輯。步驟如下:
-
AbstractCommand 執行完成後調⽤ handleCommandEnd ⽅法將執行結果 HystrixCommandCompletion 事件發佈到事件流中;
-
事件流通過 Observable.window() ⽅法將事件按時間分組,並通過 flatMap() ⽅法將事件按類型(成功、失敗等)聚合成桶,形成桶流;
-
再將各個桶使⽤ Observable.window() 按窗口內桶數量聚合成滑動窗⼝數據;
-
將滑動窗口數據聚合成數據對象(如健康數據流、累計數據等);
-
熔斷器 CircuitBreaker 初始化時訂閱健康數據流,根據健康情況修改熔斷器的開關。
【AbstractCommand】
void handleCommandEnd(boolean command
-ExecutionStarted):在業務執行完畢後,會調用 handleCommandEnd 方法,在此方法中,上報執行結果 executionResult,這也是健康統計的入口。
private void handleCommandEnd(boolean commandExecutionStarted) {
Reference<TimerListener> tl = timeoutTimer.get();
if (tl != null) {
tl.clear();
}
long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp;
executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
//執行結果上報健康統計
if (executionResultAtTimeOfCancellation == null) {
metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted);
} else {
metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted);
}
if (endCurrentThreadExecutingCommand != null) {
endCurrentThreadExecutingCommand.call();
}
}
**【BucketedRollingCounterStream】**BucketedRollingCounterStream(HystrixEventStream stream, final int numBuckets, int bucketSizeInMs,final Func2<Bucket, Event, Bucket> appendRawEventToBucket,final Func2<Output, Bucket, Output> re-duceBucket)
健康統計類 HealthCountsStream 的滑動窗口實現主要是在父類 BucketedRollingCounterStream,首先父類 BucketedCounterStream 將事件流處理成桶流,BucketedRollingCounterStream 處理成滑動窗口,然後由 HealthCountsStream 傳入的 reduceBucket 函數處理成健康統計信息
protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
final Func2<Output, Bucket, Output> reduceBucket) {
//調用父類,數據處理成桶流
super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
//根據傳入的reduceBucket函數,處理滑動窗口內數據
Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
@Override
public Observable<Output> call(Observable<Bucket> window) {
return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
}
};
//對父類桶流數據進行操作
this.sourceStream = bucketedStream
//窗口內桶數量爲numBuckets,每次移動1個桶
.window(numBuckets, 1)
//滑動窗口內數據處理
.flatMap(reduceWindowToSummary)
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share()
.onBackpressureDrop();
}
【HealthCounts】
HealthCounts plus(long[] eventTypeCounts):對桶內數據按事件類型累計,生成統計數據 HealthCounts;
public HealthCounts plus(long[] eventTypeCounts) {
long updatedTotalCount = totalCount;
long updatedErrorCount = errorCount;
long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()];
long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()];
long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()];
long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()];
long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()];
//總數
updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
//失敗數
updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
return new HealthCounts(updatedTotalCount, updatedErrorCount);
}
八、總結
在分佈式環境中,不可避免地會有許多服務的依賴項中有的失敗。Hystrix 作爲一個庫,可通過添加熔斷、隔離、降級等邏輯來幫助用戶控制分佈式服務之間的交互,以提高系統的整體彈性。主要功能如下:
-
保護系統,控制來自訪問第三方依賴項(通常是通過網絡)的延遲和失敗
-
阻止複雜分佈式系統中的級聯故障
-
快速失敗並快速恢復
-
平滑降級
-
近乎實時的監控,警報和控制
Hystrix 使用過程中,有一些要注意的點:
-
覆寫的 getFallback() 方法,儘量不要有網絡依賴。如果有網絡依賴,建議採用多次降級,即在 getFallback() 內實例化 HystrixCommand,並執行 Command。getFallback() 儘量保證高性能返回,快速降級。
-
HystrixCommand 建議採用的是線程隔離策略。
-
hystrix.threadpool.default.allowMaximumSizeToDivergeFromCoreSize 設置爲 true 時,hystrix.threadpool.default.maximumSize 纔會生效。最大線程數需要根據業務自身情況和性能測試結果來考量,儘量初始時設置小一些,支持動態調整大小,因爲它是減少負載並防止資源在延遲發生時被阻塞的主要工具。
-
信號隔離策略下,執行業務邏輯時,使用的是應用服務的父級線程(如 Tomcat 容器線程)。所以,一定要設置好併發量,有網絡開銷的調用,不建議使用該策略,容易導致容器線程排隊堵塞,從而影響整個應用服務。
另外 Hystrix 高度依賴 RxJava 這個響應式函數編程框架,簡單瞭解 RxJava 的使用方式,有利於理解源碼邏輯。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/0j5F7_IIeTx6BEOKJdxkXg