1、問題現(xiàn)象
首先接到項目反饋使用 RocketMQ 會出現(xiàn)如下錯誤:

由于項目組并沒有對消息發(fā)送失敗做任何補償,導致丟失消息發(fā)送失敗,故需要對這個問題進行深層次的探討,并加以解決。
2、問題分析
首先我們根據(jù)關鍵字:TIMEOUT_CLEAN_QUEUE 去 RocketMQ 中查詢,去探究在什么時候會拋出如上錯誤。根據(jù)全文搜索如下圖所示:

Broker 端快速失敗其原理圖如下:

消息發(fā)送者向 Broker 發(fā)送消息寫入請求,Broker 端在接收到請求后會首先放入一個隊列中(SendThreadPoolQueue),默認容量為 10000。
Broker 會專門使用一個線程池(SendMessageExecutor)去從隊列中獲取任務并執(zhí)行消息寫入請求,為了保證消息的順序處理,該線程池默認線程個數(shù)為1。
如果 Broker 端受到垃圾回收等等因素造成單條寫入數(shù)據(jù)發(fā)生抖動,單個 Broker 端積壓的請求太多從而得不到及時處理,會極大的造成客戶端消息發(fā)送的時間延長。
設想一下,如果由于 Broker 壓力增大,寫入一條消息需要500ms甚至超過1s,并且隊列中積壓了5000條消息,消息發(fā)送端的默認超時時間為3s,如果按照這樣的速度,這些請求在輪到 Broker 執(zhí)行寫入請求時,客戶端已經(jīng)將這個請求超時了,這樣不僅會造成大量的無效處理,還會導致客戶端發(fā)送超時。
故 RocketMQ 為了解決該問題,引入 Broker 端快速失敗機制,即開啟一個定時調(diào)度線程,每隔10毫秒去檢查隊列中的第一個排隊節(jié)點,如果該節(jié)點的排隊時間已經(jīng)超過了 200ms,就會取消該隊列中所有已超過 200ms 的請求,立即向客戶端返回失敗,這樣客戶端能盡快進行重試,因為 Broker 都是集群部署,下次重試可以發(fā)送到其他 Broker 上,這樣能最大程度保證消息發(fā)送在默認 3s 的時間內(nèi)經(jīng)過重試機制,能有效避免某一臺 Broker 由于瞬時壓力大而造成的消息發(fā)送不可用,從而實現(xiàn)消息發(fā)送的高可用。
從 Broker 端快速失敗機制引入的初衷來看,快速失敗后會發(fā)起重試,除非同一時刻集群內(nèi)所有的 Broker 都繁忙,不然消息會發(fā)送成功,用戶是不會感知這個錯誤的,那為什么用戶感知了呢?難道 TIMEOUT_ CLEAN _ QUEUE 錯誤,Broker 不重試?
為了解開這個謎團,接下來會采用源碼分析的手段去探究真相。接下來將以消息同步發(fā)送為例揭示其消息發(fā)送處理流程中的核心關鍵點。
MQ Client 消息發(fā)送端首先會利用網(wǎng)絡通道將請求發(fā)送到 Broker,然后接收到請求結(jié)果后并調(diào)用 processSendResponse 方法對響應結(jié)果進行解析,如下圖所示:

我們從 proccessSendResponse 方法中可以得知如果 code 為 SYSTEM_BUSY,該方法會拋出 MQBrokerException,響應 code 為 SYSTEM_BUSY,其錯誤描述為開頭部分的錯誤信息。
那我們沿著該方法的調(diào)用鏈路,可以找到其直接調(diào)用方:DefaultMQProducerImpl 的 sendKernelImpl,我們重點考慮如果底層方法拋出 MQBrokerException 該方法會如何處理。
其關鍵代碼如下圖所示:

sendKernelImpl 方法被 DefaultMQProducerImpl 的 sendDefaultImpl 方法調(diào)用,下面是其核心實現(xiàn)截圖:

這里非常令人意外的是連 SYSTEM_ERROR 都會重試,卻沒有包含 SYSTEM_BUSY,顯然違背了快速失敗的設計初衷,故筆者斷定,這是 RocketMQ 的一個BUG,將 SYSTEM_BUSY 遺漏了,后續(xù)會提一個 PR,增加一行代碼,將 SYSTEM_BUSY 加上即可。
問題分析到這里,該問題應該就非常明了。
3、解決方案
如果大家在網(wǎng)上搜索 TIMEOUT_CLEAN_QUEUE 的解決方法,大家不約而同提出的解決方案是增加 waitTimeMillsInSendQueue 的值,該值默認為 200ms,例如將其設置為 1000s 等等,以前我是反對的,因為我的認知里 Broker 會重試,但現(xiàn)在發(fā)現(xiàn) Broker 不會重試,所以我現(xiàn)在認為該 BUG未解決的情況下適當提高該值能有效的緩解。
但這是并不是好的解決方案,我會在近期向官方提交一個PR,將這個問題修復,建議大家在公司盡量對自己使用的版本進行修改,重新打一個包即可,因為這已經(jīng)違背了 Broker 端快速失敗的設計初衷。
但在消息發(fā)送的業(yè)務方,盡量自己實現(xiàn)消息的重試機制,即不依賴 RocketMQ 本身提供的重試機制,因為受制于網(wǎng)絡等因素,消息發(fā)送不可能百分之百成功,建議大家在消息發(fā)送時捕獲一下異常,如果發(fā)送失敗,可以將消息存入數(shù)據(jù)庫,再結(jié)合定時任務對消息進行重試,盡最大程度保證消息不丟失。
特別推薦一個分享架構(gòu)+算法的優(yōu)質(zhì)內(nèi)容,還沒關注的小伙伴,可以長按關注一下:
長按訂閱更多精彩▼
如有收獲,點個在看,誠摯感謝
免責聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺僅提供信息存儲服務。文章僅代表作者個人觀點,不代表本平臺立場,如有問題,請聯(lián)系我們,謝謝!
ckquote>