在安卓中使用netty進(jìn)行通信

共有四個(gè)文件 兩個(gè)服務(wù)器的 兩個(gè)客戶(hù)端的
1/2服務(wù)器 NettyServerBootstrap
package com.example.c2534.myapplication2.netty_from8391.server;
import com.example.c2534.myapplication2.Const.Const;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServerBootstrap {
private int port;
public NettyServerBootstrap(int port) throws InterruptedException {
this.port = port;
bind();
}
private void bind() throws InterruptedException {
EventLoopGroup boss=new NioEventLoopGroup();
EventLoopGroup worker=new NioEventLoopGroup();
ServerBootstrap bootstrap=new ServerBootstrap();
bootstrap.group(boss,worker);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG, 128);
//通過(guò)NoDelay禁用Nagle,使消息立即發(fā)出去,不用等待到一定的數(shù)據(jù)量才發(fā)出去
bootstrap.option(ChannelOption.TCP_NODELAY, true);
//保持長(zhǎng)連接狀態(tài)
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new NettyServerHandler());
}
});
ChannelFuture f= bootstrap.bind(port).sync();
if(f.isSuccess()){
System.out.println("服務(wù)器成功啟動(dòng)---------------");
}
}
//開(kāi)啟服務(wù)器端口監(jiān)聽(tīng)
public static void main(String []args) throws InterruptedException {
NettyServerBootstrap bootstrap=new NettyServerBootstrap(Const.TCP_PORT);
}
}
2/2服務(wù)器 NettyServerHandle
package com.example.c2534.myapplication2.netty_from8391.server;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class NettyServerHandler extends SimpleChannelInboundHandler<Object> {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//channel失效,從Map中移除
//NettyChannelMap.remove((SocketChannel)ctx.channel());
System.out.println("我是服務(wù)器 偵測(cè)到一個(gè)不活躍的頻道");
}
//這里是從客戶(hù)端過(guò)來(lái)的消息
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object baseMsg) {
String msg1=((ByteBuf)baseMsg).toString(CharsetUtil.UTF_8).trim();
System.out.println("我是服務(wù)器 收到= "+msg1);
ByteBuf bb = Unpooled.wrappedBuffer("pong".getBytes(CharsetUtil.UTF_8));
System.out.println("我是服務(wù)器 發(fā)送= pong");
channelHandlerContext.writeAndFlush(bb);
//ReferenceCountUtil.release(baseMsg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {
super.exceptionCaught(ctx, cause);
System.out.println("我是服務(wù)器 出現(xiàn)異常了?。。?quot;);
}
}
1/2客戶(hù)端 NettyClientBootstrap
package com.example.c2534.myapplication2.netty_from8391;
import com.example.c2534.myapplication2.Const.Const;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
public class NettyClientBootstrap {
private int port= Const.TCP_PORT;
private String host= Const.HOST;
public SocketChannel socketChannel;
public void startNetty() throws InterruptedException {
System.out.println("長(zhǎng)鏈接開(kāi)始");
if(start()){
System.out.println("長(zhǎng)鏈接成功");
ByteBuf bb = Unpooled.wrappedBuffer(("我是客戶(hù)端,連接成功后我發(fā)送這一條信息".getBytes(CharsetUtil.UTF_8)));
socketChannel.writeAndFlush(bb);
}
}
public void send2server(String msg){
ByteBuf bb = Unpooled.wrappedBuffer((msg.getBytes(CharsetUtil.UTF_8)));
socketChannel.writeAndFlush(bb);
}
private Boolean start() throws InterruptedException {
EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
Bootstrap bootstrap=new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
bootstrap.group(eventLoopGroup);
bootstrap.remoteAddress(host, port);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new IdleStateHandler(20, 10, 0));
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
ChannelFuture future = null ;
try {
future =bootstrap.connect(new InetSocketAddress(host,port)).sync();
if (future.isSuccess()) {
socketChannel = (SocketChannel)future.channel();
System.out.println("連接服務(wù)器 成功---------");
return true;
}else{
System.out.println("連接服務(wù)器 失敗---------");
startNetty();
return false;
}
} catch (Exception e) {
System.out.println("無(wú)法連接----------------5秒后重試");
//這里最好暫停一下。不然會(huì)基本屬于毫秒時(shí)間內(nèi)執(zhí)行很多次。
//造成重連失敗
TimeUnit.SECONDS.sleep(5);
startNetty();
return false;
}
}
}
2/2客戶(hù)端 NettyClientHandle
package com.example.c2534.myapplication2.netty_from8391;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
public class NettyClientHandler extends SimpleChannelInboundHandler<Object> {
//設(shè)置心跳時(shí)間 開(kāi)始
public static final int MIN_CLICK_DELAY_TIME = 1000*30;
private long lastClickTime =0;
//設(shè)置心跳時(shí)間 結(jié)束
//利用寫(xiě)空閑發(fā)送心跳檢測(cè)消息
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
switch (e.state()) {
case WRITER_IDLE:
long currentTime = System.currentTimeMillis();
if(currentTime - lastClickTime > MIN_CLICK_DELAY_TIME){
lastClickTime = System.currentTimeMillis();
ByteBuf bb = Unpooled.wrappedBuffer("ping".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(bb);
System.out.println("向服務(wù)器發(fā)送 ping ");
}
break;
default:
break;
}
}
}
//這里是接受服務(wù)端發(fā)送過(guò)來(lái)的消息
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object baseMsg) throws Exception {
String msg1=((ByteBuf)baseMsg).toString(CharsetUtil.UTF_8).trim();
System.out.println("接受服務(wù)端發(fā)送過(guò)來(lái)的消息= "+msg1);
ReferenceCountUtil.release(msg1);
}
NettyClientBootstrap nettyClient=new NettyClientBootstrap();
//這里是斷線(xiàn)要進(jìn)行的操作
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
System.out.println("重連了。---------");
//這里最好暫停一下。不然會(huì)基本屬于毫秒時(shí)間內(nèi)執(zhí)行很多次。
//造成重連失敗
TimeUnit.SECONDS.sleep(5);
nettyClient.startNetty();
//ctx.channel().eventLoop().schedule();
}
//這里是出現(xiàn)異常的話(huà)要進(jìn)行的操作
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {
super.exceptionCaught(ctx, cause);
System.out.println("出現(xiàn)異常了。。。。。。。。。。。。。");
TimeUnit.SECONDS.sleep(10);
//nettyClient.startNetty(context);
cause.printStackTrace();
}
}