diff --git a/pom.xml b/pom.xml index ef55b7d..d048729 100644 --- a/pom.xml +++ b/pom.xml @@ -81,6 +81,13 @@ logstash-gelf 1.13.0 + + + commons-codec + commons-codec + 1.15 + + diff --git a/src/main/java/com/inspect/tcpserver/controller/DeviceServerController.java b/src/main/java/com/inspect/tcpserver/controller/DeviceServerController.java index a9b1e89..c740a44 100644 --- a/src/main/java/com/inspect/tcpserver/controller/DeviceServerController.java +++ b/src/main/java/com/inspect/tcpserver/controller/DeviceServerController.java @@ -46,7 +46,7 @@ public class DeviceServerController { logger.error("SendCoded或ReceiveCode为空"); return AjaxResult.fail(500, "SendCoded或ReceiveCode为空"); } else { - server.SendXmlMessage(jsonObject);// to device + server.sendXmlMessage(jsonObject);// to device return AjaxResult.success(); } diff --git a/src/main/java/com/inspect/tcpserver/tcp/MyDecoder.java b/src/main/java/com/inspect/tcpserver/tcp/MyDecoder.java index 2e2fb29..9df8e95 100644 --- a/src/main/java/com/inspect/tcpserver/tcp/MyDecoder.java +++ b/src/main/java/com/inspect/tcpserver/tcp/MyDecoder.java @@ -1,14 +1,19 @@ package com.inspect.tcpserver.tcp; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; +import org.apache.tomcat.util.buf.HexUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; public class MyDecoder extends ByteToMessageDecoder { + private static final Logger log = LoggerFactory.getLogger(MyDecoder.class); private final String PACKET_FLAG = "EB90"; private final int BASE_LENGTH = 2 + 8 + 8 + 1 + 4 + 2; @@ -16,46 +21,52 @@ public class MyDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { if (in.readableBytes() >= BASE_LENGTH) { - //skip - if (in.readableBytes() > 512 * 1024) { - in.skipBytes(in.readableBytes()); - } - int index; - String flag; - while (true) { - index = in.readerIndex(); - in.markReaderIndex(); - byte[] dst = new byte[2]; - in.readBytes(dst, 0, 2); - flag = ByteUtils.byte2Hex(dst); - if (PACKET_FLAG.equalsIgnoreCase(flag)) { - break; + try { + //skip + if (in.readableBytes() > 512 * 1024) { + in.skipBytes(in.readableBytes()); } - in.resetReaderIndex(); - if (in.readableBytes() < BASE_LENGTH) { + + ByteBuf forPrint = in.copy(); + log.info("[NETTY] FULL MSG: [{}]", ByteBufUtil.hexDump(forPrint)); + + int index; + String flag; + while (true) { + index = in.readerIndex(); + in.markReaderIndex(); + byte[] dst = new byte[2]; + in.readBytes(dst, 0, 2); + flag = ByteUtils.byte2Hex(dst); + if (PACKET_FLAG.equalsIgnoreCase(flag)) { + break; + } + in.resetReaderIndex(); + if (in.readableBytes() < BASE_LENGTH) { + return; + } + } + long sendIndex = in.readLongLE(); + long receiveIndex = in.readLongLE(); + byte sourceFlag = in.readByte(); + int xmlLength = in.readIntLE(); + if (in.readableBytes() < xmlLength) { + in.readerIndex(index); return; } + byte[] payload = new byte[xmlLength]; + in.readBytes(payload); + in.readShortLE(); + BinaryModel binaryModel = new BinaryModel(); + binaryModel.receiveIndex = receiveIndex; + binaryModel.sendIndex = sendIndex; + binaryModel.sourceFlag = sourceFlag; + binaryModel.dataLength = xmlLength; + binaryModel.dataBuf = Unpooled.copiedBuffer(payload); + out.add(binaryModel); + } catch (Exception e) { + e.printStackTrace(); } - long sendIndex = in.readLongLE(); - long receiveIndex = in.readLongLE(); - byte sourceFlag = in.readByte(); - int xmlLength = in.readIntLE(); -// int readableBytes = in.readableBytes(); -// if(readableBytes - * 服务启动监听器 - **/ @Component public class NettyServer { private Logger logger = LoggerFactory.getLogger(NettyServer.class); @@ -143,12 +139,12 @@ public class NettyServer { } //发送消息 - public void SendMsg(String client, boolean request, String xml) { - logger.info("==========client===========" + client + "是否在线=" + clients.containsKey(client)); + public void sendMsg(String client, boolean request, String xml) { + logger.info("[NETTY] Client: {} online: {}", client, clients.containsKey(client)); if (clients.containsKey(client) && !StringUtil.isNullOrEmpty(xml)) { ByteBuf byteBuf = Unpooled.copiedBuffer(xml, CharsetUtil.UTF_8); int length = byteBuf.readableBytes(); - ByteBuf allBuf = Unpooled.buffer(length + ConfigType.dataLegth); + ByteBuf allBuf = Unpooled.buffer(length + ConfigType.dataLength); allBuf.writeByte(0xEB); allBuf.writeByte(0x90); allBuf.writeLongLE(sendIndex); @@ -159,22 +155,20 @@ public class NettyServer { allBuf.writeByte(0xEB); allBuf.writeByte(0x90); - // 存入缓存 - redisTemplate.opsForValue().set(String.valueOf(this.sendIndex), allBuf.toString(CharsetUtil.US_ASCII), 60L, TimeUnit.SECONDS); + redisTemplate.opsForValue().set(String.valueOf(sendIndex), allBuf.toString(CharsetUtil.US_ASCII), 60L, TimeUnit.SECONDS); nettyServerHandler.SendMsg(clients.get(client), allBuf); - logger.info("发送到机器人发送会话序列号:{},接收会话序列号:{}", sendIndex, receiveIndex); - this.sendIndex++; + logger.info("[NETTY] 向设备端机器人发送消息会话序列号: [{}], 接收端序列号: [{}]", sendIndex, receiveIndex); + sendIndex++; } else { - logger.warn("设备接入层下发消息时,设备不在线"); + logger.warn("[NETTY] 设备端机器人: [{}] 离线!!!", client); } } //开启线程处理消息 - public void ReceiveMsg(BinaryModel binaryModel) { - executorService.execute(() -> - { + public void receiveMsg(BinaryModel binaryModel) { + executorService.execute(() -> { try { - ThreadDealMsg(binaryModel); + dealMsgInThreadPool(binaryModel); } catch (Exception e) { e.printStackTrace(); } @@ -187,20 +181,20 @@ public class NettyServer { String msg = redisTemplate.opsForValue().get(String.valueOf(sendIndex)); if (!StringUtil.isNullOrEmpty(msg)) { ByteBuf allBuf = Unpooled.copiedBuffer(msg, CharsetUtil.US_ASCII); - SendMsg(sendCode, true, msg); + sendMsg(sendCode, true, msg); } } //处理接收消息 - private void ThreadDealMsg(BinaryModel binaryModel) throws DocumentException { + private void dealMsgInThreadPool(BinaryModel binaryModel) throws DocumentException { String xml = binaryModel.dataBuf.toString(CharsetUtil.UTF_8); - logger.info("接收到机器人发送会话序列号:{},接收会话序列号:{}", binaryModel.sendIndex, binaryModel.receiveIndex); - this.receiveIndex = binaryModel.sendIndex; + logger.info("[NETTY] 设备端机器人会话序列号: [{}], 接收端序列号: [{}]", binaryModel.sendIndex, binaryModel.receiveIndex); + receiveIndex = binaryModel.sendIndex; SAXReader saxReader = new SAXReader(); Document document = saxReader.read(new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8))); Element root = document.getRootElement(); String sendCode = root.element("SendCode").getText(); - clients.put(sendCode, binaryModel.id);//按照正常得逻辑是注册成功后在添加进关系中 + clients.put(sendCode, binaryModel.id); int type = 0; if (null != root.element("Type") && !StringUtil.isNullOrEmpty(root.element("Type").getText())) { type = Integer.parseInt(root.element("Type").getText()); @@ -209,6 +203,9 @@ public class NettyServer { if (null != root.element("Command") && !StringUtil.isNullOrEmpty(root.element("Command").getText())) { command = Integer.parseInt(root.element("Command").getText()); } + + logger.info("[NETTY] MESSAGE Type: {}, Data: [{}], Hex: [{}]", type, xml, HexUtils.ascii2hex(xml)); + //判断是否重发 if (type == SystemType.system) { if (command == SystemType.has_response || command == SystemType.no_response) { @@ -224,22 +221,23 @@ public class NettyServer { case SystemType.system: switch (command) { case SystemType.register_request: - // 处理注册请求响应 - logger.info("收到接入侧注册信息:{}", xml); + // 收到接入侧注册信息 + logger.info("[NETTY] 设备端注册信息: {}", xml); dealRegister(xml); - logger.info("收到接入侧注册信息:{}", xml); + logger.info("[NETTY] 设备端注册信息2: {}", xml); break; case SystemType.heart_request: // 处理心跳请求响应 - logger.info("收到接入侧心跳信息:{}", xml); - SendHeart(xml); + logger.info("[NETTY] 设备端心跳: {}", xml); + sendHeartBeat(xml); break; case SystemType.has_response: // 处理有返回值的消息响应 if (null != root.element("Items") && null != root.element("Items").element("Item")) { // 处理设备上报的模型同步响应 if (null != root.element("Items").element("Item").attribute("device_file_path")) { - logger.info("收到接入侧模型同步数据:{}", xml); + // 收到接入侧模型同步数据 + logger.info("[NETTY] 设备端模型同步数据: {}", xml); json = down.ModelControlXml2Json(xml); JSONObject jsonObject = JSONObject.parseObject(json); jsonObject.put("Type", Constant.MODEL_UP_TYPE); @@ -247,75 +245,77 @@ public class NettyServer { } // 任务控制响应任务执行ID if (null != root.element("Items").element("Item").attribute("task_patrolled_id")) { - logger.info("收到接入侧任务下发或控制回复数据:{}", xml); + // 收到接入侧任务下发或控制回复数据 + logger.info("[NETTY] 设备端任务下发或控制回复数据: {}", xml); json = down.ModelControlXml2Json(xml); JSONObject jsonObject = JSONObject.parseObject(json); jsonObject.put("SendCode", ""); - jsonObject.put("SendCode", ""); clientController.sendMsg(jsonObject.toJSONString()); } } else { - logger.warn("接收到的系统类信息报文中,root:{},中不包含items或items中没有item,不予处理", root); + // 接收到的系统类信息报文中,root:{},中不包含items或items中没有item,不予处理 + logger.warn("[NETTY] IN RECEIVING MESSAGE, no items or item , OMIT IT, ROOT: {}", root); } break; default: - logger.warn("不予处理的消息体,{}", xml); - logger.warn("接收到的系统类信息报文中,command:{},不在处理范围内,不予处理", command); + // 接收到的系统类信息报文中,command:{},不在处理范围内,不予处理 + logger.warn("[NETTY] 非法的消息不予处理 CMD: {}, BODY: {}", command, xml); } break; case PushType.patrolDeviceState: json = down.PatrolDeviceStateControlXml2Json(xml); - logger.info("收到接入侧设备状态数据:{}", xml); + + logger.info("[NETTY] 客户端设备状态数据: {}", xml); break; - case PushType.patrolDeviceRuning: + case PushType.patrolDeviceRunning: json = down.PatrolDeviceRuningControlXml2Json(xml); - logger.info("收到接入侧设备运行数据:{}", xml); + logger.info("[NETTY] 客户端设备运行数据: {}", xml); break; case PushType.nestState: json = down.NestStateControlXml2Json(xml); - logger.info("收到接入侧机巢状态数据{}", xml); + logger.info("[NETTY] 客户端机巢状态数据: {}", xml); break; - case PushType.nestRuning: + case PushType.nestRunning: json = down.NestRuningControlXml2Json(xml); - logger.info("收到接入侧机巢运行数据{}", xml); + logger.info("[NETTY] 客户端机巢运行数据: {}", xml); break; case PushType.location: json = down.LocationControlXml2Json(xml); - logger.info("收到接入侧设备坐标{}", xml); + logger.info("[NETTY] 客户端设备坐标: {}", xml); break; case PushType.route: json = down.RouteControlXml2Json(xml); - logger.info("收到接入侧设备路线{}", xml); + logger.info("[NETTY] 客户端设备路线: {}", xml); break; case PushType.alarm: json = down.AlarmControlXml2Json(xml); - logger.info("收到接入侧设备异常告警{}", xml); + logger.info("[NETTY] 客户端设备异常告警: {}", xml); break; case PushType.environment: json = down.EnvironmentControlXml2Json(xml); - logger.info("收到接入侧设备上报环境数据{}", xml); + logger.info("[NETTY] 客户端设备上报环境数据: {}", xml); break; case PushType.taskState: json = down.TaskStateControlXml2Json(xml); - logger.info("收到接入侧设备任务状态{}", xml); + logger.info("[NETTY] 客户端设备任务状态: {}", xml); break; case PushType.result: json = down.TaskResultControlXml2Json(xml); - logger.info("收到接入侧巡视结果{}", xml); + logger.info("[NETTY] 客户端巡视结果: {}", xml); break; default: - logger.warn("不予处理的消息体,{}", xml); - logger.warn("server-handle-接收到的type:{},不在处理范围内,不予处理", type); + // server-handle-接收到的type:{},不在处理范围内,不予处理 + logger.warn("[NETTY] 非法的消息不予处理: {}, TYPE: {}", xml, type); } if (type != SystemType.system && !StringUtil.isNullOrEmpty(json)) { //rabbitmq推送到消息队列中基于springboot_xggd rabbitTemplate.convertAndSend(Constant.EX_CHANGE_NAME, Constant.ROUTING_KEY_NAME, json); String receiveCode = root.element("ReceiveCode").getText(); - ResponseMsg(receiveCode, sendCode); + responseMsg(receiveCode, sendCode); } } - public void ResponseMsg(String sendCode, String receiveCode) { + public void responseMsg(String sendCode, String receiveCode) { XStream xStream = new XStream(new Xpp3Driver(new NoNameCoder())); xStream.alias(alias, ResponseControl.class); xStream.autodetectAnnotations(true); @@ -328,7 +328,7 @@ public class NettyServer { responseControl.Time = CommonUtils.GetNowDateString(); responseControl.Items = ""; String xml = xStream.toXML(responseControl); - SendMsg(receiveCode, false, xml); + sendMsg(receiveCode, false, xml); } //处理注册应答 @@ -349,7 +349,7 @@ public class NettyServer { // 调用基础服务鉴权设备 if (authDevice(obj.SendCode)) { - //鉴权通过_xggd + //鉴权通过 responseControl.Code = ResponseType.succeed; responseControl.Items = new ArrayList<>(); RegisterResponseModel model = new RegisterResponseModel(); @@ -377,7 +377,7 @@ public class NettyServer { // 鉴权不通过 responseControl.Code = ResponseType.fault; } - SendRegisterResponse(responseControl, obj.SendCode); + sendRegisterResponse(responseControl, obj.SendCode); } /** @@ -407,23 +407,23 @@ public class NettyServer { } - public void SendRegisterResponse(RegisterResponseControl responseControl, String sendCode) { + public void sendRegisterResponse(RegisterResponseControl responseControl, String sendCode) { XStream xStream = new XStream(new Xpp3Driver(new NoNameCoder())); xStream.alias(alias, RegisterResponseControl.class); xStream.autodetectAnnotations(true); String xml = xStream.toXML(responseControl); - SendMsg(sendCode, false, xml); + sendMsg(sendCode, false, xml); } - public void SendHeart(String xml) { - logger.info("接收到机器人巡视系统心跳消息:{}", xml); + public void sendHeartBeat(String xml) { + logger.info("[NETTY] 设备端机器人系统心跳消息: {}", xml); XStream xStream = new XStream(new Xpp3Driver(new NoNameCoder())); xStream.alias(alias, BaseControl.class); xStream.autodetectAnnotations(true); xStream.ignoreUnknownElements(); xStream.addPermission(AnyTypePermission.ANY); BaseControl obj = (BaseControl) xStream.fromXML(xml); - ResponseMsg(obj.ReceiveCode, obj.SendCode); + responseMsg(obj.ReceiveCode, obj.SendCode); // 推送消息到mq JSONObject jsonObject = new JSONObject(); @@ -442,7 +442,7 @@ public class NettyServer { return clients.containsKey(code); } - public void SendXmlMessage(JSONObject obj) { + public void sendXmlMessage(JSONObject obj) { int type = obj.getInteger("Type"); String receiveCode = obj.getString("ReceiveCode"); String json = obj.toJSONString(); @@ -480,12 +480,12 @@ public class NettyServer { xml = up.LendonTaskJson2Xml(json); break; default: - logger.warn("应用向设备接入发送消息,type:{}不在处理范围内,不予处理", type); + logger.warn("[NETTY] 向设备端下发命令, 类型[{}]不正确, 不予处理", type); } if (!StringUtils.isEmpty(xml)) { - logger.info("向设备接入侧下发命令:{}", xml); - SendMsg(receiveCode, true, xml); + logger.info("[NETTY] 向设备端下发命令: {}", xml); + sendMsg(receiveCode, true, xml); } } } diff --git a/src/main/java/com/inspect/tcpserver/tcp/NettyServerHandler.java b/src/main/java/com/inspect/tcpserver/tcp/NettyServerHandler.java index 7c85884..cb2569c 100644 --- a/src/main/java/com/inspect/tcpserver/tcp/NettyServerHandler.java +++ b/src/main/java/com/inspect/tcpserver/tcp/NettyServerHandler.java @@ -3,18 +3,13 @@ package com.inspect.tcpserver.tcp; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.tomcat.util.buf.HexUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; -/** - * @author yww - *

- * netty服务端处理器 - **/ - public class NettyServerHandler extends ChannelInboundHandlerAdapter { public Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); @@ -38,7 +33,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { public void channelActive(ChannelHandlerContext ctx) throws Exception { String id = ctx.channel().id().asShortText(); ids.put(id, ctx); - logger.warn("设备连接,id:{}", id); + logger.warn("[NETTY] 设备上线: {}", id); } /** @@ -62,11 +57,11 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - logger.warn("channelRead!!!"); + logger.warn("[NETTY] channelRead"); String id = ctx.channel().id().asShortText(); BinaryModel binaryModel = (BinaryModel) msg; binaryModel.id = id; - nettyServer.ReceiveMsg((BinaryModel) msg); + nettyServer.receiveMsg((BinaryModel) msg); } /** diff --git a/src/main/java/com/inspect/tcpserver/tcp/SystemType.java b/src/main/java/com/inspect/tcpserver/tcp/SystemType.java index 2f48e2b..62a16c2 100644 --- a/src/main/java/com/inspect/tcpserver/tcp/SystemType.java +++ b/src/main/java/com/inspect/tcpserver/tcp/SystemType.java @@ -50,9 +50,9 @@ class ResponseType { class PushType { public static final int patrolDeviceState = 1; - public static final int patrolDeviceRuning = 2; + public static final int patrolDeviceRunning = 2; public static final int nestState = 20001; - public static final int nestRuning = 10004; + public static final int nestRunning = 10004; public static final int location = 3; public static final int route = 4; public static final int alarm = 5; @@ -65,7 +65,7 @@ class PushType { } class ConfigType { - public static final int dataLegth = 25; + public static final int dataLength = 25; public static final String heart_beat_interval = "300"; public static final String patroldevice_run_interval = "300"; public static final String nest_run_interval = "300"; diff --git a/src/main/java/com/inspect/tcpserver/util/HexUtils.java b/src/main/java/com/inspect/tcpserver/util/HexUtils.java new file mode 100644 index 0000000..79a41a4 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/util/HexUtils.java @@ -0,0 +1,16 @@ +package com.inspect.tcpserver.util; + +import org.apache.commons.codec.binary.Hex; + +import java.nio.charset.Charset; + +public class HexUtils { + public static String ascii2hex(String str) { + return convertStringToHex(str, "UTF8"); + } + + public static String convertStringToHex(String str, String charsetName) { + char[] chars = Hex.encodeHex(str.getBytes(Charset.forName(charsetName))); + return String.valueOf(chars); + } +}