www.久久久久|狼友网站av天堂|精品国产无码a片|一级av色欲av|91在线播放视频|亚洲无码主播在线|国产精品草久在线|明星AV网站在线|污污内射久久一区|婷婷综合视频网站

當前位置:首頁 > 公眾號精選 > 架構師社區(qū)
[導讀]本文來源: https://juejin.im/post/5ea159e4f265da47f0794da5 MQ的主要特點為解耦、異步、削峰,該文章主要記錄與分享個人在實際項目中的RocketMQ削峰用法,用于減少數(shù)據(jù)庫壓力的業(yè)務場景,其中RocketMQ的核心組件概念如下: Producer:生產發(fā)送消息 Broker

高并發(fā):RocketMQ 削峰實戰(zhàn)!

ckground-color: rgb(255, 255, 255);text-align: left;box-sizing: border-box !important;overflow-wrap: break-word !important;">文來源:

https://juejin.im/post/5ea159e4f265da47f0794da5

MQ的主要特點為解耦、異步、削峰,該文章主要記錄與分享個人在實際項目中的RocketMQ削峰用法,用于減少數(shù)據(jù)庫壓力的業(yè)務場景,其中RocketMQ的核心組件概念如下:
  • Producer:生產發(fā)送消息
  • Broker:存儲Producer發(fā)送過來的消息
  • Consumer:從Broker拉取消息并進行消費
  • NameServer:為Producer或Consumer路由到Broker
    高并發(fā):RocketMQ 削峰實戰(zhàn)!
其中消費流程有以下幾點是必須注意的:
  • RocketMQ的Consumer獲取消息是通過向Broker發(fā)送拉取請求獲取的,而不是由Broker發(fā)送Consumer接收的方式。
  • Consumer每次拉取消息時消息都會被均勻分發(fā)到消息隊列再進行傳輸,所以RocketMQ中的很多參數(shù)都是針對隊列而不是Topic的(這個是重點,順便吐槽下源碼的文檔講的真不清晰,很多都需要自己試錯,但Dashboard做得很好),其中每個Broker消息隊列(ConsumeQueue)的數(shù)量都可以通過RocketMQ DashBoard實時更改調整。

rocketmq-spring-boot-starter用法簡介

當開發(fā)中需要快速集成RocketMQ時可以考慮使用 rocketmq-spring-boot-starter 搭建RocketMQ的集成環(huán)境,但該框架并不完全具備RocketMQ所有的配置簡化,如需批量消費消息便需要自定義一個DefaultMQPushConsumer bean去消費了。個人在開發(fā)中常用的 rocketmq-spring-boot-starter 相關類:
  • RocketMQListener 接口:消費者都需實現(xiàn)該接口的消費方法 onMessage(msg) 。
  • RocketMQPushConsumerLifecycleListener 接口:當 @RocketMQMessageListener 中的配置不足以滿足我們的需求時,可以實現(xiàn)該接口直接更改消費者類 DefaultMQPushConsumer 配置
  • @RocketMQMessageListener :被該注解標注并實現(xiàn)了接口 RocketMQListener 的bean為一個消費者并監(jiān)聽指定topic隊列中的消息,該注解中包含消費者的一些常用配置(大部分按默認即可),一般只需更改consumerGroup(消費組)與topic。 RocketMQMessageListener 中的屬性配置是可以使用Placeholder(占位符)從配置文件或配置中心獲取的,如下圖:
    高并發(fā):RocketMQ 削峰實戰(zhàn)!

業(yè)務案例

有一個點贊業(yè)務,不限制用戶的點贊數(shù)只需進行記錄(產品需求,開發(fā)提議無效),當每個用戶都進行x連擊享受數(shù)量猛增的快感時如果數(shù)據(jù)庫都需要進行x個點贊數(shù)據(jù)的插入,數(shù)據(jù)庫毫無疑問會塞死導致崩潰。于是想到可以嘗試下MQ削峰,比如每秒來了5000消息但數(shù)據(jù)庫只能承受2000,那我消費時每次只拉取消費1600就好了,剩下的放在Broker堆積慢慢消費就好。由于之前的消息中心也在用RocketMQ,于是確認使用RocketMQ來進行削峰。
高并發(fā):RocketMQ 削峰實戰(zhàn)!

環(huán)境配置

文章例子環(huán)境:1NameServer + 2Broker + 1Consumer

添加maven依賴

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
復制代碼

application.yml配置

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: praise-group
server:
  port: 10000

spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    username: root
    password: tiger
    url: jdbc:mysql://localhost:3306/wilson
swagger:
  docket:
    base-package: io.rocket.consumer.controller
復制代碼

點贊接口

PraiseRecord(點贊記錄):

@Data
public class PraiseRecord implements Serializable {
    private Long id;
    private Long uid;
    private Long liveId;
    private LocalDateTime createTime;
}
復制代碼

MessageController(簡單的測試接口):

RestController
@RequestMapping("/message")
public class MessageController {
    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @PostMapping("/praise")
    public ServerResponse praise(@RequestBody PraiseRecordVO vo) {
        rocketMQTemplate.sendOneWay(RocketConstant.Topic.PRAISE_TOPIC, MessageBuilder.withPayload(vo).build());
        return ServerResponse.success();
    }

    // ......

}
復制代碼
由于用戶可以連續(xù)點贊,所以考慮可以在點贊消息的處理上寬松一點(容許消息丟失)以追求更高的性能,因此選擇使用 sendOneyWay() 進行消息發(fā)送。
RocketMQ的消息發(fā)送方式主要含syncSend()同步發(fā)送、asyncSend()異步發(fā)送、sendOneWay()三種方式,sendOneWay()也是異步發(fā)送,區(qū)別在于不需等待Broker返回確認,所以可能會存在信息丟失的狀況,但吞吐量更高,具體需根據(jù)業(yè)務情況選用。

性能:sendOneWay > asyncSend > syncSend RocketMQTemplate的send()方法默認是同步(syncSend)的,更多可看源碼實現(xiàn)。

PraiseListener:點贊消息消費者

@Service
@RocketMQMessageListener(topic = RocketConstant.Topic.PRAISE_TOPIC, consumerGroup = RocketConstant.ConsumerGroup.PRAISE_CONSUMER)
@Slf4j
public class PraiseListener implements RocketMQListener<PraiseRecordVO>, RocketMQPushConsumerLifecycleListener {
    @Resource
    private PraiseRecordService praiseRecordService;

    @Override
    public void onMessage(PraiseRecordVO vo) {
        praiseRecordService.insert(vo.copyProperties(PraiseRecord::new));
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        // 每次拉取的間隔,單位為毫秒
        consumer.setPullInterval(2000);
        // 設置每次從隊列中拉取的消息數(shù)為16
        consumer.setPullBatchSize(16);
    }
}
單次pull消息的最大數(shù)目受broker存儲的 MessageStoreConfig.maxTransferCountOnMessageInMemory (默認為32)值限制,即若想要消費者從隊列拉取的消息數(shù)大于32有效(pullBatchSize>32)則需更改Broker的啟動參數(shù) maxTransferCountOnMessageInMemory 值。在MQ削峰的配置參數(shù)里,以下幾個 DefaultMQPushConsumer 的參數(shù)是需要注意一下的:
  • pullInterval:每次從Broker拉取消息的間隔,單位為毫秒
  • pullBatchSize:每次從Broker隊列拉取到的消息數(shù),該參數(shù)很容易讓人誤解,一開始我以為是每次拉取的消息總數(shù),但測試過幾次后確認了實質上是從每個隊列的拉取數(shù)(源碼上的注釋文檔真的很差,跟沒有一樣),即Consume每次拉取的消息總數(shù)如下: EachPullTotal=所有Broker上的寫隊列數(shù)和(writeQueueNums=readQueueNums) * pullBatchSize
  • consumeMessageBatchMaxSize:每次消費(即將多條消息合并為List消費)的最大消息數(shù)目,默認值為1,rocketmq-spring-boot-starter 目前不支持批量消費(2.1.0版本)
在消費者開始消息消費時會先從各隊列中拉取一條消息進行消費,消費成功后再以每次pullBatchSize的數(shù)目進行拉取。
PraiseListener中設置了每次拉取的間隔為2s,每次從隊列拉取的消息數(shù)為16,在搭建了2master broker且broker上writeQueueNums=readQueueNums=4的環(huán)境下每次拉取的消息理論數(shù)值為16 * 2 * 4 = 128,在第一次從各隊列拉取1條消息(即共8條)后消費成功后會每次就會拉取最多128條消息進行消費,想驗證下的可以把onMessage()的insert()改為log.info("1")然后統(tǒng)計單位秒內打印的日志數(shù)是否為128。
高并發(fā):RocketMQ 削峰實戰(zhàn)!
根據(jù)以上配置單Conumer情況下每2s理論消費為128,即每2秒數(shù)據(jù)庫新增的點贊數(shù)據(jù)大概為128條左右,有20%偏差都在個人可接受范圍內,然后對點贊接口進行簡單壓測1s 2000請求校驗MQ效果,根據(jù)消費配置理論上需要16次拉取即需32s才能消費完,壓測后查看數(shù)據(jù)庫校驗效果:
高并發(fā):RocketMQ 削峰實戰(zhàn)!
高并發(fā):RocketMQ 削峰實戰(zhàn)!
由上圖可以看出除第一次2s和最后一次2s外數(shù)據(jù)庫每2s的插入數(shù)據(jù)數(shù)和一般都在128附近波動,也用了34s(因第一次拉取數(shù)較少所以比理論多花費一次拉取)消費的偏差大小可能會受每次拉取數(shù)pullBatchSize、Broker上的消息隊列數(shù)、網(wǎng)絡波動等情況影響,但需要的目的已經(jīng)達到了,我只想把單位時間內過多的數(shù)據(jù)庫操作交給MQ做分隔成多個單位時間內的小批量操作,消息過多就堆積,當請求峰值過了后直到MQ堆積的消息消費完前數(shù)據(jù)庫的插入數(shù)依舊會與峰值期的插入數(shù)相差不大,達到了MQ削峰填谷的效果。

上線了但消費效率預估失誤如何動態(tài)更改消費效率 ?

當把拉取數(shù)pullBatchSize設置Broker的默認最大傳輸值32了,線上又不想重啟Broker更改maxTransferCountOnMessageInMemory參數(shù),如有2個Broker且queue都為4,那么拉取消費效率才為32 * 2 * 4 = 256,如果想要動態(tài)調整,可以從Broker數(shù)或Broker隊列數(shù)下手,可以將Broker的writeQueueNums、readQueueNums增大,如都改為8,那么效率就成了32 * 2 * 8 = 512。
需要注意的是更改完queues后必須去Dashboard的Topic下的CONSUMER MANAGER查看新增的隊列上是否都有Consumer成功注冊上去了,因為遇到了在測試與生產上使用rocketmq-spring-boot-starter @RocketMQListener標注消費者不會自動注冊到新隊列上的情況,但沒排除是不是RocketMQ版本的原因(個人本地的版本比環(huán)境上的高了一個小版本0.0.1,本地沒出現(xiàn)沒消費者注冊到新隊列上的問題),而是使用了自定義DefaultMQPushConsumer bean(原生的方式都是沒有問題的)的備用方案。當再啟動新的消費者應用時CONSUMER MANAGER(下圖)中就會出現(xiàn) 新Consumer數(shù) * 各Broker隊列數(shù)和的隊列行。
高并發(fā):RocketMQ 削峰實戰(zhàn)!

如何使用RocketMQ批量消費 ?

雖然點贊業(yè)務使用MQ單條插入后TPS已經(jīng)達到當前業(yè)務指標要求了,但考慮到如果后續(xù)要求在不添加機器數(shù)的情況下增加TPS,且數(shù)據(jù)量還沒到分庫分表的程度,個人就打算從批量消費下手,由一次插入一條點贊記錄改為一次性插入多條(insertBatch)。當然能滿足現(xiàn)有需求能不做肯定不做的,過度優(yōu)化過分礙事,但想多點方案不會壞事。
rocketmq-spring-boot-starter并沒有提供批量消費的功能,所以要批量消費消息需要自定義 DefaultMQPushConsumer 并配置其 consumeMessageBatchMaxSize 屬性。 consumeMessageBatchMaxSize 屬性默認值為1,即每次只消費一條消息,需要注意的是該屬性也會受 pullBatchSize 影響,如果 consumeMessageBatchMaxSize 為32但 pullBatchSize 只為12,那么每次批量消費的最大消息數(shù)也就只有12。如下為個人測試批量消費Consumer的測試bean:

@Bean
public DefaultMQPushConsumer userMQPushConsumer() throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketConstant.ConsumerGroup.SPRING_BOOT_USER_CONSUMER);
    consumer.setNamesrvAddr(nameServer);
    consumer.subscribe(RocketConstant.Topic.SPRING_BOOT_USER_TOPIC, "*");
    // 設置每次消息拉取的時間間隔,單位毫秒
    consumer.setPullInterval(1000);
    // 設置每個隊列每次拉取的最大消息數(shù)
    consumer.setPullBatchSize(24);
    // 設置消費者單次批量消費的消息數(shù)目上限
    consumer.setConsumeMessageBatchMaxSize(12);
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context)
            -> {
        List<UserInfo> userInfos = new ArrayList<>(msgs.size());
        Map<Integer, Integer> queueMsgMap = new HashMap<>(8);
        msgs.forEach(msg -> {
            userInfos.add(JSONObject.parseObject(msg.getBody(), UserInfo.class));
            queueMsgMap.compute(msg.getQueueId(), (key, val) -> val == null ? 1 : ++val);
        });
        log.info("userInfo size: {}, content: {}", userInfos.size(), userInfos);
        /*
          處理批量消息,如批量插入:userInfoMapper.insertBatch(userInfos);
         */

        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    consumer.start();
    return consumer;
}

如果默認配置情況下log打印出的userInfo size恒為1,但由于設置了 consumeMessageBatchMaxSize pullBatchSize ,且 pullBatchSize 較小,所以每次消費的消息數(shù)最大值為12,如下圖:
高并發(fā):RocketMQ 削峰實戰(zhàn)!

附本文相關信息

  • 確保mqnamesrv與mqbroker已啟動成功,如該文章環(huán)境的啟動:
         
    mqnamesrv -n 127.0.0.1:9876
    mqbroker -c E:\RocketMQ\rocketmq-all-4.5.2-bin-release\bin\2m-noslave\broker-a.properties
    mqbroker -c E:\RocketMQ\rocketmq-all-4.5.2-bin-release\bin\2m-noslave\broker-b.properties
  • RocketMQ DashBoard啟動流程可參考官方github文檔或到我的資源里下載jar包運行
  • 源碼地址(https://github.com/Wilson-He/spring-boot-series/tree/master/spring-rocketmq),2m-noslave目錄是該文章中例子中的2master broker配置與啟動腳本,spring-boot-consumer-peak目錄為包含該文章相關代碼的實際例子

特別推薦一個分享架構+算法的優(yōu)質內容,還沒關注的小伙伴,可以長按關注一下:

高并發(fā):RocketMQ 削峰實戰(zhàn)!

長按訂閱更多精彩▼

高并發(fā):RocketMQ 削峰實戰(zhàn)!

如有收獲,點個在看,誠摯感謝

免責聲明:本文內容由21ic獲得授權后發(fā)布,版權歸原作者所有,本平臺僅提供信息存儲服務。文章僅代表作者個人觀點,不代表本平臺立場,如有問題,請聯(lián)系我們,謝謝!

ckquote>
本站聲明: 本文章由作者或相關機構授權發(fā)布,目的在于傳遞更多信息,并不代表本站贊同其觀點,本站亦不保證或承諾內容真實性等。需要轉載請聯(lián)系該專欄作者,如若文章內容侵犯您的權益,請及時聯(lián)系本站刪除。
換一批
延伸閱讀

9月2日消息,不造車的華為或將催生出更大的獨角獸公司,隨著阿維塔和賽力斯的入局,華為引望愈發(fā)顯得引人矚目。

關鍵字: 阿維塔 塞力斯 華為

加利福尼亞州圣克拉拉縣2024年8月30日 /美通社/ -- 數(shù)字化轉型技術解決方案公司Trianz今天宣布,該公司與Amazon Web Services (AWS)簽訂了...

關鍵字: AWS AN BSP 數(shù)字化

倫敦2024年8月29日 /美通社/ -- 英國汽車技術公司SODA.Auto推出其旗艦產品SODA V,這是全球首款涵蓋汽車工程師從創(chuàng)意到認證的所有需求的工具,可用于創(chuàng)建軟件定義汽車。 SODA V工具的開發(fā)耗時1.5...

關鍵字: 汽車 人工智能 智能驅動 BSP

北京2024年8月28日 /美通社/ -- 越來越多用戶希望企業(yè)業(yè)務能7×24不間斷運行,同時企業(yè)卻面臨越來越多業(yè)務中斷的風險,如企業(yè)系統(tǒng)復雜性的增加,頻繁的功能更新和發(fā)布等。如何確保業(yè)務連續(xù)性,提升韌性,成...

關鍵字: 亞馬遜 解密 控制平面 BSP

8月30日消息,據(jù)媒體報道,騰訊和網(wǎng)易近期正在縮減他們對日本游戲市場的投資。

關鍵字: 騰訊 編碼器 CPU

8月28日消息,今天上午,2024中國國際大數(shù)據(jù)產業(yè)博覽會開幕式在貴陽舉行,華為董事、質量流程IT總裁陶景文發(fā)表了演講。

關鍵字: 華為 12nm EDA 半導體

8月28日消息,在2024中國國際大數(shù)據(jù)產業(yè)博覽會上,華為常務董事、華為云CEO張平安發(fā)表演講稱,數(shù)字世界的話語權最終是由生態(tài)的繁榮決定的。

關鍵字: 華為 12nm 手機 衛(wèi)星通信

要點: 有效應對環(huán)境變化,經(jīng)營業(yè)績穩(wěn)中有升 落實提質增效舉措,毛利潤率延續(xù)升勢 戰(zhàn)略布局成效顯著,戰(zhàn)新業(yè)務引領增長 以科技創(chuàng)新為引領,提升企業(yè)核心競爭力 堅持高質量發(fā)展策略,塑強核心競爭優(yōu)勢...

關鍵字: 通信 BSP 電信運營商 數(shù)字經(jīng)濟

北京2024年8月27日 /美通社/ -- 8月21日,由中央廣播電視總臺與中國電影電視技術學會聯(lián)合牽頭組建的NVI技術創(chuàng)新聯(lián)盟在BIRTV2024超高清全產業(yè)鏈發(fā)展研討會上宣布正式成立。 活動現(xiàn)場 NVI技術創(chuàng)新聯(lián)...

關鍵字: VI 傳輸協(xié)議 音頻 BSP

北京2024年8月27日 /美通社/ -- 在8月23日舉辦的2024年長三角生態(tài)綠色一體化發(fā)展示范區(qū)聯(lián)合招商會上,軟通動力信息技術(集團)股份有限公司(以下簡稱"軟通動力")與長三角投資(上海)有限...

關鍵字: BSP 信息技術
關閉
關閉