背景
我們的支付場景下,要求消費的業(yè)務(wù)消息絕不能丟失,且能充分利用高規(guī)格的服務(wù)器的性能,比如用線程池對業(yè)務(wù)消息進(jìn)行快速處理。有同學(xué)可能沒太理解這個問題有啥不好處理,讓我一步步分析下。
MQ的優(yōu)勢和缺點
MQ是我們在應(yīng)對高并發(fā)場景最常用的一種措施,它可以幫我們對業(yè)務(wù)解耦、對流程異步化以及削峰填谷的妙用。
但是,由于引入了這一額外的中間件,也增加了系統(tǒng)的復(fù)雜度和不穩(wěn)定因素。
消息可靠性的應(yīng)對
消息的可靠性保證需要從消息流轉(zhuǎn)的每個環(huán)節(jié)進(jìn)行保障,比如生產(chǎn)端的事務(wù)型消息,broker的實時刷盤持久化,消費端的手動ACK 。
這里,我們對生產(chǎn)端和存儲端的保障措施不作討論,重點關(guān)注消費端的手動ACK機制。
手動ACK的問題
手動ACK可以保證消息一定被消費,但是需要確保手動ACK的順序和消息順序一致,為什么?
消息隊列之所以性能高處理快,是因為采用了文件順序讀寫方式,系統(tǒng)在拉取消息進(jìn)行消費時,是按順序文件的offset進(jìn)行拉取的,如果commit offset的順序錯亂,會使得服務(wù)端的消息狀態(tài)錯亂,比如消息重發(fā)。
因此,如果我們在本地啟動了線程池,對消息進(jìn)行拉取處理,由于各線程的處理速度不一定一致,所以無法保證各線程處理完之后對各自消息的ACK操作是順序的,怎么辦,難道只能同步拉消費取然后ACK么。
解決方案
最不濟,可以提交一批任務(wù),批量等待統(tǒng)一提交。不過總覺得不優(yōu)雅。
某次看JUC中的AQS的時候,啟發(fā)了我。
我們平時用的類似CountDownLauch這些并發(fā)工具類,不也是處理的多線程協(xié)作的問題么。
我們的場景完全沒有AQS復(fù)雜,借鑒它的思路,應(yīng)該是沒有問題的。
- 創(chuàng)建雙端隊列,隊列節(jié)點中需要維護(hù)自身處理狀態(tài)state,和對應(yīng)msg的offset。
- 服務(wù)從消息中心拉取消息,在提交本地線程池執(zhí)行之前,先入隊列。
- 消息消費完之后,通知隊列中對應(yīng)的節(jié)點,更新狀態(tài)為完成。
- 隊列頭被更新后出隊列,提交offset,并判斷新的隊列頭的狀態(tài),直到遇到state是未完成的head時阻塞。undefined
方案解析
該方案可以有效利用本地線程的資源,并行的處理,并通過隊列和異步通知機制保證最終commit offset時有序。
在最差情況下(即head節(jié)點對應(yīng)的msg最后一個被處理完),相當(dāng)于等待一批線程處理完成后統(tǒng)一提交。除此之外等待性能都要更優(yōu)。
異步通知的實現(xiàn)
public class MSGFuture { /*全局變量,存放msg對應(yīng)的future對象*/ private static final MapFUTURES = new ConcurrentHashMap (); /*全局不變唯一標(biāo)識*/ private final long id; /*最長等待時間*/ private final int timeout; /*并發(fā)鎖*/ private final Lock lock = new ReentrantLock(); /*通知條件*/ private final Condition done = lock.newCondition(); /*開始時間*/ private final long start = System.currentTimeMillis(); /*業(yè)務(wù)結(jié)果*/ private volatile Object response; }
//構(gòu)造函數(shù) public MSGFuture(Request request, int timeout) { /*全局自增ID*/ this.id = request.getrId(); /*超時時間*/ this.timeout = timeout > 0 ? timeout : 1000; /*放入全局變量*/ FUTURES.put(id, this); }
//業(yè)務(wù)處理結(jié)果更新 public static void received(long id, Object response) { MSGFuture future = FUTURES.remove(id); if (future != null) { future.doReceived(response); } else { logger.warn("response return timeout,id:"+id); } }
//結(jié)果更新,通知等待條件 private void doReceived(Object res) { lock.lock(); try { response = res; done.signal(); } finally { lock.unlock(); } }
//異步等待獲取結(jié)果 public Object get(int timeout) throws TimeoutException { if (!isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (!isDone()) { done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } if (!isDone()) { throw new TimeoutException(); } } return returnFromResponse(); }
總結(jié)
看到這里,有同學(xué)會說,這個和AQS有啥關(guān)系呀~
其實,只是處理思路的一種借鑒,比如state狀態(tài),比如鎖機制和通知等待。既然都是多線程任務(wù)協(xié)調(diào),那總有相似之處。
總之一句話,別說背八股文沒用,多多了解會有大幫助~
免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺僅提供信息存儲服務(wù)。文章僅代表作者個人觀點,不代表本平臺立場,如有問題,請聯(lián)系我們,謝謝!