《跟閃電俠學Netty》閱讀筆記 - 聊天系統(tǒng)實現
Part1引言
本部分整合聊天系統(tǒng)有關的章節(jié),內容主要是介紹關鍵功能的實現邏輯,建議讀者先看看作者的博客項目,切換到不同分支看看各個細節(jié)功能如何實現。
篇幅有限,這里僅僅記錄一些個人學習過程的重點部分。
Part2思維導圖
https://www.mubu.com/doc/1dunN_7Luzl

Part3項目代碼
作者的倉庫代碼地址:https://github.com/lightningMan/flash-netty5
Part4通信協(xié)議設計和自定義編解碼實現
1什么是通信協(xié)議?
基于TCP通信均為二進制協(xié)議,底層都是通過字節(jié)進行傳輸的,在通信協(xié)議當中規(guī)定數據傳輸的每一個字節(jié)含義。
2通信過程
客戶端轉換數據為二進制。
網絡傳輸給服務端。
服務端根據協(xié)議規(guī)則讀取二進制數據。
服務端處理數據返回響應結果給客戶端。
3聊天系統(tǒng)的通信協(xié)議數據對象設計
在聊天系統(tǒng)當中通信協(xié)議的設計如下。
4字節(jié)魔數
比如Java的字節(jié)碼CafeBabe
,用于快速識別是否自定義協(xié)議,也可以方便快速提取數據。
public?static?final?int?MAGIC_NUMBER?=?0x12345678;
1 字節(jié)版本號
類似TCP的IPV4、IPV6。
/**??
?*?協(xié)議版本??
?*/??
(deserialize?=?false,?serialize?=?false)??
private?Byte?version?=?1;
1 字節(jié)序列化算法
使用1個字節(jié)來標識算法。
/**??
?*?序列化算法定義??
?*/??
public?interface?SerializerAlgorithm?{??
????/**??
?????*?json?序列化??
?????*/??
????byte?JSON?=?1;??
}
1 字節(jié)指令
一個字節(jié)最多表示256種指令。注意在設計上指令和版本號進行綁定關聯(lián),實現不同版本之間的指令兼容,提高程序的健壯性。
??
public?abstract?class?Packet?{??
????/**??
?????*?協(xié)議版本??
?????*/??
???? (deserialize?=?false,?serialize?=?false)??
????private?Byte?version?=?1;??
??
??
???? (serialize?=?false)??
????public?abstract?Byte?getCommand();??
}
4字節(jié)數據長度
數據長度是必要的,主要用于字節(jié)流這種連續(xù)不斷的數據形式進行切割。
byteBuf.writeInt(bytes.length);
int 基本數據類型在Java中默認占4個字節(jié),這4個字節(jié)用來存儲字節(jié)數組的長度。
N字節(jié)數據
數據部分。
4如何實現JAVA對象二進制互相轉化?
所謂互轉對應了網絡 Socket IO 的input/output
中的數據轉化部分,實體數據轉為字節(jié)流這個過程我們通常叫做編碼,反之則是解碼。
無論是編碼還是解碼,都是依賴Netty自定義的 MessageToMessageCodec實現的,聊天系統(tǒng)的編碼和解碼工作都是依賴 PacketCodecHandler 完成的。
.Sharable??
public?class?PacketCodecHandler?extends?MessageToMessageCodec<ByteBuf,?Packet>?{??
????public?static?final?PacketCodecHandler?INSTANCE?=?new?PacketCodecHandler();??
??
????private?PacketCodecHandler()?{??
??
????}??
??
???? ??
????protected?void?decode(ChannelHandlerContext?ctx,?ByteBuf?byteBuf,?List<Object>?out)?{??
????????out.add(PacketCodec.INSTANCE.decode(byteBuf));??
????}??
??
???? ??
????protected?void?encode(ChannelHandlerContext?ctx,?Packet?packet,?List<Object>?out)?{??
????????ByteBuf?byteBuf?=?ctx.channel().alloc().ioBuffer();??
????????PacketCodec.INSTANCE.encode(byteBuf,?packet);??
????????out.add(byteBuf);??
????}??
}
自定義邏輯處理器,在 Netty Server 中需要注冊到 pipeline 當中。
public?static?void?main(String[]?args)?{??
????NioEventLoopGroup?boosGroup?=?new?NioEventLoopGroup();??
????NioEventLoopGroup?workerGroup?=?new?NioEventLoopGroup();??
??
????final?ServerBootstrap?serverBootstrap?=?new?ServerBootstrap();??
????serverBootstrap??
????????????.group(boosGroup,?workerGroup)??
????????????.channel(NioServerSocketChannel.class)??
????????????.option(ChannelOption.SO_BACKLOG,?1024)??
????????????.childOption(ChannelOption.SO_KEEPALIVE,?true)??
????????????.childOption(ChannelOption.TCP_NODELAY,?true)??
????????????.childHandler(new?ChannelInitializer<NioSocketChannel>()?{??
????????????????protected?void?initChannel(NioSocketChannel?ch)?{??
?????????????????//......
????????????????????ch.pipeline().addLast(PacketCodecHandler.INSTANCE);?
??????????????//?......
????????????????}??
????????????});??
??
??
????bind(serverBootstrap,?PORT);??
}
這里解釋下為什么PacketCodecHandler
要被注解標記為“Sharable”,因為編碼和解碼可能在多個handler
中用到,為了提高效率,這里通過共享減少實例的創(chuàng)建。
這個優(yōu)化方式在原書后面的章節(jié)會提到
帶著疑問我們再看看@ChannelHandler.Sharable
這個注解的源碼解釋。
Indicates that the same instance of the annotated ChannelHandler can be added to one or more ChannelPipelines multiple times without a race condition. If this annotation is not specified, you have to create a new handler instance every time you add it to a pipeline because it has unshared state such as member variables. This annotation is provided for documentation purpose, just like the JCIP annotations
上面的內容翻譯過來就是:
被注解的Sharable
的同一個ChannelHandler實例,可以被多次添加到一個或多個ChannelPipeline
中,并且可以確保不會出現(多線程)競爭情況。
如果沒有指定這個注解,那么每次創(chuàng)建新的 Channel 都需要使用新的 Handler 實例。
注意在如果存在不共享變量或者狀態(tài),如有動態(tài)的成員變量,就不能用這個注解。簡單來說@ChannelHandler.Sharable
實現了Netty中的"Bean"單例和共享。
5實戰(zhàn)部分
數據編碼過程(思路)
下面是數據編碼的基本編寫思路。
添加編碼器。
ch.pipeline().addLast(new?PacketEncoder());
往
ByteBuf
逐個寫字段,實現編碼過程。
public?class?PacketEncoder?extends?MessageToByteEncoder<Packet>?{??
??
???? ??
????protected?void?encode(ChannelHandlerContext?ctx,?Packet?packet,?ByteBuf?out)?{??
????????PacketCodec.INSTANCE.encode(out,?packet);??
????}??
}
完整的自定義協(xié)議:PacketCodec#encode。
public?void?encode(ByteBuf?byteBuf,?Packet?packet)?{??
????//?1.?序列化?java?對象??
????byte[]?bytes?=?Serializer.DEFAULT.serialize(packet);??
??
????//?2.?實際編碼過程??
????byteBuf.writeInt(MAGIC_NUMBER);??
????byteBuf.writeByte(packet.getVersion());??
????byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlgorithm());??
????byteBuf.writeByte(packet.getCommand());??
????byteBuf.writeInt(bytes.length);??
????byteBuf.writeBytes(bytes);??
}
解碼數據過程(思路)
下面是數據解碼的基本編寫思路:
在handler當中添加自定義邏輯處理器。
.handler(new?ChannelInitializer<SocketChannel>()?{??
???? ??
????public?void?initChannel(SocketChannel?ch)?{
??ch.pipeline().addLast(new?PacketDecoder());
?}??
});
定義解碼邏輯處理器。
public?class?PacketDecoder?extends?MessageToMessageDecoder<ByteBuf>?{??
??
???? ??
????protected?void?decode(ChannelHandlerContext?ctx,?ByteBuf?in,?List?out)?{??
????????out.add(PacketCodec.INSTANCE.decode(in));??
????}??
}
具體的解碼過程:
跳過魔數。
跳過協(xié)議版本號 ?。
讀取序列化算法。
讀取指令,數據包,算法標識等自定義協(xié)議的基本內容。
根據數據長度。
取出數據。
PacketCodec#decode
public?Packet?decode(ByteBuf?byteBuf)?{??
????//?跳過?magic?number????byteBuf.skipBytes(4);??
??
????//?跳過版本號??
????byteBuf.skipBytes(1);??
??
????//?序列化算法??
????byte?serializeAlgorithm?=?byteBuf.readByte();??
??
????//?指令??
????byte?command?=?byteBuf.readByte();??
??
????//?數據包長度??
????int?length?=?byteBuf.readInt();??
??
????byte[]?bytes?=?new?byte[length];??
????byteBuf.readBytes(bytes);??
??
????Class<??extends?Packet>?requestType?=?getRequestType(command);??
????Serializer?serializer?=?getSerializer(serializeAlgorithm);??
??
????if?(requestType?!=?null?&&?serializer?!=?null)?{??
????????return?serializer.deserialize(requestType,?bytes);??
????}??
??
????return?null;??
}
6思考
JSON序列化方式之外其他序列化方式如何實現?
Java原生序列化
類實現 Serializable 接口
具體底層由
ObjectOutputStream
和ObjectInputStream
實現
Hessian
Hessian 是動態(tài)類型、二進制、緊湊的,并且可跨語言移植的一種序列化框架
Hessian 協(xié)議要比 JDK、JSON 更加緊湊,性能上要比 JDK、JSON 序列化高效很多,而且生成的字節(jié)數也更小
Protobuf
谷歌實現的混合語言數據標準
輕便、高效的結構化數據存儲格式
支持 Java、Python、C++、Go 等語言
要求定義 IDL(Interface description language),并且使用對應語言的IDL生成序列化工具類
Thrift
Facebook于2007年開發(fā)的跨語言的rpc服框架
通過Thrift的編譯環(huán)境生成各種語言類型的接口文件
序列化和編碼都是JAVA對象封裝二進制過程,兩者的聯(lián)系和區(qū)別
總結起來就是一句話:序列化是目標,編碼是方法。網上有一張圖非常直觀的展示了兩者的區(qū)別。

兩者的聯(lián)系和區(qū)別
編碼:信息從一種形式或格式轉換為另一種形式的過程,目的是方便傳輸協(xié)議通信。
序列化:“序列化”其實本身也是“信息從一種形式或格式轉換為另一種形式的過程”,只不過這個表現形式直觀具體,序列化也常常用于表達一個對象的狀態(tài)。
7聊天系統(tǒng)的Netty細節(jié)優(yōu)化
優(yōu)化部分是聊天系統(tǒng)的精髓,也是使用Netty實踐非常有價值的指導和參考。
81. 使用共享Handler
問題分析
在舊版本代碼中,每個新連接每次通過 ChannelInitializer 調用,會造成9個指令對象都被new一遍操作,連接過多會造成大量對象創(chuàng)建影響系統(tǒng)性能。
我們仔細觀察可以發(fā)現,其實很多處理器內部是沒有任何 "狀態(tài)"的,對于無狀態(tài)的業(yè)務處理器就可以使用單例模式封裝。
serverBootstrap
????????????????.childHandler(new?ChannelInitializer<NioSocketChannel>()?{
????????????????????protected?void?initChannel(NioSocketChannel?ch)?{
????????????????????????ch.pipeline().addLast(new?Spliter());
????????????????????????ch.pipeline().addLast(new?PacketDecoder());
????????????????????????ch.pipeline().addLast(new?LoginRequestHandler());
????????????????????????ch.pipeline().addLast(new?AuthHandler());
????????????????????????ch.pipeline().addLast(new?MessageRequestHandler());
????????????????????????ch.pipeline().addLast(new?CreateGroupRequestHandler());
????????????????????????ch.pipeline().addLast(new?JoinGroupRequestHandler());
????????????????????????ch.pipeline().addLast(new?QuitGroupRequestHandler());
????????????????????????ch.pipeline().addLast(new?ListGroupMembersRequestHandler());
????????????????????????ch.pipeline().addLast(new?GroupMessageRequestHandler());
????????????????????????ch.pipeline().addLast(new?LogoutRequestHandler());
????????????????????????ch.pipeline().addLast(new?PacketEncoder());
????????????????????}
????????????????});
優(yōu)化手段
通過加入注解
@ChannelHandler.Shareble
,表示這個 handler 是支持多個 channel 共享的,否則會報錯。發(fā)布靜態(tài) final 的不可變對象來實現單例,編譯器優(yōu)化。
最后還可以壓縮Handler,把編碼和解碼過程放到一個公用的Handler處理(比如請求指令分發(fā)解析處理)。
注意事項
并不是所有的Handler都可以單例
Spliter 不是單例的,因為它需要對每個數據做拆包處理。
92. 縮短事件傳播路徑
問題分析
首先,指令的decode必須要在最前面,因為涉及后面的命令解析。
如果把每個命令decode之后再傳播到每個命令事件,但是對應的事件又不做任何處理,那么會浪費很多次多余的命令判斷。
優(yōu)化手段
根本目的是縮短事件傳播鏈條,讓事件傳播鏈盡可能短,優(yōu)化手段實際上也很簡單,那就是 使用統(tǒng)一Handler。
通常的做法如下:
該Handler只做判斷,不做任何狀態(tài)存儲,使用單例優(yōu)化。
public?static?final?IMHandler?INSTANCE?=?new?IMHandler();
聊天系統(tǒng)中利用HashMap存儲所有的命令處理Handler。
這里個人順帶指定下HashMap初始化容量,小小優(yōu)化一下。
private?IMHandler()?{??
????handlerMap?=?new?HashMap<>(7);??
??
????handlerMap.put(MESSAGE_REQUEST,?MessageRequestHandler.INSTANCE);??
????handlerMap.put(CREATE_GROUP_REQUEST,?CreateGroupRequestHandler.INSTANCE);??
????handlerMap.put(JOIN_GROUP_REQUEST,?JoinGroupRequestHandler.INSTANCE);??
????handlerMap.put(QUIT_GROUP_REQUEST,?QuitGroupRequestHandler.INSTANCE);??
????handlerMap.put(LIST_GROUP_MEMBERS_REQUEST,?ListGroupMembersRequestHandler.INSTANCE);??
????handlerMap.put(GROUP_MESSAGE_REQUEST,?GroupMessageRequestHandler.INSTANCE);??
????handlerMap.put(LOGOUT_REQUEST,?LogoutRequestHandler.INSTANCE);??
}
回調
channelRead0
實際上就是委托給map中的元素對應的指令處理器處理。
??
protected?void?channelRead0(ChannelHandlerContext?ctx,?Packet?packet)?throws?Exception?{??
????handlerMap.get(packet.getCommand()).channelRead(ctx,?packet);??
}
通過一個統(tǒng)一的處理器包括多個靜態(tài)單例處理器,有效減少JVM內存開銷,單例也可以減少對象實例化的開銷。
103. 事件傳播源調整
關鍵點
如果你的 outBound
類型的 handler
較多,在寫數據的時候能用?ctx.writeAndFlush()
?就用這個方法, 不要用 ctx.channel().writeAndFlush()
。
原因
究其原因是ctx.writeAndFlush() 會繞過所有不需要處理的其他Outbound
類型。
ctx.writeAndFlush()
?是從 pipeline
鏈中的當前節(jié)點開始往前找到第一個 outBound 類型向前傳播的,如果這個對象不需要其他outBound
的handler
處理就可以用這個方法。
我們可以通過下面這個圖理解:

而ctx.channel().writeAndFlush() 表現則不同,它是從 pipeline
鏈中的最后一個 outBound
類型的 handler
開始,把對象往前進行傳播,從圖中就可以看到, outBound 的處理器越多,調用鏈路就越長,可能產生越多“無用”操作。
當然如果確定后面的 outBound
都需要進行處理,那么就可以用這個方法。

相關問題
writeAndFlush
為什么可以縮短事件傳播路徑?它是如何實現
OutBound
類型的事件傳播縮短的?
114. 減少阻塞主線程的操作【重要】
Netty中容易被忽視的一點,卻是非常重要的概念,那就是 一個Channel的其中一個Handler阻塞,會導致所有其他綁定的Channel一起被拖慢。
只要有一個 channel
的一個 handler
中的?channelRead0()
?方法阻塞了 NIO 線程,最終都會拖慢綁定在該 NIO 線程上的其他所有的 channel,而不是只影響當前的Channel。
為什么會這樣?源碼之下無問題,但是本文不涉及源碼解讀,讀者可以自己留下問題仔細思考一下。
為了更好理解,這里再舉個代碼的例子。
List<Channel>?channelList?=?已有數據可讀的?channel
for?(Channel?channel?in?channelist)?{
???for?(ChannelHandler?handler?in?channel.pipeline())?{
???????handler.channelRead0();
???}???
}
比如,上面的操作中,如果for循環(huán)某次出現卡頓,這不僅僅拖慢一個客戶端,而是拖慢所有客戶端。
解決這個問題的方式是,由于Netty進行客戶端處理的時候本身已經被設計為非阻塞模式了,大部分情況需要開發(fā)者自行使用 業(yè)務線程池 開啟新的線程防止”卡頓“。
需要注意,引入業(yè)務線程池會增加系統(tǒng)復雜度,也會增加線上調試難度。
125. 如何準確統(tǒng)計時長?
錯誤做法:在線程的頭尾加入時間差計算得出執(zhí)行時長結果。
正確做法:使用writeAndFlush+addListener 的方式判斷 futrue.isDone
之后才計算 。
原因:writeAndFlush 在非NIO線程中它是一個異步操作,其他操作由第一個任務隊列異步執(zhí)行。
關鍵點:writeAndFlush 真正執(zhí)行完成才算是完成處理,監(jiān)聽它完成處理的回調動作才能算出較為準確執(zhí)行時長。
13優(yōu)化小結
如果Handler多例但是無狀態(tài),完全可以改為單例模式 。
盡可能減少Handler的臃腫,防止調用鏈路過長。
一個耗時操作不只影響單個Channel,所以建議Handler的耗時操作要交給業(yè)務線程池開啟新線程處理防止”卡頓“,但是需要注意和線程綁定的相關參數處理問題。
耗時統(tǒng)計,
writeAndFlush
屬于異步任務,使用JDK的Future.isDone()
方法判斷真正的結束時間才是正解。
Part5實現登錄
14處理流程圖

15實現思路
目標客戶端和服務端分別啟動Netty服務。
客戶端發(fā)送登錄請求指令,服務端解碼之后根據傳輸結果校驗,根據校驗結果構建登錄請求響應指令
LoginResponsePacket
。通過
ctx.writeAndFlush(loginResponsePacket);
回送響應結果給客戶端。登錄校驗成功,通過
SessionUtil
添加sessio
n信息客戶端登錄成功之后,構建請求指令對象,設置參數,通過Netty發(fā)送到服務端 。
服務端收到請求進行驗證,并且構建相對應的響應指令結果對象。
16實現步驟
下面是大致的實現步驟:
添加 LoginRequestHandler 登錄邏輯處理器在Server端。
ch.pipeline().addLast(LoginRequestHandler.INSTANCE);
??
.Sharable??
public?class?LoginRequestHandler?extends?SimpleChannelInboundHandler<LoginRequestPacket>?{??
??
????public?static?final?LoginRequestHandler?INSTANCE?=?new?LoginRequestHandler();??
??
????protected?LoginRequestHandler()?{??
????}??
??
???? ??
????protected?void?channelRead0(ChannelHandlerContext?ctx,?LoginRequestPacket?loginRequestPacket)?{??
????????LoginResponsePacket?loginResponsePacket?=?new?LoginResponsePacket();??
????????loginResponsePacket.setVersion(loginRequestPacket.getVersion());??
????????loginResponsePacket.setUserName(loginRequestPacket.getUserName());??
??
????????if?(valid(loginRequestPacket))?{??
????????????loginResponsePacket.setSuccess(true);??
????????????String?userId?=?IDUtil.randomId();??
????????????loginResponsePacket.setUserId(userId);??
????????????System.out.println("["?+?loginRequestPacket.getUserName()?+?"]登錄成功");??
????????????SessionUtil.bindSession(new?Session(userId,?loginRequestPacket.getUserName()),?ctx.channel());??
????????}?else?{??
????????????loginResponsePacket.setReason("賬號密碼校驗失敗");??
????????????loginResponsePacket.setSuccess(false);??
????????????System.out.println(new?Date()?+?":?登錄失敗!");??
????????}??
??
????????//?登錄響應??
????????ctx.writeAndFlush(loginResponsePacket);??
????}??
??
????private?boolean?valid(LoginRequestPacket?loginRequestPacket)?{??
????????return?true;??
????}??
??
???? ??
????public?void?channelInactive(ChannelHandlerContext?ctx)?{??
????????SessionUtil.unBindSession(ctx.channel());??
????}??
}
在客戶端同樣添加Handler也就是
LoginResponseHandler
,LoginResponseHandler
的處理邏輯如下。
ch.pipeline().addLast(LoginResponseHandler.INSTANCE);
public?class?LoginResponseHandler?extends?SimpleChannelInboundHandler<LoginResponsePacket>?{??
??
???? ??
????protected?void?channelRead0(ChannelHandlerContext?ctx,?LoginResponsePacket?loginResponsePacket)?{??
????????String?userId?=?loginResponsePacket.getUserId();??
????????String?userName?=?loginResponsePacket.getUserName();??
??
????????if?(loginResponsePacket.isSuccess())?{??
????????????System.out.println("["?+?userName?+?"]登錄成功,userId?為:?"?+?loginResponsePacket.getUserId());??
????????????SessionUtil.bindSession(new?Session(userId,?userName),?ctx.channel());??
????????}?else?{??
????????????System.out.println("["?+?userName?+?"]登錄失敗,原因:"?+?loginResponsePacket.getReason());??
????????}??
????}??
??
???? ??
????public?void?channelInactive(ChannelHandlerContext?ctx)?{??
????????System.out.println("客戶端連接被關閉!");??
????}??
}
如何把失敗或者成功標識綁定在客戶端連接? 服務端如何高效判定客戶端重新登錄?
在聊天系統(tǒng)中實現比較簡單,服務端高效判斷的方法是利用ConcurrentHashMap
,Map當中存儲用戶的ID,如果登錄成功則存儲到此Map中,服務端也只需要判斷Map元素確認是否登錄。
private?static?final?Map<String,?Channel>?userIdChannelMap?=?new?ConcurrentHashMap<>();
17熱插拔客戶端是否登錄驗證
校驗是否登錄的邏輯封裝到工具類當中,實現比較簡單。
SessionUtil
public?static?boolean?hasLogin(Channel?channel)?{??
??
????return?getSession(channel)?!=?null;??
}??
??
public?static?Session?getSession(Channel?channel)?{??
??
????return?channel.attr(Attributes.SESSION).get();??
}
//?AttributeKey<Session>?SESSION?=?AttributeKey.newInstance("session");
AuthHandler
實現熱插拔的思路是判斷是否登錄,統(tǒng)一通過該調用鏈條完成,AuthHandler
本身作為單獨處理器封裝判斷登錄校驗邏輯。
void?channelRead(ChannelHandlerContext?ctx,?Object?msg)?throws?Exception?{
?
if?(!SessionUtil.hasLogin(ctx.channel()))?{
????????ctx.channel().close();????
?}?else?{
????????ctx.pipeline().remove(this);????????
????????super.channelRead(ctx,?msg);????
?}
}
Part6實現雙端收發(fā)消息
18客戶端處理
客戶端成功登錄之后,下一步是實現客戶端和服務端互相發(fā)送數據??蛻舳耸障⑻幚砥魅缦拢?/p>
//?收消息處理器??
ch.pipeline().addLast(new?MessageResponseHandler());
MessageResponseHandler
public?class?MessageResponseHandler?extends?SimpleChannelInboundHandler<MessageResponsePacket>?{??
???? ??
????protected?void?channelRead0(ChannelHandlerContext?ctx,?MessageResponsePacket?messageResponsePacket)?{??
????????String?fromUserId?=?messageResponsePacket.getFromUserId();??
????????String?fromUserName?=?messageResponsePacket.getFromUserName();??
????????System.out.println(fromUserId?+?":"?+?fromUserName?+?"?->?"?+?messageResponsePacket??
????????????????.getMessage());??
????}??
}
19服務端處理
因為是通用組件,服務端這里封裝到 IMHandler 通用組件當中。
handlerMap.put(MESSAGE_REQUEST,?MessageRequestHandler.INSTANCE);
MessageRequestHandler
.Sharable??
public?class?MessageRequestHandler?extends?SimpleChannelInboundHandler<MessageRequestPacket>?{??
????public?static?final?MessageRequestHandler?INSTANCE?=?new?MessageRequestHandler();??
??
????private?MessageRequestHandler()?{??
??
????}??
??
???? ??
????protected?void?channelRead0(ChannelHandlerContext?ctx,?MessageRequestPacket?messageRequestPacket)?{??
????????long?begin?=?System.currentTimeMillis();??
??
??
????????//?1.拿到消息發(fā)送方的會話信息??
????????Session?session?=?SessionUtil.getSession(ctx.channel());??
??
????????//?2.通過消息發(fā)送方的會話信息構造要發(fā)送的消息??
????????MessageResponsePacket?messageResponsePacket?=?new?MessageResponsePacket();??
????????messageResponsePacket.setFromUserId(session.getUserId());??
????????messageResponsePacket.setFromUserName(session.getUserName());??
????????messageResponsePacket.setMessage(messageRequestPacket.getMessage());??
??
????????//?3.拿到消息接收方的?channel????????Channel?toUserChannel?=?SessionUtil.getChannel(messageRequestPacket.getToUserId());??
??
????????//?4.將消息發(fā)送給消息接收方??
????????if?(toUserChannel?!=?null?&&?SessionUtil.hasLogin(toUserChannel))?{??
????????????toUserChannel.writeAndFlush(messageResponsePacket).addListener(future?->?{??
????????????????if?(future.isDone())?{??
??
????????????????}??
??
????????????});??
????????}?else?{??
????????????System.err.println("["?+?session.getUserId()?+?"]?不在線,發(fā)送失敗!");??
??
????????}??
????}??
}
20小結
實現雙端收發(fā)消息小結:
定義收發(fā)消息Java對象,對于消息進行收發(fā)。
Channel
的attr
的方法可以給Channel綁定屬性并設置某些狀態(tài),內部實際也是通過Map維護的,不需要用戶外部自己在自定義去維護。如何在控制臺當中獲取消息并且發(fā)送到服務端。
服務端回傳消息給客戶端。
Part7ChannelPipleline 和 ChannelHandler 概念
本部分是補充部分。主要介紹 Pipeline
和ChannelHanlder
構成和一些基礎概念。
21ChannelPipleline 和 ChannelHandler 構成圖

理解這幅圖之前,需要先理解Channel
這個概念。
22Channel 概念理解
一個客戶端連接對應一個Channel,這個Channel可以類比BIO當中的傳統(tǒng)概念Socket套接字。
A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind.
一個網絡套接字的節(jié)點或一個能夠進行(網絡)I/O操作的組件,如讀、寫、連接和綁定。
23ChannelPipeline
源碼對于 ChannelPipeline 的定義如下:
A list of ChannelHandlers which handles or intercepts inbound events and outbound operations of a Channel. ChannelPipeline implements an advanced form of the Intercepting Filter pattern to give a user full control over how an event is handled and how the ChannelHandlers in a pipeline interact with each other.
源碼中還有一個直觀的設計圖。
下圖描述了I/O事件在ChannelPipeline中是如何被ChannelHandlers處理的。
一個I/O事件由ChannelInboundHandler
或 ChannelOutboundHandler
處理,并通過調用 ChannelHandlerContext
中定義的事件傳播方法,比如ChannelHandlerContext.fireChannelRead(Object)
和ChannelHandlerContext.write(Object)
,轉發(fā)給其最接近的處理程序。

ChannelPipeline
的核心如下:
處理或攔截一個
Channel
的入站事件和出站操作的鏈表。通過責任鏈模式的設計,可以完全自定義處理邏輯和
ChannelHandler
之間互相通信的邏輯。
24ChannelContext
ChannelHandler與Channel和ChannelPipeline之間的映射關系,由ChannelHandlerContext
進?維護。
Enables a ChannelHandler to interact with its ChannelPipeline and other handlers.
(ChannelContext)使得ChannelHandler
能夠與它的ChannelPipeline
和其他處理程序互動。
ChannelContext
可以獲取整個Channel
的信息。獲取所有的上下文。
邏輯處理器
ChannelHandler
定義處理邏輯。
25ChannelHanlder
ChannelHanlder
包含兩種理解。
第一種理解:可以理解為Socket
連接,客戶端和服務端連接的時候會創(chuàng)建一個channel
。 負責基本的IO操作,例如:bind()
、connect()
、read()
、write()
。
第二種理解:Netty的Channel
接口所提供的API,大大減少了Socket
類復雜性。
因為Channel
連接過程中存在雙端 input/output
,所以 ChannelHandler
也分類為 ChannelInboundHandler
和 ChannelOutboundHandler
。
ChannelInboundHandler
讀取的邏輯抽象 。
channelRead
是最重要的方法 。配合
ByteBuf
使用進行buf.read
推進讀指針移動 。
ChannelOutboundHandler
對應寫出的邏輯抽象 。
核心方法是
write
,writeAndFlush
。
適配器
在使用過程中還存在對應的適配器。
ChannelOutboundHandlerAdapter
(注意處理順序和添加addLast
的順序相反)ChannelInboundHandlerAdapter
Part8客戶端和服務端的 SimpleChannelInboundHandler/ChannelInboundHandlerAdapter 簡化
整個聊天系統(tǒng)大部分的指令判斷邏輯是重復的,下面介紹如何通過 SImpleChannelInboundHandler/ChannelInboundHandlerAdapter 簡化指令的處理邏輯。
`ChannelInboundHandlerAdapter`[1]?which allows to explicit only handle a specific type of messages. For example here is an implementation which only handle?
String
?messages.
ChannelInboundHandlerAdapter ?允許明確地只處理特定類型的消息。而SimpleChannelInboundHandler
提供了一個模板,作用是把處理邏輯不變的內容寫好在 channelRead(ctx,msg)
中,并且在里面調用 channelRead0
,這樣處理之后就可以通過抽象方法實現傳遞到子類中去進行傳播。
26區(qū)別
SimpleChannelInboundHandler
和ChannelInboundHandlerAdapter
這兩個類使用上不太好區(qū)分,下面再補充介紹一下如何正確對待使用兩者。
ChannelInboundHandlerAdapter
需要覆蓋的方法是channelRead,特點是不會自動釋放消息,需要調用ctx.fireChannelRead(msg) 向后續(xù)鏈條處理器傳遞消息,也就是需要手動通過責任鏈的方式傳遞給下位處理器。SimpleChannelInboundHandler
是 ChannelInboundHandlerAdapter
的子類,做了額外的處理,會自動釋放消息。如果還需要繼續(xù)傳遞消息,此時需調用一次 **ReferenceCountUtil.retain(msg)**。
需注意SimpleChannelInboundHandler
也要調用ctx.fireChannelRead(msg)
來觸發(fā)鏈條中下一處理器處理。
ChannelInboundHandlerAdapter
通常用于處于鏈條中間的某些環(huán)節(jié)對數據進行處理,如數據驗證,需要將消息繼續(xù)傳遞。SimpleChannelInboundHandler
則比較適合鏈條最后一個環(huán)節(jié),該環(huán)節(jié)處理完后,后續(xù)不再需要該消息,因此可以自動釋放。
27應用
在聊天系統(tǒng)中統(tǒng)一處理的Handler繼承了SimpleChannelInboundHandler,重寫channelRead0
方法,主要對于解碼之后的操作指令和通用Map進行匹配,如果匹配則分發(fā)到具體的邏輯處理器。
IMHandler 的公用Handler實現非常簡單直觀。
.Sharable??
public?class?IMHandler?extends?SimpleChannelInboundHandler<Packet>?{??
????public?static?final?IMHandler?INSTANCE?=?new?IMHandler();??
??
????private?Map<Byte,?SimpleChannelInboundHandler<??extends?Packet>>?handlerMap;??
??
????private?IMHandler()?{??
????????handlerMap?=?new?HashMap<>(7);??
??
????????handlerMap.put(MESSAGE_REQUEST,?MessageRequestHandler.INSTANCE);??
????????handlerMap.put(CREATE_GROUP_REQUEST,?CreateGroupRequestHandler.INSTANCE);??
????????handlerMap.put(JOIN_GROUP_REQUEST,?JoinGroupRequestHandler.INSTANCE);??
????????handlerMap.put(QUIT_GROUP_REQUEST,?QuitGroupRequestHandler.INSTANCE);??
????????handlerMap.put(LIST_GROUP_MEMBERS_REQUEST,?ListGroupMembersRequestHandler.INSTANCE);??
????????handlerMap.put(GROUP_MESSAGE_REQUEST,?GroupMessageRequestHandler.INSTANCE);??
????????handlerMap.put(LOGOUT_REQUEST,?LogoutRequestHandler.INSTANCE);??
????}??
??
???? ??
????protected?void?channelRead0(ChannelHandlerContext?ctx,?Packet?packet)?throws?Exception?{??
????????handlerMap.get(packet.getCommand()).channelRead(ctx,?packet);??
????}??
}
Part9客戶端和服務端單聊
28目標
輸入用戶名,服務端隨機分配ID,這里省去通過賬號和密碼注冊過程 。
多個客戶端登錄,用 userId 空格 消息的方式單聊。
29實現過程
使用工具類把
UserId
和Channe
l綁定為Session。Session
的信息包含用戶ID以及名稱 ,后續(xù)可以擴展更多的字段。使用
SessionUtil
工具類操作Session,通過Session貯存當前會話信息。注意建議用ConcurrentHashMap
ConcurrentHashMap
為userId -> Channel的映射Map。用戶登錄,需要把Session塞入Map。
當用戶斷開
Channel
連接退出,需要移除Session信息。服務端接受消息并且轉發(fā)(這里Netty類似轉發(fā)手機信號的基站)
獲取會話信息。
構造發(fā)給客戶端的對象
MessageResponse
。消息接收方標識獲取對應
Channel
。如果目標用戶登錄則發(fā)送消息,如果對方不在線,則控制臺打印警告信息。
部分實現代碼如下:
MessageResponseHandler
public?class?MessageResponseHandler?extends?SimpleChannelInboundHandler<MessageResponsePacket>?{??
???? ??
????protected?void?channelRead0(ChannelHandlerContext?ctx,?MessageResponsePacket?messageResponsePacket)?{??
????????String?fromUserId?=?messageResponsePacket.getFromUserId();??
????????String?fromUserName?=?messageResponsePacket.getFromUserName();??
????????System.out.println(fromUserId?+?":"?+?fromUserName?+?"?->?"?+?messageResponsePacket??
????????????????.getMessage());??
????}??
}
MessageRequestHandler
.Sharable??
public?class?MessageRequestHandler?extends?SimpleChannelInboundHandler<MessageRequestPacket>?{??
????public?static?final?MessageRequestHandler?INSTANCE?=?new?MessageRequestHandler();??
??
????private?MessageRequestHandler()?{??
??
????}??
??
???? ??
????protected?void?channelRead0(ChannelHandlerContext?ctx,?MessageRequestPacket?messageRequestPacket)?{??
????????long?begin?=?System.currentTimeMillis();??
??
??
????????//?1.拿到消息發(fā)送方的會話信息??
????????Session?session?=?SessionUtil.getSession(ctx.channel());??
??
????????//?2.通過消息發(fā)送方的會話信息構造要發(fā)送的消息??
????????MessageResponsePacket?messageResponsePacket?=?new?MessageResponsePacket();??
????????messageResponsePacket.setFromUserId(session.getUserId());??
????????messageResponsePacket.setFromUserName(session.getUserName());??
????????messageResponsePacket.setMessage(messageRequestPacket.getMessage());??
??
????????//?3.拿到消息接收方的?channel????????Channel?toUserChannel?=?SessionUtil.getChannel(messageRequestPacket.getToUserId());??
??
????????//?4.將消息發(fā)送給消息接收方??
????????if?(toUserChannel?!=?null?&&?SessionUtil.hasLogin(toUserChannel))?{??
????????????toUserChannel.writeAndFlush(messageResponsePacket).addListener(future?->?{??
????????????????if?(future.isDone())?{??
??
????????????????}??
??
????????????});??
????????}?else?{??
????????????System.err.println("["?+?session.getUserId()?+?"]?不在線,發(fā)送失敗!");??
??
????????}??
????}??
}
Part10群聊發(fā)起和通知
下面兩個小節(jié)圍繞群聊實現介紹。
群聊和單聊實現類似,都是通過標識獲取Channel,為了方面多個成員管理,設計 ChannelGroup
完成Channel
的批量操作。
30預期效果
三位用戶依次登錄。
控制臺輸入 createGroup 指令,提示創(chuàng)建群聊需要 userId 列表,之后以英文逗號分隔userId。
群聊創(chuàng)建成功之后,所有群聊成員收到加入成功消息。
31創(chuàng)建群聊實現
主要邏輯如下:
創(chuàng)建一個
channel
分組。篩選出待加入群聊的用戶的
channel
和userName
。創(chuàng)建群聊創(chuàng)建結果的響應。
給每個客戶端發(fā)送拉群通知。
保存群組相關的信息。
存儲群的相關信息利用了ConcurrentHashMap
實現,和Session
的會話信息存儲方式類似。ChannelGroup對象負責封裝多個Channel
的信息,模擬群聊中的“群”。
??
.Sharable??
public?class?CreateGroupRequestHandler?extends?SimpleChannelInboundHandler<CreateGroupRequestPacket>?{??
????public?static?final?CreateGroupRequestHandler?INSTANCE?=?new?CreateGroupRequestHandler();??
??
????private?CreateGroupRequestHandler()?{??
??
????}??
??
???? ??
????protected?void?channelRead0(ChannelHandlerContext?ctx,?CreateGroupRequestPacket?createGroupRequestPacket)?{??
????????List<String>?userIdList?=?createGroupRequestPacket.getUserIdList();??
??
????????List<String>?userNameList?=?new?ArrayList<>();??
????????//?1.?創(chuàng)建一個?channel?分組??
????????ChannelGroup?channelGroup?=?new?DefaultChannelGroup(ctx.executor());??
??
????????//?2.?篩選出待加入群聊的用戶的?channel?和?userName????????for?(String?userId?:?userIdList)?{??
????????????Channel?channel?=?SessionUtil.getChannel(userId);??
????????????if?(channel?!=?null)?{??
????????????????channelGroup.add(channel);??
????????????????userNameList.add(SessionUtil.getSession(channel).getUserName());??
????????????}??
????????}??
??
????????//?3.?創(chuàng)建群聊創(chuàng)建結果的響應??
????????String?groupId?=?IDUtil.randomId();??
????????CreateGroupResponsePacket?createGroupResponsePacket?=?new?CreateGroupResponsePacket();??
????????createGroupResponsePacket.setSuccess(true);??
????????createGroupResponsePacket.setGroupId(groupId);??
????????createGroupResponsePacket.setUserNameList(userNameList);??
??
????????//?4.?給每個客戶端發(fā)送拉群通知??
????????channelGroup.writeAndFlush(createGroupResponsePacket);??
??
????????System.out.print("群創(chuàng)建成功,id?為?"?+?createGroupResponsePacket.getGroupId()?+?",?");??
????????System.out.println("群里面有:"?+?createGroupResponsePacket.getUserNameList());??
??
????????//?5.?保存群組相關的信息??
????????SessionUtil.bindChannelGroup(groupId,?channelGroup);??
????}??
}
客戶端收到消息處理邏輯為簡單打印創(chuàng)建群聊成功的信息,代碼很簡單這里不再貼代碼。
Part11群聊成員管理實現
32設計流程和實現思路
設計流程
加入群聊,控制臺輸出創(chuàng)建成功消息。
控制臺輸入
joinGroup
之后輸入群ID,加入群聊,控制臺顯示加入群成功。控制臺輸入
listGroupMembers
然后輸入群ID,展示群成員。quitGroup
輸入群ID,進行退群控制臺輸入
joinGroup
之后輸入群ID顯示對應成員不在,則退群成功。
實現思路
在控制臺中加入群加入的命令處理器。
服務端處理群聊請求。
客戶端處理加群響應.
群聊退出實現。
33在控制臺中加入群加入的命令處理器
JoinGroupConsoleCommand
public?class?JoinGroupConsoleCommand?implements?ConsoleCommand?{??
???? ??
????public?void?exec(Scanner?scanner,?Channel?channel)?{??
????????JoinGroupRequestPacket?joinGroupRequestPacket?=?new?JoinGroupRequestPacket();??
??
????????System.out.print("輸入?groupId,加入群聊:");??
????????String?groupId?=?scanner.next();??
??
????????joinGroupRequestPacket.setGroupId(groupId);??
????????channel.writeAndFlush(joinGroupRequestPacket);??
????}??
}
34服務端處理群聊請求
服務端處理群聊請求:
構建
Channel
,把處在同一個分組的Channel
放到一個List
當中存儲 。如果群聊構建成功,則構建創(chuàng)建成功響應結果 。
.Sharable??
public?class?JoinGroupRequestHandler?extends?SimpleChannelInboundHandler<JoinGroupRequestPacket>?{??
????public?static?final?JoinGroupRequestHandler?INSTANCE?=?new?JoinGroupRequestHandler();??
??
????private?JoinGroupRequestHandler()?{??
??
????}??
??
???? ??
????protected?void?channelRead0(ChannelHandlerContext?ctx,?JoinGroupRequestPacket?requestPacket)?{??
????????//?1.?獲取群對應的?channelGroup,然后將當前用戶的?channel?添加進去??
????????String?groupId?=?requestPacket.getGroupId();??
????????ChannelGroup?channelGroup?=?SessionUtil.getChannelGroup(groupId);??
????????channelGroup.add(ctx.channel());??
??
????????//?2.?構造加群響應發(fā)送給客戶端??
????????JoinGroupResponsePacket?responsePacket?=?new?JoinGroupResponsePacket();??
??
????????responsePacket.setSuccess(true);??
????????responsePacket.setGroupId(groupId);??
????????ctx.writeAndFlush(responsePacket);??
????}??
}
35客戶端處理加群響應
簡單打印加群的響應消息。
public?class?JoinGroupResponseHandler?extends?SimpleChannelInboundHandler<JoinGroupResponsePacket>?{??
??
???? ??
????protected?void?channelRead0(ChannelHandlerContext?ctx,?JoinGroupResponsePacket?responsePacket)?{??
????????if?(responsePacket.isSuccess())?{??
????????????System.out.println("加入群["?+?responsePacket.getGroupId()?+?"]成功!");??
????????}?else?{??
????????????System.err.println("加入群["?+?responsePacket.getGroupId()?+?"]失敗,原因為:"?+?responsePacket.getReason());??
????????}??
????}??
}
36群聊退出實現
群聊退出主要是獲取群對應的 channelGroup
,然后將當前用戶的 channel
移除,之后構建退群的響應信息回傳客戶端即可。
QuitGroupRequestHandler
.Sharable??
public?class?QuitGroupRequestHandler?extends?SimpleChannelInboundHandler<QuitGroupRequestPacket>?{??
????public?static?final?QuitGroupRequestHandler?INSTANCE?=?new?QuitGroupRequestHandler();??
??
????private?QuitGroupRequestHandler()?{??
??
????}??
??
???? ??
????protected?void?channelRead0(ChannelHandlerContext?ctx,?QuitGroupRequestPacket?requestPacket)?{??
????????//?1.?獲取群對應的?channelGroup,然后將當前用戶的?channel?移除??
????????String?groupId?=?requestPacket.getGroupId();??
????????ChannelGroup?channelGroup?=?SessionUtil.getChannelGroup(groupId);??
????????channelGroup.remove(ctx.channel());??
??
????????//?2.?構造退群響應發(fā)送給客戶端??
????????QuitGroupResponsePacket?responsePacket?=?new?QuitGroupResponsePacket();??
??
????????responsePacket.setGroupId(requestPacket.getGroupId());??
????????responsePacket.setSuccess(true);??
????????ctx.writeAndFlush(responsePacket);??
??
????}??
}
Part12心跳檢測
37網絡問題
假死
從TCP層面來看,服務端斷開連接,需要收到4次握手包或者RST包才算真正斷開連接,如果中途應用程序并沒有捕獲到,此時依然會認為這條連接存在的。
假死引發(fā)問題
客戶端發(fā)送數據超時無響應,影響體驗。
浪費CPU和內存資源,性能下滑。
假死原因
公網丟包,網絡抖動 。
應用程序阻塞無法讀寫 。
客戶端或者服務端設別故障,網卡,機房故障。
為了解決上面的問題,通常會使用心跳檢測機制,定期檢測每個Channel
連接是否存活。
38服務端心跳檢測實現
通過
IdleStateHandler
自帶Handler
實現繼承類,然后開啟定時任務。
如果發(fā)現假死,則
Handler
回調channelIdle
方法判斷,根據最后一個收到心跳的間隔判定是否需要移除Handler。
39客戶端預判和防御假死
新建
Handler
。開啟定時線程。
組裝心跳包。
發(fā)送心跳。
服務端接受和識別傳來Handler心跳包,刷新內部維護的心跳信息,之后回送收到心跳包消息即可。
40注意事項
心跳檢測Handler插入到整個Pipeline最前面,因為如果連接本已經斷開,那么此時再進行后續(xù)處理均無意義。
假死不一定“死”,防止服務端誤判,客戶端也需要措施防止假死和預判假死,這就是客戶端預判的含義。
41思考
IdleHandler
可否單例?斷開鏈接之后重新連接登錄。
下面是答案:
IdleHandler 可否單例?
答案是不能。因為它并不是無狀態(tài)的,并且每個Channel都有各自的連接狀態(tài)。
斷開鏈接之后重新連接登錄
通過額外的線程定時輪循所有的連接的活躍性,如果發(fā)現其中有規(guī)定時間內假死連接,則嘗試執(zhí)行重連。
Part13寫在最后
熟悉聊天系統(tǒng)對于后續(xù)的源碼分析十分有意義,建議讀者掌握吃透各種Netty的基礎用法,項目的整體構建比較簡單入門,個人在筆記中僅僅將重點部分挑選并做了梳理。
Part14文章參考
https://juejin.cn/book/m/6844733738119593991/section/6844733738291576840?suid=2040300414187416
參考資料
[1]
class in io.netty.channel: https://netty.io/4.0/api/io/netty/channel/ChannelInboundHandlerAdapter.html