package com.inspect.tcpserver.tcp; import com.alibaba.fastjson.JSONObject; import com.inspect.tcpserver.constant.Constant; import com.inspect.tcpserver.controller.ClientController; import com.inspect.tcpserver.domain.DeviceServerProperties; import com.inspect.tcpserver.util.Color; import com.thoughtworks.xstream.XStream; import com.thoughtworks.xstream.io.naming.NoNameCoder; import com.thoughtworks.xstream.io.xml.Xpp3Driver; import com.thoughtworks.xstream.security.AnyTypePermission; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.CharsetUtil; import io.netty.util.internal.StringUtil; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.Element; import org.dom4j.io.SAXReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; import javax.annotation.Resource; import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.concurrent.*; @Component public class NettyServer { private final Logger logger = LoggerFactory.getLogger(NettyServer.class); /** * 接收/发送报文xml外层别名 */ private final String aliasHost = "PatrolHost"; private final String aliasDevice = "PatrolDevice"; @Resource ClientController clientController; private EventLoopGroup bossGroup; private EventLoopGroup workGroup; private Map tcpClientMap = new HashMap<>(); //机器人id,通道id private long sendIndex = 0; //若重启系统后还要延续之前的序列号则需要把序列号存入redis中 private long receiveIndex = 0; private DownXml2Json downXml2Json = new DownXml2Json(aliasHost); private UpJson2Xml upJson2Xml = new UpJson2Xml(aliasHost); private NettyServerHandler nettyServerHandler; private ExecutorService executorService = new ThreadPoolExecutor(1, 10, 5L, TimeUnit.SECONDS, new ArrayBlockingQueue(4), Executors.defaultThreadFactory()); @Resource DeviceServerProperties deviceServerProperties; @Resource(name = "stringRedisTemplate") private RedisTemplate redisTemplate; @Resource private RestTemplate restTemplate; @Resource private RabbitTemplate rabbitTemplate; @Value("${iip_server.authDevice.url}") String iipAuthDeviceUrl; private String serverIP; private int serverPort; private int num = 410; public void init() { this.serverIP = deviceServerProperties.ip; this.serverPort = deviceServerProperties.port; upJson2Xml = new UpJson2Xml(aliasHost); downXml2Json = new DownXml2Json(aliasHost); } @Async public void startServer() { // 初始化 init(); //new 一个主线程组 bossGroup = new NioEventLoopGroup(1); //new 一个工作线程组 workGroup = new NioEventLoopGroup(10); ServerBootstrap bootstrap = new ServerBootstrap() .group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new MyDecoder()); nettyServerHandler = new NettyServerHandler(NettyServer.this); ch.pipeline().addLast(nettyServerHandler); } }) .localAddress(serverPort) //设置队列大小 .option(ChannelOption.SO_BACKLOG, 1024) // 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文 .childOption(ChannelOption.SO_KEEPALIVE, true); //绑定端口,开始接收进来的连接 try { ChannelFuture future = bootstrap.bind(serverPort).sync(); logger.info("################ TCP服务器启动 ################"); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { close(); } } //释放资源 public void close() { if (bossGroup != null) { bossGroup.shutdownGracefully(); } if (workGroup != null) { workGroup.shutdownGracefully(); } } //发送消息 public void flushMsgToDevice(String uuid, String clientKey, boolean request, String xml) { logger.info("clientKey: " + clientKey + ", tcpClientMap.size(): " + tcpClientMap.size() + ", tcpClientMap: " + tcpClientMap); if (tcpClientMap.containsKey(clientKey) && !StringUtil.isNullOrEmpty(xml)) { ByteBuf byteBuf = Unpooled.copiedBuffer(xml, CharsetUtil.UTF_8); int length = byteBuf.readableBytes(); ByteBuf allBuf = Unpooled.buffer(length + ConfigType.dataLength); allBuf.writeByte(0xEB); allBuf.writeByte(0x90); allBuf.writeLongLE(sendIndex); allBuf.writeLongLE(receiveIndex); allBuf.writeByte(request ? 0x00 : 0x01); allBuf.writeIntLE(length); allBuf.writeBytes(byteBuf); allBuf.writeByte(0xEB); allBuf.writeByte(0x90); redisTemplate.opsForValue().set(String.valueOf(sendIndex), allBuf.toString(CharsetUtil.US_ASCII), 60L, TimeUnit.SECONDS); logger.info(Color.MAGENTA + "######## => 会话:{}, 客户:{}, 向设备回送序列:{}, 接收端序列:{}, 消息########\n{}" + Color.END, uuid, tcpClientMap.get(clientKey), sendIndex, receiveIndex, xml); nettyServerHandler.sendMsg(uuid, clientKey, tcpClientMap.get(clientKey), allBuf); sendIndex++; } else { logger.warn(Color.RED + "######## 会话: {}, 客户: [{}/{}] 离线!!! ########" + Color.END, uuid, tcpClientMap.get(clientKey), clientKey); } } public void flushMsgToDeviceBroadcast(String uuid, String clientKey, boolean request, String xml) { for (Map.Entry entry : tcpClientMap.entrySet()) { logger.info("######## flushMsgToDeviceBroadcast key: {}, value: {} ########", entry.getKey(), entry.getValue()); ByteBuf byteBuf = Unpooled.copiedBuffer(xml, CharsetUtil.UTF_8); int length = byteBuf.readableBytes(); ByteBuf allBuf = Unpooled.buffer(length + ConfigType.dataLength); allBuf.writeByte(0xEB); allBuf.writeByte(0x90); allBuf.writeLongLE(sendIndex); allBuf.writeLongLE(receiveIndex); allBuf.writeByte(request ? 0x00 : 0x01); allBuf.writeIntLE(length); allBuf.writeBytes(byteBuf); allBuf.writeByte(0xEB); allBuf.writeByte(0x90); redisTemplate.opsForValue().set(String.valueOf(sendIndex), allBuf.toString(CharsetUtil.US_ASCII), 60L, TimeUnit.SECONDS); logger.info(Color.MAGENTA + "######## => 会话:{}, 客户:{}, 向设备回送序列:{}, 接收端序列:{}, 消息########\n{}" + Color.END, uuid, tcpClientMap.get(clientKey), sendIndex, receiveIndex, xml); nettyServerHandler.sendMsg(uuid, entry.getKey(), tcpClientMap.get(entry.getKey()), allBuf); sendIndex++; try { Thread.sleep(1); } catch (InterruptedException e) {} } } //开启线程处理消息 public void receiveMsg(BinaryModel binaryModel) { executorService.execute(() -> { try { dealMsgInThreadPool(binaryModel); } catch (Exception e) { logger.error(ExceptionUtils.getStackTrace(e)); } }); } //重新发送 public void resetSendMsg(long sendIndex, String sendCode) { // 获取缓存的中的值 String msg = redisTemplate.opsForValue().get(String.valueOf(sendIndex)); if (!StringUtil.isNullOrEmpty(msg)) { ByteBuf allBuf = Unpooled.copiedBuffer(msg, CharsetUtil.US_ASCII); flushMsgToDevice("", sendCode, true, msg); } } //处理接收消息 private void dealMsgInThreadPool(BinaryModel binaryModel) throws DocumentException { String xml = binaryModel.dataBuf.toString(CharsetUtil.UTF_8); receiveIndex = binaryModel.sendIndex; SAXReader saxReader = new SAXReader(); Document document = saxReader.read(new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8))); Element root = document.getRootElement(); String sendCode = root.element("SendCode").getText(); String receiveCode = root.element("ReceiveCode").getText(); tcpClientMap.put(sendCode, binaryModel.id); int type = 0; if (null != root.element("Type") && !StringUtil.isNullOrEmpty(root.element("Type").getText())) { type = Integer.parseInt(root.element("Type").getText()); } int command = 0; if (null != root.element("Command") && !StringUtil.isNullOrEmpty(root.element("Command").getText())) { command = Integer.parseInt(root.element("Command").getText()); } logger.info(Color.MAGENTA + "######## <= 会话: {}, 客户: [{}/{}], 消息类型: {}, 命令:{}, 消息体: ########\n{}" + Color.END, binaryModel.uuid, sendCode, binaryModel.id, type, command, xml); //判断是否重发 if (type == SystemType.system) { if (command == SystemType.has_response || command == SystemType.no_response) { if (null != root.element("Code")) { String code = root.element("Code").getText(); if (code.equals(ResponseType.retry)) { resetSendMsg(binaryModel.receiveIndex, sendCode); } else if (code.equals(ResponseType.succeed)) { logger.info("######## 响应结果为成功, 客户: {}, 命令: {}, 消息体 ########\n{}", sendCode, command, xml); } else if (code.equals(ResponseType.fault)) { logger.warn("######## 响应结果为失败, 客户: {}, 命令: {}, 消息体 ########\n{}", sendCode, command, xml); } else if (code.equals(ResponseType.reject)) { logger.warn("######## 响应结果为拒绝, 客户: {}, 命令: {}, 消息体 ########\n{}", sendCode, command, xml); } return; } } } String json = null; switch (type) { case SystemType.system: switch (command) { case SystemType.register_request: // 收到接入侧注册信息 logger.info("######## 会话: {}, 客户: {}, 客户端注册信息 ########", binaryModel.uuid, sendCode); dealRegister(binaryModel.uuid, xml); break; case SystemType.heart_request: // 处理心跳请求响应 logger.info("######## 会话: {}, 客户: {}, 客户端心跳 ########", binaryModel.uuid, sendCode); sendHeartBeat(binaryModel.uuid, xml); break; case SystemType.has_response: // 处理有返回值的消息响应 if (null != root.element("Items") && null != root.element("Items").element("Item")) { // 处理设备上报的模型同步响应 if (null != root.element("Items").element("Item").attribute("device_file_path")) { // 收到接入侧模型同步数据 logger.info(Color.YELLOW + "######## 会话: {}, 客户: {}, 客户端模型同步数据 ########" + Color.END, binaryModel.uuid, sendCode); // json = downXml2Json.ModelControlXml2Json(xml); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, xml, ModelControl.class); JSONObject jsonObject = JSONObject.parseObject(json); jsonObject.put("uuid", binaryModel.uuid); jsonObject.put("Type", Constant.MODEL_UP_TYPE); rabbitTemplate.convertAndSend(Constant.EX_CHANGE_NAME, Constant.ROUTING_KEY_NAME, jsonObject.toJSONString()); } // 任务控制响应任务执行ID if (null != root.element("Items").element("Item").attribute("task_patrolled_id")) { // 收到接入侧任务下发或控制回复数据 logger.info("######## 会话: {}, 客户: {}, 设备端任务下发或控制回复数据 ########", binaryModel.uuid, sendCode); // json = downXml2Json.ModelControlXml2Json(xml); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, xml, ModelControl.class); JSONObject jsonObject = JSONObject.parseObject(json); jsonObject.put("SendCode", ""); clientController.sendMsg(binaryModel.uuid, jsonObject.toJSONString()); } } else { // 接收到的系统类信息报文中,root:{},中不包含items或items中没有item,不予处理 logger.warn("[NETTY] IN RECEIVING MESSAGE, no items or item , OMIT IT, ROOT: {}", root); } break; default: // 接收到的系统类信息报文中,command:{},不在处理范围内,不予处理 logger.warn("######## 非法的消息不予处理, 客户: {}, 命令: {}, 消息体 ########\n{}", sendCode, command, xml); } break; case PushType.patrolDeviceState:// insert into basedata_mont_patdevstadata // json = downXml2Json.PatrolDeviceStateControlXml2Json(xml); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, xml, PatrolDeviceStateControl.class); logger.info("######## 会话: {}, 客户: {}, 客户端设备状态数据 ########", binaryModel.uuid, sendCode); break; case PushType.patrolDeviceRunning:// insert into basedata_mont_patdevrundata // json = downXml2Json.PatrolDeviceRunningControlXml2Json(xml); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, xml, PatrolDeviceRunningControl.class); logger.info("######## 会话: {}, 客户: {}, 客户端设备运行数据 ########", binaryModel.uuid, sendCode); break; case PushType.nestState:// insert into basedata_mont_neststadata // json = downXml2Json.NestStateControlXml2Json(xml); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, xml, NestStateControl.class); logger.info("######## 会话: {}, 客户: {}, 客户端机巢状态数据 ########", binaryModel.uuid, sendCode); break; case PushType.nestRunning:// insert into basedata_mont_nestrundata // json = downXml2Json.NestRunningControlXml2Json(xml); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, xml, NestRunningControl.class); logger.info("######## 会话: {}, 客户: {}, 客户端机巢运行数据 ########", binaryModel.uuid, sendCode); break; case PushType.location:// insert into basedata_mont_patdevcoord // json = downXml2Json.LocationControlXml2Json(xml); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, xml, LocationControl.class); logger.info("######## 会话: {}, 客户: {}, 客户端设备坐标 ########", binaryModel.uuid, sendCode); break; case PushType.route:// insert into basedata_mont_patdevpatroute // json = downXml2Json.RouteControlXml2Json(xml); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, xml, RouteControl.class); logger.info("######## 会话: {}, 客户: {}, 客户端设备路线 ########", binaryModel.uuid, sendCode); break; case PushType.alarm:// insert into basedata_mont_patdevalmabn // json = down.AlarmControlXml2Json(xml); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, xml, AlarmControl.class); logger.info("######## 会话: {}, 客户: {}, 客户端设备异常告警 ########", binaryModel.uuid, sendCode); break; case PushType.environment:// insert into basedata_mont_evndata // json = downXml2Json.EnvironmentControlXml2Json(xml); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, xml, EnvironmentControl.class); logger.info("######## 会话: {}, 客户: {}, 客户端设备上报环境数据 ########", binaryModel.uuid, sendCode); break; case PushType.taskState:// insert into basedata_mont_taskstadata and patrol_task_status // json = downXml2Json.TaskStateControlXml2Json(xml); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, xml, TaskStateControl.class); logger.info("######## 会话: {}, 客户: {}, 客户端设备任务状态 ########", binaryModel.uuid, sendCode); break; case PushType.result:// insert into basedata_mont_taskresult and patrol_task_result_main // json = downXml2Json.TaskResultControlXml2Json(xml); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, xml, TaskResultControl.class); logger.info("######## 会话: {}, 客户: {}, 客户端巡视结果 ########", binaryModel.uuid, sendCode); break; default: // server-handle-接收到的type:{},不在处理范围内,不予处理 logger.info("######## 会话: {}, 客户: {}, 非法的消息不予处理, 类型{}, 消息体 ########\n{}", binaryModel.uuid, sendCode, type, xml); } if (type != SystemType.system && !StringUtil.isNullOrEmpty(json)) { if ((type == NestCtlType.courseReversal && command == 3)) { // 处理用SSCOM模拟的数据, 向无人机发送控制指令 logger.info("######## 会话: {}, 客户: {}, 向设备透传200001控制指令", binaryModel.uuid, sendCode); flushMsgToDeviceBroadcast(binaryModel.uuid, receiveCode, false, xml); } else { //rabbitmq推送到消息队列中基于springboot_xggd JSONObject jsonObject = JSONObject.parseObject(json); jsonObject.put("uuid", binaryModel.uuid); json = jsonObject.toJSONString(); // send to BasedataMontDataMqAcceptHandle rabbitTemplate.convertAndSend(Constant.EX_CHANGE_NAME, Constant.ROUTING_KEY_NAME, json); boolean isHost = json.contains(aliasHost); logger.info("######## 会话: {}, 客户: {}, isHost: {}", binaryModel.uuid, sendCode, isHost); sendResponseToDevice(binaryModel.uuid, receiveCode, sendCode, isHost); } } else { if ((type == NestCtlType.suddenStop && command == 7) || (type == NestCtlType.ptzPitch && command == 6) || (type == NestCtlType.picModelSet && command == 1) || (type == NestCtlType.nestSuddenStop && command == 2)) {// 处理用SSCOM模拟的数据, 向无人机发送控制指令 logger.info("######## 会话: {}, 客户: {}, 向设备透传200002~20005控制指令", binaryModel.uuid, sendCode); flushMsgToDeviceBroadcast(binaryModel.uuid, receiveCode, false, xml); } } } public void sendResponseToDevice(String uuid, String sendCode, String receiveCode, boolean isHost) { XStream xStream = new XStream(new Xpp3Driver(new NoNameCoder())); xStream.alias(isHost ? aliasHost : aliasDevice, ResponseControl.class); xStream.autodetectAnnotations(true); ResponseControl responseControl = new ResponseControl(); responseControl.SendCode = sendCode; responseControl.ReceiveCode = receiveCode; responseControl.Type = String.valueOf(SystemType.system); responseControl.Code = ResponseType.succeed; responseControl.Command = String.valueOf(SystemType.no_response); responseControl.Time = CommonUtils.GetNowDateString(); responseControl.Items = ""; String xml = xStream.toXML(responseControl); flushMsgToDevice(uuid, receiveCode, false, xml); } //处理注册应答 public void dealRegister(String uuid, String xml) { BaseControl obj = new BaseControl(); try { XStream xStream = getXmlStreamInstance(); xStream.alias(aliasHost, BaseControl.class); xStream.autodetectAnnotations(true); xStream.ignoreUnknownElements(); xStream.addPermission(AnyTypePermission.ANY); obj = (BaseControl) xStream.fromXML(xml); } catch (com.thoughtworks.xstream.XStreamException e) { logger.error("######## 注册异常堆栈[PatrolHost] 会话: {}, {} ########", uuid, e.getMessage()); try { XStream xStreamEx = getXmlStreamInstance(); xStreamEx.alias(aliasDevice, BaseControl.class); xStreamEx.autodetectAnnotations(true); xStreamEx.ignoreUnknownElements(); xStreamEx.addPermission(AnyTypePermission.ANY); obj = (BaseControl) xStreamEx.fromXML(xml); } catch (com.thoughtworks.xstream.XStreamException e2) { logger.error("######## 解析失败[PatrolDevice] {} ########", e2.getMessage()); } } RegisterResponseControl responseControl = new RegisterResponseControl(); responseControl.SendCode = obj.ReceiveCode; responseControl.ReceiveCode = obj.SendCode; responseControl.Type = String.valueOf(SystemType.system); responseControl.Command = String.valueOf(SystemType.has_response); responseControl.Time = CommonUtils.GetNowDateString(); // 调用基础服务鉴权设备 if (authDevice(obj.SendCode)) { //鉴权通过 responseControl.Code = ResponseType.succeed; responseControl.Items = new ArrayList<>(); RegisterResponseModel model = new RegisterResponseModel(); model.patroldevice_run_interval = ConfigType.patroldevice_run_interval; model.heart_beat_interval = ConfigType.heart_beat_interval; model.env_interval = ConfigType.env_interval; //model.weather_interval= ConfigType.weather_interval; // 当连接客户端为无人机机巢时,报文中增加机巢运行数据上报间隔 if (obj.SendCode.equals(deviceServerProperties.nestCode)) { model.nest_run_interval = ConfigType.nest_run_interval; } responseControl.Items.add(model); // 推送消息到mq JSONObject jsonObject = new JSONObject(); jsonObject.put("uuid", uuid); jsonObject.put("patroldevice_code", obj.SendCode); jsonObject.put("SendCode", obj.SendCode); jsonObject.put("Type", "heartbeat"); jsonObject.put("eventType", "connect"); jsonObject.put("HeartBeatInterval", ConfigType.heart_beat_interval); rabbitTemplate.convertAndSend(Constant.EX_CHANGE_NAME, Constant.ROUTING_KEY_NAME, jsonObject.toJSONString()); } else { // 鉴权不通过 responseControl.Code = ResponseType.fault; } sendRegisterResponse(uuid, responseControl, obj.SendCode); } private XStream getXmlStreamInstance() { return new XStream(new Xpp3Driver(new NoNameCoder())); } /** * 鉴权巡视设备 * * @param sendCode 巡视设备(机器人、无人机)唯一标识 * @return */ public boolean authDevice(String sendCode) { // // 调用基础服务的鉴权巡视设备接口 // String url = String.format(iipAuthDeviceUrl,sendCode); // // String resultString = restTemplate.getForObject(url,String.class); // // Result result = JSONObject.parseObject(resultString, Result.class); // // // 判断鉴权结果 // if(StringUtils.equals(result.getCode(),"200")){ // logger.info("设备鉴权成功,result:{}",result); // return true; // } // // logger.warn("设备鉴权失败 sendCode:{},resutl:{}",sendCode,result); // return false; return true; } public void sendRegisterResponse(String uuid, RegisterResponseControl responseControl, String sendCode) { String xml = ""; try { XStream xStream = new XStream(new Xpp3Driver(new NoNameCoder())); xStream.alias(aliasHost, RegisterResponseControl.class); xStream.autodetectAnnotations(true); xml = xStream.toXML(responseControl); } catch (com.thoughtworks.xstream.XStreamException e) { logger.error("######## DOWN REGISTER 解析失败, 异常堆栈 [PatrolHost]: {} ########", e.getMessage()); try { XStream xStream = new XStream(new Xpp3Driver(new NoNameCoder())); xStream.alias(aliasDevice, RegisterResponseControl.class); xStream.autodetectAnnotations(true); xml = xStream.toXML(responseControl); } catch (com.thoughtworks.xstream.XStreamException e2) { logger.error("################ DOWN REGISTER 解析失败[PatrolDevice] {} ################", e2.getMessage()); } } flushMsgToDevice(uuid, sendCode, false, xml); } public void sendHeartBeat(final String uuid, String xml) { //logger.info("################ 设备端机器人系统心跳消息 ################\n{}", xml); boolean isHost = true; BaseControl obj = new BaseControl(); try { XStream xStream = new XStream(new Xpp3Driver(new NoNameCoder())); xStream.alias(aliasHost, BaseControl.class); xStream.autodetectAnnotations(true); xStream.ignoreUnknownElements(); xStream.addPermission(AnyTypePermission.ANY); obj = (BaseControl) xStream.fromXML(xml); } catch (com.thoughtworks.xstream.XStreamException e) { logger.error("######## HEARTBEAT TO MQ 解析失败, 异常堆栈[PatrolHost]: {} ########", e.getMessage()); try { XStream xStream = new XStream(new Xpp3Driver(new NoNameCoder())); xStream.alias(aliasDevice, BaseControl.class); xStream.autodetectAnnotations(true); xStream.ignoreUnknownElements(); xStream.addPermission(AnyTypePermission.ANY); obj = (BaseControl) xStream.fromXML(xml); isHost = false; } catch (com.thoughtworks.xstream.XStreamException e2) { logger.error("######## HEARTBEAT TO MQ 解析失败[PatrolDevice] {} ########", e2.getMessage()); } } sendResponseToDevice(uuid, obj.ReceiveCode, obj.SendCode, isHost); // 推送消息到mq JSONObject jsonObject = new JSONObject(); jsonObject.put("uuid", uuid); jsonObject.put("SendCode", obj.SendCode); jsonObject.put("Type", "heartbeat"); jsonObject.put("eventType", "heart"); jsonObject.put("HeartBeatInterval", ConfigType.heart_beat_interval); rabbitTemplate.convertAndSend(Constant.EX_CHANGE_NAME, Constant.ROUTING_KEY_NAME, jsonObject.toJSONString()); } /** * 判断机器人是否在线 */ public boolean isOnline(String code) { return tcpClientMap.containsKey(code); } public void sendXmlMessage(JSONObject obj) { int type = obj.getInteger("Type"); String receiveCode = obj.getString("ReceiveCode"); String json = obj.toJSONString(); String xml = null; switch (type) { case RobotType.robotVl: case RobotType.robot: case RobotType.robotCar: case RobotType.robotFz: case RobotType.robotIr: case RobotType.robotPtz: // xml = upJson2Xml.RobotJson2Xml(json); xml = upJson2Xml.UpStreamJson2Xml(json, RobotControl.class); break; case UAVType.uav: case UAVType.uavXj: case UAVType.uavKz: case UAVType.uavYt: case UAVType.nest: // xml = upJson2Xml.UavControlJson2Xml(json); xml = upJson2Xml.UpStreamJson2Xml(json, RobotControl.class); break; case TaskType.taskControl: // xml = upJson2Xml.ResponseJson2Xml(json); xml = upJson2Xml.UpStreamJson2Xml(json, ResponseControl.class); break; case TaskType.taskSend: // xml = upJson2Xml.TaskSendJson2Xml(json); xml = upJson2Xml.UpStreamJson2Xml(json, TaskSendControl.class); break; case TaskType.taskArea: // xml = upJson2Xml.AreaJson2Xml(json); xml = upJson2Xml.UpStreamJson2Xml(json, AreaControl.class); break; case ModelType.modelSync: // xml = upJson2Xml.ResponseJson2Xml(json); xml = upJson2Xml.UpStreamJson2Xml(json, ResponseControl.class); break; case TaskType.lendonTask: // xml = upJson2Xml.LinkageTaskJson2Xml(json); xml = upJson2Xml.UpStreamJson2Xml(json, LinkageTaskControl.class); break; default: logger.warn("################ 向设备端下发命令, 类型:{}错误, 不予处理 ################", type); } if (!StringUtils.isEmpty(xml)) { //logger.info("################ 向设备端下发命令 ################\n{}", xml); flushMsgToDevice("", receiveCode, true, xml); } } }