package com.inspect.tcpserver.tcp; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.inspect.tcpserver.domain.Result; import com.inspect.tcpserver.domain.UpSystemServerProperties; import com.thoughtworks.xstream.XStream; import com.thoughtworks.xstream.io.naming.NoNameCoder; import com.thoughtworks.xstream.io.xml.Xpp3Driver; import com.thoughtworks.xstream.security.AnyTypePermission; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.CharsetUtil; import io.netty.util.internal.StringUtil; import org.apache.commons.lang3.StringUtils; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.Element; import org.dom4j.io.SAXReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; import javax.annotation.Resource; import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; import java.util.TimerTask; import java.util.concurrent.*; @Component public class NettyClient { private Logger logger = LoggerFactory.getLogger(NettyClient.class); // 客户端只需要一个 时间循环组 , 即 NioEventLoopGroup 线程池 private static final EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1); ; private String serverIP; private int serverPort; private long sendIndex = 0; //若重启系统后还要延续之前的序列号则需要把序列号存入redis中 private long receiveIndex = 0; private String sendCode; private String receiveCode; private UpJson2Xml up; private DownXml2Json down; private NettyClientHandler client; private ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(4); private ExecutorService executorService = new ThreadPoolExecutor(1, 5, 5L, TimeUnit.SECONDS, new ArrayBlockingQueue(4), Executors.defaultThreadFactory()); @Resource UpSystemServerProperties upSystemServerProperties; @Resource private RestTemplate restTemplate; @Resource private RedisTemplate redisTemplate; @Value("${iip_server.send.url}") String iipSendUrl; @Value("${up_time_interval_setting}") String upTimeIntervalSetting; /** * 接收/发送报文xml外层别名 */ private String alias = "PatrolHost"; /** * 设备层需要的编码,上报或下发的时候转 */ private String deviceAlias = "PatrolDevice"; public NettyClient() { up = new UpJson2Xml(alias); down = new DownXml2Json(alias); } //释放资源 public void Close() { if (eventLoopGroup != null) { eventLoopGroup.shutdownGracefully(); } scheduledExecutor.shutdown(); } //连接服务器 @Async public void ConnectServer() { this.serverIP = upSystemServerProperties.ip; this.serverPort = upSystemServerProperties.port; this.sendCode = upSystemServerProperties.iipCode; this.receiveCode = upSystemServerProperties.upCode; try { // 客户端启动对象 Bootstrap bootstrap = new Bootstrap(); // 设置相关参数 bootstrap.group(eventLoopGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.TCP_NODELAY, true); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { client = new NettyClientHandler(NettyClient.this); ch.pipeline().addLast(new MyDecoder()); ch.pipeline().addLast(client); } }); // 开始连接服务器, 并进行同步操作 // ChannelFuture 类分析 , Netty 异步模型 // sync 作用是该方法不会再次阻塞 ChannelFuture channelFuture = bootstrap.connect(serverIP, serverPort).addListener(new ConnectionListener(this)).sync(); logger.info("nettyClient连接服务器成功"); // 关闭通道, 开始监听 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } } //发送消息 public void SendMsg(boolean request, String xml) { if (client != null && !StringUtil.isNullOrEmpty(xml)) { ByteBuf byteBuf = Unpooled.copiedBuffer(xml, CharsetUtil.UTF_8); int length = byteBuf.readableBytes(); ByteBuf allBuf = Unpooled.buffer(length + ConfigType.dataLength); allBuf.writeByte(0xEB); allBuf.writeByte(0x90); allBuf.writeLongLE(sendIndex); allBuf.writeLongLE(receiveIndex); allBuf.writeByte(request ? 0x00 : 0x01); allBuf.writeIntLE(length); allBuf.writeBytes(byteBuf); allBuf.writeByte(0xEB); allBuf.writeByte(0x90); // 存入缓存 redisTemplate.opsForValue().set(String.valueOf(this.sendIndex), allBuf.toString(CharsetUtil.US_ASCII), 60L, TimeUnit.SECONDS); client.sendMsg(allBuf); this.sendIndex++; } else { logger.warn("与上级系统连接失败"); } } //重新发送 public void resetSendMsg(long sendIndex) { if (client != null) { // 获取缓存的中的值 String msg = redisTemplate.opsForValue().get(String.valueOf(sendIndex)); if (!StringUtil.isNullOrEmpty(msg)) { ByteBuf allBuf = Unpooled.copiedBuffer(msg, CharsetUtil.US_ASCII); client.sendMsg(allBuf); } } } //线程处理接收函数 public void ReceiveMsg(BinaryModel binaryModel) { executorService.execute(() -> { try { ThreadDealMsg(binaryModel); } catch (Exception e) { e.printStackTrace(); } }); } //处理接收消息 private void ThreadDealMsg(BinaryModel binaryModel) throws DocumentException { String xml = binaryModel.dataBuf.toString(CharsetUtil.UTF_8); logger.info("收到上级系统消息:{}", xml); this.receiveIndex = binaryModel.sendIndex; SAXReader saxReader = new SAXReader(); Document document = saxReader.read(new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8))); Element root = document.getRootElement(); int type = 0; if (null != root.element("Type") && !StringUtil.isNullOrEmpty(root.element("Type").getText())) { type = Integer.parseInt(root.element("Type").getText()); } int command = 0; if (null != root.element("Command") && !StringUtil.isNullOrEmpty(root.element("Command").getText())) { command = Integer.parseInt(root.element("Command").getText()); } if (type == SystemType.system) { if (command == SystemType.no_response || command == SystemType.has_response) { if (null != root.element("Code")) { if (root.element("Code").getText() == ResponseType.retry) { resetSendMsg(binaryModel.receiveIndex); } } } } // 发送给上级的响应 String response = ""; String json = null; switch (type) { case SystemType.system: switch (command) { case SystemType.has_response: dealRegister(xml); break; case SystemType.no_response: //心跳处理 break; } break; case RobotType.robotVl: case RobotType.robot: case RobotType.robotCar: case RobotType.robotFz: case RobotType.robotIr: case RobotType.robotPtz: json = down.RobotControlXml2Json(xml); break; case UAVType.uav: case UAVType.uavXj: case UAVType.uavKz: case UAVType.uavYt: case UAVType.nest: json = down.UavControlXml2Json(xml); break; case TaskType.taskControl: json = down.BaseControlXml2Json(xml); break; case TaskType.taskSend: json = down.TaskSendControlXml2Json(xml); break; case TaskType.lendonTask: json = down.LendonTaskControlXml2Json(xml); break; case TaskType.taskArea: json = down.AreaControlXml2Json(xml); break; case ModelType.modelSync: json = down.BaseControlXml2Json(xml); break; case QueryType.queryResult: json = down.ResultControlXml2Json(xml); break; default: logger.warn("client-handle-接收到的type:{},不在处理范围内,不予处理", type); } // 将上级下发的指令,转发到业务端处理,接收业务端处理后的结果,上报给上级系统 if (type != SystemType.system && !StringUtil.isNullOrEmpty(json)) { //调用业务端处理 ResponseEntity ajaxResultResponseEntity = restTemplate.postForEntity(iipSendUrl, json, com.inspect.tcpserver.domain.Result.class); HttpStatus statusCode = ajaxResultResponseEntity.getStatusCode(); if (statusCode.equals(HttpStatus.OK)) { // 调用业务端处理成功 Result body = ajaxResultResponseEntity.getBody(); if (null == body) { logger.error("接收上级系统下发的指令,转发到应用业务端处理后,返回的响应体为空"); return; } String bodyCode = body.getCode(); String msg = body.getMsg(); String data = body.getData(); logger.info("接收到上级系统下发指令,转发到巡视主机,成功,返回code:{},msg:{},data:{}", bodyCode, msg, data); // 响应巡视主机 JSONObject item = JSONObject.parseObject(data); response = createDownResponse(item); } else { // 调用业务端处理失败 logger.warn("下发指令,失败,httpCode:{}", statusCode); response = createDownFailResponse(); } // 将xml消息转为json格式字符串 String msg = up.ModelJson2Xml(response); // 上报上级系统,会话类型为响应 SendMsg(false, msg); } } //处理注册应答 public void dealRegister(String xml) { XStream xStream = new XStream(new Xpp3Driver(new NoNameCoder())); xStream.alias(alias, RegisterResponseControl.class); xStream.autodetectAnnotations(true); xStream.ignoreUnknownElements(); xStream.addPermission(AnyTypePermission.ANY); RegisterResponseControl obj = (RegisterResponseControl) xStream.fromXML(xml); TimerSendControl(obj); logger.info("客户端 接收到服务端注册回馈,服务注册完成"); } //处理心跳 public void TimerSendControl(RegisterResponseControl response) { try { if (response.Code.equals(ResponseType.succeed)) { int heart = Integer.parseInt(response.Items.get(0).heart_beat_interval); int patroldevice = Integer.parseInt(response.Items.get(0).patroldevice_run_interval); int nest = Integer.parseInt(response.Items.get(0).nest_run_interval); int weather = Integer.parseInt(response.Items.get(0).weather_interval); SendHeart(); // 定时心跳报活 scheduledExecutor.scheduleWithFixedDelay(new TimerTask() { @Override public void run() { SendHeart(); } }, 0, heart, TimeUnit.SECONDS); // 上级系统返回的定时信息存入redis cacheTimeInterval(heart, patroldevice, nest, weather); } } catch (Exception e) { e.printStackTrace(); } } /** * 缓存上级系统返回的定时任务间隔 * * @param heart * @param patroldevice * @param nest * @param weather */ private void cacheTimeInterval(int heart, int patroldevice, int nest, int weather) { JSONObject json = new JSONObject(); json.put("heart", heart); json.put("patroldevice", patroldevice); json.put("nest", nest); json.put("weather", weather); redisTemplate.opsForValue().set(upTimeIntervalSetting, json.toJSONString()); } private String createRegHeart(boolean isheart) { ResponseControl obj = new ResponseControl(); XStream xStream = new XStream(new Xpp3Driver(new NoNameCoder())); xStream.autodetectAnnotations(true); xStream.alias(alias, ResponseControl.class); obj.SendCode = sendCode; obj.ReceiveCode = receiveCode; obj.Type = String.valueOf(SystemType.system); obj.Code = ""; obj.Command = String.valueOf(isheart ? SystemType.heart_request : SystemType.register_request); obj.Time = CommonUtils.GetNowDateString(); obj.Items = ""; String resultXML = xStream.toXML(obj); return resultXML; } /** * 创建下发失败指令返回 * * @param * @return */ private String createDownFailResponse() { JSONObject object = new JSONObject(); object.put("SendCode", sendCode); object.put("ReceiveCode", receiveCode); object.put("Type", SystemType.system); object.put("Code", ResponseType.fault); object.put("Command", SystemType.no_response); JSONArray jsonArray = new JSONArray(); object.put("Items", jsonArray); object.put("Time", CommonUtils.GetNowDateString()); return object.toString(); } /** * 创建下发成功指令返回 * * @param * @return */ private String createDownResponse(JSONObject item) { JSONObject object = new JSONObject(); object.put("SendCode", sendCode); object.put("ReceiveCode", receiveCode); object.put("Type", SystemType.system); object.put("Code", ResponseType.succeed); if (null == item) { object.put("Command", SystemType.no_response); JSONArray jsonArray = new JSONArray(); object.put("Items", jsonArray); } else { object.put("Command", SystemType.has_response); JSONArray jsonArray = new JSONArray(); jsonArray.add(item); object.put("Items", jsonArray); } object.put("Time", CommonUtils.GetNowDateString()); return object.toString(); } public void SendRegister() { String xml = createRegHeart(false); SendMsg(true, xml); } public void SendHeart() { String xml = createRegHeart(true); SendMsg(true, xml); } /** * 处理身份 * 处理sendcode 为本机 * receiveCode 为无人机或机器人处理系统 * * @return */ public JSONObject handleIdentity(JSONObject obj) { // 从服务端发出的请求,sendcode 应为服务端 obj.put("SendCode", sendCode); obj.put("ReceiveCode", receiveCode); return obj; } /** * 发送消息 * * @param json */ public void sendJsonMessage(String json) { JSONObject obj = JSONObject.parseObject(json); if (obj != null) { // 处理身份 obj = handleIdentity(obj); json = obj.toJSONString(); int type = Integer.parseInt(obj.get("Type").toString()); String xml = null; switch (type) { case PushType.environment: xml = up.EnvironmentControlJson2Xml(json); logger.info("向上级系统发送环境数据。{}", xml); break; case PushType.alarm: xml = up.AlarmControlJson2Xml(json); logger.info("向上级系统发送巡视设备异常告警数据。{}", xml); break; case PushType.analysisAlarm: xml = up.AnalysisControlJson2Xml(json); logger.info("向上级系统发送告警数据。{}", xml); break; case PushType.location: xml = up.LocationControlJson2Xml(json); logger.info("向上级系统发送巡视设备坐标。{}", xml); break; case PushType.monitor: xml = up.MonitorControlJson2Xml(json); logger.info("向上级系统发送静默监视告警数据。{}", xml); break; case PushType.nestRunning: xml = up.NestRuningJson2Xml(json); logger.info("向上级系统发送无人机机巢运行数据。{}", xml); break; case PushType.nestState: xml = up.NestStateJson2Xml(json); logger.info("向上级系统发送无人机机巢状态数据。{}", xml); break; case PushType.patrolDeviceState: xml = up.PatrolDeviceStateControlJson2Xml(json); logger.info("向上级系统发送巡视设备状态数据。{}", xml); break; case PushType.patrolDeviceRunning: xml = up.PatrolDeviceRuningControlJson2Xml(json); logger.info("向上级系统发送巡视设备运行数据。{}", xml); break; case PushType.result: xml = up.TaskResultControlJson2Xml(json); logger.info("向上级系统发送巡视结果。{}", xml); break; case PushType.taskState: xml = up.TaskStateControlJson2Xml(json); logger.info("向上级系统发送任务状态数据。{}", xml); break; case PushType.total: xml = up.ReportControlJson2Xml(json); logger.info("向上级系统发送巡视设备统计信息上报。{}", xml); break; case PushType.route: xml = up.RouteControlJson2Xml(json); logger.info("向上级系统发送巡视路线。{}", xml); break; case SystemType.system: xml = up.ModelJson2Xml(json); logger.info("向上级系统发送系统数据。{}", xml); break; case ModelType.modelUpdate: xml = up.UpdateModelJson2Xml(json); //xml = up.ModelJson2Xml(json, UpdateModelControl.class); logger.info("向上级系统发送模型更新上报指令。{}", xml); break; default: logger.warn("应用向上级系统发送消息,type:{}不在处理范围内,不予处理", type); } if (!StringUtils.isEmpty(xml)) { // 将设备别名转换为上级别名 xml = xml.replaceAll(deviceAlias, alias); SendMsg(true, xml); } } } }