面試官:說說Kafka處理請求的全流程
掃描二維碼
隨時隨地手機(jī)看文章
大家好,我是 yes。
這是我的第三篇Kafka源碼分析文章,前兩篇講了日志段的讀寫和二分算法在kakfa索引上的應(yīng)用
今天來講講 Kafka Broker
端處理請求的全流程,剖析下底層的網(wǎng)絡(luò)通信是如何實現(xiàn)的、Reactor
在kafka
上的應(yīng)用。
再說說社區(qū)為何在2.3版本將請求類型劃分成兩大類,又是如何實現(xiàn)兩類請求處理的優(yōu)先級。
叨叨
不過在進(jìn)入今天主題之前我想先叨叨幾句,就源碼這個事兒,不同人有不同的看法。
有些人聽到源碼這兩個詞就被嚇到了,這么多代碼怎么看。奔進(jìn)去就像無頭蒼蠅,一路斷點(diǎn)跟下來,跳來跳去,算了拜拜了您嘞。
而有些人覺得源碼有啥用,看了和沒看一樣,看了也用不上。
其實上面兩種想法我都有過,哈哈哈。那為什么我會開始看Kafka
源碼呢?
其實就是我有個同事在自學(xué)go
,然后想用go寫個消息隊列,在畫架構(gòu)圖的時候就來問我,這消息隊列好像有點(diǎn)東西啊,消息收發(fā),元數(shù)據(jù)管理,消息如何持久一堆問題過來,我直呼頂不住。
這市面上Kafka
、RocketMQ
都是現(xiàn)成的方案,于是乎我就看起了源碼。
所以促使我看源碼的初始動力,竟然是為了在同事前面裝逼?。?/p>
我是先看了RocketMQ
,因為畢竟是Java
寫的,而Kafka Broker
都是scala
寫的。
梳理了一波RocketMQ
之后,我又想看看Kafka
是怎么做的,于是乎我又看起了Kafka
。
在源碼分析之前我先總結(jié)性的說了說Kafka
底層的通信模型。應(yīng)對面試官詢問Kafka
請求全過程已經(jīng)夠了。
其實源碼分析在手機(jī)上看效果欠佳,建議電腦端打開觀看。
Reactor模式
在扯到Kafka
之前我們先來說說Reactor模式
,基本上只要是底層的高性能網(wǎng)絡(luò)通信就離不開Reactor模式
。像Netty、Redis都是使用Reactor模式
。
像我們以前剛學(xué)網(wǎng)絡(luò)編程的時候以下代碼可是非常的熟悉,新來一個請求,要么在當(dāng)前線程直接處理了,要么新起一個線程處理。
在早期這樣的編程是沒問題的,但是隨著互聯(lián)網(wǎng)的快速發(fā)展,單線程處理不過來,也不能充分的利用計算機(jī)資源。
而每個請求都新起一個線程去處理,資源的要求就太高了,并且創(chuàng)建線程也是一個重操作。
說到這有人想到了,那搞個線程池不就完事了嘛,還要啥Reactor
。
池化技術(shù)確實能緩解資源的問題,但是池子是有限的,池子里的一個線程不還是得候著某個連接,等待指示嘛。現(xiàn)在的互聯(lián)網(wǎng)時代早已突破C10K
了。
因此引入的IO多路復(fù)用
,由一個線程來監(jiān)視一堆連接,同步等待一個或多個IO事件的到來,然后將事件分發(fā)給對應(yīng)的Handler
處理,這就叫Reactor模式
。
網(wǎng)絡(luò)通信模型的發(fā)展如下
單線程 => 多線程 => 線程池 => Reactor模型
Kafka所采用的Reactor模型
如下
Kafka Broker 網(wǎng)絡(luò)通信模型
簡單來說就是,Broker 中有個Acceptor(mainReactor)
監(jiān)聽新連接的到來,與新連接建連之后輪詢選擇一個Processor(subReactor)
管理這個連接。
而Processor
會監(jiān)聽其管理的連接,當(dāng)事件到達(dá)之后,讀取封裝成Request
,并將Request
放入共享請求隊列中。
然后IO線程池不斷的從該隊列中取出請求,執(zhí)行真正的處理。處理完之后將響應(yīng)發(fā)送到對應(yīng)的Processor
的響應(yīng)隊列中,然后由Processor
將Response
返還給客戶端。
每個listener
只有一個Acceptor線程
,因為它只是作為新連接建連再分發(fā),沒有過多的邏輯,很輕量,一個足矣。
Processor
在Kafka中稱之為網(wǎng)絡(luò)線程,默認(rèn)網(wǎng)絡(luò)線程池有3個線程,對應(yīng)的參數(shù)是num.network.threads
。并且可以根據(jù)實際的業(yè)務(wù)動態(tài)增減。
還有個 IO 線程池,即KafkaRequestHandlerPool
,執(zhí)行真正的處理,對應(yīng)的參數(shù)是num.io.threads
,默認(rèn)值是 8。IO線程處理完之后會將Response
放入對應(yīng)的Processor
中,由Processor
將響應(yīng)返還給客戶端。
可以看到網(wǎng)絡(luò)線程和IO線程之間利用的經(jīng)典的生產(chǎn)者 - 消費(fèi)者模式,不論是用于處理Request的共享請求隊列,還是IO處理完返回的Response。
這樣的好處是什么?生產(chǎn)者和消費(fèi)者之間解耦了,可以對生產(chǎn)者或者消費(fèi)者做獨(dú)立的變更和擴(kuò)展。并且可以平衡兩者的處理能力,例如消費(fèi)不過來了,我多加些IO線程。
如果你看過其他中間件源碼,你會發(fā)現(xiàn)生產(chǎn)者-消費(fèi)者模式真的是太常見了,所以面試題經(jīng)常會有手寫一波生產(chǎn)者-消費(fèi)者。
源碼級別剖析網(wǎng)絡(luò)通信模型
Kafka 網(wǎng)絡(luò)通信組件主要由兩大部分構(gòu)成:
SocketServer 和 KafkaRequestHandlerPool。
SocketServer
可以看出SocketServer
旗下管理著,Acceptor 線程
、Processor 線程
和 RequestChannel
等對象。
data-plane
和control-plane
稍后再做分析,先看看RequestChannel
是什么。
RequestChannel
關(guān)鍵的屬性和方法都已經(jīng)在下面代碼中注釋了,可以看出這個對象主要就是管理Processor
和作為傳輸Request
和Response
的中轉(zhuǎn)站。
Acceptor
接下來我們再看看Acceptor
可以看到它繼承了AbstractServerThread
,接下來再看看它run些啥
再來看看accept(key)
做了啥
很簡單,標(biāo)準(zhǔn)selector
的處理,獲取準(zhǔn)備就緒事件,調(diào)用serverSocketChannel.accept()
得到socketChannel
,將socketChannel
交給通過輪詢選擇出來的Processor
,之后由它來處理IO事件。
Processor
接下來我們再看看Processor
,相對而言比Acceptor
復(fù)雜一些。
先來看看三個關(guān)鍵的成員
再來看看主要的處理邏輯。
可以看到Processor
主要是將底層讀事件IO數(shù)據(jù)封裝成Request
存入隊列中,然后將IO線程塞入的Response
,返還給客戶端,并處理Response
的回調(diào)邏輯。
KafkaRequestHandlerPool
IO線程池,實際處理請求的線程。
再來看看IO線程都干了些啥
很簡單,核心就是不斷的從requestChannel
拿請求,然后調(diào)用handle處理請求。
handle
方法是位于KafkaApis
類中,可以理解為通過switch
,根據(jù)請求頭里面不同的apikey
調(diào)用不同的handle
來處理請求。
我們再舉例看下較為簡單的處理LIST_OFFSETS
的過程,即handleListOffsetRequest
,來完成一個請求的閉環(huán)。
我用紅色箭頭標(biāo)示了調(diào)用鏈。表明處理完請求之后是塞給對應(yīng)的Processor
的。
最后再來個更詳細(xì)的總覽圖,把源碼分析到的類基本上都對應(yīng)的加上去了。
請求處理優(yōu)先級
上面提到的data-plane
和control-plane
是時候揭開面紗了。這兩個對應(yīng)的就是數(shù)據(jù)類請求和控制類請求。
為什么需要分兩類請求呢?直接在請求里面用key標(biāo)明請求是要讀寫數(shù)據(jù)啊還是更新元數(shù)據(jù)不就行了嗎?
簡單點(diǎn)的說比如我們想刪除某個topic,我們肯定是想這個topic馬上被刪除的,而此時producer還一直往這個topic寫數(shù)據(jù)。
那這個情況可能是我們的刪除請求排在第N個...等前面的寫入請求處理好了才輪到刪除的請求。實際上前面那些往這個topic寫入的請求都是沒用的,平白的消耗資源。
再或者說進(jìn)行Preferred Leader
選舉時候,producer
將ack
設(shè)置為all
時候,老leader
還在等著follower
寫完數(shù)據(jù)向他報告呢,誰知follower
已經(jīng)成為了新leader
。
而通知它leader已經(jīng)變更的請求由于被一堆數(shù)據(jù)類型請求堵著呢,老leader
就傻傻的在等著,直到超時。
就是為了解決這種情況,社區(qū)將請求分為兩類。
那如何讓控制類的請求優(yōu)先被處理?優(yōu)先隊列?
社區(qū)采取的是兩套Listener
,即數(shù)據(jù)類型一個listener
,控制類一個listener
。
對應(yīng)的就是我們上面講的網(wǎng)絡(luò)通信模型,在kafka中有兩套! kafka通過兩套監(jiān)聽變相的實現(xiàn)了請求優(yōu)先級,畢竟數(shù)據(jù)類型請求肯定很多,控制類肯定少,這樣看來控制類肯定比大部分?jǐn)?shù)據(jù)類型先被處理!
迂回戰(zhàn)術(shù)啊。
控制類的和數(shù)據(jù)類區(qū)別就在于:就一個Porcessor線程
,并且請求隊列寫死的長度為20,社區(qū)覺得這樣夠了。
最后
看源碼主要就是得耐心,耐心跟下去。然后再跳出來看。你會發(fā)現(xiàn)不過如此,哈哈哈。
前兩篇由于授權(quán)給他人了,因此公眾號上發(fā)不了,貼下連接,有興趣的同學(xué)可以看下。
Kafka日志段讀寫分析:https://juejin.im/post/5ef6b94ae51d4534a1236cb0
Kafka索引在設(shè)計有什么亮點(diǎn):https://juejin.im/post/5efdeae7f265da22d017e58d
我是yes,一個在互聯(lián)網(wǎng)摸爬滾打且莫得感情的工具人。
免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺僅提供信息存儲服務(wù)。文章僅代表作者個人觀點(diǎn),不代表本平臺立場,如有問題,請聯(lián)系我們,謝謝!