以前对接过一个tcp协议的接口,实现对类似于手机的pdt设备发送文本文字的功能,对接协议其实是文本形式的,很简单的一种协议。当初一路坎坷的对接完成,那时候实现方式也比较复杂,没有支持断连重连功能,本想着能优化一下,但是直到我从那家公司离职,也没有优化:)
想在回想起来当初实现功能的过程,比较曲折,这其中,当然是因为不熟悉多线程编程,对netty实现方式不熟悉。后续继续看netty的时候,就写下了这个demo程序,希望以后能够用到,这个demo搭建起来后,研究netty就比较方便了,如果以后有人也用netty对接什么协议接口,也是给其他人一些参考。
log4j.rootCategory=DEBUG,stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%p %t %d{yyyy-MM-dd HH:mm:ss} %C %m%n
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.29.Final</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.49</version> </dependency>
public class Client { private final Random random = new Random(); private final static Logger log = LoggerFactory.getLogger(Client.class); private final AtomicInteger requestSequence = new AtomicInteger(0); private final Timer timer = new Timer("NettyClient_scanneResponseTable", true); private Bootstrap bootstrap = new Bootstrap(); private EventLoopGroup eventLoopGroupWorker = new NioEventLoopGroup(2, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(-1); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet())); } }); private final int connectTimeout = 5000; private final String hostname; private final int port; private final String charsetName = "UTF-8"; private Channel channel; private volatile boolean inited = false; private final int sendTimeout = 5; private final int waitResponseTimeout = 10; protected final ConcurrentMap<Integer, ReponseWrapper> responseTable = new ConcurrentHashMap<Integer, ReponseWrapper>( 256); public Client(String hostname, int port) { super(); this.hostname = hostname; this.port = port; } /** * 初始化 */ private void init() { log.info("初始化"); final Charset charset = Charset.forName(this.charsetName); bootstrap.group(eventLoopGroupWorker)// .channel(NioSocketChannel.class)// .option(ChannelOption.TCP_NODELAY, true)// .option(ChannelOption.SO_KEEPALIVE, false)// .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.connectTimeout)// .handler(new ChannelInitializer<SocketChannel>() {// @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); ByteBuf delimiter = Unpooled.copiedBuffer("\n".getBytes()); pipeline.addLast(// new StringDecoder(charset), // new DelimiterBasedFrameDecoder(1024, delimiter), // new StringEncoder(charset), // new NettyClinetHandler()// ); } }); } /** * 连接服务端 */ public void connect() { if(!inited) { this.init(); } log.info("开始连接"); final ChannelFuture cf = bootstrap.connect(this.hostname, this.port); try { cf.await(this.connectTimeout, TimeUnit.SECONDS); if (cf.isSuccess()) { log.info("连接[{}]成功", cf.channel()); this.channel = cf.channel(); this.inited = true; this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { Client.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); } else { if(!inited) { //是首次连接 this.eventLoopGroupWorker.shutdownGracefully(); }else { log.info("继续重连"); this.eventLoopGroupWorker.schedule(new ReconnectTask(), nextReconnectDelayTime(), TimeUnit.SECONDS); } } } catch (InterruptedException e) { log.error("connect[{}] cause exception", cf.channel(), e); } } /** * 重连随机时间 * @return */ protected int nextReconnectDelayTime() { return this.random.nextInt(5); } /** * 断开连接 */ public void disconnect() { this.timer.cancel(); Future<?> future = this.eventLoopGroupWorker.shutdownGracefully(); try { future.await(5, TimeUnit.SECONDS); } catch (InterruptedException e) { log.error("断开连接异常",e); } this.channel.close(); } /** * 发送消息,true成功,false失败 * * @param msg * @return */ public boolean send(String msg) { final Integer seq = requestSequence.incrementAndGet(); JSONObject jsonObject = new JSONObject(); jsonObject.put("seq", seq); jsonObject.put("msg", msg); final ChannelFuture channelFuture = this.channel.writeAndFlush(jsonObject.toJSONString() + "\n"); final int timeoutMillis = (this.sendTimeout + this.waitResponseTimeout) * 1000; final ReponseWrapper rep = new ReponseWrapper(channelFuture, timeoutMillis); channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { responseTable.put(seq, rep); rep.setSendSuccess(true); rep.releaseSendMessage(); } }); try { rep.awaitSendMessage(this.sendTimeout,TimeUnit.SECONDS); rep.setSendSuccess(true); } catch (InterruptedException e) { log.error("awaitSendMessage[{}] cause exception", channelFuture.channel(), e); return false; } if (responseTable.containsKey(seq)) { try { rep.awaitResponse(this.waitResponseTimeout,TimeUnit.SECONDS); return rep.isResponseSuccess(); } catch (InterruptedException e) { log.error("awaitResponse[{}] cause exception", channelFuture.channel(), e); return false; } } else { return false; } } /** * 检测请求和响应 */ protected void scanResponseTable() { Iterator<Entry<Integer, ReponseWrapper>> it = this.responseTable.entrySet().iterator(); while (it.hasNext()) { Entry<Integer, ReponseWrapper> next = it.next(); ReponseWrapper rep = next.getValue(); long time = rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000; if (time <= System.currentTimeMillis()) { rep.releaseAll(); it.remove(); log.warn("remove timeout request, " + rep); } } } /** * 业务处理 * @param ctx * @param msg */ protected void processMessageReceived(ChannelHandlerContext ctx, String msg) { log.trace("接收消息[{}]",msg); JSONObject jsonObject = JSONObject.parseObject(msg); Integer seq = jsonObject.getInteger("seq"); final ReponseWrapper responseChannelFutureWrapper = this.responseTable.get(seq); if (responseChannelFutureWrapper != null) { responseChannelFutureWrapper.setResponseSuccess(true); responseChannelFutureWrapper.releaseResponse(); } else { log.warn("不存的请求号[{}]的消息[{}]",seq,msg); } } /** * 业务处理 */ class NettyClinetHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { processMessageReceived(ctx, msg); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("失去连接,开始准备重连[{}]",ctx.channel()); ctx.executor().schedule(new ReconnectTask(), nextReconnectDelayTime(), TimeUnit.SECONDS); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("发生异常[{}]",ctx.channel(),cause); } } /** * 重连任务 */ class ReconnectTask implements Runnable{ @Override public void run() { Client.this.connect(); } } /** * 响应 */ class ReponseWrapper { private final long beginTimestamp = System.currentTimeMillis(); private final long timeoutMillis; private final ChannelFuture channelFuture; private final CountDownLatch sendCountDownLatch = new CountDownLatch(1); private final CountDownLatch waitResponseCountDownLatch = new CountDownLatch(1); private volatile boolean sendSuccess = false; private volatile boolean responseSuccess = false; public ReponseWrapper(ChannelFuture channelFuture, long timeoutMillis) { super(); this.timeoutMillis = timeoutMillis; this.channelFuture = channelFuture; } public void awaitSendMessage(int sendTimeout,TimeUnit unit) throws InterruptedException { this.sendCountDownLatch.await(sendTimeout, unit); } public void releaseSendMessage() { this.sendCountDownLatch.countDown(); } public void awaitResponse(int waitResponseTimeout,TimeUnit unit) throws InterruptedException { this.waitResponseCountDownLatch.await(waitResponseTimeout, unit); } public void releaseResponse() { this.waitResponseCountDownLatch.countDown(); } public void releaseAll() { this.sendCountDownLatch.countDown(); this.waitResponseCountDownLatch.countDown(); } public long getBeginTimestamp() { return beginTimestamp; } public long getTimeoutMillis() { return timeoutMillis; } public boolean isSendSuccess() { return sendSuccess; } public void setSendSuccess(boolean sendSuccess) { this.sendSuccess = sendSuccess; } public boolean isResponseSuccess() { return responseSuccess; } public void setResponseSuccess(boolean responseSuccess) { this.responseSuccess = responseSuccess; } @Override public String toString() { return "ReponseWrapper [beginTimestamp=" + beginTimestamp + ", timeoutMillis=" + timeoutMillis + ", channelFuture=" + channelFuture + ", sendCountDownLatch=" + sendCountDownLatch + ", waitResponseCountDownLatch=" + waitResponseCountDownLatch + ", sendSuccess=" + sendSuccess + ", responseSuccess=" + responseSuccess + "]"; } }}
public class BootstrapClientDemo {
public static void main(String[] args) { Client client = new Client("127.0.0.1", 9000); client.connect(); try { System.out.println("开始发送:"+System.currentTimeMillis()); client.send("hello,world"); System.out.println("结束发送:"+System.currentTimeMillis()); } catch (Exception e) { e.printStackTrace(); }}
}
public class Server { private AtomicInteger requestSequence = new AtomicInteger(0); private final ServerBootstrap serverBootstrap; private final EventLoopGroup eventLoopGroupBoss = new NioEventLoopGroup(); private final EventLoopGroup eventLoopGroupWorker = new NioEventLoopGroup(); private String charsetName = "UTF-8"; public Server() { this.serverBootstrap = new ServerBootstrap(); } public void startup() { final Charset charset = Charset.forName(this.charsetName); this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker) .channel(NioServerSocketChannel.class)// .option(ChannelOption.SO_BACKLOG, 1024)// .option(ChannelOption.SO_REUSEADDR, true)// .childOption(ChannelOption.TCP_NODELAY, true)// .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); ByteBuf delimiter = Unpooled.copiedBuffer("\n".getBytes()); pipeline.addLast(// new StringDecoder(charset), // new DelimiterBasedFrameDecoder(1024,delimiter),// new StringEncoder(charset), // new NettyServerHandler()// ); } }); try { ChannelFuture sync = this.serverBootstrap.bind(9000).sync(); sync.get(); System.out.println("绑定结果:"+sync.isSuccess()); } catch (Exception e) { e.printStackTrace(); } } class NettyServerHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { processMessageReceived(ctx,msg); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); } } public void shutdown() { } protected void processMessageReceived(final ChannelHandlerContext ctx, final String msg) { System.out.println("接收消息:"+msg); ctx.executor().schedule(new Runnable() { @Override public void run() { //System.out.println("回显消息:"+msg); //ctx.channel().writeAndFlush(msg+"\n"); } }, 10, TimeUnit.SECONDS); }}
public class BootstrapServerDemo { public static void main(String[] args) { Server s = new Server(); s.startup(); try { TimeUnit.SECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }}