基本功能:与客户端建立连接后立刻发送当前时间
先建立一个时间的类
package timeExample;import java.sql.Date;public class UnixTime { private final long value; public UnixTime() { this(System.currentTimeMillis()/1000L); } public UnixTime(long value) { this.value = value; } public long value() { return value; } @Override public String toString() { return new Date((value())*1000L ).toString(); } public static void main(String[] arg) { System.out.println(new UnixTime()); }}
服务端代码:
package timeExample;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToByteEncoder;public class TimeEncoder extends MessageToByteEncoder<UnixTime> { @Override protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) throws Exception { // TODO Auto-generated method stub out.writeInt((int)msg.value()); } }/** * * 这个类同样可以这样实现: * public class TimeEncoder extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { UnixTime m = (UnixTime) msg; ByteBuf encoded = ctx.alloc().buffer(4); encoded.writeInt((int)m.value()); ctx.write(encoded, promise); // (1) }} * 可以看到MessageToByteEncoder这个类封装了一些必要且固定的代码 * * * * * * * * * * * * **/
package timeExample;import java.beans.EventHandler;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelOutboundHandlerAdapter;public class TImeServerHandler extends ChannelInboundHandlerAdapter{ //之所以用ChannelActive是因为这个timeServer的作用是在连接建立后立刻给客户端发送时间,而不接收客户端发送的消息 //channelActive() method will be invoked when a connection is established and ready to generate traffic @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ChannelFuture future=ctx.writeAndFlush(new UnixTime()); //使用的是JavaNio,因此它是非阻塞的,writeAndFlush可能还未完成就返回结果,ChannelFuture是标志这个操作的状态 //因此可以添加监听器监听这个操作状态 future.addListener(ChannelFutureListener.CLOSE);//一旦监听听监听到操作完成就关闭连接。这个是下面那段代码的简写 /* future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { assert f == future; ctx.close(); } }); */ } }
package timeExample;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.util.concurrent.Future;//without receiving any requests and closes the connection once the message is sent//close the connection on completion.public class TimeServer { public int port; public TimeServer(int port) { this.port=port; } public void run() throws InterruptedException { ServerBootstrap b=new ServerBootstrap(); EventLoopGroup boss=new NioEventLoopGroup(); EventLoopGroup worker= new NioEventLoopGroup(); try { b.group(boss,worker).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new TimeEncoder(),new TImeServerHandler());// TODO Auto-generated method stub } }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture future=b.bind(port).sync(); future.channel().closeFuture().sync(); } finally { worker.shutdownGracefully(); boss.shutdownGracefully(); }} public static void main(String arg[]) throws InterruptedException { new TimeServer(10111).run(); }}
客户端代码:
package timeExample;import java.util.List;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.ByteToMessageDecoder;/** * netty将读到的消息缓存到一个buf里,有时候一个完整的消息就会被碎片化到不同的buf里了 * 使用ByteToMessageDecoder来解决这个问题 * * */public class TimeDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // TODO Auto-generated method stub if(in.readableBytes()<4) return;//每当有新的data到来时decode()方法就会被调用 UnixTime t=new UnixTime(in.readUnsignedInt()); out.add(t); }}
package timeExample;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelPromise;public class TimeClientHandler extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { UnixTime t=(UnixTime)msg; System.out.println(t); ctx.close(); }}
package timeExample;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;public class TimeClient { private String address; private int port; TimeClient(String address,int port){ this.address=address; this.port=port; } public void run() throws InterruptedException { EventLoopGroup worker=new NioEventLoopGroup(); Bootstrap b=new Bootstrap(); try { b.group(worker).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { // TODO Auto-generated method stub ch.pipeline().addLast(new TimeDecoder(),new TimeClientHandler()); } }).option(ChannelOption.SO_KEEPALIVE, true); ChannelFuture future =b.connect(address, port).sync(); future.channel().closeFuture().sync(); }finally { worker.shutdownGracefully();//当EventLoopGroup shutdown以后所有的Channel才会shutdown。 } } public static void main(String[] arg) throws InterruptedException { new TimeClient("localhost",10111).run(); }}
参考:netty官方文档:Netty.docs: User guide for 4.x