From bc1298941b2b64cb0df1cabb3ed6f16b0dcfddca Mon Sep 17 00:00:00 2001 From: lijw Date: Fri, 21 Mar 2025 18:05:28 +0800 Subject: [PATCH] =?UTF-8?q?netty=E5=8F=91=E9=80=81=E9=97=AE=E9=A2=98?= =?UTF-8?q?=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../inspect/tcpserver/tcp/ChannelCache.java | 50 +++++++++++++++++ .../tcpserver/tcp/NettyServerHandler.java | 53 +++++++++++-------- 2 files changed, 80 insertions(+), 23 deletions(-) create mode 100644 src/main/java/com/inspect/tcpserver/tcp/ChannelCache.java diff --git a/src/main/java/com/inspect/tcpserver/tcp/ChannelCache.java b/src/main/java/com/inspect/tcpserver/tcp/ChannelCache.java new file mode 100644 index 0000000..762dd32 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/tcp/ChannelCache.java @@ -0,0 +1,50 @@ +package com.inspect.tcpserver.tcp; + +import io.netty.channel.ChannelHandlerContext; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class ChannelCache { + private static Map channelPool = new ConcurrentHashMap<>(); + + public int size() { + return channelPool.size(); + } + + public static ChannelCache getInstance() { + return ChannelCacheHolder.instance; + } + + public static class ChannelCacheHolder { + public static ChannelCache instance = new ChannelCache(); + } + + public ChannelHandlerContext get(String handle) { + return channelPool.get(handle); + } + + public void add(String handle, ChannelHandlerContext ctx) { + channelPool.put(handle, ctx); + } + + public String addIfAbsent(String devId, ChannelHandlerContext ctx) { + for (Map.Entry entry : channelPool.entrySet()) { + if (entry.getValue() == ctx) { + String key = (String) entry.getKey(); + return devId.equals(key) ? null : key; + } + } + channelPool.put(devId, ctx); + return null; + } + + public void remove(ChannelHandlerContext ctx) { + for (Map.Entry entry : channelPool.entrySet()) { + if (entry.getValue() == ctx) { + channelPool.remove(entry.getKey()); + break; + } + } + } +} diff --git a/src/main/java/com/inspect/tcpserver/tcp/NettyServerHandler.java b/src/main/java/com/inspect/tcpserver/tcp/NettyServerHandler.java index a3cc180..842744f 100644 --- a/src/main/java/com/inspect/tcpserver/tcp/NettyServerHandler.java +++ b/src/main/java/com/inspect/tcpserver/tcp/NettyServerHandler.java @@ -1,30 +1,38 @@ package com.inspect.tcpserver.tcp; +import com.inspect.tcpserver.util.Color; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.Map; - public class NettyServerHandler extends ChannelInboundHandlerAdapter { public Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); private NettyServer nettyServer; - private Map ids = new HashMap<>(); public NettyServerHandler(NettyServer nettyServer) { this.nettyServer = nettyServer; } public void sendMsg(String uuid, String clientKey, String clientValue, ByteBuf byteBuf) { - if (ids.containsKey(clientValue)) { -// ByteBuf forPrint = byteBuf.copy(); -// logger.info("###### 会话:{}, 客户键值:{}, 客户号:{}, 下行原始报文 ######\n [{}]", uuid, clientKey, clientValue, ByteBufUtil.hexDump(forPrint)); - ids.get(clientValue).writeAndFlush(byteBuf); + logger.info(Color.MAGENTA + "###### clientKey:" + clientKey + ", clientValue:" + clientValue + ", channelPool:" + ChannelCache.getInstance().size() + " ######" + Color.END); + ChannelHandlerContext ctx = ChannelCache.getInstance().get(clientValue); + if(ctx != null) { + logger.info(Color.MAGENTA + "###### 执行下发 ######" + Color.END); + ctx.writeAndFlush(Unpooled.wrappedBuffer(byteBuf)).addListener( + (ChannelFuture future) -> { + if (future.isSuccess()) { + logger.info(Color.MAGENTA + "send command success" + Color.END); + } else { + logger.error(Color.MAGENTA + "send command fail" + Color.END); + } + }); + } else { + logger.info(Color.MAGENTA + "###### 无法执行下发,ctx==null ######" + Color.END); } } @@ -34,7 +42,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String id = ctx.channel().id().asShortText(); - ids.put(id, ctx); + ChannelCache.getInstance().addIfAbsent(id, ctx); logger.info("###### 设备上线:{} ######", id); } @@ -43,13 +51,9 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - ctx.close(); String id = ctx.channel().id().asShortText(); logger.warn("###### 设备断开:{} ######", id); - if (ids.containsKey(id)) { - ids.remove(id); - } - super.channelInactive(ctx); + ChannelCache.getInstance().remove(ctx); } /** @@ -69,15 +73,18 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { nettyServer.receiveMsg((BinaryModel) msg); } - /** - * 发生异常触发 - */ + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + super.channelReadComplete(ctx); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + super.userEventTriggered(ctx, evt); + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - ctx.close(); - String id = ctx.channel().id().asShortText(); - if (ids.containsKey(id)) { - ids.remove(id); - } + logger.error( "channel ctx: " + ctx.channel() + " exception", cause); } }