package com.inspect.tcpserver.tcp; import com.alibaba.fastjson.JSONObject; import com.inspect.tcpserver.constant.Constant; import com.inspect.tcpserver.controller.ClientController; import com.inspect.tcpserver.domain.DeviceServerProperties; import com.inspect.tcpserver.util.Color; import com.thoughtworks.xstream.XStream; import com.thoughtworks.xstream.io.naming.NoNameCoder; import com.thoughtworks.xstream.io.xml.Xpp3Driver; import com.thoughtworks.xstream.security.AnyTypePermission; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.CharsetUtil; import io.netty.util.internal.StringUtil; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.Element; import org.dom4j.io.SAXReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.*; @Component public class NettyServer { private final Logger logger = LoggerFactory.getLogger(NettyServer.class); /** * 接收/发送报文xml外层别名 */ private final String aliasHost = "PatrolHost"; private final String aliasDevice = "PatrolDevice"; @Resource ClientController clientController; private EventLoopGroup bossGroup; private EventLoopGroup workGroup; private long sendIndex = 0; //若重启系统后还要延续之前的序列号则需要把序列号存入redis中 private long receiveIndex = 0; private DownXml2Json downXml2Json = new DownXml2Json(aliasHost); private UpJson2Xml upJson2Xml = new UpJson2Xml(aliasHost); private NettyServerHandler nettyServerHandler; // private ExecutorService executorService = new ThreadPoolExecutor(20, 50, 100, TimeUnit.SECONDS, new LinkedBlockingQueue(400)); @Resource DeviceServerProperties deviceServerProperties; @Resource(name = "stringRedisTemplate") private RedisTemplate redisTemplate; @Resource private RabbitTemplate rabbitTemplate; @Value("${iip_server.authDevice.url}") String iipAuthDeviceUrl; private int serverPort; public void init() { this.serverPort = deviceServerProperties.port; upJson2Xml = new UpJson2Xml(aliasHost); downXml2Json = new DownXml2Json(aliasHost); } @Async public void startServer() { // 初始化 init(); //new 一个主线程组 bossGroup = new NioEventLoopGroup(); //new 一个工作线程组 workGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap() .group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new MyDecoder()); nettyServerHandler = new NettyServerHandler(NettyServer.this); ch.pipeline().addLast(nettyServerHandler); } }) .localAddress(serverPort) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.SO_LINGER, 10); try { ChannelFuture future = bootstrap.bind(serverPort).sync(); logger.info("###### TCP服务器启动 ######"); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { close(); } } //释放资源 public void close() { if (bossGroup != null) { bossGroup.shutdownGracefully(); } if (workGroup != null) { workGroup.shutdownGracefully(); } } private String compact(String xml) { String compactXml = xml .replaceAll(">\\s+<", "><") // 处理标签间的空白 .replaceAll("\\s+", " "); // 压缩连续空格(可选) return compactXml; } //发送消息 public void flushMsgToDevice(String uuid, String clientKey, boolean request, String xml) { if (ChannelCache.getInstance().get(clientKey) != null) { if(clientKey.startsWith("areaPatrolServer")) { xml = xml.replace("", ""); xml = xml.replace("", ""); } else { xml = xml.replace("", ""); xml = xml.replace("", ""); } ByteBuf byteBuf = Unpooled.copiedBuffer(xml, CharsetUtil.UTF_8); int length = byteBuf.readableBytes(); ByteBuf allBuf = Unpooled.buffer(length + ConfigType.dataLength); allBuf.writeByte(0xEB); allBuf.writeByte(0x90); allBuf.writeLongLE(sendIndex); allBuf.writeLongLE(receiveIndex); allBuf.writeByte(request ? 0x00 : 0x01); allBuf.writeIntLE(length); allBuf.writeBytes(byteBuf); allBuf.writeByte(0xEB); allBuf.writeByte(0x90); redisTemplate.opsForValue().set(String.valueOf(sendIndex), allBuf.toString(CharsetUtil.US_ASCII), 60L, TimeUnit.SECONDS); nettyServerHandler.sendMsg(uuid, clientKey, allBuf, compact(xml), request); sendIndex++; } else { logger.warn(Color.RED + "###### 客户端[{}]离线! ######" + Color.END, clientKey); } } public void flushMsgToDeviceBroadcast(String uuid, String clientKey, boolean request, String xml) { for (String client : ChannelCache.getInstance().getClients()) { if(clientKey.startsWith("areaPatrolServer")) { xml = xml.replace("", ""); xml = xml.replace("", ""); } else { xml = xml.replace("", ""); xml = xml.replace("", ""); } ByteBuf byteBuf = Unpooled.copiedBuffer(xml, CharsetUtil.UTF_8); int length = byteBuf.readableBytes(); ByteBuf allBuf = Unpooled.buffer(length + ConfigType.dataLength); allBuf.writeByte(0xEB); allBuf.writeByte(0x90); allBuf.writeLongLE(sendIndex); allBuf.writeLongLE(receiveIndex); allBuf.writeByte(request ? 0x00 : 0x01); allBuf.writeIntLE(length); allBuf.writeBytes(byteBuf); allBuf.writeByte(0xEB); allBuf.writeByte(0x90); redisTemplate.opsForValue().set(String.valueOf(sendIndex), allBuf.toString(CharsetUtil.US_ASCII), 60L, TimeUnit.SECONDS); nettyServerHandler.sendMsg(uuid, client, allBuf, compact(xml), request); sendIndex++; try { Thread.sleep(1); } catch (InterruptedException e) {} } } //开启线程处理消息 public void receiveMsg(BinaryModel binaryModel, ChannelHandlerContext context) { // executorService.execute(() -> { try { dealMsgInThreadPool(binaryModel, context); } catch (Exception e) { logger.error("error", e); } // }); } //重新发送 public void resetSendMsg(long sendIndex, String sendCode) { // 获取缓存的中的值 String msg = redisTemplate.opsForValue().get(String.valueOf(sendIndex)); if (!StringUtil.isNullOrEmpty(msg)) { ByteBuf allBuf = Unpooled.copiedBuffer(msg, CharsetUtil.US_ASCII); flushMsgToDevice("", sendCode, true, msg); } } //处理接收消息 private void dealMsgInThreadPool(BinaryModel binaryModel, ChannelHandlerContext context) throws DocumentException { String xml = binaryModel.dataBuf.toString(CharsetUtil.UTF_8); receiveIndex = binaryModel.sendIndex; SAXReader saxReader = new SAXReader(); Document document = saxReader.read(new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8))); Element root = document.getRootElement(); String sendCode = root.element("SendCode").getText(); String receiveCode = root.element("ReceiveCode").getText(); ChannelCache.getInstance().replace(sendCode, context); int type = 0; if (null != root.element("Type") && !StringUtil.isNullOrEmpty(root.element("Type").getText())) { type = Integer.parseInt(root.element("Type").getText()); } int command = 0; if (null != root.element("Command") && !StringUtil.isNullOrEmpty(root.element("Command").getText())) { command = Integer.parseInt(root.element("Command").getText()); } String compactXml = compact(xml); // logger.info(Color.YELLOW + "###### 会话:{}, 客户:[{}], 消息类型:{}, 命令:{}, 消息体: ######\n{}" + Color.END, binaryModel.uuid, sendCode, type, command, compactXml); //判断是否重发 if (type == SystemType.system) { if (command == SystemType.has_response || command == SystemType.no_response) { if (null != root.element("Code")) { String code = root.element("Code").getText(); if (code.equals(ResponseType.retry)) { resetSendMsg(binaryModel.receiveIndex, sendCode); return; } else if (code.equals(ResponseType.succeed)) { logger.info(Color.YELLOW + "###### 客户端[{}]响应结果为成功 ######" + Color.END, sendCode); if(command == SystemType.no_response) { return; } } else if (code.equals(ResponseType.fault)) { logger.warn(Color.RED + "###### 客户端[{}]响应结果为失败 ######" + Color.END, sendCode); return; } else if (code.equals(ResponseType.reject)) { logger.warn(Color.RED + "###### 客户端[{}]响应结果为拒绝 ######" + Color.END, sendCode); return; } } } } String json = null; switch (type) { case SystemType.system: switch (command) { case SystemType.register_request: // 收到接入侧注册信息 logger.info(Color.YELLOW + "###### 客户端[{}]注册 ######" + Color.END, sendCode); dealRegister(binaryModel.uuid, compactXml); break; case SystemType.heart_request: // 处理心跳请求响应 logger.info(Color.YELLOW + "###### 客户端[{}]上报心跳 ######" + Color.END, sendCode); sendHeartBeat(binaryModel.uuid, compactXml); break; case SystemType.has_response: // 处理有返回值的消息响应 if (null != root.element("Items") && null != root.element("Items").element("Item")) { // 处理设备上报的模型同步响应 if (null != root.element("Items").element("Item").attribute("device_file_path")) { // 收到接入侧模型同步数据 logger.info(Color.YELLOW + "###### 模型同步收到客户端[{}]响应数据 ######" + Color.END, sendCode); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, ModelControl.class); JSONObject jsonObject = JSONObject.parseObject(json); jsonObject.put("uuid", binaryModel.uuid); jsonObject.put("Type", Constant.MODEL_UP_TYPE); rabbitTemplate.convertAndSend(Constant.EX_CHANGE_NAME, Constant.ROUTING_KEY_NAME, jsonObject.toJSONString()); } // 任务控制响应任务执行ID if (null != root.element("Items").element("Item").attribute("task_patrolled_id")) { // 收到接入侧任务下发或控制回复数据 logger.info(Color.YELLOW + "###### 任务下发收到客户端[{}]响应数据 ######" + Color.END, sendCode); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, ModelControl.class); JSONObject jsonObject = JSONObject.parseObject(json); jsonObject.put("SendCode", ""); clientController.sendMsg(binaryModel.uuid, jsonObject.toJSONString()); } } else { logger.warn(Color.RED + "###### 客户端[{}]响应数据没有items ######" + Color.END, sendCode); } break; default: logger.warn(Color.RED + "###### 客户端[{}]非法的消息不予处理 ######" + Color.END, sendCode); } break; case PushType.patrolDeviceState:// insert into basedata_mont_patdevstadata json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, PatrolDeviceStateControl.class); logger.info(Color.YELLOW + "###### 客户端[{}]上报设备状态数据 ######" + Color.END, sendCode); break; case PushType.patrolDeviceRunning:// insert into basedata_mont_patdevrundata json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, PatrolDeviceRunningControl.class); logger.info(Color.YELLOW + "###### 客户端[{}]上报设备运行数据 ######" + Color.END, sendCode); break; case PushType.nestState:// insert into basedata_mont_neststadata json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, NestStateControl.class); logger.info(Color.YELLOW + "###### 客户端[{}]上报机巢状态数据 ######" + Color.END, sendCode); break; case PushType.nestRunning:// insert into basedata_mont_nestrundata json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, NestRunningControl.class); logger.info(Color.YELLOW + "###### 客户端[{}]上报机巢运行数据 ######" + Color.END, sendCode); break; case PushType.location:// insert into basedata_mont_patdevcoord json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, LocationControl.class); logger.info(Color.YELLOW + "###### 客户端[{}]上报设备坐标 ######" + Color.END, sendCode); break; case PushType.route:// insert into basedata_mont_patdevpatroute json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, RouteControl.class); logger.info(Color.YELLOW + "###### 客户端[{}]上报设备路线 ######" + Color.END, sendCode); break; case PushType.alarm:// insert into basedata_mont_patdevalmabn json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, AlarmControl.class); logger.info(Color.YELLOW + "###### 客户端[{}]上报设备异常告警 ######" + Color.END, sendCode); break; case PushType.environment:// insert into basedata_mont_evndata json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, EnvironmentControl.class); logger.info(Color.YELLOW + "###### 客户端[{}]上报设备环境数据 ######" + Color.END, sendCode); break; case PushType.taskState:// insert into basedata_mont_taskstadata and patrol_task_status json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, TaskStateControl.class); //root@Linx:/home/atia/data/log/inspect-main# grep -rn "type:41, messageBody:" info.log logger.info(Color.YELLOW + "###### 客户端[{}]上报设备任务状态 ######" + Color.END, sendCode); break; case PushType.result:// insert into basedata_mont_taskresult and patrol_task_result_main json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, TaskResultControl.class); //root@Linx:/home/atia/data/log/inspect-main# grep -rn "type:61, messageBody:" info.log logger.info(Color.YELLOW + "###### 客户端[{}]上报巡视结果 ######" + Color.END, sendCode); break; default: logger.info(Color.RED + "###### 客户端[{}]上报的非法消息不予处理 ######" + Color.END, sendCode); } if (type != SystemType.system && !StringUtil.isNullOrEmpty(json)) { if ((type == NestCtlType.courseReversal && command == 3)) { // 处理用SSCOM模拟的数据, 向无人机发送控制指令 logger.info("###### 向客户端[{}]透传200001控制指令 ######", sendCode); flushMsgToDeviceBroadcast(binaryModel.uuid, receiveCode, false, compactXml); } else { JSONObject jsonObject = JSONObject.parseObject(json); jsonObject.put("uuid", binaryModel.uuid); json = jsonObject.toJSONString(); // send to BasedataMontDataMqAcceptHandle rabbitTemplate.convertAndSend(Constant.EX_CHANGE_NAME, Constant.ROUTING_KEY_NAME, json); boolean isHost = json.contains(aliasHost); sendResponseToDevice(binaryModel.uuid, receiveCode, sendCode, isHost); } } else { if ((type == NestCtlType.suddenStop && command == 7) || (type == NestCtlType.ptzPitch && command == 6) || (type == NestCtlType.picModelSet && command == 1) || (type == NestCtlType.nestSuddenStop && command == 2)) {// 处理用SSCOM模拟的数据, 向无人机发送控制指令 logger.info("###### 向客户端[{}]透传200002~20005控制指令 ######", sendCode); flushMsgToDeviceBroadcast(binaryModel.uuid, receiveCode, false, compactXml); } } } public void sendResponseToDevice(String uuid, String sendCode, String receiveCode, boolean isHost) { XStream xStream = new XStream(new Xpp3Driver(new NoNameCoder())); xStream.alias(isHost ? aliasHost : aliasDevice, ResponseControl.class); xStream.autodetectAnnotations(true); ResponseControl responseControl = new ResponseControl(); responseControl.SendCode = sendCode; responseControl.ReceiveCode = receiveCode; responseControl.Type = String.valueOf(SystemType.system); responseControl.Code = ResponseType.succeed; responseControl.Command = String.valueOf(SystemType.no_response); responseControl.Time = CommonUtils.GetNowDateString(); responseControl.Items = ""; String xml = xStream.toXML(responseControl); flushMsgToDevice(uuid, receiveCode, false, xml); } //处理注册应答 public void dealRegister(String uuid, String xml) { BaseControl obj = new BaseControl(); try { XStream xStream = getXmlStreamInstance(); xStream.alias(aliasHost, BaseControl.class); xStream.autodetectAnnotations(true); xStream.ignoreUnknownElements(); xStream.addPermission(AnyTypePermission.ANY); obj = (BaseControl) xStream.fromXML(xml); } catch (com.thoughtworks.xstream.XStreamException e) { try { XStream xStreamEx = getXmlStreamInstance(); xStreamEx.alias(aliasDevice, BaseControl.class); xStreamEx.autodetectAnnotations(true); xStreamEx.ignoreUnknownElements(); xStreamEx.addPermission(AnyTypePermission.ANY); obj = (BaseControl) xStreamEx.fromXML(xml); } catch (com.thoughtworks.xstream.XStreamException e2) { logger.error(Color.RED + "###### dealRegister解析失败:{} ######" + Color.END, e2.getMessage()); } } RegisterResponseControl responseControl = new RegisterResponseControl(); responseControl.SendCode = obj.ReceiveCode; responseControl.ReceiveCode = obj.SendCode; responseControl.Type = String.valueOf(SystemType.system); responseControl.Command = String.valueOf(SystemType.has_response); responseControl.Time = CommonUtils.GetNowDateString(); // 调用基础服务鉴权设备 if (authDevice(obj.SendCode)) { //鉴权通过 responseControl.Code = ResponseType.succeed; responseControl.Items = new ArrayList<>(); RegisterResponseModel model = new RegisterResponseModel(); model.patroldevice_run_interval = ConfigType.patroldevice_run_interval; model.heart_beat_interval = ConfigType.heart_beat_interval; model.env_interval = ConfigType.env_interval; //model.weather_interval= ConfigType.weather_interval; // 当连接客户端为无人机机巢时,报文中增加机巢运行数据上报间隔 if (obj.SendCode.equals(deviceServerProperties.nestCode)) { model.nest_run_interval = ConfigType.nest_run_interval; } responseControl.Items.add(model); // 推送消息到mq JSONObject jsonObject = new JSONObject(); jsonObject.put("uuid", uuid); jsonObject.put("patroldevice_code", obj.SendCode); jsonObject.put("SendCode", obj.SendCode); jsonObject.put("Type", "heartbeat"); jsonObject.put("eventType", "connect"); jsonObject.put("HeartBeatInterval", ConfigType.heart_beat_interval); rabbitTemplate.convertAndSend(Constant.EX_CHANGE_NAME, Constant.ROUTING_KEY_NAME, jsonObject.toJSONString()); } else { // 鉴权不通过 responseControl.Code = ResponseType.fault; } sendRegisterResponse(uuid, responseControl, obj.SendCode); } private XStream getXmlStreamInstance() { return new XStream(new Xpp3Driver(new NoNameCoder())); } /** * 鉴权巡视设备 * * @param sendCode 巡视设备(机器人、无人机)唯一标识 * @return */ public boolean authDevice(String sendCode) { return true; } public void sendRegisterResponse(String uuid, RegisterResponseControl responseControl, String sendCode) { String xml = ""; try { XStream xStream = new XStream(new Xpp3Driver(new NoNameCoder())); xStream.alias(aliasHost, RegisterResponseControl.class); xStream.autodetectAnnotations(true); xml = xStream.toXML(responseControl); } catch (com.thoughtworks.xstream.XStreamException e) { try { XStream xStream = new XStream(new Xpp3Driver(new NoNameCoder())); xStream.alias(aliasDevice, RegisterResponseControl.class); xStream.autodetectAnnotations(true); xml = xStream.toXML(responseControl); } catch (com.thoughtworks.xstream.XStreamException e2) { logger.error(Color.RED + "###### sendRegisterResponse解析失败:{} ######" + Color.END, e2.getMessage()); } } flushMsgToDevice(uuid, sendCode, false, xml); } public void sendHeartBeat(final String uuid, String xml) { boolean isHost = true; BaseControl obj = new BaseControl(); try { XStream xStream = new XStream(new Xpp3Driver(new NoNameCoder())); xStream.alias(aliasHost, BaseControl.class); xStream.autodetectAnnotations(true); xStream.ignoreUnknownElements(); xStream.addPermission(AnyTypePermission.ANY); obj = (BaseControl) xStream.fromXML(xml); } catch (com.thoughtworks.xstream.XStreamException e) { try { XStream xStream = new XStream(new Xpp3Driver(new NoNameCoder())); xStream.alias(aliasDevice, BaseControl.class); xStream.autodetectAnnotations(true); xStream.ignoreUnknownElements(); xStream.addPermission(AnyTypePermission.ANY); obj = (BaseControl) xStream.fromXML(xml); isHost = false; } catch (com.thoughtworks.xstream.XStreamException e2) { logger.error(Color.RED + "###### sendHeartBeat解析失败:{} ######" + Color.END, e2.getMessage()); } } sendResponseToDevice(uuid, obj.ReceiveCode, obj.SendCode, isHost); // 推送消息到mq JSONObject jsonObject = new JSONObject(); jsonObject.put("uuid", uuid); jsonObject.put("SendCode", obj.SendCode); jsonObject.put("Type", "heartbeat"); jsonObject.put("eventType", "heart"); jsonObject.put("HeartBeatInterval", ConfigType.heart_beat_interval); rabbitTemplate.convertAndSend(Constant.EX_CHANGE_NAME, Constant.ROUTING_KEY_NAME, jsonObject.toJSONString()); } public void sendXmlMessage(JSONObject obj) { int type = obj.getInteger("Type"); String receiveCode = obj.getString("ReceiveCode"); String json = obj.toJSONString(); String xml = null; switch (type) { case RobotType.robotVl: case RobotType.robot: case RobotType.robotCar: case RobotType.robotFz: case RobotType.robotIr: case RobotType.robotPtz: xml = upJson2Xml.UpStreamJson2Xml(json, RobotControl.class); break; case UAVType.uav: case UAVType.uavXj: case UAVType.uavKz: case UAVType.uavYt: case UAVType.nest: xml = upJson2Xml.UpStreamJson2Xml(json, RobotControl.class); break; case TaskType.taskControl: xml = upJson2Xml.UpStreamJson2Xml(json, ResponseControl.class); break; case TaskType.taskSend: xml = upJson2Xml.UpStreamJson2Xml(json, TaskSendControl.class); break; case TaskType.taskArea: xml = upJson2Xml.UpStreamJson2Xml(json, AreaControl.class); break; case ModelType.modelSync: xml = upJson2Xml.UpStreamJson2Xml(json, ResponseControl.class); break; case TaskType.lendonTask: xml = upJson2Xml.UpStreamJson2Xml(json, LinkageTaskControl.class); break; default: logger.error(Color.RED + "###### 向设备端下发命令, 类型:{}错误, 不予处理 ######" + Color.END, type); } if (!StringUtils.isEmpty(xml)) { flushMsgToDevice("", receiveCode, true, xml); } else { logger.error(Color.RED + "###### xml is empty ######" + Color.END, type); } } }