跟著源碼學IM(八):萬字長文,手把手教你用Netty打造IM聊天

本文作者芋艿,原題“使用 Netty 實現(xiàn) IM 聊天賊簡單”,本次有修訂和改動。
一、本文引言
上篇《跟著源碼學IM(七):手把手教你用WebSocket打造Web端IM聊天》中,我們使用 WebSocket 實現(xiàn)了一個簡單的 IM 功能,支持身份認證、私聊消息、群聊消息。
然后就有人發(fā)私信,希望使用純 Netty 實現(xiàn)一個類似的功能,因此就有了本文。

注:源碼請從同步鏈接附件中下載,http://www.52im.net/thread-3489-1-1.html。
學習交流:
- 移動端IM開發(fā)入門文章:《新手入門一篇就夠:從零開發(fā)移動端IM》
- 開源IM框架源碼:https://github.com/JackJiang2011/MobileIMSDK
(本文同步發(fā)布于:http://www.52im.net/thread-3489-1-1.html)
二、知識準備
可能有人不知道 Netty 是什么,這里簡單介紹下:
Netty 是一個 Java 開源框架。Netty 提供異步的、事件驅動的網(wǎng)絡應用程序框架和工具,用以快速開發(fā)高性能、高可靠性的網(wǎng)絡服務器和客戶端程序。
也就是說,Netty 是一個基于 NIO 的客戶、服務器端編程框架,使用Netty 可以確保你快速和簡單的開發(fā)出一個網(wǎng)絡應用,例如實現(xiàn)了某種協(xié)議的客戶,服務端應用。
Netty 相當簡化和流線化了網(wǎng)絡應用的編程開發(fā)過程,例如,TCP 和 UDP 的 Socket 服務開發(fā)。
以下是幾篇有關Netty的入門文章,值得一讀:
《新手入門:目前為止最透徹的的Netty高性能原理和框架架構解析》
《寫給初學者:Java高性能NIO框架Netty的學習方法和進階策略》
《史上最通俗Netty框架入門長文:基本介紹、環(huán)境搭建、動手實戰(zhàn)》
如果你連Java的NIO都不知道是什么,下面的文章建議優(yōu)先讀一下:
《少啰嗦!一分鐘帶你讀懂Java的NIO和經(jīng)典IO的區(qū)別》
《史上最強Java NIO入門:擔心從入門到放棄的,請讀這篇!》
《Java的BIO和NIO很難懂?用代碼實踐給你看,再不懂我轉行!》
Netty源碼和API的在線閱讀地址:
1)Netty-4.1.x 完整源碼(在線閱讀版)(* 推薦)
2)Netty-4.0.x 完整源碼(在線閱讀版)
3)Netty-4.1.x API文檔(在線版)(* 推薦)
4)Netty-4.0.x API文檔(在線版)
三、本文源碼
本文完整代碼附件下載:請從同步鏈接附件中下載,http://www.52im.net/thread-3489-1-1.html。
源碼的目錄結構,如下圖所示:?

如上圖所示:
1)lab-67-netty-demo-server 項目:搭建 Netty 服務端;
2)lab-67-netty-demo-client 項目:搭建 Netty 客戶端;
3)lab-67-netty-demo-common 項目:提供 Netty 的基礎封裝,提供消息的編解碼、分發(fā)的功能。
另外,源碼中也會提供 Netty 常用功能的示例:
1)心跳機制,實現(xiàn)服務端對客戶端的存活檢測;
2)斷線重連,實現(xiàn)客戶端對服務端的重新連接。
不嗶嗶,直接開干。
五、通信協(xié)議
在上一章中,我們實現(xiàn)了客戶端和服務端的連接功能。而本小節(jié),我們要讓它們兩能夠說上話,即進行數(shù)據(jù)的讀寫。
在日常項目的開發(fā)中,前端和后端之間采用 HTTP 作為通信協(xié)議,使用文本內容進行交互,數(shù)據(jù)格式一般是 JSON。但是在 TCP 的世界里,我們需要自己基于二進制構建,構建客戶端和服務端的通信協(xié)議。
我們以客戶端向服務端發(fā)送消息來舉個例子,假設客戶端要發(fā)送一個登錄請求。
對應的類如下:
public class AuthRequest {
????/** 用戶名 **/
????private String username;
????/** 密碼 **/
????private String password;
}
顯然:我們無法將一個 Java 對象直接丟到 TCP Socket 當中,而是需要將其轉換成 byte 字節(jié)數(shù)組,才能寫入到 TCP Socket 中去。即,需要將消息對象通過序列化,轉換成 byte 字節(jié)數(shù)組。
同時:在服務端收到 byte 字節(jié)數(shù)組時,需要將其又轉換成 Java 對象,即反序列化。不然,服務端對著一串 byte 字節(jié)處理個毛線?!
友情提示:服務端向客戶端發(fā)消息,也是一樣的過程哈!
序列化的工具非常多,例如說 Google 提供的?Protobuf,性能高效,且序列化出來的二進制數(shù)據(jù)較小。Netty 對?Protobuf?進行集成,提供了相應的編解碼器。
如下圖所示:?

但是考慮到很多可能對 Protobuf 并不了解,因為它實現(xiàn)序列化又增加額外學習成本。因此,仔細一個捉摸,還是采用 JSON 方式進行序列化。可能有人會疑惑,JSON 不是將對象轉換成字符串嗎?嘿嘿,我們再把字符串轉換成 byte 字節(jié)數(shù)組就可以啦~
下面,我們新建?lab-67-netty-demo-common?項目,并在 codec 包下,實現(xiàn)我們自定義的通信協(xié)議。
如下圖所示:

5.1、Invocation
創(chuàng)建 Invocation 類,通信協(xié)議的消息體。
代碼如下:
/**
?* 通信協(xié)議的消息體
?*/
public class Invocation {
????/**
?????* 類型
?????*/
????private String type;
????/**
?????* 消息,JSON 格式
?????*/
????private String message;
?
????// 空構造方法
????public Invocation() {
????}
?
????public Invocation(String type, String message) {
????????this.type = type;
????????this.message = message;
????}
?
????public Invocation(String type, Message message) {
????????this.type = type;
????????this.message = JSON.toJSONString(message);
????}
?
????// ... 省略 setter、getter、toString 方法
}
①?type 屬性,類型,用于匹配對應的消息處理器。如果類比 HTTP 協(xié)議,type 屬性相當于請求地址。
②?message 屬性,消息內容,使用 JSON 格式。
另外,Message 是我們定義的消息接口,代碼如下:
public interface Message {
????// ... 空,作為標記接口
}
5.2、粘包與拆包
在開始看 Invocation 的編解碼處理器之前,我們先了解下粘包與拆包的概念。
5.2.1 產(chǎn)生原因
產(chǎn)生粘包和拆包問題的主要原因是,操作系統(tǒng)在發(fā)送 TCP 數(shù)據(jù)的時候,底層會有一個緩沖區(qū),例如 1024 個字節(jié)大小。
如果一次請求發(fā)送的數(shù)據(jù)量比較小,沒達到緩沖區(qū)大小,TCP 則會將多個請求合并為同一個請求進行發(fā)送,這就形成了粘包問題。
例如說:在《詳解 Socket 編程 --- TCP_NODELAY 選項》文章中我們可以看到,在關閉 Nagle 算法時,請求不會等待滿足緩沖區(qū)大小,而是盡快發(fā)出,降低延遲。
如果一次請求發(fā)送的數(shù)據(jù)量比較大,超過了緩沖區(qū)大小,TCP 就會將其拆分為多次發(fā)送,這就是拆包,也就是將一個大的包拆分為多個小包進行發(fā)送。
如下圖展示了粘包和拆包的一個示意圖,演示了粘包和拆包的三種情況:?

如上圖所示:
1)A 和 B 兩個包都剛好滿足 TCP 緩沖區(qū)的大小,或者說其等待時間已經(jīng)達到 TCP 等待時長,從而還是使用兩個獨立的包進行發(fā)送;
2)A 和 B 兩次請求間隔時間內較短,并且數(shù)據(jù)包較小,因而合并為同一個包發(fā)送給服務端;
3)B 包比較大,因而將其拆分為兩個包 B_1 和 B_2 進行發(fā)送,而這里由于拆分后的 B_2 比較小,其又與 A 包合并在一起發(fā)送。
5.2.2 解決方案
對于粘包和拆包問題,常見的解決方案有三種。
①?客戶端在發(fā)送數(shù)據(jù)包的時候,每個包都固定長度。比如 1024 個字節(jié)大小,如果客戶端發(fā)送的數(shù)據(jù)長度不足 1024 個字節(jié),則通過補充空格的方式補全到指定長度。
這種方式,暫時沒有找到采用這種方式的案例。
②?客戶端在每個包的末尾使用固定的分隔符。例如 \r\n,如果一個包被拆分了,則等待下一個包發(fā)送過來之后找到其中的 \r\n,然后對其拆分后的頭部部分與前一個包的剩余部分進行合并,這樣就得到了一個完整的包。具體的案例,有 HTTP、WebSocket、Redis。
③?將消息分為頭部和消息體,在頭部中保存有當前整個消息的長度,只有在讀取到足夠長度的消息之后才算是讀到了一個完整的消息。
友情提示:方案 ③ 是 ① 的升級版,動態(tài)長度。
本文將采用這種方式,在每次 Invocation 序列化成字節(jié)數(shù)組寫入 TCP Socket 之前,先將字節(jié)數(shù)組的長度寫到其中。
如下圖所示:?

5.3、InvocationEncoder
創(chuàng)建 InvocationEncoder 類,實現(xiàn)將 Invocation 序列化,并寫入到 TCP Socket 中。
代碼如下:
public class InvocationEncoder extends MessageToByteEncoder<Invocation> {
?
????private Logger logger = LoggerFactory.getLogger(getClass());
?
????@Override
????protected void encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out) {
????????// <2.1> 將 Invocation 轉換成 byte[] 數(shù)組
????????byte[] content = JSON.toJSONBytes(invocation);
????????// <2.2> 寫入 length
????????out.writeInt(content.length);
????????// <2.3> 寫入內容
????????out.writeBytes(content);
????????logger.info("[encode][連接({}) 編碼了一條消息({})]", ctx.channel().id(), invocation.toString());
????}
}
①?MessageToByteEncoder?是 Netty 定義的編碼 ChannelHandler 抽象類,將泛型?消息轉換成字節(jié)數(shù)組。
②?#encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out)?方法,進行編碼的邏輯。
<2.1>?處,調用 JSON 的 #toJSONBytes(Object object, SerializerFeature... features) 方法,將 Invocation 轉換成 字節(jié)數(shù)組。
<2.2>?處,將字節(jié)數(shù)組的長度,寫入到 TCP Socket 當中。這樣,后續(xù)「5.4 InvocationDecoder」可以根據(jù)該長度,解析到消息,解決粘包和拆包的問題。
友情提示:MessageToByteEncoder 會最終將 ByteBuf out 寫到 TCP Socket 中。
<2.3>?處,將字節(jié)數(shù)組,寫入到 TCP Socket 當中。
5.4、InvocationDecoder
創(chuàng)建 InvocationDecoder 類,實現(xiàn)從 TCP Socket 讀取字節(jié)數(shù)組,反序列化成 Invocation。
代碼如下:?

①?ByteToMessageDecoder?是 Netty 定義的解碼 ChannelHandler 抽象類,在 TCP Socket 讀取到新數(shù)據(jù)時,觸發(fā)進行解碼。
②?在?<2.1>、<2.2>、<2.3>?處,從 TCP Socket 中讀取長度。
③?在?<3.1>、<3.2>、<3.3>?處,從 TCP Socket 中讀取字節(jié)數(shù)組,并反序列化成 Invocation 對象。
最終,添加 List<Object> out 中,交給后續(xù)的 ChannelHandler 進行處理。稍后,我們將在「6. 消息分發(fā)」小結中,會看到 MessageDispatcher 將 Invocation 分發(fā)到其對應的 MessageHandler 中,進行業(yè)務邏輯的執(zhí)行。
5.5、引入依賴
創(chuàng)建 pom.xml 文件,引入 Netty、FastJSON 等等依賴。

5.6、本章小結
至此,我們已經(jīng)完成通信協(xié)議的定義、編解碼的邏輯,是不是蠻有趣的?!
另外,我們在 NettyServerHandlerInitializer 和 NettyClientHandlerInitializer 的初始化代碼中,將編解碼器添加到其中。
如下圖所示:?

六、消息分發(fā)
在 SpringMVC 中,DispatcherServlet 會根據(jù)請求地址、方法等,將請求分發(fā)到匹配的 Controller 的 Method 方法上。
在?lab-67-netty-demo-client?項目的 dispatcher 包中,我們創(chuàng)建了 MessageDispatcher 類,實現(xiàn)和 DispatcherServlet 類似的功能,將 Invocation 分發(fā)到其對應的 MessageHandler 中,進行業(yè)務邏輯的執(zhí)行。?

下面,我們來看看具體的代碼實現(xiàn)。
6.1、Message
創(chuàng)建 Message 接口,定義消息的標記接口。
代碼如下:
public interface Message {
}
下圖,是我們涉及到的 Message 實現(xiàn)類。
如下圖所示:

6.2、MessageHandler
創(chuàng)建 MessageHandler 接口,消息處理器接口。
代碼如下:
public interface MessageHandler<T extendsMessage> {
????/**
?????* 執(zhí)行處理消息
?????*
?????* @param channel 通道
?????* @param message 消息
?????*/
????voide xecute(Channel channel, T message);
?
????/**
?????* @return 消息類型,即每個 Message 實現(xiàn)類上的 TYPE 靜態(tài)字段
?????*/
????String getType();
}
如上述代碼所示:
1)定義了泛型 <T> ,需要是 Message 的實現(xiàn)類;
2)定義的兩個接口方法,自己看下注釋哈。
下圖,是我們涉及到的 MessageHandler 實現(xiàn)類。
如下圖所示:?

6.3、MessageHandlerContainer
創(chuàng)建 MessageHandlerContainer 類,作為 MessageHandler 的容器。
代碼如下:?

①?實現(xiàn) InitializingBean 接口,在 #afterPropertiesSet() 方法中,掃描所有 MessageHandler Bean ,添加到 MessageHandler 集合中。
②?在?#getMessageHandler(String type)?方法中,獲得類型對應的 MessageHandler 對象。稍后,我們會在 MessageDispatcher 調用該方法。
③?在?#getMessageClass(MessageHandler handler)?方法中,通過 MessageHandler 中,通過解析其類上的泛型,獲得消息類型對應的 Class 類。這是參考 Rocketmq-spring 項目的?DefaultRocketMQListenerContainer#getMessageType()?方法,進行略微修改。
6.4、MessageDispatcher
創(chuàng)建 MessageDispatcher 類,將 Invocation 分發(fā)到其對應的 MessageHandler 中,進行業(yè)務邏輯的執(zhí)行。
代碼如下:
@ChannelHandler.Sharable
public class MessageDispatcher extends SimpleChannelInboundHandler<Invocation> {
?
????@Autowired
????private MessageHandlerContainer messageHandlerContainer;
?
????private final ExecutorService executor =? Executors.newFixedThreadPool(200);
?
????@Override
????protected void channelRead0(ChannelHandlerContext ctx, Invocation invocation) {
????????// <3.1> 獲得 type 對應的 MessageHandler 處理器
????????MessageHandler messageHandler = messageHandlerContainer.getMessageHandler(invocation.getType());
????????// 獲得? MessageHandler 處理器的消息類
????????Class<? extendsMessage> messageClass = MessageHandlerContainer.getMessageClass(messageHandler);
????????// <3.2> 解析消息
????????Message message = JSON.parseObject(invocation.getMessage(), messageClass);
????????// <3.3> 執(zhí)行邏輯
????????executor.submit(newRunnable() {
?
????????????@Override
????????????public void run() {
????????????????// noinspection unchecked
????????????????messageHandler.execute(ctx.channel(), message);
????????????}
????????});
????}
}
① 在類上添加 @ChannelHandler.Sharable 注解,標記這個 ChannelHandler 可以被多個 Channel 使用。
② SimpleChannelInboundHandler 是 Netty 定義的消息處理 ChannelHandler 抽象類,處理消息的類型是 <I> 泛型時。
③ #channelRead0(ChannelHandlerContext ctx, Invocation invocation) 方法,處理消息,進行分發(fā)。

<3.1> 處,調用 MessageHandlerContainer 的 #getMessageHandler(String type) 方法,獲得 Invocation 的 type 對應的 MessageHandler 處理器。
然后,調用 MessageHandlerContainer 的 #getMessageClass(messageHandler) 方法,獲得??MessageHandler 處理器的消息類。
<3.2> 處,調用 JSON 的 ## parseObject(String text, Class<T> clazz) 方法,將 Invocation 的 message 解析成 MessageHandler 對應的消息對象。
<3.3> 處,丟到線程池中,然后調用 MessageHandler 的 #execute(Channel channel, T message) 方法,執(zhí)行業(yè)務邏輯。
注意:為什么要丟到 executor 線程池中呢?我們先來了解下 EventGroup 的線程模型。
友情提示:在我們啟動 Netty 服務端或者客戶端時,都會設置其 EventGroup。
EventGroup 我們可以先簡單理解成一個線程池,并且線程池的大小僅僅是 CPU 數(shù)量 * 2。每個 Channel 僅僅會被分配到其中的一個線程上,進行數(shù)據(jù)的讀寫。并且,多個 Channel 會共享一個線程,即使用同一個線程進行數(shù)據(jù)的讀寫。
那么試著思考下,MessageHandler 的具體邏輯視線中,往往會涉及到 IO 處理,例如說進行數(shù)據(jù)庫的讀取。這樣,就會導致一個 Channel 在執(zhí)行 MessageHandler 的過程中,阻塞了共享當前線程的其它 Channel 的數(shù)據(jù)讀取。
因此,我們在這里創(chuàng)建了 executor 線程池,進行 MessageHandler 的邏輯執(zhí)行,避免阻塞 Channel 的數(shù)據(jù)讀取。
可能會有人說,我們是不是能夠把 EventGroup 的線程池設置大一點,例如說 200 呢?對于長連接的 Netty 服務端,往往會有 1000 ~ 100000 的 Netty 客戶端連接上來,這樣無論設置多大的線程池,都會出現(xiàn)阻塞數(shù)據(jù)讀取的情況。
友情提示:executor 線程池,我們一般稱之為業(yè)務線程池或者邏輯線程池,顧名思義,就是執(zhí)行業(yè)務邏輯的。這樣的設計方式,目前 Dubbo 等等 RPC 框架,都采用這種方式。后續(xù),可以認真閱讀下《【NIO 系列】——之 Reactor 模型》文章,進一步理解。
6.5、NettyServerConfig
創(chuàng)建 NettyServerConfig 配置類,創(chuàng)建 MessageDispatcher 和 MessageHandlerContainer Bean。
代碼如下:
@Configuration
public class NettyServerConfig {
?
????@Bean
????public MessageDispatcher messageDispatcher() {
????????return new MessageDispatcher();
????}
?
????@Bean
????public MessageHandlerContainer messageHandlerContainer() {
????????return new MessageHandlerContainer();
????}
}
6.6、NettyClientConfig
創(chuàng)建 NettyClientConfig 配置類,創(chuàng)建 MessageDispatcher 和 MessageHandlerContainer Bean。
代碼如下:
@Configuration
public class NettyClientConfig {
????@Bean
????public MessageDispatcher messageDispatcher() {
????????return new MessageDispatcher();
????}
????@Bean
????public MessageHandlerContainer messageHandlerContainer() {
????????return new MessageHandlerContainer();
????}
}
6.7、本章小結
后續(xù),我們將在如下小節(jié),具體演示消息分發(fā)的使用。
七、斷開重連
Netty 客戶端需要實現(xiàn)斷開重連機制,解決各種情況下的斷開情況。
例如說:
1)Netty 客戶端啟動時,Netty 服務端處于掛掉,導致無法連接上;
2)在運行過程中,Netty 服務端掛掉,導致連接被斷開;
3)任一一端網(wǎng)絡抖動,導致連接異常斷開。
具體的代碼實現(xiàn)比較簡單,只需要在兩個地方增加重連機制:
1)Netty 客戶端啟動時,無法連接 Netty 服務端時,發(fā)起重連;
2)Netty 客戶端運行時,和 Netty 斷開連接時,發(fā)起重連。
考慮到重連會存在失敗的情況,我們采用定時重連的方式,避免占用過多資源。
7.1、具體代碼
①?在 NettyClient 中,提供 #reconnect() 方法,實現(xiàn)定時重連的邏輯。
代碼如下:
// NettyClient.java
public void reconnect() {
????eventGroup.schedule(new Runnable() {
????????@Override
????????publicvoidrun() {
????????????logger.info("[reconnect][開始重連]");
????????????try{
????????????????start();
????????????} catch(InterruptedException e) {
????????????????logger.error("[reconnect][重連失敗]", e);
????????????}
????????}
????}, RECONNECT_SECONDS, TimeUnit.SECONDS);
????logger.info("[reconnect][{} 秒后將發(fā)起重連]", RECONNECT_SECONDS);
}
通過調用 EventLoop 提供的?#schedule(Runnable command, long delay, TimeUnit unit)?方法,實現(xiàn)定時邏輯。而在內部的具體邏輯,調用 NettyClient 的 #start() 方法,發(fā)起連接 Netty 服務端。
又因為 NettyClient 在 #start() 方法在連接 Netty 服務端失敗時,又會調用 #reconnect() 方法,從而再次發(fā)起定時重連。如此循環(huán)反復,知道 Netty 客戶端連接上 Netty 服務端。
如下圖所示:?

②?在 NettyClientHandler 中,實現(xiàn)?#channelInactive(ChannelHandlerContext ctx)?方法,在發(fā)現(xiàn)和 Netty 服務端斷開時,調用 Netty Client 的?#reconnect()?方法,發(fā)起重連。
代碼如下:
// NettyClientHandler.java
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
????// 發(fā)起重連
????nettyClient.reconnect();
????// 繼續(xù)觸發(fā)事件
????super.channelInactive(ctx);
}
7.2、簡單測試
①?啟動 Netty Client,不要啟動 Netty Server,控制臺打印日志如下圖:?

可以看到 Netty Client 在連接失敗時,不斷發(fā)起定時重連。
②?啟動 Netty Server,控制臺打印如下圖:?

可以看到 Netty Client 成功重連上 Netty Server。
八、心跳機制與空閑檢測
我們可以了解到 TCP 自帶的空閑檢測機制,默認是 2 小時。這樣的檢測機制,從系統(tǒng)資源層面上來說是可以接受的。
但是在業(yè)務層面,如果 2 小時才發(fā)現(xiàn)客戶端與服務端的連接實際已經(jīng)斷開,會導致中間非常多的消息丟失,影響客戶的使用體驗。
因此,我們需要在業(yè)務層面,自己實現(xiàn)空閑檢測,保證盡快發(fā)現(xiàn)客戶端與服務端實際已經(jīng)斷開的情況。
實現(xiàn)邏輯如下:
1)服務端發(fā)現(xiàn) 180 秒未從客戶端讀取到消息,主動斷開連接;
2)客戶端發(fā)現(xiàn) 180 秒未從服務端讀取到消息,主動斷開連接。
考慮到客戶端和服務端之間并不是一直有消息的交互,所以我們需要增加心跳機制。
邏輯如下:
1)客戶端每 60 秒向服務端發(fā)起一次心跳消息,保證服務端可以讀取到消息;
2)服務端在收到心跳消息時,回復客戶端一條確認消息,保證客戶端可以讀取到消息。
友情提示:
為什么是 180 秒?可以加大或者減小,看自己希望多快檢測到連接異常。過短的時間,會導致心跳過于頻繁,占用過多資源。
為什么是 60 秒?三次機會,確認是否心跳超時。
雖然聽起來有點復雜,但是實現(xiàn)起來并不復雜哈。
8.1、服務端的空閑檢測
在 NettyServerHandlerInitializer 中,我們添加了一個?ReadTimeoutHandler?處理器,它在超過指定時間未從對端讀取到數(shù)據(jù),會拋出?ReadTimeoutException?異常。
如下圖所示:

通過這樣的方式,實現(xiàn)服務端發(fā)現(xiàn) 180 秒未從客戶端讀取到消息,主動斷開連接。
8.2、客戶端的空閑檢測
在 NettyClientHandlerInitializer 中,我們添加了一個?ReadTimeoutHandler?處理器,它在超過指定時間未從對端讀取到數(shù)據(jù),會拋出?ReadTimeoutException?異常。
如下圖所示:?

通過這樣的方式,實現(xiàn)客戶端發(fā)現(xiàn) 180 秒未從服務端讀取到消息,主動斷開連接。
8.3、心跳機制
Netty 提供了?IdleStateHandler?處理器,提供空閑檢測的功能,在 Channel 的讀或者寫空閑時間太長時,將會觸發(fā)一個 IdleStateEvent 事件。
這樣,我們只需要在 NettyClientHandler 處理器中,在接收到 IdleStateEvent 事件時,客戶端向客戶端發(fā)送一次心跳消息。
如下圖所示:

其中,HeartbeatRequest 是心跳請求。
同時,我們在服務端項目中,創(chuàng)建了一個 HeartbeatRequestHandler 消息處理器,在收到客戶端的心跳請求時,回復客戶端一條確認消息。
代碼如下:
@Component
public class HeartbeatRequestHandler implementsMessageHandler<HeartbeatRequest> {
????private Logger logger = LoggerFactory.getLogger(getClass());
?
????@Override
????public void execute(Channel channel, HeartbeatRequest message) {
????????logger.info("[execute][收到連接({}) 的心跳請求]", channel.id());
????????// 響應心跳
????????HeartbeatResponse response = newHeartbeatResponse();
????????channel.writeAndFlush(newInvocation(HeartbeatResponse.TYPE, response));
????}
?
????@Override
????public String getType() {
????????return HeartbeatRequest.TYPE;
????}
}
其中,HeartbeatResponse 是心跳確認響應。
8.4、簡單測試
啟動 Netty Server 服務端,再啟動 Netty Client 客戶端,耐心等待 60 秒后,可以看到心跳日志如下:?
九、認證邏輯
從本小節(jié)開始,我們就具體看看業(yè)務邏輯的處理示例。
認證的過程,如下圖所示:
9.1、AuthRequest
創(chuàng)建 AuthRequest 類,定義用戶認證請求。
代碼如下:
public class AuthRequest implements Message {
????public static final String TYPE = "AUTH_REQUEST";
? ?/**
?????* 認證 Token
?????*/
????private String accessToken;
????// ... 省略 setter、getter、toString 方法
}
這里我們使用 accessToken 認證令牌進行認證。
因為一般情況下,我們使用 HTTP 進行登錄系統(tǒng),然后使用登錄后的身份標識(例如說 accessToken 認證令牌),將客戶端和當前用戶進行認證綁定。
9.2、AuthResponse
創(chuàng)建 AuthResponse 類,定義用戶認證響應。
代碼如下:
public class AuthResponse implements Message {
????public static final String TYPE = "AUTH_RESPONSE";
?
????/**
?????* 響應狀態(tài)碼
?????*/
????private Integer code;
????/**
?????* 響應提示
?????*/
????private String message;
?
????// ... 省略 setter、getter、toString 方法
}
9.3、AuthRequestHandler
服務端...
創(chuàng)建 AuthRequestHandler 類,為服務端處理客戶端的認證請求。
代碼如下:?
代碼比較簡單,看看?<1>、<2>、<3>、<4>?上的注釋。
9.4、AuthResponseHandler
客戶端...
創(chuàng)建 AuthResponseHandler 類,為客戶端處理服務端的認證響應。
代碼如下:
@Component
public class AuthResponseHandler implements MessageHandler<AuthResponse> {
?
????private Logger logger = LoggerFactory.getLogger(getClass());
?
????@Override
????public void execute(Channel channel, AuthResponse message) {
????????logger.info("[execute][認證結果:{}]", message);
????}
?
????@Override
????public String getType() {
????????return AuthResponse.TYPE;
????}
}
打印個認證結果,方便調試。
9.5、TestController
客戶端...
創(chuàng)建 TestController 類,提供 /test/mock 接口,模擬客戶端向服務端發(fā)送請求。
代碼如下:
@RestController
@RequestMapping("/test")
public class TestController {
?
????@Autowired
????private NettyClient nettyClient;
?
????@PostMapping("/mock")
????public String mock(String type, String message) {
????????// 創(chuàng)建 Invocation 對象
????????Invocation invocation = new Invocation(type, message);
????????// 發(fā)送消息
????????nettyClient.send(invocation);
????????return "success";
????}
}
9.6、簡單測試
啟動 Netty Server 服務端,再啟動 Netty Client 客戶端,然后使用 Postman 模擬一次認證請求。
如下圖所示:?
同時,可以看到認證成功的日志如下:
十一、群聊邏輯
群聊的過程,如下圖所示:?
服務端負責將客戶端 A 發(fā)送的群聊消息,轉發(fā)給客戶端 A、B、C。
友情提示:考慮到邏輯簡潔,提供的本小節(jié)的示例并不是一個一個群,而是所有人在一個大的群聊中哈~
11.1、ChatSendToAllRequest
創(chuàng)建 ChatSendToOneRequest 類,發(fā)送給所有人的群聊消息的請求。
代碼如下:
public class ChatSendToAllRequest implements Message {
????public static final String TYPE = "CHAT_SEND_TO_ALL_REQUEST";
????/**
?????* 消息編號
?????*/
????private String msgId;
????/**
?????* 內容
?????*/
????private String content;
?
????// ... 省略 setter、getter、toString 方法
}
PS:如果是正經(jīng)的群聊,會有一個 groupId 字段,表示群編號。
11.2、ChatSendToAllHandler
服務端...
創(chuàng)建 ChatSendToAllHandler 類,為服務端處理客戶端的群聊請求。
代碼如下:?
代碼比較簡單,看看?<1>、<2>?上的注釋。
11.3、簡單測試
①?啟動 Netty Server 服務端。
②?啟動 Netty Client 客戶端 A。然后使用 Postman 模擬一次認證請求(用戶為 yunai)。
如下圖所示:
③?啟動 Netty Client 客戶端 B。注意,需要設置 --server.port 端口為 8081,避免沖突。
④?啟動 Netty Client 客戶端 C。注意,需要設置 --server.port 端口為 8082,避免沖突。?
⑤?最后使用 Postman 模擬一次發(fā)送群聊消息。
如下圖所示:?
同時,可以看到客戶端 A 群發(fā)給所有客戶端的日志如下:
最后,要想系統(tǒng)地學習IM開發(fā)的方方面面,請繼續(xù)閱讀:《新手入門一篇就夠:從零開發(fā)移動端IM》
附錄、系列文章
《跟著源碼學IM(一):手把手教你用Netty實現(xiàn)心跳機制、斷線重連機制》
《跟著源碼學IM(二):自已開發(fā)IM很難?手把手教你擼一個Andriod版IM》
《跟著源碼學IM(三):基于Netty,從零開發(fā)一個IM服務端》
《跟著源碼學IM(四):拿起鍵盤就是干,教你徒手開發(fā)一套分布式IM系統(tǒng)》
《跟著源碼學IM(五):正確理解IM長連接、心跳及重連機制,并動手實現(xiàn)》
《跟著源碼學IM(六):手把手教你用Go快速搭建高性能、可擴展的IM系統(tǒng)》
《跟著源碼學IM(七):手把手教你用WebSocket打造Web端IM聊天》
《跟著源碼學IM(八):萬字長文,手把手教你用Netty打造IM聊天》(* 本文)
?本文同步發(fā)布鏈接是:http://www.52im.net/thread-3489-1-1.html