diff --git a/src/main/java/com/inspect/tcpserver/tcp/ChannelCache.java b/src/main/java/com/inspect/tcpserver/tcp/ChannelCache.java index 762dd32..5e9858b 100644 --- a/src/main/java/com/inspect/tcpserver/tcp/ChannelCache.java +++ b/src/main/java/com/inspect/tcpserver/tcp/ChannelCache.java @@ -3,6 +3,7 @@ package com.inspect.tcpserver.tcp; import io.netty.channel.ChannelHandlerContext; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; public class ChannelCache { @@ -47,4 +48,18 @@ public class ChannelCache { } } } + + public void replace(String newKey, ChannelHandlerContext ctx) { + for (Map.Entry entry : channelPool.entrySet()) { + if (entry.getValue() == ctx) { + channelPool.remove(entry.getKey()); + channelPool.put(newKey, ctx); + break; + } + } + } + + public Set getClients() { + return channelPool.keySet(); + } } diff --git a/src/main/java/com/inspect/tcpserver/tcp/NettyServer.java b/src/main/java/com/inspect/tcpserver/tcp/NettyServer.java index b1ad77b..35623b0 100644 --- a/src/main/java/com/inspect/tcpserver/tcp/NettyServer.java +++ b/src/main/java/com/inspect/tcpserver/tcp/NettyServer.java @@ -38,6 +38,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.*; @Component @@ -55,7 +56,6 @@ public class NettyServer { ClientController clientController; private EventLoopGroup bossGroup; private EventLoopGroup workGroup; - private Map tcpClientMap = new HashMap<>(); //机器人id,通道id private long sendIndex = 0; //若重启系统后还要延续之前的序列号则需要把序列号存入redis中 private long receiveIndex = 0; private DownXml2Json downXml2Json = new DownXml2Json(aliasHost); @@ -138,7 +138,7 @@ public class NettyServer { //发送消息 public void flushMsgToDevice(String uuid, String clientKey, boolean request, String xml) { - if (tcpClientMap.containsKey(clientKey)) { + if (ChannelCache.getInstance().get(clientKey) != null) { if(clientKey.startsWith("areaPatrolServer")) { xml = xml.replace("", ""); xml = xml.replace("", ""); @@ -159,15 +159,15 @@ public class NettyServer { allBuf.writeByte(0xEB); allBuf.writeByte(0x90); redisTemplate.opsForValue().set(String.valueOf(sendIndex), allBuf.toString(CharsetUtil.US_ASCII), 60L, TimeUnit.SECONDS); - nettyServerHandler.sendMsg(uuid, clientKey, tcpClientMap.get(clientKey), allBuf, compact(xml)); + nettyServerHandler.sendMsg(uuid, clientKey, allBuf, compact(xml)); sendIndex++; } else { - logger.warn(Color.RED + "###### 客户端[{}/{}]离线! ######" + Color.END, clientKey, tcpClientMap.get(clientKey)); + logger.warn(Color.RED + "###### 客户端[{}]离线! ######" + Color.END, clientKey); } } public void flushMsgToDeviceBroadcast(String uuid, String clientKey, boolean request, String xml) { - for (Map.Entry entry : tcpClientMap.entrySet()) { + for (String client : ChannelCache.getInstance().getClients()) { if(clientKey.startsWith("areaPatrolServer")) { xml = xml.replace("", ""); xml = xml.replace("", ""); @@ -188,7 +188,7 @@ public class NettyServer { allBuf.writeByte(0xEB); allBuf.writeByte(0x90); redisTemplate.opsForValue().set(String.valueOf(sendIndex), allBuf.toString(CharsetUtil.US_ASCII), 60L, TimeUnit.SECONDS); - nettyServerHandler.sendMsg(uuid, entry.getKey(), tcpClientMap.get(entry.getKey()), allBuf, compact(xml)); + nettyServerHandler.sendMsg(uuid, client, allBuf, compact(xml)); sendIndex++; try { Thread.sleep(1); @@ -197,10 +197,10 @@ public class NettyServer { } //开启线程处理消息 - public void receiveMsg(BinaryModel binaryModel) { + public void receiveMsg(BinaryModel binaryModel, ChannelHandlerContext context) { executorService.execute(() -> { try { - dealMsgInThreadPool(binaryModel); + dealMsgInThreadPool(binaryModel, context); } catch (Exception e) { logger.error(ExceptionUtils.getStackTrace(e)); } @@ -218,7 +218,7 @@ public class NettyServer { } //处理接收消息 - private void dealMsgInThreadPool(BinaryModel binaryModel) throws DocumentException { + private void dealMsgInThreadPool(BinaryModel binaryModel, ChannelHandlerContext context) throws DocumentException { String xml = binaryModel.dataBuf.toString(CharsetUtil.UTF_8); receiveIndex = binaryModel.sendIndex; SAXReader saxReader = new SAXReader(); @@ -226,7 +226,7 @@ public class NettyServer { Element root = document.getRootElement(); String sendCode = root.element("SendCode").getText(); String receiveCode = root.element("ReceiveCode").getText(); - tcpClientMap.put(sendCode, binaryModel.id); + ChannelCache.getInstance().replace(sendCode, context); int type = 0; if (null != root.element("Type") && !StringUtil.isNullOrEmpty(root.element("Type").getText())) { type = Integer.parseInt(root.element("Type").getText()); @@ -237,7 +237,7 @@ public class NettyServer { } String compactXml = compact(xml); -// logger.info(Color.YELLOW + "###### 会话:{}, 客户:[{}/{}], 消息类型:{}, 命令:{}, 消息体: ######\n{}" + Color.END, binaryModel.uuid, sendCode, binaryModel.id, type, command, compactXml); +// logger.info(Color.YELLOW + "###### 会话:{}, 客户:[{}], 消息类型:{}, 命令:{}, 消息体: ######\n{}" + Color.END, binaryModel.uuid, sendCode, type, command, compactXml); //判断是否重发 if (type == SystemType.system) { if (command == SystemType.has_response || command == SystemType.no_response) { @@ -247,15 +247,15 @@ public class NettyServer { resetSendMsg(binaryModel.receiveIndex, sendCode); return; } else if (code.equals(ResponseType.succeed)) { - logger.info(Color.YELLOW + "###### 客户端[{}/{}]响应结果为成功 ######" + Color.END, sendCode, binaryModel.id); + logger.info(Color.YELLOW + "###### 客户端[{}]响应结果为成功 ######" + Color.END, sendCode); if(command == SystemType.no_response) { return; } } else if (code.equals(ResponseType.fault)) { - logger.warn(Color.RED + "###### 客户端[{}/{}]响应结果为失败 ######" + Color.END, sendCode, binaryModel.id); + logger.warn(Color.RED + "###### 客户端[{}]响应结果为失败 ######" + Color.END, sendCode); return; } else if (code.equals(ResponseType.reject)) { - logger.warn(Color.RED + "###### 客户端[{}/{}]响应结果为拒绝 ######" + Color.END, sendCode, binaryModel.id); + logger.warn(Color.RED + "###### 客户端[{}]响应结果为拒绝 ######" + Color.END, sendCode); return; } } @@ -267,12 +267,12 @@ public class NettyServer { switch (command) { case SystemType.register_request: // 收到接入侧注册信息 - logger.info(Color.YELLOW + "###### 客户端[{}/{}]注册 ######" + Color.END, sendCode, binaryModel.id); + logger.info(Color.YELLOW + "###### 客户端[{}]注册 ######" + Color.END, sendCode); dealRegister(binaryModel.uuid, compactXml); break; case SystemType.heart_request: // 处理心跳请求响应 - logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报心跳 ######" + Color.END, sendCode, binaryModel.id); + logger.info(Color.YELLOW + "###### 客户端[{}]上报心跳 ######" + Color.END, sendCode); sendHeartBeat(binaryModel.uuid, compactXml); break; case SystemType.has_response: @@ -281,7 +281,7 @@ public class NettyServer { // 处理设备上报的模型同步响应 if (null != root.element("Items").element("Item").attribute("device_file_path")) { // 收到接入侧模型同步数据 - logger.info(Color.YELLOW + "###### 模型同步收到客户端[{}/{}]响应数据 ######" + Color.END, sendCode, binaryModel.id); + logger.info(Color.YELLOW + "###### 模型同步收到客户端[{}]响应数据 ######" + Color.END, sendCode); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, ModelControl.class); JSONObject jsonObject = JSONObject.parseObject(json); jsonObject.put("uuid", binaryModel.uuid); @@ -291,66 +291,66 @@ public class NettyServer { // 任务控制响应任务执行ID if (null != root.element("Items").element("Item").attribute("task_patrolled_id")) { // 收到接入侧任务下发或控制回复数据 - logger.info(Color.YELLOW + "###### 任务下发收到客户端[{}/{}]响应数据 ######" + Color.END, sendCode, binaryModel.id); + logger.info(Color.YELLOW + "###### 任务下发收到客户端[{}]响应数据 ######" + Color.END, sendCode); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, ModelControl.class); JSONObject jsonObject = JSONObject.parseObject(json); jsonObject.put("SendCode", ""); clientController.sendMsg(binaryModel.uuid, jsonObject.toJSONString()); } } else { - logger.warn(Color.RED + "###### 客户端[{}/{}]响应数据没有items ######" + Color.END, sendCode, binaryModel.id); + logger.warn(Color.RED + "###### 客户端[{}]响应数据没有items ######" + Color.END, sendCode); } break; default: - logger.warn(Color.RED + "###### 客户端[{}/{}]非法的消息不予处理 ######" + Color.END, sendCode, binaryModel.id); + logger.warn(Color.RED + "###### 客户端[{}]非法的消息不予处理 ######" + Color.END, sendCode); } break; case PushType.patrolDeviceState:// insert into basedata_mont_patdevstadata json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, PatrolDeviceStateControl.class); - logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报设备状态数据 ######" + Color.END, sendCode, binaryModel.id); + logger.info(Color.YELLOW + "###### 客户端[{}]上报设备状态数据 ######" + Color.END, sendCode); break; case PushType.patrolDeviceRunning:// insert into basedata_mont_patdevrundata json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, PatrolDeviceRunningControl.class); - logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报设备运行数据 ######" + Color.END, sendCode, binaryModel.id); + logger.info(Color.YELLOW + "###### 客户端[{}]上报设备运行数据 ######" + Color.END, sendCode); break; case PushType.nestState:// insert into basedata_mont_neststadata json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, NestStateControl.class); - logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报机巢状态数据 ######" + Color.END, sendCode, binaryModel.id); + logger.info(Color.YELLOW + "###### 客户端[{}]上报机巢状态数据 ######" + Color.END, sendCode); break; case PushType.nestRunning:// insert into basedata_mont_nestrundata json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, NestRunningControl.class); - logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报机巢运行数据 ######" + Color.END, sendCode, binaryModel.id); + logger.info(Color.YELLOW + "###### 客户端[{}]上报机巢运行数据 ######" + Color.END, sendCode); break; case PushType.location:// insert into basedata_mont_patdevcoord json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, LocationControl.class); - logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报设备坐标 ######" + Color.END, sendCode, binaryModel.id); + logger.info(Color.YELLOW + "###### 客户端[{}]上报设备坐标 ######" + Color.END, sendCode); break; case PushType.route:// insert into basedata_mont_patdevpatroute json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, RouteControl.class); - logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报设备路线 ######" + Color.END, sendCode, binaryModel.id); + logger.info(Color.YELLOW + "###### 客户端[{}]上报设备路线 ######" + Color.END, sendCode); break; case PushType.alarm:// insert into basedata_mont_patdevalmabn json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, AlarmControl.class); - logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报设备异常告警 ######" + Color.END, sendCode, binaryModel.id); + logger.info(Color.YELLOW + "###### 客户端[{}]上报设备异常告警 ######" + Color.END, sendCode); break; case PushType.environment:// insert into basedata_mont_evndata json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, EnvironmentControl.class); - logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报设备环境数据 ######" + Color.END, sendCode, binaryModel.id); + logger.info(Color.YELLOW + "###### 客户端[{}]上报设备环境数据 ######" + Color.END, sendCode); break; case PushType.taskState:// insert into basedata_mont_taskstadata and patrol_task_status json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, TaskStateControl.class); - logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报设备任务状态 ######" + Color.END, sendCode, binaryModel.id); + logger.info(Color.YELLOW + "###### 客户端[{}]上报设备任务状态 ######" + Color.END, sendCode); break; case PushType.result:// insert into basedata_mont_taskresult and patrol_task_result_main json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, TaskResultControl.class); - logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报巡视结果 ######" + Color.END, sendCode, binaryModel.id); + logger.info(Color.YELLOW + "###### 客户端[{}]上报巡视结果 ######" + Color.END, sendCode); break; default: - logger.info(Color.RED + "###### 客户端[{}/{}]上报的非法消息不予处理 ######" + Color.END, sendCode, binaryModel.id); + logger.info(Color.RED + "###### 客户端[{}]上报的非法消息不予处理 ######" + Color.END, sendCode); } if (type != SystemType.system && !StringUtil.isNullOrEmpty(json)) { if ((type == NestCtlType.courseReversal && command == 3)) { // 处理用SSCOM模拟的数据, 向无人机发送控制指令 - logger.info("###### 向客户端[{}/{}]透传200001控制指令 ######", sendCode, binaryModel.id); + logger.info("###### 向客户端[{}]透传200001控制指令 ######", sendCode); flushMsgToDeviceBroadcast(binaryModel.uuid, receiveCode, false, compactXml); } else { JSONObject jsonObject = JSONObject.parseObject(json); @@ -366,7 +366,7 @@ public class NettyServer { || (type == NestCtlType.ptzPitch && command == 6) || (type == NestCtlType.picModelSet && command == 1) || (type == NestCtlType.nestSuddenStop && command == 2)) {// 处理用SSCOM模拟的数据, 向无人机发送控制指令 - logger.info("###### 向客户端[{}/{}]透传200002~20005控制指令 ######", sendCode, binaryModel.id); + logger.info("###### 向客户端[{}]透传200002~20005控制指令 ######", sendCode); flushMsgToDeviceBroadcast(binaryModel.uuid, receiveCode, false, compactXml); } } diff --git a/src/main/java/com/inspect/tcpserver/tcp/NettyServerHandler.java b/src/main/java/com/inspect/tcpserver/tcp/NettyServerHandler.java index aadcaf8..85b2181 100644 --- a/src/main/java/com/inspect/tcpserver/tcp/NettyServerHandler.java +++ b/src/main/java/com/inspect/tcpserver/tcp/NettyServerHandler.java @@ -18,19 +18,19 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { this.nettyServer = nettyServer; } - public void sendMsg(String uuid, String clientKey, String clientValue, ByteBuf byteBuf, String xml) { - ChannelHandlerContext ctx = ChannelCache.getInstance().get(clientValue); + public void sendMsg(String uuid, String clientKey, ByteBuf byteBuf, String xml) { + ChannelHandlerContext ctx = ChannelCache.getInstance().get(clientKey); if(ctx != null) { ctx.writeAndFlush(Unpooled.wrappedBuffer(byteBuf)).addListener( (ChannelFuture future) -> { if (future.isSuccess()) { - logger.info(Color.CYAN + "###### 当前连接数:{},向客户:[{}/{}]下发消息成功:{}######" + Color.END, ChannelCache.getInstance().size(), clientKey, clientValue, xml); + logger.info(Color.CYAN + "###### 活动连接:{},向客户端[{}]下发消息成功:{}######" + Color.END, ChannelCache.getInstance().getClients(), clientKey, xml); } else { - logger.error(Color.RED + "###### 当前连接数:{},向客户:[{}/{}]下发消息失败:{}######" + Color.END, ChannelCache.getInstance().size(), clientKey, clientValue, xml); + logger.error(Color.RED + "###### 活动连接:{},向客户端[{}]下发消息失败:{}######" + Color.END, ChannelCache.getInstance().getClients(), clientKey, xml); } }); } else { - logger.error(Color.RED + "###### 当前连接数:{},无法向客户:[{}/{}]下发消息,ctx==null######" + Color.END, ChannelCache.getInstance().size(), clientKey, clientValue); + logger.error(Color.RED + "###### 活动连接:{},无法向客户端[{}]下发消息,ctx==null######" + Color.END, ChannelCache.getInstance().getClients(), clientKey); } } @@ -62,7 +62,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { String id = ctx.channel().id().asShortText(); BinaryModel binaryModel = (BinaryModel) msg; binaryModel.id = id; - nettyServer.receiveMsg((BinaryModel) msg); + nettyServer.receiveMsg((BinaryModel) msg, ctx); } @Override