《跟閃電俠學(xué)Netty》閱讀筆記 - 開篇入門Netty
引言
《跟閃電俠學(xué)Netty》 并不是個人接觸的第一本Netty書籍,但個人更推薦讀者把它作為作為第一本Netty入門的書籍。
和 《Netty In Action》 不同,這本書直接從Netty入門程序代碼開始引入Netty框架,前半部分教你如何用Netty搭建簡易的通訊系統(tǒng),整體難度比較低,后半部分直接從服務(wù)端源碼、客戶端源碼、ChannelPipeline開始介紹,和前半部分割裂較為嚴(yán)重。
相較于入門的程序,源碼分析毫無疑問是比較有干貨的部分,但是和前面入門程序相比有點學(xué)完了99乘法表就讓你去做微積分的卷子一樣,如果Netty使用生疏源碼部分講解肯定是十分難懂的,所以更建議只看前半截。
個人比較推薦這本書吃透Netty編寫的簡單通訊“項目”之后,直接去看《Netty In Action》做一個更為系統(tǒng)的深入和基礎(chǔ)鞏固。等《Netty In Action》看明白之后,再回過頭來看《跟閃電俠學(xué)Netty》的源碼分析部分。
拋開源碼分析部分,這本書是“我奶奶都能學(xué)會”的優(yōu)秀入門書籍,用代碼實戰(zhàn)加講解方式學(xué)起來輕松印象深刻。
開篇入門部分先不引入項目,這里先對于過去JDK的網(wǎng)絡(luò)IO模型作為引子介紹為什么我們需要用Netty,學(xué)習(xí)Netty帶來的好處等。
思維導(dǎo)圖
https://www.mubucm.com/doc/3eZpDZQHKMB

Netty 依賴版本(4.1.6.Final)
本書使用的Netty版本為 4.1.6,為了避免后面閱讀源碼的時候產(chǎn)生誤解,建議以此版本為基準(zhǔn)。
????<dependency>
????????<groupId>io.netty</groupId>
????????<artifactId>netty-all</artifactId>
????????<version>4.1.6.Final</version>
????</dependency>
JDK 原生編程模型
到目前為止,JDK一共實現(xiàn)了三種網(wǎng)絡(luò)IO編程模型:BIO、NIO和AIO。三種模型不僅產(chǎn)生的間隔時間跨度大,并且由三組完全不同編程風(fēng)格的開發(fā)人員設(shè)計API,不同編程模型和設(shè)計思路之間的切換十分復(fù)雜,開發(fā)者的學(xué)習(xí)成本也比較大。
針對這些問題,我們直接了解Netty如何統(tǒng)一這些模型以及如何降低并發(fā)編程的開發(fā)難度,這里先對過去的JDK網(wǎng)絡(luò)IO編程模型做一個了解。
洗衣機(jī)案例理解阻塞非阻塞,同步異步概念
在了解JDK的網(wǎng)絡(luò)IO模型之前,必須先了解的阻塞非阻塞,同步異步的概念。
同步和異步指的是任務(wù)之間是否需要等待其它任務(wù)完成或者等待某個事件的發(fā)生。如果一個任務(wù)必須等待另一個任務(wù)完成才能繼續(xù)執(zhí)行,那么這兩個任務(wù)就是同步的;如果一個任務(wù)可以直接繼續(xù)執(zhí)行而無需等待另一個任務(wù)的完成,那么這兩個任務(wù)就是異步的。
阻塞和非阻塞指的是任務(wù)在等待結(jié)果時是否會一直占用CPU資源。如果一個任務(wù)在等待結(jié)果時會一直占用CPU資源,那么這個任務(wù)就是阻塞的;如果一個任務(wù)在等待結(jié)果時不會占用CPU資源,那么這個任務(wù)就是非阻塞的。
這里給一個生活中洗衣服的例子幫助完全沒有了解過這些概念的讀者加深印象,這個例子來源于某個網(wǎng)課,個人覺得十分貼切和易懂就拿過來用了。
同步阻塞
理解:
洗衣服丟到洗衣機(jī),全程看著洗衣機(jī)洗完,洗好之后晾衣服。
類比 :
請求接口
等待接口返回結(jié)果,中間不能做其他事情。
拿到結(jié)果處理數(shù)據(jù)
分析: 同步:全程看著洗衣機(jī)洗完。 阻塞:等待洗衣機(jī)洗好衣服之后跑過去晾衣服。
同步非阻塞
理解:
把衣服丟到洗衣機(jī)洗,然后回客廳做其他事情,定時看看洗衣機(jī)是不是洗完了,洗好后再去晾衣服。(等待期間你可以做其他事情,比如用電腦刷劇看視頻)。
這種模式類似日常生活洗衣機(jī)洗衣服。
類比:
請求接口。
等待期間切換到其他任務(wù),但是需要定期觀察接口是否有回送數(shù)據(jù)。
拿到結(jié)果處理數(shù)據(jù)。
分析:
和阻塞方式的最大區(qū)別是不需要一直盯著洗衣機(jī),期間可以抽空干其他的事情。
同步:等待洗衣機(jī)洗完這個事情沒有本質(zhì)變化,洗好衣服之后還是要跑過去晾衣服。 非阻塞:拿到衣服之前可以干別的事情,只不過需要每次隔一段時間查看能不能拿到洗好的衣服。
異步阻塞
理解:
把衣服丟到洗衣機(jī)洗,然后看著洗衣機(jī)洗完,洗好后再去晾衣服(沒這個情況,幾乎沒這個說法,可以忽略)。
類比:
請求接口,不需要關(guān)心結(jié)果。
客戶端可以抽空干其他事情,但是非得等待接口返回結(jié)果
拿到服務(wù)端的處理結(jié)果
分析:
難以描述,幾乎不存在這種說法。
異步非阻塞
理解:
把衣服丟到洗衣機(jī)洗,然后回客廳做其他事情,洗衣機(jī)洗好后會自動去晾衣服,晾完成后放個音樂告訴你洗好衣服并晾好了。
類比 :
請求接口,此時客戶端可以繼續(xù)執(zhí)行代碼。
服務(wù)端準(zhǔn)備并且處理數(shù)據(jù),在處理完成之后在合適的時間通知客戶端
客戶端收到服務(wù)端處理完成的結(jié)果。
分析: 異步:洗衣機(jī)自己不僅把衣服洗好了還幫我們把衣服晾好了。 非阻塞:拿到“衣服”結(jié)果之前可以干別的事情。
注意異步非阻塞情況下,“我們”對待洗衣服這件事情的“態(tài)度”完全變了。
BIO 編程模型
BIO叫做阻塞IO模型,在阻塞IO模型中兩個任務(wù)之間需要等待響應(yīng)結(jié)果,應(yīng)用進(jìn)程需要等待內(nèi)核把整個數(shù)據(jù)準(zhǔn)備好之后才能開始進(jìn)行處理。
BIO是入門網(wǎng)絡(luò)編程的第一個程序,從JDK1.0開始便存在了,存在于
java.net
包當(dāng)中。下面的程序也是入門Tomcat源碼的基礎(chǔ)程序。

Java實現(xiàn)代碼
在BIO的實現(xiàn)代碼中,服務(wù)端通過accept
一直阻塞等待直到有客戶端連接。首先是服務(wù)端代碼。
public?static?void?main(String[]?args)?throws?IOException?{??
????ServerSocket?serverSocket?=?new?ServerSocket(8000);??
??
????//?接受連接??
????new?Thread(()?->?{??
????????while?(true)?{??
????????????//?1.?阻塞獲取連接??
????????????try?{??
????????????????Socket?socket?=?serverSocket.accept();??
??
????????????????//?2.?為每一個新連接使用一個新線程??
????????????????new?Thread(()?->?{??
????????????????????try?{??
????????????????????????int?len;??
????????????????????????byte[]?data?=?new?byte[1024];??
????????????????????????InputStream?inputStream?=?socket.getInputStream();??
????????????????????????//?字節(jié)流讀取數(shù)據(jù)??
????????????????????????while?((-1?!=?(len?=?inputStream.read())))?{??
????????????????????????????System.err.println(new?String(data,?0,?len));??
????????????????????????}??
????????????????????}?catch?(IOException?ioException)?{??
????????????????????????ioException.printStackTrace();??
????????????????????}??
????????????????}).start();??
????????????}?catch?(IOException?e)?{??
????????????????e.printStackTrace();??
??
????????????}??
????????}??
????}).start();??
}
較為核心的部分是serverSocket.accept()
這一串代碼,會導(dǎo)致服務(wù)端阻塞等待客戶端的連接請求,即使沒有連接也會一直阻塞。
服務(wù)端啟動之后會監(jiān)聽8000端口,等待客戶端連接,此時需要一直占用CPU資源,獲取到客戶端連接之將會開辟一個新的線程單獨為客戶端提供服務(wù)。

然后是客戶端代碼。
public?static?void?main(String[]?args)?{??
????new?Thread(()->{??
????????try?{??
????????????Socket?socket?=?new?Socket("127.0.0.1",?8000);??
????????????while?(true){??
????????????????socket.getOutputStream().write((new?Date()?+?":"+?"hellow?world").getBytes(StandardCharsets.ISO_8859_1));??
????????????????try?{??
????????????????????Thread.sleep(2000);??
????????????????}?catch?(InterruptedException?e)?{??
????????????????????e.printStackTrace();??
????????????????}??
????????????}??
????????}?catch?(IOException?ioException)?{??
????????????ioException.printStackTrace();??
????????}??
????}).start();??
}
客戶端的核心代碼如下,通過建立Socket和服務(wù)端建立連接。
Socket?socket?=?new?Socket("127.0.0.1",?8000);
Connected?to?the?target?VM,?address:?'127.0.0.1:5540',?transport:?'socket'
客戶端啟動之后會間隔兩秒發(fā)送數(shù)據(jù)給服務(wù)端,服務(wù)端收到請求之后打印客戶端傳遞的內(nèi)容。

Connected?to?the?target?VM,?address:?'127.0.0.1:5548',?transport:?'socket'
Disconnected?from?the?target?VM,?address:?'127.0.0.1:5548',?transport:?'socket'
Process?finished?with?exit?code?130
優(yōu)缺點分析
傳統(tǒng)的IO模型有如下優(yōu)缺點:
優(yōu)點
實現(xiàn)簡單 。
客戶端較少情況下運行良好。
缺點
每次連接都需要一個單獨的線程。
單機(jī)單核心線程上下文切換代價巨大 。
數(shù)據(jù)讀寫只能以字節(jié)流為單位。
while(true) 死循環(huán)非常浪費CPU資源 。
API 晦澀難懂,對于編程人員需要考慮非常多的內(nèi)容。
結(jié)論:
在傳統(tǒng)的IO模型中,每個連接創(chuàng)建成功之后都需要一個線程來維護(hù),每個線程包含一個while死循環(huán),那么1w個連接對應(yīng)1w個線程,繼而1w個while死循環(huán)。
單機(jī)是不可能完成同時支撐1W個線程的,但是在客戶端連接數(shù)量較少的時候,這種方式效率很高并且實現(xiàn)非常簡單。
NIO 編程模型
NIO 編程模型是 JDK1.4 出現(xiàn)的全新API,它實現(xiàn)的是同步非阻塞IO編程模型。以下面的模型為例,第二階段依然需要等待結(jié)果之后主動處理數(shù)據(jù),主要的區(qū)別在第一階段(紅線部分)輪詢的時候可以干別的事情,只需多次調(diào)用檢查是否有數(shù)據(jù)可以開始讀取。

Java 實現(xiàn)代碼
NIO編程模型中,新來一個連接不再創(chuàng)建一個新的線程,而是可以把這條連接直接綁定到某個指定線程。
概念上理解NIO并不難,但是要寫出JDK的NIO編程模板代碼卻不容易。
public?static?void?main(String[]?args)?throws?IOException?{??
?Selector?serverSelector?=?Selector.open();
????????Selector?clientSelector?=?Selector.open();
????????new?Thread(()?->?{
????????????try?{
????????????????//?對應(yīng)IO編程中服務(wù)端啟動
????????????????ServerSocketChannel?listenerChannel?=?ServerSocketChannel.open();
????????????????listenerChannel.socket().bind(new?InetSocketAddress(8000));
????????????????listenerChannel.configureBlocking(false);
????????????????listenerChannel.register(serverSelector,?SelectionKey.OP_ACCEPT);
????????????????while?(true)?{
????????????????????//?監(jiān)測是否有新的連接,這里的1指的是阻塞的時間為1ms
????????????????????if?(serverSelector.select(1)?>?0)?{
????????????????????????Set<SelectionKey>?set?=?serverSelector.selectedKeys();
????????????????????????Iterator<SelectionKey>?keyIterator?=?set.iterator();
????????????????????????while?(keyIterator.hasNext())?{
????????????????????????????SelectionKey?key?=?keyIterator.next();
????????????????????????????if?(key.isAcceptable())?{
????????????????????????????????try?{
????????????????????????????????????//?(1)?每來一個新連接,不需要創(chuàng)建一個線程,而是直接注冊到clientSelector
????????????????????????????????????SocketChannel?clientChannel?=?((ServerSocketChannel)?key.channel()).accept();
????????????????????????????????????clientChannel.configureBlocking(false);
????????????????????????????????????clientChannel.register(clientSelector,?SelectionKey.OP_READ);
????????????????????????????????}?finally?{
????????????????????????????????????keyIterator.remove();
????????????????????????????????}
????????????????????????????}
????????????????????????}
????????????????????}
????????????????}
????????????}?catch?(IOException?ignored)?{
????????????}
????????}).start();
????????new?Thread(()?->?{
????????????try?{
????????????????while?(true)?{
????????????????????//?(2)?批量輪詢是否有哪些連接有數(shù)據(jù)可讀,這里的1指的是阻塞的時間為1ms
????????????????????if?(clientSelector.select(1)?>?0)?{
????????????????????????Set<SelectionKey>?set?=?clientSelector.selectedKeys();
????????????????????????Iterator<SelectionKey>?keyIterator?=?set.iterator();
????????????????????????while?(keyIterator.hasNext())?{
????????????????????????????SelectionKey?key?=?keyIterator.next();
????????????????????????????if?(key.isReadable())?{
????????????????????????????????try?{
????????????????????????????????????SocketChannel?clientChannel?=?(SocketChannel)?key.channel();
????????????????????????????????????ByteBuffer?byteBuffer?=?ByteBuffer.allocate(1024);
????????????????????????????????????//?(3)?讀取數(shù)據(jù)以塊為單位批量讀取
????????????????????????????????????clientChannel.read(byteBuffer);
????????????????????????????????????byteBuffer.flip();
????????????????????????????????????System.out.println(Charset.defaultCharset().newDecoder().decode(byteBuffer)
????????????????????????????????????????????.toString());
????????????????????????????????}?finally?{
????????????????????????????????????keyIterator.remove();
????????????????????????????????????key.interestOps(SelectionKey.OP_READ);
????????????????????????????????}
????????????????????????????}
????????????????????????}
????????????????????}
????????????????}
????????????}?catch?(IOException?ignored)?{
????????????}
????????}).start();
}
上面的代碼不需要過多糾結(jié),NIO的代碼模板確實非常復(fù)雜,我們可以把上面的兩個線程看作是兩個傳送帶,第一條傳送帶只負(fù)責(zé)接收外部的連接請求,收到請求數(shù)據(jù)之后直接丟給第二條傳送帶處理。第二條傳送帶收到任務(wù)之后進(jìn)行解析和處理,最后把結(jié)果返回即可。
書中并沒有給NIO的客戶端案例,但是有意思的是Netty的客戶端啟動連接代碼可以完美銜接JDK的NIO Server服務(wù)端,從這一點上可以發(fā)現(xiàn)Netty的NIO編程模型實際上就是對于JDK NIO模型的改良和優(yōu)化。
PS:后續(xù)篇章的源碼閱讀可以看到Netty和JDK的API的關(guān)系密不可分。
public?static?void?main(String[]?args)?throws?InterruptedException?{??
????Bootstrap?bootstrap?=?new?Bootstrap();??
????NioEventLoopGroup?eventExecutors?=?new?NioEventLoopGroup();??
????//?引導(dǎo)器引導(dǎo)啟動??
????bootstrap.group(eventExecutors)??
????????????.channel(NioSocketChannel.class)??
????????????.handler(new?ChannelInitializer<Channel>()?{??
???????????????? ??
????????????????protected?void?initChannel(Channel?channel)?throws?Exception?{??
????????????????????channel.pipeline().addLast(new?StringEncoder());??
????????????????}??
????????????});??
??
????//?建立通道??
????Channel?channel?=?bootstrap.connect("127.0.0.1",?8000).channel();??
??
????while?(true){??
????????channel.writeAndFlush(new?Date()?+?"?Hello?world");??
????????Thread.sleep(2000);??
????}??
}
Netty無論是客戶端啟動還是服務(wù)端啟動都會打印一堆日志,下面是客戶端啟動日志。
14:42:24.020?[main]?DEBUG?i.n.buffer.PooledByteBufAllocator?-?-Dio.netty.allocator.cacheTrimIntervalMillis:?0
14:42:24.020?[main]?DEBUG?i.n.buffer.PooledByteBufAllocator?-?-Dio.netty.allocator.useCacheForAllThreads:?false
14:42:24.020?[main]?DEBUG?i.n.buffer.PooledByteBufAllocator?-?-Dio.netty.allocator.maxCachedByteBuffersPerChunk:?1023
14:42:24.027?[main]?DEBUG?io.netty.buffer.ByteBufUtil?-?-Dio.netty.allocator.type:?pooled
14:42:24.027?[main]?DEBUG?io.netty.buffer.ByteBufUtil?-?-Dio.netty.threadLocalDirectBufferSize:?0
14:42:24.027?[main]?DEBUG?io.netty.buffer.ByteBufUtil?-?-Dio.netty.maxThreadLocalCharBufferSize:?16384
14:42:24.052?[main]?DEBUG?io.netty.util.Recycler?-?-Dio.netty.recycler.maxCapacityPerThread:?4096
14:42:24.052?[main]?DEBUG?io.netty.util.Recycler?-?-Dio.netty.recycler.ratio:?8
14:42:24.052?[main]?DEBUG?io.netty.util.Recycler?-?-Dio.netty.recycler.chunkSize:?32
14:42:24.052?[main]?DEBUG?io.netty.util.Recycler?-?-Dio.netty.recycler.blocking:?false
14:42:24.060?[nioEventLoopGroup-2-1]?DEBUG?io.netty.buffer.AbstractByteBuf?-?-Dio.netty.buffer.checkAccessible:?true
14:42:24.060?[nioEventLoopGroup-2-1]?DEBUG?io.netty.buffer.AbstractByteBuf?-?-Dio.netty.buffer.checkBounds:?true
14:42:24.060?[nioEventLoopGroup-2-1]?DEBUG?i.n.util.ResourceLeakDetectorFactory?-?Loaded?default?ResourceLeakDetector:?io.netty.util.ResourceLeakDetector@310af49
Disconnected?from?the?target?VM,?address:?'127.0.0.1:13875',?transport:?'socket'
Process?finished?with?exit?code?130
客戶端連接之后會間隔2S向服務(wù)端推送當(dāng)前時間。
Connected?to?the?target?VM,?address:?'127.0.0.1:13714',?transport:?'socket'
Tue?Apr?11?14:42:24?CST?2023?Hello?world
Tue?Apr?11?14:42:26?CST?2023?Hello?world
Tue?Apr?11?14:42:28?CST?2023?Hello?world
JDK的NIO針對BIO的改良點
NIO模型工作上有了“分工”的細(xì)節(jié),即兩個Selector,一個負(fù)責(zé)接受新連接,另一個負(fù)責(zé)處理連接傳遞的數(shù)據(jù)。
對比BIO模型一個連接就分配一個線程的策略,NIO模型的策略是讓所有的連接注冊過程變?yōu)橛?strong>一個Selector完成,Selector會定期輪詢檢查哪個客戶端連接可以接入,如果可以接入就注冊到當(dāng)前的Selector,后續(xù)遇到數(shù)據(jù)讀取只需要輪詢一個Selector就行了。
線程資源受限問題通過Selector將每個客戶端的while(true) 轉(zhuǎn)為只有一個 while(true) 死循環(huán)得以解決,它的“副作線程用”是線程的減少直接帶來了切換效率的提升。不僅如此NIO還提供了面向Buffer的緩存 ? ByteBuffer,提高讀寫效率,移動指針任意讀寫。
JDK的NIO編程模型缺點
看起來無非就是代碼復(fù)雜了一點,其實NIO模型看起來也“還不錯”?
NO!NO!NO!JDK的NIO實際上還有很多其他問題:
API復(fù)雜難用,需要理解非常多的底層概念 。(尤其是臭名昭著的 ByteBuffer)
JDK沒有線程模型,用戶需要自己設(shè)計底層NIO模型。
自定義協(xié)議也要拆包 。
JDK的NIO是由于Epoll實現(xiàn)的,底層存在空輪詢的BUG 。
自行實現(xiàn)NIO模型會存在很多問題。
編程人員的編程水平層次不齊,個人定制的NIO模型難以通用,替換性也很差。
基于以上種種問題,Netty 統(tǒng)統(tǒng)都有解決方案。
簡單介紹AIO
JDK的AIO不是很成熟,AIO底層依然因為Epoll的遺留問題存在臭名昭著的空輪詢BUG,這里并不推薦讀者使用JDK的AIO進(jìn)行編程。

Java AIO 的核心在于兩個關(guān)鍵類:AsynchronousSocketChannel 和 AsynchronousServerSocketChannel。
AsynchronousSocketChannel 實現(xiàn)異步套接字通信,可以讓我們在不同的客戶端連接之間切換,而無需創(chuàng)建新的線程或線程池。
AsynchronousServerSocketChannel 則用于異步地監(jiān)聽客戶端的連接請求。
Java 實現(xiàn)代碼
這里用ChatGPT生成了一段JDK的AIO代碼,為了更好理解順帶讓它把注釋一塊生成了。
public?class?AIOServer?{??
??
??
????public?static?void?main(String[]?args)?throws?IOException?{??
????????//?創(chuàng)建一個?ExecutorService,用于處理異步操作的線程池??
????????ExecutorService?executor?=?Executors.newFixedThreadPool(10);??
????????//?創(chuàng)建一個?AsynchronousChannelGroup,將線程池與該?Channel?組關(guān)聯(lián)??
????????AsynchronousChannelGroup?channelGroup?=?AsynchronousChannelGroup.withThreadPool(executor);??
??
????????//?創(chuàng)建?AsynchronousServerSocketChannel,并綁定到指定地址和端口??
????????final?AsynchronousServerSocketChannel?serverSocketChannel?=?AsynchronousServerSocketChannel.open(channelGroup);??
????????InetSocketAddress?address?=?new?InetSocketAddress("localhost",?12345);??
????????serverSocketChannel.bind(address);??
??
????????System.out.println("Server?started?on?port?"?+?address.getPort());??
??
????????//?調(diào)用?accept?方法接收客戶端連接,同時傳入一個?CompletionHandler?處理連接結(jié)果??
????????serverSocketChannel.accept(null,?new?CompletionHandler<AsynchronousSocketChannel,?Object>()?{??
????????????//?當(dāng)連接成功時會調(diào)用?completed?方法,傳入客戶端的?SocketChannel?實例作為參數(shù)??
???????????? ??
????????????public?void?completed(AsynchronousSocketChannel?clientSocketChannel,?Object?attachment)?{??
????????????????//?繼續(xù)接受下一個客戶端連接,并處理當(dāng)前客戶端的請求??
????????????????serverSocketChannel.accept(null,?this);??
????????????????handleClient(clientSocketChannel);??
????????????}??
??
????????????//?當(dāng)連接失敗時會調(diào)用?failed?方法,傳入異常信息作為參數(shù)??
???????????? ??
????????????public?void?failed(Throwable?exc,?Object?attachment)?{??
????????????????System.out.println("Error?accepting?connection:?"?+?exc.getMessage());??
????????????}??
????????});??
??
????????//?在主線程中等待,防止程序退出??
????????while?(true)?{??
????????????try?{??
????????????????Thread.sleep(Long.MAX_VALUE);??
????????????}?catch?(InterruptedException?e)?{??
????????????????break;??
????????????}??
????????}??
????}??
??
????private?static?void?handleClient(AsynchronousSocketChannel?clientSocketChannel)?{??
????????ByteBuffer?buffer?=?ByteBuffer.allocate(1024);??
????????//?讀取客戶端發(fā)送的數(shù)據(jù),同時傳入一個?CompletionHandler?處理讀取結(jié)果??
????????clientSocketChannel.read(buffer,?null,?new?CompletionHandler<Integer,?Object>()?{??
????????????//?當(dāng)讀取成功時會調(diào)用?completed?方法,傳入讀取到的字節(jié)數(shù)和附件對象(此處不需要)??
???????????? ??
????????????public?void?completed(Integer?bytesRead,?Object?attachment)?{??
????????????????if?(bytesRead?>?0)?{??
????????????????????//?將?Buffer?翻轉(zhuǎn),以便進(jìn)行讀取操作??
????????????????????buffer.flip();??
????????????????????byte[]?data?=?new?byte[bytesRead];??
????????????????????buffer.get(data,?0,?bytesRead);??
????????????????????String?message?=?new?String(data);??
????????????????????System.out.println("Received?message:?"?+?message);??
????????????????????//?向客戶端發(fā)送數(shù)據(jù)??
????????????????????clientSocketChannel.write(ByteBuffer.wrap(("Hello,?"?+?message).getBytes()));??
????????????????????buffer.clear();??
????????????????????//?繼續(xù)讀取下一批數(shù)據(jù),并傳入當(dāng)前的?CompletionHandler?以處理讀取結(jié)果??
????????????????????clientSocketChannel.read(buffer,?null,?this);??
????????????????}?else?{??
????????????????????try?{??
????????????????????????//?當(dāng)客戶端關(guān)閉連接時,關(guān)閉該?SocketChannel????????????????????????clientSocketChannel.close();??
????????????????????}?catch?(IOException?e)?{??
????????????????????????System.out.println("Error?closing?client?socket?channel:?"?+?e.getMessage());??
????????????????????}??
????????????????}??
????????????}??
??
????????????//?當(dāng)讀取失敗時會調(diào)用?failed?方法,傳入異常信息和附件對象(此處不需要)??
???????????? ??
????????????public?void?failed(Throwable?exc,?Object?attachment)?{??
????????????????System.out.println("Error?reading?from?client?socket?channel:?"?+?exc.getMessage());??
????????????}??
????????});??
????}??
??
??
}
AIO 編程模型優(yōu)缺點
優(yōu)點:
并發(fā)性高、CPU利用率高、線程利用率高 。
缺點:
不適合輕量級數(shù)據(jù)傳輸,因為進(jìn)程之間頻繁的通信在追錯、管理,資源消耗上不是很可觀。
適用場景:
對并發(fā)有需求的重量級數(shù)據(jù)傳輸。
從上面的代碼也可以看出,AIO的API和NIO又是截然不同的寫法,為了不繼續(xù)增加學(xué)習(xí)成本,這里點到為止,不再深入AIO編程模型的部分了,讓我們繼續(xù)回到Netty,了解Netty的編程模型。
使用Netty 帶來的好處
Netty不需要了解過多概念
底層IO模型隨意切換
自帶粘包拆包的問題處理
解決了空輪詢問題
自帶協(xié)議棧,支持通用協(xié)議切換
社區(qū)活躍,各種問題都有解決方案
RPC、消息中間件實踐,健壯性極強(qiáng)
網(wǎng)絡(luò)IO通信框架過程
一個網(wǎng)絡(luò)IO通信框架從客戶端發(fā)出請求到接受到結(jié)果,基本包含了下面這8個操作:
解析指令
構(gòu)建指令對象
編碼
等待響應(yīng)
解碼
翻譯指令對象
解析指令
執(zhí)行
下面來看看Netty的編程模型。
Netty 啟動模板代碼(重要)
經(jīng)過上面一長串的鋪墊,現(xiàn)在來到整體Netty的代碼部分:
服務(wù)端
首先是服務(wù)端代碼:
?public?static?void?main(String[]?args)?{
????????ServerBootstrap?serverBootstrap?=?new?ServerBootstrap();
????????NioEventLoopGroup?boos?=?new?NioEventLoopGroup();
????????NioEventLoopGroup?worker?=?new?NioEventLoopGroup();
????????serverBootstrap
????????????????.group(boos,?worker)
????????????????.channel(NioServerSocketChannel.class)
????????????????.childHandler(new?ChannelInitializer<NioSocketChannel>()?{
????????????????????protected?void?initChannel(NioSocketChannel?ch)?{
????????????????????????ch.pipeline().addLast(new?StringDecoder());
????????????????????????ch.pipeline().addLast(new?SimpleChannelInboundHandler<String>()?{
????????????????????????????
????????????????????????????protected?void?channelRead0(ChannelHandlerContext?ctx,?String?msg)?{
????????????????????????????????System.out.println(msg);
????????????????????????????}
????????????????????????});
????????????????????}
????????????????})
????????????????.bind(8000);
????}
初學(xué)Netty的時候可能沒有NIO的經(jīng)驗,所以我們簡單做個類比:
NioEventLoopGroup?boos?=?new?NioEventLoopGroup();
NioEventLoopGroup?worker?=?new?NioEventLoopGroup();
可以直接看作
Selector?serverSelector?=?Selector.open();
Selector?clientSelector?=?Selector.open();
其中boss負(fù)責(zé)處理連接,worker負(fù)責(zé)讀取請求和處理數(shù)據(jù)。兩者的工作模式也是類似的,boss就像是老板負(fù)責(zé)“接單”,worker 打工仔負(fù)責(zé)接收單子的內(nèi)容然后開始打工干活。
客戶端
客戶端的啟動代碼如下。
public?static?void?main(String[]?args)?throws?InterruptedException?{??
????Bootstrap?bootstrap?=?new?Bootstrap();??
????NioEventLoopGroup?eventExecutors?=?new?NioEventLoopGroup();??
????//?引導(dǎo)器引導(dǎo)啟動??
????bootstrap.group(eventExecutors)??
????????????.channel(NioSocketChannel.class)??
????????????.handler(new?ChannelInitializer<Channel>()?{??
???????????????? ??
????????????????protected?void?initChannel(Channel?channel)?throws?Exception?{??
????????????????????channel.pipeline().addLast(new?StringEncoder());??
????????????????}??
????????????});??
??
????//?建立通道??
????Channel?channel?=?bootstrap.connect("127.0.0.1",?8000).channel();??
??
????while?(true){??
????????channel.writeAndFlush(new?Date()?+?"?Hello?world");??
????????Thread.sleep(2000);??
????}??
??
}
客戶端的代碼中的NioEventLoopGroup
實際對應(yīng)了main函數(shù)單獨開啟的線程。上面的代碼可以完美的替代調(diào)JDK的NIO、AIO、BIO 的API,學(xué)習(xí)成本大大降低,Netty為使用者做了大量的“準(zhǔn)備”工作,提供了很多"開箱即用"的功能,非常方便。
Netty的服務(wù)端和客戶端的入門程序代碼是分析源碼的開始,這部分代碼需要有較深的印象。
問題
摘錄部分Netty入門級別的八股。
Linux網(wǎng)絡(luò)編程中的五種I/O模型
關(guān)鍵點:
不同的角度理解IO模型的概念會有變化。注意本部分站在用戶程序和內(nèi)核的網(wǎng)絡(luò)IO交互的角度理解的。
權(quán)威:
RFC標(biāo)準(zhǔn)
書籍 《UNIX Network Programming》(中文名《UNIX網(wǎng)絡(luò)編程-卷一》)第六章。
下面部分總結(jié)自:《UNIX Network Programming》(中文名《UNIX網(wǎng)絡(luò)編程-卷一》)
1)阻塞式I/O
注意原書中阻塞式I/O給出的例子是UDP而不是TCP的例子。recvfrom 函數(shù)可以看作是系統(tǒng)調(diào)用,在阻塞I/O模型中,recvfrom 的系統(tǒng)調(diào)用要等待內(nèi)核把數(shù)據(jù)從內(nèi)核態(tài)拷貝到用戶的緩沖池或者發(fā)生錯誤的時候(比如信號中斷)才進(jìn)行返回。recvfrom 收到數(shù)據(jù)之后再執(zhí)行數(shù)據(jù)處理。

2)非阻塞式I/O
recvfrom 的系統(tǒng)調(diào)用會在設(shè)置非阻塞的時候,會要求內(nèi)核在無數(shù)據(jù)的時候返回錯誤,所以前面三次都是錯誤調(diào)用,在第四次調(diào)用之后此時recvfrom輪詢到數(shù)據(jù),于是開始正常的等待內(nèi)核把數(shù)據(jù)復(fù)制到用戶進(jìn)程緩存。
此處輪詢的定義為:對于描述符進(jìn)行recvfrom循環(huán)調(diào)用,會增加CPU的開銷。注意非阻塞的輪詢不一定要比阻塞等待要強(qiáng),有時候甚至?xí)袩o意義的開銷反而不如阻塞。

3)I/O復(fù)用(select,poll,epoll...)
I/O多路復(fù)用是阻塞在select,epoll這樣的系統(tǒng)調(diào)用,沒有阻塞在真正的I/O系統(tǒng)調(diào)用如recvfrom。進(jìn)程受阻于select,等待可能多個套接口中的任一個變?yōu)榭勺x。
IO多路復(fù)用最大的區(qū)別是使用兩個系統(tǒng)調(diào)用(select和recvfrom)。Blocking IO(BIO)只調(diào)用了一個系統(tǒng)調(diào)用(recvfrom)。
select/epoll 核心是可以同時處理多個 connection,但是這并不一定提升效率,連接數(shù)不高的話性能不一定比多線程+阻塞IO好。但是連接數(shù)比較龐大之后會有顯著的差距。
多路復(fù)用模型中,每一個socket都需要設(shè)置為non-blocking,否則是無法進(jìn)行elect的。
listenerChannel.configureBlocking(false);
這個設(shè)置的意義就在于此。

4)信號驅(qū)動式I/O(SIGIO)
信號驅(qū)動的優(yōu)勢是等待數(shù)據(jù)報到之前進(jìn)程不被阻塞,主循環(huán)可以繼續(xù)執(zhí)行,等待信號到來即可,注意這里有可能是數(shù)據(jù)已經(jīng)準(zhǔn)備好被處理,或者數(shù)據(jù)復(fù)制完成可以準(zhǔn)備讀取。
信號驅(qū)動IO 也是同步模型,雖然可以通過信號的方式減少交互,但是系統(tǒng)調(diào)用過程當(dāng)中依然需要進(jìn)行等待,內(nèi)核也依然是通知何時開啟一個IO操作,和前面介紹的IO模型對比發(fā)現(xiàn)優(yōu)勢并不明顯。

5)異步I/O(POSIX的aio_系列函數(shù))

核心: Future-Listener機(jī)制
IO操作分為兩步
發(fā)起IO請求,等待數(shù)據(jù)準(zhǔn)備(Waiting for the data to be ready)
實際的IO操作,將數(shù)據(jù)從內(nèi)核拷貝到進(jìn)程中(Copying the data from the kernel to the process)
前四種IO模型都是同步IO操作,主要的區(qū)別在于第一階段處理方式,而他們的第二階段是一樣的:在數(shù)據(jù)從內(nèi)核復(fù)制到應(yīng)用緩沖區(qū)期間(用戶空間),進(jìn)程阻塞于recvfrom調(diào)用或者select() 函數(shù)。 異步I/O模型內(nèi)在這兩個階段都要(自行)處理。
阻塞IO和非阻塞IO的區(qū)別在于第一步,發(fā)起IO請求是否會被阻塞,如果阻塞直到完成那么就是傳統(tǒng)的阻塞IO,如果不阻塞,那么就是非阻塞IO。
同步IO和異步IO的區(qū)別就在于第二個步驟是否阻塞,如果實際的IO讀寫阻塞請求進(jìn)程,那么就是同步IO,因此阻塞IO、非阻塞IO、IO復(fù)用、信號驅(qū)動IO都是同步IO,如果不阻塞,而是操作系統(tǒng)幫你做完IO操作再將結(jié)果返回給你,那么就是異步IO。
異步IO模型非常像是我們?nèi)粘|c外賣,我們時不時看看配送進(jìn)度就是在“輪詢”,當(dāng)外賣員把外賣送到指定位置打電話通知我們?nèi)ツ眉纯伞?/p>
交互幾個核心點
再次強(qiáng)調(diào)是針對用戶程序和內(nèi)核的網(wǎng)絡(luò)IO交互角度理解的。
阻塞非阻塞說的是線程的狀態(tài)(重要)
同步和異步說的是消息的通知機(jī)制(重要) ?- 同步需要主動讀寫數(shù)據(jù),異步是不需要主動讀寫數(shù)據(jù) ?- 同步IO和異步IO是針對用戶應(yīng)用程序和內(nèi)核的交互
為什么Netty使用NIO而不是AIO?
Netty不看重Windows上的使用,在Linux系統(tǒng)上,AIO的底層實現(xiàn)仍使用EPOLL,沒有很好實現(xiàn)AIO,因此在性能上沒有明顯的優(yōu)勢,而且被JDK封裝了一層不容易深度優(yōu)化。
Netty整體架構(gòu)是reactor模型, 而AIO是proactor模型, 混合在一起會非常混亂,把AIO也改造成reactor模型看起來是把epoll繞個彎又繞回來
AIO還有個缺點是接收數(shù)據(jù)需要預(yù)先分配緩存, 而不是NIO那種需要接收時才需要分配緩存, 所以對連接數(shù)量非常大但流量小的情況, 內(nèi)存浪費很多
Linux上AIO不夠成熟,處理回調(diào)結(jié)果速度跟不到處理需求,比如外賣員太少,顧客太多,供不應(yīng)求,造成處理速度有瓶頸(待驗證)。
結(jié)論
Netty整體架構(gòu)是reactor模型,采用epoll機(jī)制,所以往深的說,還是IO多路復(fù)用模式,所以也可說netty是同步非阻塞模型(看的層次不一樣),只不過實際是異步IO。
Netty 應(yīng)用場景了解么?
作為 RPC 框架的網(wǎng)絡(luò)通信工具。分布式系統(tǒng)之間的服務(wù)器通信可以使用Netty完成,雖然是Java編寫的框架,但是性能非常接近 C 和C++ 執(zhí)行效率。
消息隊列:比如大名鼎鼎的RocketMq底層完全依賴Netty,編程人員不需要很強(qiáng)的并發(fā)編程功底也可以快速上手和維護(hù)代碼。
實現(xiàn)一個即時通訊系統(tǒng):正好和本書應(yīng)用場景重合了。
介紹Netty
簡短介紹
Netty是一個高性能、異步、NIO編程模型的網(wǎng)絡(luò)編程框架。它提供了簡單易用的API,可以快速地開發(fā)各種網(wǎng)絡(luò)應(yīng)用程序,如客戶端、服務(wù)器和協(xié)議實現(xiàn)等。同時,Netty還具有良好的可擴(kuò)展性和靈活性,支持多種傳輸協(xié)議和編解碼器。
稍微復(fù)雜一點
Netty是由JBOSS提供的一個java開源框架, 是業(yè)界最流行的NIO框架,整合了多種協(xié)議( 包括FTP、SMTP、HTTP等各種二進(jìn)制文本協(xié)議)的實現(xiàn)經(jīng)驗,精心設(shè)計的框架,在多個大型商業(yè)項目中得到充分驗證。 1)API使用簡單 2)成熟、穩(wěn)定 3)社區(qū)活躍 有很多種NIO框架 如mina 4)經(jīng)過大規(guī)模的驗證(互聯(lián)網(wǎng)、大數(shù)據(jù)、網(wǎng)絡(luò)游戲、電信通信行業(yè))。
總結(jié)
開篇簡單介紹了JDK的BIO、NIO和AIO,三者不僅出現(xiàn)時間跨度大,三個團(tuán)隊編寫,和JDK的IO編程一樣晦澀難懂和不好用,開發(fā)人員需要花大量事件學(xué)習(xí)底層細(xì)節(jié)。
用洗衣機(jī)的例子,理解網(wǎng)絡(luò)編程模型的重要概念:同步、非同步、阻塞、非阻塞。從入門的角度來看,同步和異步可以認(rèn)為是否是由客戶端主動獲取數(shù)據(jù),而阻塞和非阻塞則是客戶端是否需要拿到結(jié)果進(jìn)行處理,兩者是相輔相成的。
Netty 編程模型統(tǒng)一了JDK的編程模型,降低了學(xué)習(xí)成本,同時效率比原生JDK更高,并且解決了NIO 中的空輪詢問題。
Netty 底層實際上和JDK的網(wǎng)絡(luò)編程模型密切相關(guān),從案例代碼可以看到Netty的客戶端API代碼可以直接往NIO的Server發(fā)送數(shù)據(jù)。
補(bǔ)充書中沒有介紹的AIO編程模型,用ChatGPT 生成的代碼簡單易懂。
最后補(bǔ)充有關(guān)Netty的問題。
寫在最后
開篇部分補(bǔ)充了書中沒介紹的一些網(wǎng)絡(luò)編程模型的基本概念,以及在最后關(guān)聯(lián)了些相關(guān)書籍的知識點和,最后順帶歸納了一些八股問題,當(dāng)然最為重要的部分是熟悉Netty的入門程序代碼。
開篇入門篇到此就結(jié)束了,如果內(nèi)容描述有誤,歡迎評論或者私信留言。
參考
《跟閃電俠學(xué)Netty》開篇:Netty是什么? - 簡書 (jianshu.com)
網(wǎng)課專欄:《高并發(fā)系列之百萬連接Netty實戰(zhàn)課程》
Netty 書籍推薦
《Netty權(quán)威指南》
《Netty進(jìn)階之路》