package com.inspect.tcpserver.tcp; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.inspect.tcpserver.domain.Result; import com.inspect.tcpserver.domain.UpSystemServerProperties; import com.thoughtworks.xstream.XStream; import com.thoughtworks.xstream.io.naming.NoNameCoder; import com.thoughtworks.xstream.io.xml.Xpp3Driver; import com.thoughtworks.xstream.security.AnyTypePermission; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.CharsetUtil; import io.netty.util.internal.StringUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.Element; import org.dom4j.io.SAXReader; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; import javax.annotation.Resource; import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; import java.util.Objects; import java.util.TimerTask; import java.util.concurrent.*; @Slf4j @Component public class NettyClient { // 客户端只需要一个 时间循环组 , 即 NioEventLoopGroup 线程池 private static final EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); private String serverIP; private int serverPort; private long sendIndex = 0; //若重启系统后还要延续之前的序列号则需要把序列号存入redis中 private long receiveIndex = 0; private String sendCode; private String receiveCode; private UpJson2Xml upJson2Xml; private DownXml2Json downXml2Json; private NettyClientHandler client; private ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(4); // private ExecutorService executorService = new ThreadPoolExecutor(20, 50, 100, TimeUnit.SECONDS, new LinkedBlockingQueue(400)); @Resource UpSystemServerProperties upSystemServerProperties; @Resource private RestTemplate restTemplate; @Resource(name = "stringRedisTemplate") private RedisTemplate redisTemplate; @Value("${iip_server.send.url}") String iipSendUrl; @Value("${up_time_interval_setting}") String upTimeIntervalSetting; @Value("${print_recv_data:1}") Integer printRecvData; /** * 接收/发送报文xml外层别名 */ private String alias = "PatrolHost"; /** * 设备层需要的编码,上报或下发的时候转 */ private String deviceAlias = "PatrolDevice"; public NettyClient() { upJson2Xml = new UpJson2Xml(alias); downXml2Json = new DownXml2Json(alias); } //释放资源 public void Close() { eventLoopGroup.shutdownGracefully(); scheduledExecutor.shutdown(); } //连接服务器 @Async public void connectUpperSystemServer() { this.serverIP = upSystemServerProperties.ip; this.serverPort = upSystemServerProperties.port; this.sendCode = upSystemServerProperties.iipCode; this.receiveCode = upSystemServerProperties.upCode; log.info("upperServer config serverIP:{}, serverPort:{}, sendCode:{}, receiveCode:{}", this.serverIP, this.serverPort, this.sendCode, this.receiveCode); try { // 客户端启动对象 Bootstrap bootstrap = new Bootstrap(); // 设置相关参数 bootstrap.group(eventLoopGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.TCP_NODELAY, true); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { client = new NettyClientHandler(NettyClient.this); ch.pipeline().addLast(new MyDecoder(printRecvData)); ch.pipeline().addLast(client); } }); // 开始连接服务器, 并进行同步操作 // ChannelFuture 类分析 , Netty 异步模型 // sync 作用是该方法不会再次阻塞 ChannelFuture channelFuture = bootstrap.connect(serverIP, serverPort).addListener(new ConnectionListener(this)).sync(); log.info("connected to upperServer"); // 关闭通道, 开始监听 channelFuture.channel().closeFuture().sync(); } catch(Exception e) { log.error("error", e); } } //发送消息 public void sendMsgToUpper(boolean request, String uuid, String xml) { log.info("###### 会话:{}, 向上级系统发送消息:{} ######\n{}", uuid, xml); if (client != null) { ByteBuf byteBuf = Unpooled.copiedBuffer(xml, CharsetUtil.UTF_8); int length = byteBuf.readableBytes(); ByteBuf allBuf = Unpooled.buffer(length + ConfigType.dataLength); allBuf.writeByte(0xEB); allBuf.writeByte(0x90); allBuf.writeLongLE(sendIndex); allBuf.writeLongLE(receiveIndex); allBuf.writeByte(request ? 0x00 : 0x01); allBuf.writeIntLE(length); allBuf.writeBytes(byteBuf); allBuf.writeByte(0xEB); allBuf.writeByte(0x90); // 存入缓存 redisTemplate.opsForValue().set(String.valueOf(this.sendIndex), allBuf.toString(CharsetUtil.US_ASCII), 60L, TimeUnit.SECONDS); client.sendProtoBuffer(allBuf); this.sendIndex++; } else { log.info("###### 会话:{}, 与上级系统连接失败 ######", uuid); } } //重新发送 public void resetSendMsg(long sendIndex) { if (client != null) { // 获取缓存的中的值 String msg = redisTemplate.opsForValue().get(String.valueOf(sendIndex)); if (!StringUtil.isNullOrEmpty(msg)) { ByteBuf allBuf = Unpooled.copiedBuffer(msg, CharsetUtil.US_ASCII); client.sendProtoBuffer(allBuf); } } } //线程处理接收函数 public void receiveUpstreamData(BinaryModel binaryModel) { // executorService.execute(() -> // { try { parseUpstreamData(binaryModel); } catch (Exception e) { log.error("error", e); } // }); } //处理接收消息 private void parseUpstreamData(BinaryModel binaryModel) throws DocumentException { String xml = binaryModel.dataBuf.toString(CharsetUtil.UTF_8); this.receiveIndex = binaryModel.sendIndex; SAXReader saxReader = new SAXReader(); Document document = saxReader.read(new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8))); Element root = document.getRootElement(); int type = 0; if (null != root.element("Type") && !StringUtil.isNullOrEmpty(root.element("Type").getText())) { type = Integer.parseInt(root.element("Type").getText()); } int command = 0; if (null != root.element("Command") && !StringUtil.isNullOrEmpty(root.element("Command").getText())) { command = Integer.parseInt(root.element("Command").getText()); } if (type == SystemType.system) { if (command == SystemType.no_response || command == SystemType.has_response) { if (null != root.element("Code")) { if (Objects.equals(root.element("Code").getText(), ResponseType.retry)) { resetSendMsg(binaryModel.receiveIndex); } } } } // 发送给上级的响应 String response = ""; String json = null; log.info("###### 解析上级系统消息, 通道: {}, 类型:{} ######\n{}", binaryModel.id, type, xml); switch (type) { case SystemType.system: switch (command) { case SystemType.has_response: dealRegister(xml); break; case SystemType.no_response: //心跳处理 break; } break; case RobotType.robotVl: case RobotType.robot: case RobotType.robotCar: case RobotType.robotFz: case RobotType.robotIr: case RobotType.robotPtz: json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, xml, RobotControl.class); break; case UAVType.uav: case UAVType.uavXj: case UAVType.uavKz: case UAVType.uavYt: case UAVType.nest: json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, xml, RobotControl.class); break; case TaskType.taskControl: json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, xml, BaseControl.class); break; case TaskType.taskSend: json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, xml, TaskSendControl.class); break; case TaskType.lendonTask: json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, xml, LinkageTaskControl.class); break; case TaskType.taskArea: json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, xml, AreaControl.class); break; case ModelType.modelSync: json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, xml, BaseControl.class); break; case QueryType.queryResult: json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, xml, ResultControl.class); break; default: log.info("ClienHandler接收到的type:{}不在处理范围内, 不予处理", type); } log.info("###### 消息处理返回json: {} ######", json); // 将上级下发的指令,转发到业务端处理,接收业务端处理后的结果,上报给上级系统 if (type != SystemType.system && !StringUtil.isNullOrEmpty(json)) { log.info("###### 调用业务端处理json: {} ######", json); //调用业务端处理 ResponseEntity ajaxResultResponseEntity = restTemplate.postForEntity(iipSendUrl, json, com.inspect.tcpserver.domain.Result.class); HttpStatus statusCode = ajaxResultResponseEntity.getStatusCode(); if (statusCode.equals(HttpStatus.OK)) { // 调用业务端处理成功 Result body = ajaxResultResponseEntity.getBody(); if (null == body) { log.info("接收上级系统下发的指令,转发到应用业务端处理后,返回的响应体为空"); return; } String bodyCode = body.getCode(); String msg = body.getMsg(); String data = body.getData(); if (StringUtils.isBlank(data)) { log.info("接收上级系统下发的指令,转发到应用业务端处理后,返回的内容为空"); return; } log.info("接收到上级系统下发指令,转发到巡视主机,成功,返回code:{},msg:{},data:{}", bodyCode, msg, data); // 响应巡视主机 JSONObject item = JSONObject.parseObject(data); response = createDownResponse(item); } else { // 调用业务端处理失败 log.info("下发指令,失败,httpCode:{}", statusCode); response = createDownFailResponse(); } // 将xml消息转为json格式字符串 //String msg = upJson2Xml.ModelJson2Xml(response); String msg = upJson2Xml.UpStreamJson2Xml(response, ModelControl.class); log.info("###### 消息响应: {} ######", msg); // 上报上级系统,会话类型为响应 sendMsgToUpper(false, binaryModel.uuid, msg); } } private XStream getXmlStreamInstance() { return new XStream(new Xpp3Driver(new NoNameCoder())); } //处理注册应答 public void dealRegister(String xml) { RegisterResponseControl obj = new RegisterResponseControl(); try { XStream xStream = getXmlStreamInstance(); xStream.alias(alias, RegisterResponseControl.class); xStream.autodetectAnnotations(true); xStream.ignoreUnknownElements(); xStream.addPermission(AnyTypePermission.ANY); obj = (RegisterResponseControl) xStream.fromXML(xml); } catch (com.thoughtworks.xstream.XStreamException e) { try { XStream xStream = getXmlStreamInstance(); xStream.alias(deviceAlias, RegisterResponseControl.class); xStream.autodetectAnnotations(true); xStream.ignoreUnknownElements(); xStream.addPermission(AnyTypePermission.ANY); obj = (RegisterResponseControl) xStream.fromXML(xml); } catch (com.thoughtworks.xstream.XStreamException e2) { log.error("###### dealRegister解析失败:{} ######", e2.getMessage()); } } timerSendControl(obj); log.info("###### 客户端接收到服务端注册回馈, 服务注册完成 ######"); } //处理心跳 public void timerSendControl(RegisterResponseControl response) { try { if (response.Code.equals(ResponseType.succeed)) { int heart = StringUtils.isEmpty(response.Items.get(0).heart_beat_interval) ? 0 : Integer.parseInt(response.Items.get(0).heart_beat_interval); int patrolDevice = StringUtils.isEmpty(response.Items.get(0).patroldevice_run_interval) ? 0 : Integer.parseInt(response.Items.get(0).patroldevice_run_interval); int nest = StringUtils.isEmpty(response.Items.get(0).nest_run_interval) ? 0 : Integer.parseInt(response.Items.get(0).nest_run_interval); int weather = StringUtils.isEmpty(response.Items.get(0).weather_interval) ? 0 : Integer.parseInt(response.Items.get(0).weather_interval); sendHeartbeatToUpper(); // 定时心跳报活 scheduledExecutor.scheduleWithFixedDelay(new TimerTask() { @Override public void run() { sendHeartbeatToUpper(); } }, 0, heart, TimeUnit.SECONDS); // 上级系统返回的定时信息存入redis cacheTimeInterval(heart, patrolDevice, nest, weather); } } catch (Exception e) { log.error("error", e); } } /** * 缓存上级系统返回的定时任务间隔 * * @param heart * @param patrolDevice * @param nest * @param weather */ private void cacheTimeInterval(int heart, int patrolDevice, int nest, int weather) { JSONObject json = new JSONObject(); json.put("heart", heart); json.put("patroldevice", patrolDevice); json.put("nest", nest); json.put("weather", weather); redisTemplate.opsForValue().set(upTimeIntervalSetting, json.toJSONString()); } private String createRegHeart(boolean isHeart) { ResponseControl obj = new ResponseControl(); XStream xStream = new XStream(new Xpp3Driver(new NoNameCoder())); xStream.autodetectAnnotations(true); xStream.alias(alias, ResponseControl.class); obj.SendCode = sendCode; obj.ReceiveCode = receiveCode; obj.Type = String.valueOf(SystemType.system); obj.Code = ""; obj.Command = String.valueOf(isHeart ? SystemType.heart_request : SystemType.register_request); obj.Time = CommonUtils.GetNowDateString(); obj.Items = ""; String resultXML = xStream.toXML(obj); return resultXML; } /** * 创建下发失败指令返回 * * @param * @return */ private String createDownFailResponse() { JSONObject object = new JSONObject(); object.put("SendCode", sendCode); object.put("ReceiveCode", receiveCode); object.put("Type", SystemType.system); object.put("Code", ResponseType.fault); object.put("Command", SystemType.no_response); JSONArray jsonArray = new JSONArray(); object.put("Items", jsonArray); object.put("Time", CommonUtils.GetNowDateString()); return object.toString(); } /** * 创建下发成功指令返回 * * @param * @return */ private String createDownResponse(JSONObject item) { JSONObject object = new JSONObject(); object.put("SendCode", sendCode); object.put("ReceiveCode", receiveCode); object.put("Type", SystemType.system); object.put("Code", ResponseType.succeed); if (null == item) { object.put("Command", SystemType.no_response); JSONArray jsonArray = new JSONArray(); object.put("Items", jsonArray); } else { object.put("Command", SystemType.has_response); JSONArray jsonArray = new JSONArray(); jsonArray.add(item); object.put("Items", jsonArray); } object.put("Time", CommonUtils.GetNowDateString()); return object.toString(); } public void sendRegisterToUpper() { String xml = createRegHeart(false); log.info("###### 向上级系统发送注册消息 ######"); sendMsgToUpper(true, "", xml); } public void sendHeartbeatToUpper() { String xml = createRegHeart(true); log.info("###### 向上级系统发送心跳消息 ######"); sendMsgToUpper(true, "", xml); } /** * 处理身份 * 处理sendcode 为本机 * receiveCode 为无人机或机器人处理系统 * * @return */ public JSONObject handleIdentity(JSONObject obj) { // 从服务端发出的请求,sendCode 应为服务端 obj.put("SendCode", sendCode); obj.put("ReceiveCode", receiveCode); return obj; } /** * 发送消息 * * @param json */ public void sendJsonMessage(String uuid, String json) { JSONObject obj = JSONObject.parseObject(json); if (obj != null) { if(obj.get("robot_file_path") != null || obj.get("video_file_path") != null || obj.get("device_file_path") != null || obj.get("drone_file_path") != null || obj.get("task_file_path") != null) { String response = createDownResponse(obj); String xml = upJson2Xml.UpStreamJson2Xml(response, ModelControl.class); xml = xml.replaceAll(deviceAlias, alias); log.info("###### 会话:{}, 向上级系统发送模型上报指令 ######", uuid); sendMsgToUpper(true, uuid, xml); return; } // 处理身份 obj = handleIdentity(obj); json = obj.toJSONString(); int type = Integer.parseInt(obj.get("Type").toString()); String xml = null; switch (type) { case PushType.environment: xml = upJson2Xml.UpStreamJson2Xml(json, EnvironmentControl.class); log.info("###### 会话:{}, 向上级系统发送环境数据 ######", uuid); break; case PushType.alarm: xml = upJson2Xml.UpStreamJson2Xml(json, AlarmControl.class); log.info("###### 会话:{}, 向上级系统发送巡视设备异常告警数据 ######", uuid); break; case PushType.analysisAlarm: xml = upJson2Xml.UpStreamJson2Xml(json, AnalysisControl.class); log.info("###### 会话:{}, 向上级系统发送告警数据 ######", uuid); break; case PushType.location: xml = upJson2Xml.UpStreamJson2Xml(json, LocationControl.class); log.info("###### 会话:{}, 向上级系统发送巡视设备坐标 ######", uuid); break; case PushType.monitor: xml = upJson2Xml.UpStreamJson2Xml(json, MonitorControl.class); log.info("###### 会话:{}, 向上级系统发送静默监视告警数据 ######", uuid); break; case PushType.nestRunning: xml = upJson2Xml.UpStreamJson2Xml(json, NestRunningControl.class); log.info("###### 会话:{}, 向上级系统发送无人机机巢运行数据 ######", uuid); break; case PushType.nestState: xml = upJson2Xml.UpStreamJson2Xml(json, NestStateControl.class); log.info("###### 会话:{}, 向上级系统发送无人机机巢状态数据 ######", uuid); break; case PushType.patrolDeviceState: xml = upJson2Xml.UpStreamJson2Xml(json, PatrolDeviceStateControl.class); log.info("###### 会话:{}, 向上级系统发送巡视设备状态数据 ######", uuid); break; case PushType.patrolDeviceRunning: xml = upJson2Xml.UpStreamJson2Xml(json, PatrolDeviceRunningControl.class); log.info("###### 会话:{}, 向上级系统发送巡视设备运行数据 ######", uuid); break; case PushType.result: xml = upJson2Xml.UpStreamJson2Xml(json, TaskResultControl.class); log.info("###### 会话:{}, 向上级系统发送巡视结果 ######", uuid); break; case PushType.taskState: xml = upJson2Xml.UpStreamJson2Xml(json, TaskStateControl.class); log.info("###### 会话:{}, 向上级系统发送任务状态数据 ######", uuid); break; case PushType.total: xml = upJson2Xml.UpStreamJson2Xml(json, ReportControl.class); log.info("###### 会话:{}, 向上级系统发送巡视设备统计信息上报 ######", uuid); break; case PushType.route: xml = upJson2Xml.UpStreamJson2Xml(json, RouteControl.class); log.info("###### 会话:{}, 向上级系统发送巡视路线 ######", uuid); break; case SystemType.system: xml = upJson2Xml.UpStreamJson2Xml(json, ModelControl.class); log.info("###### 会话:{}, 向上级系统发送系统数据 ######", uuid); break; case ModelType.modelUpdate: xml = upJson2Xml.UpStreamJson2Xml(json, UpdateModelControl.class); log.info("###### 会话:{}, 向上级系统发送模型更新上报指令 ######", uuid); break; default: log.info("###### 会话:{}, 应用向上级系统发送消息, 类型: [{}] 不在处理范围内, 不予处理 ######", uuid, type); } if (!StringUtils.isEmpty(xml)) { // 将设备别名转换为上级别名 xml = xml.replaceAll(deviceAlias, alias); sendMsgToUpper(true, uuid, xml); } } } }