SpringBoot+Netty+WebSocket 實現(xiàn)消息推送
關(guān)于Netty
Netty 是一個利用 Java 的高級網(wǎng)絡(luò)的能力,隱藏其背后的復(fù)雜性而提供一個易于使用的 API 的客戶端/服務(wù)器框架。
Maven依賴
<dependencies>
?<!--?https://mvnrepository.com/artifact/io.netty/netty-all?-->
?<dependency>
??<groupId>io.netty</groupId>
??<artifactId>netty-all</artifactId>
??<version>4.1.36.Final</version>
?</dependency>
</dependencies>
SpringBootApplication
啟動器中需要new一個NettyServer,并顯式調(diào)用啟動netty。
@SpringBootApplication
public?class?SpringCloudStudyDemoApplication?{
?public?static?void?main(String[]?args)?{
??SpringApplication.run(SpringCloudStudyDemoApplication.class,args);
??try?{
???new?NettyServer(12345).start();
???System.out.println("https://blog.csdn.net/moshowgame");
???System.out.println("http://127.0.0.1:6688/netty-websocket/index");
??}catch(Exception?e)?{
???System.out.println("NettyServerError:"+e.getMessage());
??}
?}
}
NettyServer
啟動的NettyServer,這里進行配置
/**
?*?NettyServer?Netty服務(wù)器配置
?*?@author?zhengkai.blog.csdn.net
?*?@date?2019-06-12
?*/
public?class?NettyServer?{
????private?final?int?port;
?
????public?NettyServer(int?port)?{
????????this.port?=?port;
????}
?
????public?void?start()?throws?Exception?{
????????EventLoopGroup?bossGroup?=?new?NioEventLoopGroup();
?
????????EventLoopGroup?group?=?new?NioEventLoopGroup();
????????try?{
????????????ServerBootstrap?sb?=?new?ServerBootstrap();
????????????sb.option(ChannelOption.SO_BACKLOG,?1024);
????????????sb.group(group,?bossGroup)?//?綁定線程池
????????????????????.channel(NioServerSocketChannel.class)?//?指定使用的channel
????????????????????.localAddress(this.port)//?綁定監(jiān)聽端口
????????????????????.childHandler(new?ChannelInitializer<SocketChannel>()?{?//?綁定客戶端連接時候觸發(fā)操作
?
????????????????????????@Override
????????????????????????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
????????????????????????????System.out.println("收到新連接");
????????????????????????????//websocket協(xié)議本身是基于http協(xié)議的,所以這邊也要使用http解編碼器
????????????????????????????ch.pipeline().addLast(new?HttpServerCodec());
????????????????????????????//以塊的方式來寫的處理器
????????????????????????????ch.pipeline().addLast(new?ChunkedWriteHandler());
????????????????????????????ch.pipeline().addLast(new?HttpObjectAggregator(8192));
????????????????????????????ch.pipeline().addLast(new?WebSocketServerProtocolHandler("/ws",?null,?true,?65536?*?10));
????????????????????????????ch.pipeline().addLast(new?MyWebSocketHandler());
????????????????????????}
????????????????????});
????????????ChannelFuture?cf?=?sb.bind().sync();?//?服務(wù)器異步創(chuàng)建綁定
????????????System.out.println(NettyServer.class?+?"?啟動正在監(jiān)聽:?"?+?cf.channel().localAddress());
????????????cf.channel().closeFuture().sync();?//?關(guān)閉服務(wù)器通道
????????}?finally?{
????????????group.shutdownGracefully().sync();?//?釋放線程池資源
????????????bossGroup.shutdownGracefully().sync();
????????}
????}
}
MyChannelHandlerPool
通道組池,管理所有websocket連接
/**
?*?MyChannelHandlerPool
?*?通道組池,管理所有websocket連接
?*?@author?zhengkai.blog.csdn.net
?*?@date?2019-06-12
?*/
public?class?MyChannelHandlerPool?{
????public?MyChannelHandlerPool(){}
????public?static?ChannelGroup?channelGroup?=?new?DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
MyWebSocketHandler
處理ws一下幾種情況:
channelActive與客戶端建立連接
channelInactive與客戶端斷開連接
channelRead0客戶端發(fā)送消息處理
/**
?*?NettyServer?Netty服務(wù)器配置
?*?@author?zhengkai.blog.csdn.net
?*?@date?2019-06-12
?*/
public?class?NettyServer?{
????private?final?int?port;
?
????public?NettyServer(int?port)?{
????????this.port?=?port;
????}
?
????public?void?start()?throws?Exception?{
????????EventLoopGroup?bossGroup?=?new?NioEventLoopGroup();
?
????????EventLoopGroup?group?=?new?NioEventLoopGroup();
????????try?{
????????????ServerBootstrap?sb?=?new?ServerBootstrap();
????????????sb.option(ChannelOption.SO_BACKLOG,?1024);
????????????sb.group(group,?bossGroup)?//?綁定線程池
????????????????????.channel(NioServerSocketChannel.class)?//?指定使用的channel
????????????????????.localAddress(this.port)//?綁定監(jiān)聽端口
????????????????????.childHandler(new?ChannelInitializer<SocketChannel>()?{?//?綁定客戶端連接時候觸發(fā)操作
?
????????????????????????@Override
????????????????????????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
????????????????????????????System.out.println("收到新連接");
????????????????????????????//websocket協(xié)議本身是基于http協(xié)議的,所以這邊也要使用http解編碼器
????????????????????????????ch.pipeline().addLast(new?HttpServerCodec());
????????????????????????????//以塊的方式來寫的處理器
????????????????????????????ch.pipeline().addLast(new?ChunkedWriteHandler());
????????????????????????????ch.pipeline().addLast(new?HttpObjectAggregator(8192));
????????????????????????????ch.pipeline().addLast(new?WebSocketServerProtocolHandler("/ws",?"WebSocket",?true,?65536?*?10));
????????????????????????????ch.pipeline().addLast(new?MyWebSocketHandler());
????????????????????????}
????????????????????});
????????????ChannelFuture?cf?=?sb.bind().sync();?//?服務(wù)器異步創(chuàng)建綁定
????????????System.out.println(NettyServer.class?+?"?啟動正在監(jiān)聽:?"?+?cf.channel().localAddress());
????????????cf.channel().closeFuture().sync();?//?關(guān)閉服務(wù)器通道
????????}?finally?{
????????????group.shutdownGracefully().sync();?//?釋放線程池資源
????????????bossGroup.shutdownGracefully().sync();
????????}
????}
}
socket.html
主要是連接ws,發(fā)送消息,以及消息反饋
<!DOCTYPE?html?PUBLIC?"-//W3C//DTD?XHTML?1.0?Transitional//EN"?"http://www.w3.org/TR/xhtml1/DTD/xhtml1transitional.dtd">
<html?xmlns="http://www.w3.org/1999/xhtml">
<head>
????<meta?http-equiv="Content-Type"?content="text/html;?charset=utf-8"?/>
????<title>Netty-Websocket</title>
????<script?type="text/javascript">
????????//?by?zhengkai.blog.csdn.net
????????var?socket;
????????if(!window.WebSocket){
????????????window.WebSocket?=?window.MozWebSocket;
????????}
????????if(window.WebSocket){
????????????socket?=?new?WebSocket("ws://127.0.0.1:12345/ws");
????????????socket.onmessage?=?function(event){
????????????????var?ta?=?document.getElementById('responseText');
????????????????ta.value?+=?event.data+"\r\n";
????????????};
????????????socket.onopen?=?function(event){
????????????????var?ta?=?document.getElementById('responseText');
????????????????ta.value?=?"Netty-WebSocket服務(wù)器。。。。。。連接??\r\n";
????????????};
????????????socket.onclose?=?function(event){
????????????????var?ta?=?document.getElementById('responseText');
????????????????ta.value?=?"Netty-WebSocket服務(wù)器。。。。。。關(guān)閉?\r\n";
????????????};
????????}else{
????????????alert("您的瀏覽器不支持WebSocket協(xié)議!");
????????}
????????function?send(message){
????????????if(!window.WebSocket){return;}
????????????if(socket.readyState?==?WebSocket.OPEN){
????????????????socket.send(message);
????????????}else{
????????????????alert("WebSocket 連接沒有建立成功!");
????????????}
?
????????}
?
????</script>
</head>
<body>
<form?onSubmit="return?false;">
????<label>ID</label><input?type="text"?name="uid"?value="${uid!!}"?/>?<br?/>
????<label>TEXT</label><input?type="text"?name="message"?value="這里輸入消息"?/>?<br?/>
????<br?/>?<input?type="button"?value="發(fā)送ws消息"
??????????????????onClick="send(this.form.uid.value+':'+this.form.message.value)"?/>
????<hr?color="black"?/>
????<h3>服務(wù)端返回的應(yīng)答消息</h3>
????<textarea?id="responseText"?style="width:?1024px;height:?300px;"></textarea>
</form>
</body>
</html>
Controller
寫好了html當然還需要一個controller來引導(dǎo)頁面。
@RestController
public?class?IndexController?{
?
?@GetMapping("/index")
?public?ModelAndView??index(){
??ModelAndView?mav=new?ModelAndView("socket");
??mav.addObject("uid",?RandomUtil.randomNumbers(6));
??return?mav;
?}
?
}
效果演示



改造netty支持url參數(shù)
1.首先,調(diào)整一下加載handler的順序,優(yōu)先MyWebSocketHandler
在WebSocketServerProtocolHandler
之上。
ch.pipeline().addLast(new?MyWebSocketHandler());
ch.pipeline().addLast(new?WebSocketServerProtocolHandler("/ws",?null,?true,?65536?*?10));
2.其次,改造MyWebSocketHandler
?的channelRead
方法,首次連接會是一個FullHttpRequest
類型,可以通過FullHttpRequest.uri()
獲取完整ws的URL地址,之后接受信息的話,會是一個TextWebSocketFrame
類型。
public?class?MyWebSocketHandler?extends?SimpleChannelInboundHandler<TextWebSocketFrame>?{
????@Override
????public?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{
????????System.out.println("與客戶端建立連接,通道開啟!");
????????//添加到channelGroup通道組
????????MyChannelHandlerPool.channelGroup.add(ctx.channel());
????}
????@Override
????public?void?channelInactive(ChannelHandlerContext?ctx)?throws?Exception?{
????????System.out.println("與客戶端斷開連接,通道關(guān)閉!");
????????//添加到channelGroup?通道組
????????MyChannelHandlerPool.channelGroup.remove(ctx.channel());
????}
????@Override
????public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)?throws?Exception?{
????????//首次連接是FullHttpRequest,處理參數(shù)?by?zhengkai.blog.csdn.net
????????if?(null?!=?msg?&&?msg?instanceof?FullHttpRequest)?{
????????????FullHttpRequest?request?=?(FullHttpRequest)?msg;
????????????String?uri?=?request.uri();
????????????Map?paramMap=getUrlParams(uri);
????????????System.out.println("接收到的參數(shù)是:"+JSON.toJSONString(paramMap));
????????????//如果url包含參數(shù),需要處理
????????????if(uri.contains("?")){
????????????????String?newUri=uri.substring(0,uri.indexOf("?"));
????????????????System.out.println(newUri);
????????????????request.setUri(newUri);
????????????}
????????}else?if(msg?instanceof?TextWebSocketFrame){
????????????//正常的TEXT消息類型
????????????TextWebSocketFrame?frame=(TextWebSocketFrame)msg;
????????????System.out.println("客戶端收到服務(wù)器數(shù)據(jù):"?+frame.text());
????????????sendAllMessage(frame.text());
????????}
????????super.channelRead(ctx,?msg);
????}
????@Override
????protected?void?channelRead0(ChannelHandlerContext?channelHandlerContext,?TextWebSocketFrame?textWebSocketFrame)?throws?Exception?{
????}
????private?void?sendAllMessage(String?message){
????????//收到信息后,群發(fā)給所有channel
????????MyChannelHandlerPool.channelGroup.writeAndFlush(?new?TextWebSocketFrame(message));
????}
????private?static?Map?getUrlParams(String?url){
????????Map<String,String>?map?=?new?HashMap<>();
????????url?=?url.replace("?",";");
????????if?(!url.contains(";")){
????????????return?map;
????????}
????????if?(url.split(";").length?>?0){
????????????String[]?arr?=?url.split(";")[1].split("&");
????????????for?(String?s?:?arr){
????????????????String?key?=?s.split("=")[0];
????????????????String?value?=?s.split("=")[1];
????????????????map.put(key,value);
????????????}
????????????return??map;
????????}else{
????????????return?map;
????????}
????}
}
3.html中的ws地址也進行改造
socket?=?new?WebSocket("ws://127.0.0.1:12345/ws?uid=666&gid=777");
4.改造后控制臺輸出情況
收到新連接
與客戶端建立連接,通道開啟!
接收到的參數(shù)是:{"uid":"666","gid":"777"}
/ws
客戶端收到服務(wù)器數(shù)據(jù):142531:這里輸入消息
客戶端收到服務(wù)器數(shù)據(jù):142531:這里輸入消息
客戶端收到服務(wù)器數(shù)據(jù):142531:這里輸入消息
failed: WebSocket opening handshake timed out
聽說是ssl wss的情況下才會出現(xiàn),來自 @around-gao 的解決方法:
把MyWebSocketHandler
和WebSocketServerProtocolHandler
調(diào)下順序就好了。