WebSocket?集群解決方案

-? ? ?問(wèn)題起因? ? -最近做項(xiàng)目時(shí)遇到了需要多用戶之間通信的問(wèn)題,涉及到了WebSocket握手請(qǐng)求,以及集群中WebSocket Session共享的問(wèn)題。
期間我經(jīng)過(guò)了幾天的研究,總結(jié)出了幾個(gè)實(shí)現(xiàn)分布式WebSocket集群的辦法,從zuul到spring cloud gateway的不同嘗試,總結(jié)出了這篇文章,希望能幫助到某些人,并且能一起分享這方面的想法與研究。以下是我的場(chǎng)景描述
- 資源:4臺(tái)服務(wù)器。其中只有一臺(tái)服務(wù)器具備ssl認(rèn)證域名,一臺(tái)redis mysql服務(wù)器,兩臺(tái)應(yīng)用服務(wù)器(集群)
- 應(yīng)用發(fā)布限制條件:由于場(chǎng)景需要,應(yīng)用場(chǎng)所需要ssl認(rèn)證的域名才能發(fā)布。因此ssl認(rèn)證的域名服務(wù)器用來(lái)當(dāng)api網(wǎng)關(guān),負(fù)責(zé)https請(qǐng)求與wss(安全認(rèn)證的ws)連接。俗稱https卸載,用戶請(qǐng)求https域名服務(wù)器(eg:https://oiscircle.com/xxx),但真實(shí)訪問(wèn)到的是http ip地址的形式。只要網(wǎng)關(guān)配置高,能handle多個(gè)應(yīng)用
- 需求:用戶登錄應(yīng)用,需要與服務(wù)器建立wss連接,不同角色之間可以單發(fā)消息,也可以群發(fā)消息
- 集群中的應(yīng)用服務(wù)類型:每個(gè)集群實(shí)例都負(fù)責(zé)http無(wú)狀態(tài)請(qǐng)求服務(wù)與ws長(zhǎng)連接服務(wù)

-? ? ?系統(tǒng)架構(gòu)圖? ? -

- Eureka 服務(wù)發(fā)現(xiàn)與注冊(cè)
- Redis Session共享
- Redis 消息訂閱
- Spring Boot
- Zuul 網(wǎng)關(guān)
- Spring Cloud Gateway 網(wǎng)關(guān)
- Spring WebSocket 處理長(zhǎng)連接
- Ribbon 負(fù)載均衡
- Netty 多協(xié)議NIO網(wǎng)絡(luò)通信框架
- Consistent Hash 一致性哈希算法

-? ? ?技術(shù)可行性分析? ? -下面我將描述session特性,以及根據(jù)這些特性列舉出n個(gè)解決分布式架構(gòu)中處理ws請(qǐng)求的集群方案
WebSocketSession與HttpSession在Spring所集成的WebSocket里面,每個(gè)ws連接都有一個(gè)對(duì)應(yīng)的session:WebSocketSession,在Spring WebSocket中,我們建立ws連接之后可以通過(guò)類似這樣的方式進(jìn)行與客戶端的通信:
protected?void?handleTextMessage(WebSocketSession?session,?TextMessage?message)?{
???System.out.println("服務(wù)器接收到的消息:?" ?message?);
???//send?message?to?client
???session.sendMessage(new?TextMessage("message"));
}
那么問(wèn)題來(lái)了:ws的session無(wú)法序列化到redis,因此在集群中,我們無(wú)法將所有WebSocketSession都緩存到redis進(jìn)行session共享。每臺(tái)服務(wù)器都有各自的session。于此相反的是HttpSession,redis可以支持httpsession共享,但是目前沒(méi)有websocket session共享的方案,因此走redis websocket session共享這條路是行不通的。有的人可能會(huì)想:我可不可以將sessin關(guān)鍵信息緩存到redis,集群中的服務(wù)器從redis拿取session關(guān)鍵信息然后重新構(gòu)建websocket session...我只想說(shuō)這種方法如果有人能試出來(lái),請(qǐng)告訴我一聲...以上便是websocket session與http session共享的區(qū)別,總的來(lái)說(shuō)就是http session共享已經(jīng)有解決方案了,而且很簡(jiǎn)單,只要引入相關(guān)依賴:spring-session-data-redis
和spring-boot-starter-redis
,大家可以從網(wǎng)上找個(gè)demo玩一下就知道怎么做了。而websocket session共享的方案由于websocket底層實(shí)現(xiàn)的方式,我們無(wú)法做到真正的websocket session共享。
-? ? ?解決方案的演變??? -Netty與Spring WebSocket
剛開始的時(shí)候,我嘗試著用netty實(shí)現(xiàn)了websocket服務(wù)端的搭建。在netty里面,并沒(méi)有websocket session這樣的概念,與其類似的是channel,每一個(gè)客戶端連接都代表一個(gè)channel。前端的ws請(qǐng)求通過(guò)netty監(jiān)聽的端口,走websocket協(xié)議進(jìn)行ws握手連接之后,通過(guò)一些列的handler(責(zé)鏈模式)進(jìn)行消息處理。與websocket session類似地,服務(wù)端在連接建立后有一個(gè)channel,我們可以通過(guò)channel進(jìn)行與客戶端的通信。
???/**
????*?TODO?根據(jù)服務(wù)器傳進(jìn)來(lái)的id,分配到不同的group
????*/
???private?static?final?ChannelGroup?GROUP?=?new?DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
?
???@Override
???protected?void?channelRead0(ChannelHandlerContext?ctx,?TextWebSocketFrame?msg)?throws?Exception?{
???????//retain增加引用計(jì)數(shù),防止接下來(lái)的調(diào)用引用失效
???????System.out.println("服務(wù)器接收到來(lái)自?"? ?ctx.channel().id()? ?"?的消息:?"? ?msg.text());
???????//將消息發(fā)送給group里面的所有channel,也就是發(fā)送消息給客戶端
???????GROUP.writeAndFlush(msg.retain());
???}
那么,服務(wù)端用netty還是用spring websocket?以下我將從幾個(gè)方面列舉這兩種實(shí)現(xiàn)方式的優(yōu)缺點(diǎn)。
-? ? ?使用 netty 實(shí)現(xiàn) websocket??? -
玩過(guò)netty的人都知道netty是的線程模型是nio模型,并發(fā)量非常高,spring5之前的網(wǎng)絡(luò)線程模型是servlet實(shí)現(xiàn)的,而servlet不是nio模型,所以在spring5之后,spring的底層網(wǎng)絡(luò)實(shí)現(xiàn)采用了netty。如果我們單獨(dú)使用netty來(lái)開發(fā)websocket服務(wù)端,速度快是絕對(duì)的,但是可能會(huì)遇到下列問(wèn)題:
- 與系統(tǒng)的其他應(yīng)用集成不方便,在rpc調(diào)用的時(shí)候,無(wú)法享受springcloud里feign服務(wù)調(diào)用的便利性
- 業(yè)務(wù)邏輯可能要重復(fù)實(shí)現(xiàn)
- 使用netty可能需要重復(fù)造輪子
- 怎么連接上服務(wù)注冊(cè)中心,也是一件麻煩的事情
- restful服務(wù)與ws服務(wù)需要分開實(shí)現(xiàn),如果在netty上實(shí)現(xiàn)restful服務(wù),有多麻煩可想而知,用spring一站式restful開發(fā)相信很多人都習(xí)慣了。

-? ? ?使用 spring websocket 實(shí)現(xiàn) ws 服務(wù)? ? -spring websocket已經(jīng)被springboot很好地集成了,所以在springboot上開發(fā)ws服務(wù)非常方便,做法非常簡(jiǎn)單第一步:添加依賴
<dependency>
???<groupId>org.springframework.bootgroupId>
???<artifactId>spring-boot-starter-websocketartifactId>
dependency>
第二步:添加配置類@Configuration
public?class?WebSocketConfig?implements?WebSocketConfigurer?{
@Override
public?void?registerWebSocketHandlers(WebSocketHandlerRegistry?registry)?{
????registry.addHandler(myHandler(),?"/")
????????.setAllowedOrigins("*");
}
?
@Bean
?public?WebSocketHandler?myHandler()?{
?????return?new?MessageHandler();
?}
}
第三步:實(shí)現(xiàn)消息監(jiān)聽類@Component
@SuppressWarnings("unchecked")
public?class?MessageHandler?extends?TextWebSocketHandler?{
???private?List?clients?=?new?ArrayList<>();
?
???@Override
???public?void?afterConnectionEstablished(WebSocketSession?session)?{
???????clients.add(session);
???????System.out.println("uri?:"? ?session.getUri());
???????System.out.println("連接建立:?"? ?session.getId());
???????System.out.println("current?seesion:?"? ?clients.size());
???}
?
???@Override
???public?void?afterConnectionClosed(WebSocketSession?session,?CloseStatus?status)?{
???????clients.remove(session);
???????System.out.println("斷開連接:?"? ?session.getId());
???}
?
???@Override
???protected?void?handleTextMessage(WebSocketSession?session,?TextMessage?message)?{
???????String?payload?=?message.getPayload();
???????Map?map?=?JSONObject.parseObject(payload,?HashMap.class);
???????System.out.println("接受到的數(shù)據(jù)"? ?map);
???????clients.forEach(s?->?{
???????????try?{
???????????????System.out.println("發(fā)送消息給:?"? ?session.getId());
???????????????s.sendMessage(new?TextMessage("服務(wù)器返回收到的信息,"? ?payload));
???????????}?catch?(Exception?e)?{
???????????????e.printStackTrace();
???????????}
???????});
???}
}
從這個(gè)demo中,使用spring websocket實(shí)現(xiàn)ws服務(wù)的便利性大家可想而知了。為了能更好地向spring cloud大家族看齊,我最終采用了spring websocket實(shí)現(xiàn)ws服務(wù)。因此我的應(yīng)用服務(wù)架構(gòu)是這樣子的:一個(gè)應(yīng)用既負(fù)責(zé)restful服務(wù),也負(fù)責(zé)ws服務(wù)。沒(méi)有將ws服務(wù)模塊拆分是因?yàn)椴鸱殖鋈ヒ褂胒eign來(lái)進(jìn)行服務(wù)調(diào)用。第一本人比較懶惰,第二拆分與不拆分相差在多了一層服務(wù)間的io調(diào)用,所以就沒(méi)有這么做了。
-? ? ?從zuul開始技術(shù)轉(zhuǎn)型? ? -要實(shí)現(xiàn)websocket集群,我們必不可免地得從zuul轉(zhuǎn)型到spring cloud gateway。原因如下:zuul1.0版本不支持websocket轉(zhuǎn)發(fā),zuul 2.0開始支持websocket,zuul2.0幾個(gè)月前開源了,但是2.0版本沒(méi)有被spring boot集成,而且文檔不健全。因此轉(zhuǎn)型是必須的,同時(shí)轉(zhuǎn)型也很容易實(shí)現(xiàn)。在gateway中,為了實(shí)現(xiàn)ssl認(rèn)證和動(dòng)態(tài)路由負(fù)載均衡,yml文件中以下的某些配置是必須的,在這里提前避免大家采坑
server:
??port:?443
??ssl:
????enabled:?true
????key-store:?classpath:xxx.jks
????key-store-password:?xxxx
????key-store-type:?JKS
????key-alias:?alias
spring:
??application:
????name:?api-gateway
??cloud:
????gateway:
??????httpclient:
????????ssl:
??????????handshake-timeout-millis:?10000
??????????close-notify-flush-timeout-millis:?3000
??????????close-notify-read-timeout-millis:?0
??????????useInsecureTrustManager:?true
??????discovery:
????????locator:
??????????enabled:?true
??????????lower-case-service-id:?true
??????routes:
??????-?id:?dc
????????uri:?lb://dc
????????predicates:
????????-?Path=/dc/**
??????-?id:?wecheck
????????uri:?lb://wecheck
????????predicates:
????????-?Path=/wecheck/**
如果要愉快地玩https卸載,我們還需要配置一個(gè)filter,否則請(qǐng)求網(wǎng)關(guān)時(shí)會(huì)出現(xiàn)錯(cuò)誤not an SSL/TLS record@Component
public?class?HttpsToHttpFilter?implements?GlobalFilter,?Ordered?{
??private?static?final?int?HTTPS_TO_HTTP_FILTER_ORDER?=?10099;
??@Override
??public?Mono?filter(ServerWebExchange?exchange,?GatewayFilterChain?chain)? {
??????URI?originalUri?=?exchange.getRequest().getURI();
??????ServerHttpRequest?request?=?exchange.getRequest();
??????ServerHttpRequest.Builder?mutate?=?request.mutate();
??????String?forwardedUri?=?request.getURI().toString();
??????if?(forwardedUri?!=?null?