最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會員登陸 & 注冊

SpringBoot+Netty+WebSocket 實現(xiàn)消息推送

2022-12-24 14:22 作者:網(wǎng)星軟件  | 我要投稿

關(guān)于Netty

Netty 是一個利用 Java 的高級網(wǎng)絡(luò)的能力,隱藏其背后的復(fù)雜性而提供一個易于使用的 API 的客戶端/服務(wù)器框架。

Maven依賴

<dependencies>
?<!--?https://mvnrepository.com/artifact/io.netty/netty-all?-->
?<dependency>
??<groupId>io.netty</groupId>
??<artifactId>netty-all</artifactId>
??<version>4.1.36.Final</version>
?</dependency>

</dependencies>


SpringBootApplication

啟動器中需要new一個NettyServer,并顯式調(diào)用啟動netty。

@SpringBootApplication
public?class?SpringCloudStudyDemoApplication?{
?public?static?void?main(String[]?args)?{
??SpringApplication.run(SpringCloudStudyDemoApplication.class,args);
??try?{
???new?NettyServer(12345).start();
???System.out.println("https://blog.csdn.net/moshowgame");
???System.out.println("http://127.0.0.1:6688/netty-websocket/index");
??}catch(Exception?e)?{
???System.out.println("NettyServerError:"+e.getMessage());
??}
?}
}


NettyServer

啟動的NettyServer,這里進行配置

/**
?*?NettyServer?Netty服務(wù)器配置
?*?@author?zhengkai.blog.csdn.net
?*?@date?2019-06-12
?*/
public?class?NettyServer?{
????private?final?int?port;
?
????public?NettyServer(int?port)?{
????????this.port?=?port;
????}
?
????public?void?start()?throws?Exception?{
????????EventLoopGroup?bossGroup?=?new?NioEventLoopGroup();
?
????????EventLoopGroup?group?=?new?NioEventLoopGroup();
????????try?{
????????????ServerBootstrap?sb?=?new?ServerBootstrap();
????????????sb.option(ChannelOption.SO_BACKLOG,?1024);
????????????sb.group(group,?bossGroup)?//?綁定線程池
????????????????????.channel(NioServerSocketChannel.class)?//?指定使用的channel
????????????????????.localAddress(this.port)//?綁定監(jiān)聽端口
????????????????????.childHandler(new?ChannelInitializer<SocketChannel>()?{?//?綁定客戶端連接時候觸發(fā)操作
?
????????????????????????@Override
????????????????????????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
????????????????????????????System.out.println("收到新連接");
????????????????????????????//websocket協(xié)議本身是基于http協(xié)議的,所以這邊也要使用http解編碼器
????????????????????????????ch.pipeline().addLast(new?HttpServerCodec());
????????????????????????????//以塊的方式來寫的處理器
????????????????????????????ch.pipeline().addLast(new?ChunkedWriteHandler());
????????????????????????????ch.pipeline().addLast(new?HttpObjectAggregator(8192));
????????????????????????????ch.pipeline().addLast(new?WebSocketServerProtocolHandler("/ws",?null,?true,?65536?*?10));
????????????????????????????ch.pipeline().addLast(new?MyWebSocketHandler());
????????????????????????}
????????????????????});
????????????ChannelFuture?cf?=?sb.bind().sync();?//?服務(wù)器異步創(chuàng)建綁定
????????????System.out.println(NettyServer.class?+?"?啟動正在監(jiān)聽:?"?+?cf.channel().localAddress());
????????????cf.channel().closeFuture().sync();?//?關(guān)閉服務(wù)器通道
????????}?finally?{
????????????group.shutdownGracefully().sync();?//?釋放線程池資源
????????????bossGroup.shutdownGracefully().sync();
????????}
????}
}


MyChannelHandlerPool

通道組池,管理所有websocket連接

/**
?*?MyChannelHandlerPool
?*?通道組池,管理所有websocket連接
?*?@author?zhengkai.blog.csdn.net
?*?@date?2019-06-12
?*/
public?class?MyChannelHandlerPool?{

????public?MyChannelHandlerPool(){}

????public?static?ChannelGroup?channelGroup?=?new?DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

}


MyWebSocketHandler

處理ws一下幾種情況:

  • channelActive與客戶端建立連接

  • channelInactive與客戶端斷開連接

  • channelRead0客戶端發(fā)送消息處理

/**
?*?NettyServer?Netty服務(wù)器配置
?*?@author?zhengkai.blog.csdn.net
?*?@date?2019-06-12
?*/
public?class?NettyServer?{
????private?final?int?port;
?
????public?NettyServer(int?port)?{
????????this.port?=?port;
????}
?
????public?void?start()?throws?Exception?{
????????EventLoopGroup?bossGroup?=?new?NioEventLoopGroup();
?
????????EventLoopGroup?group?=?new?NioEventLoopGroup();
????????try?{
????????????ServerBootstrap?sb?=?new?ServerBootstrap();
????????????sb.option(ChannelOption.SO_BACKLOG,?1024);
????????????sb.group(group,?bossGroup)?//?綁定線程池
????????????????????.channel(NioServerSocketChannel.class)?//?指定使用的channel
????????????????????.localAddress(this.port)//?綁定監(jiān)聽端口
????????????????????.childHandler(new?ChannelInitializer<SocketChannel>()?{?//?綁定客戶端連接時候觸發(fā)操作
?
????????????????????????@Override
????????????????????????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
????????????????????????????System.out.println("收到新連接");
????????????????????????????//websocket協(xié)議本身是基于http協(xié)議的,所以這邊也要使用http解編碼器
????????????????????????????ch.pipeline().addLast(new?HttpServerCodec());
????????????????????????????//以塊的方式來寫的處理器
????????????????????????????ch.pipeline().addLast(new?ChunkedWriteHandler());
????????????????????????????ch.pipeline().addLast(new?HttpObjectAggregator(8192));
????????????????????????????ch.pipeline().addLast(new?WebSocketServerProtocolHandler("/ws",?"WebSocket",?true,?65536?*?10));
????????????????????????????ch.pipeline().addLast(new?MyWebSocketHandler());
????????????????????????}
????????????????????});
????????????ChannelFuture?cf?=?sb.bind().sync();?//?服務(wù)器異步創(chuàng)建綁定
????????????System.out.println(NettyServer.class?+?"?啟動正在監(jiān)聽:?"?+?cf.channel().localAddress());
????????????cf.channel().closeFuture().sync();?//?關(guān)閉服務(wù)器通道
????????}?finally?{
????????????group.shutdownGracefully().sync();?//?釋放線程池資源
????????????bossGroup.shutdownGracefully().sync();
????????}
????}
}


socket.html

主要是連接ws,發(fā)送消息,以及消息反饋

<!DOCTYPE?html?PUBLIC?"-//W3C//DTD?XHTML?1.0?Transitional//EN"?"http://www.w3.org/TR/xhtml1/DTD/xhtml1transitional.dtd">
<html?xmlns="http://www.w3.org/1999/xhtml">
<head>
????<meta?http-equiv="Content-Type"?content="text/html;?charset=utf-8"?/>
????<title>Netty-Websocket</title>
????<script?type="text/javascript">
????????//?by?zhengkai.blog.csdn.net
????????var?socket;
????????if(!window.WebSocket){
????????????window.WebSocket?=?window.MozWebSocket;
????????}
????????if(window.WebSocket){
????????????socket?=?new?WebSocket("ws://127.0.0.1:12345/ws");
????????????socket.onmessage?=?function(event){
????????????????var?ta?=?document.getElementById('responseText');
????????????????ta.value?+=?event.data+"\r\n";
????????????};
????????????socket.onopen?=?function(event){
????????????????var?ta?=?document.getElementById('responseText');
????????????????ta.value?=?"Netty-WebSocket服務(wù)器。。。。。。連接??\r\n";
????????????};
????????????socket.onclose?=?function(event){
????????????????var?ta?=?document.getElementById('responseText');
????????????????ta.value?=?"Netty-WebSocket服務(wù)器。。。。。。關(guān)閉?\r\n";
????????????};
????????}else{
????????????alert("您的瀏覽器不支持WebSocket協(xié)議!");
????????}
????????function?send(message){
????????????if(!window.WebSocket){return;}
????????????if(socket.readyState?==?WebSocket.OPEN){
????????????????socket.send(message);
????????????}else{
????????????????alert("WebSocket 連接沒有建立成功!");
????????????}
?
????????}
?
????</script>
</head>
<body>
<form?onSubmit="return?false;">
????<label>ID</label><input?type="text"?name="uid"?value="${uid!!}"?/>?<br?/>
????<label>TEXT</label><input?type="text"?name="message"?value="這里輸入消息"?/>?<br?/>
????<br?/>?<input?type="button"?value="發(fā)送ws消息"
??????????????????onClick="send(this.form.uid.value+':'+this.form.message.value)"?/>
????<hr?color="black"?/>
????<h3>服務(wù)端返回的應(yīng)答消息</h3>
????<textarea?id="responseText"?style="width:?1024px;height:?300px;"></textarea>
</form>
</body>
</html>


Controller

寫好了html當然還需要一個controller來引導(dǎo)頁面。

@RestController
public?class?IndexController?{
?
?@GetMapping("/index")
?public?ModelAndView??index(){
??ModelAndView?mav=new?ModelAndView("socket");
??mav.addObject("uid",?RandomUtil.randomNumbers(6));
??return?mav;
?}
?
}


效果演示

圖片
圖片
圖片


改造netty支持url參數(shù)

1.首先,調(diào)整一下加載handler的順序,優(yōu)先MyWebSocketHandlerWebSocketServerProtocolHandler之上。

ch.pipeline().addLast(new?MyWebSocketHandler());
ch.pipeline().addLast(new?WebSocketServerProtocolHandler("/ws",?null,?true,?65536?*?10));

2.其次,改造MyWebSocketHandler?的channelRead方法,首次連接會是一個FullHttpRequest類型,可以通過FullHttpRequest.uri()獲取完整ws的URL地址,之后接受信息的話,會是一個TextWebSocketFrame類型。

public?class?MyWebSocketHandler?extends?SimpleChannelInboundHandler<TextWebSocketFrame>?{

????@Override
????public?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{
????????System.out.println("與客戶端建立連接,通道開啟!");

????????//添加到channelGroup通道組
????????MyChannelHandlerPool.channelGroup.add(ctx.channel());
????}

????@Override
????public?void?channelInactive(ChannelHandlerContext?ctx)?throws?Exception?{
????????System.out.println("與客戶端斷開連接,通道關(guān)閉!");
????????//添加到channelGroup?通道組
????????MyChannelHandlerPool.channelGroup.remove(ctx.channel());
????}

????@Override
????public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)?throws?Exception?{
????????//首次連接是FullHttpRequest,處理參數(shù)?by?zhengkai.blog.csdn.net
????????if?(null?!=?msg?&&?msg?instanceof?FullHttpRequest)?{
????????????FullHttpRequest?request?=?(FullHttpRequest)?msg;
????????????String?uri?=?request.uri();

????????????Map?paramMap=getUrlParams(uri);
????????????System.out.println("接收到的參數(shù)是:"+JSON.toJSONString(paramMap));
????????????//如果url包含參數(shù),需要處理
????????????if(uri.contains("?")){
????????????????String?newUri=uri.substring(0,uri.indexOf("?"));
????????????????System.out.println(newUri);
????????????????request.setUri(newUri);
????????????}

????????}else?if(msg?instanceof?TextWebSocketFrame){
????????????//正常的TEXT消息類型
????????????TextWebSocketFrame?frame=(TextWebSocketFrame)msg;
????????????System.out.println("客戶端收到服務(wù)器數(shù)據(jù):"?+frame.text());
????????????sendAllMessage(frame.text());
????????}
????????super.channelRead(ctx,?msg);
????}

????@Override
????protected?void?channelRead0(ChannelHandlerContext?channelHandlerContext,?TextWebSocketFrame?textWebSocketFrame)?throws?Exception?{

????}

????private?void?sendAllMessage(String?message){
????????//收到信息后,群發(fā)給所有channel
????????MyChannelHandlerPool.channelGroup.writeAndFlush(?new?TextWebSocketFrame(message));
????}

????private?static?Map?getUrlParams(String?url){
????????Map<String,String>?map?=?new?HashMap<>();
????????url?=?url.replace("?",";");
????????if?(!url.contains(";")){
????????????return?map;
????????}
????????if?(url.split(";").length?>?0){
????????????String[]?arr?=?url.split(";")[1].split("&");
????????????for?(String?s?:?arr){
????????????????String?key?=?s.split("=")[0];
????????????????String?value?=?s.split("=")[1];
????????????????map.put(key,value);
????????????}
????????????return??map;

????????}else{
????????????return?map;
????????}
????}
}

3.html中的ws地址也進行改造

socket?=?new?WebSocket("ws://127.0.0.1:12345/ws?uid=666&gid=777");

4.改造后控制臺輸出情況

收到新連接
與客戶端建立連接,通道開啟!
接收到的參數(shù)是:{"uid":"666","gid":"777"}
/ws
客戶端收到服務(wù)器數(shù)據(jù):142531:這里輸入消息
客戶端收到服務(wù)器數(shù)據(jù):142531:這里輸入消息
客戶端收到服務(wù)器數(shù)據(jù):142531:這里輸入消息

failed: WebSocket opening handshake timed out

聽說是ssl wss的情況下才會出現(xiàn),來自 @around-gao 的解決方法:

MyWebSocketHandlerWebSocketServerProtocolHandler調(diào)下順序就好了。


SpringBoot+Netty+WebSocket 實現(xiàn)消息推送的評論 (共 條)

分享到微博請遵守國家法律
双鸭山市| 象山县| 尉犁县| 临泽县| 龙井市| 什邡市| 黄平县| 中方县| 河南省| 孟村| 富川| 台东市| 南开区| 都匀市| 永修县| 建阳市| 上栗县| 诏安县| 三门县| 新沂市| 本溪市| 八宿县| 台州市| 衡南县| 宽城| 沽源县| 武隆县| 苏尼特右旗| 林州市| 琼中| 九寨沟县| 密山市| 西和县| 南雄市| 民丰县| 翁源县| 苍山县| 泸定县| 屯门区| 政和县| 元朗区|