Browse Source

netty发送问题修改

master
lijw 9 months ago
parent
commit
bc1298941b
2 changed files with 80 additions and 23 deletions
  1. +50
    -0
      src/main/java/com/inspect/tcpserver/tcp/ChannelCache.java
  2. +30
    -23
      src/main/java/com/inspect/tcpserver/tcp/NettyServerHandler.java

+ 50
- 0
src/main/java/com/inspect/tcpserver/tcp/ChannelCache.java View File

@ -0,0 +1,50 @@
package com.inspect.tcpserver.tcp;
import io.netty.channel.ChannelHandlerContext;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ChannelCache {
private static Map<String, ChannelHandlerContext> channelPool = new ConcurrentHashMap<>();
public int size() {
return channelPool.size();
}
public static ChannelCache getInstance() {
return ChannelCacheHolder.instance;
}
public static class ChannelCacheHolder {
public static ChannelCache instance = new ChannelCache();
}
public ChannelHandlerContext get(String handle) {
return channelPool.get(handle);
}
public void add(String handle, ChannelHandlerContext ctx) {
channelPool.put(handle, ctx);
}
public String addIfAbsent(String devId, ChannelHandlerContext ctx) {
for (Map.Entry entry : channelPool.entrySet()) {
if (entry.getValue() == ctx) {
String key = (String) entry.getKey();
return devId.equals(key) ? null : key;
}
}
channelPool.put(devId, ctx);
return null;
}
public void remove(ChannelHandlerContext ctx) {
for (Map.Entry entry : channelPool.entrySet()) {
if (entry.getValue() == ctx) {
channelPool.remove(entry.getKey());
break;
}
}
}
}

+ 30
- 23
src/main/java/com/inspect/tcpserver/tcp/NettyServerHandler.java View File

@ -1,30 +1,38 @@
package com.inspect.tcpserver.tcp;
import com.inspect.tcpserver.util.Color;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
public Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
private NettyServer nettyServer;
private Map<String, ChannelHandlerContext> ids = new HashMap<>();
public NettyServerHandler(NettyServer nettyServer) {
this.nettyServer = nettyServer;
}
public void sendMsg(String uuid, String clientKey, String clientValue, ByteBuf byteBuf) {
if (ids.containsKey(clientValue)) {
// ByteBuf forPrint = byteBuf.copy();
// logger.info("###### 会话:{}, 客户键值:{}, 客户号:{}, 下行原始报文 ######\n [{}]", uuid, clientKey, clientValue, ByteBufUtil.hexDump(forPrint));
ids.get(clientValue).writeAndFlush(byteBuf);
logger.info(Color.MAGENTA + "###### clientKey:" + clientKey + ", clientValue:" + clientValue + ", channelPool:" + ChannelCache.getInstance().size() + " ######" + Color.END);
ChannelHandlerContext ctx = ChannelCache.getInstance().get(clientValue);
if(ctx != null) {
logger.info(Color.MAGENTA + "###### 执行下发 ######" + Color.END);
ctx.writeAndFlush(Unpooled.wrappedBuffer(byteBuf)).addListener(
(ChannelFuture future) -> {
if (future.isSuccess()) {
logger.info(Color.MAGENTA + "send command success" + Color.END);
} else {
logger.error(Color.MAGENTA + "send command fail" + Color.END);
}
});
} else {
logger.info(Color.MAGENTA + "###### 无法执行下发,ctx==null ######" + Color.END);
}
}
@ -34,7 +42,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String id = ctx.channel().id().asShortText();
ids.put(id, ctx);
ChannelCache.getInstance().addIfAbsent(id, ctx);
logger.info("###### 设备上线:{} ######", id);
}
@ -43,13 +51,9 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.close();
String id = ctx.channel().id().asShortText();
logger.warn("###### 设备断开:{} ######", id);
if (ids.containsKey(id)) {
ids.remove(id);
}
super.channelInactive(ctx);
ChannelCache.getInstance().remove(ctx);
}
/**
@ -69,15 +73,18 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
nettyServer.receiveMsg((BinaryModel) msg);
}
/**
* 发生异常触发
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
String id = ctx.channel().id().asShortText();
if (ids.containsKey(id)) {
ids.remove(id);
}
logger.error( "channel ctx: " + ctx.channel() + " exception", cause);
}
}

Loading…
Cancel
Save