|
|
|
@ -17,13 +17,12 @@ import io.netty.channel.socket.SocketChannel; |
|
|
|
import io.netty.channel.socket.nio.NioSocketChannel; |
|
|
|
import io.netty.util.CharsetUtil; |
|
|
|
import io.netty.util.internal.StringUtil; |
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.apache.commons.lang3.StringUtils; |
|
|
|
import org.dom4j.Document; |
|
|
|
import org.dom4j.DocumentException; |
|
|
|
import org.dom4j.Element; |
|
|
|
import org.dom4j.io.SAXReader; |
|
|
|
import org.slf4j.Logger; |
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
import org.springframework.beans.factory.annotation.Value; |
|
|
|
import org.springframework.data.redis.core.RedisTemplate; |
|
|
|
import org.springframework.http.HttpStatus; |
|
|
|
@ -35,15 +34,14 @@ import org.springframework.web.client.RestTemplate; |
|
|
|
import javax.annotation.Resource; |
|
|
|
import java.io.ByteArrayInputStream; |
|
|
|
import java.nio.charset.StandardCharsets; |
|
|
|
import java.util.Objects; |
|
|
|
import java.util.TimerTask; |
|
|
|
import java.util.concurrent.*; |
|
|
|
|
|
|
|
|
|
|
|
@Slf4j |
|
|
|
@Component |
|
|
|
public class NettyClient { |
|
|
|
|
|
|
|
private Logger logger = LoggerFactory.getLogger(NettyClient.class); |
|
|
|
|
|
|
|
// 客户端只需要一个 时间循环组 , 即 NioEventLoopGroup 线程池 |
|
|
|
private static final EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); |
|
|
|
|
|
|
|
@ -74,7 +72,7 @@ public class NettyClient { |
|
|
|
@Value("${up_time_interval_setting}") |
|
|
|
String upTimeIntervalSetting; |
|
|
|
|
|
|
|
@Value("${print_recv_data:0}") |
|
|
|
@Value("${print_recv_data:1}") |
|
|
|
Integer printRecvData; |
|
|
|
|
|
|
|
/** |
|
|
|
@ -93,15 +91,13 @@ public class NettyClient { |
|
|
|
|
|
|
|
//释放资源 |
|
|
|
public void Close() { |
|
|
|
if (eventLoopGroup != null) { |
|
|
|
eventLoopGroup.shutdownGracefully(); |
|
|
|
} |
|
|
|
eventLoopGroup.shutdownGracefully(); |
|
|
|
scheduledExecutor.shutdown(); |
|
|
|
} |
|
|
|
|
|
|
|
//连接服务器 |
|
|
|
@Async |
|
|
|
public void ConnectServer() { |
|
|
|
public void connectUpperSystemServer() { |
|
|
|
this.serverIP = upSystemServerProperties.ip; |
|
|
|
this.serverPort = upSystemServerProperties.port; |
|
|
|
this.sendCode = upSystemServerProperties.iipCode; |
|
|
|
@ -129,7 +125,7 @@ public class NettyClient { |
|
|
|
// ChannelFuture 类分析 , Netty 异步模型 |
|
|
|
// sync 作用是该方法不会再次阻塞 |
|
|
|
ChannelFuture channelFuture = bootstrap.connect(serverIP, serverPort).addListener(new ConnectionListener(this)).sync(); |
|
|
|
logger.info("nettyClient连接服务器成功"); |
|
|
|
log.info("nettyClient连接服务器成功"); |
|
|
|
// 关闭通道, 开始监听 |
|
|
|
channelFuture.channel().closeFuture().sync(); |
|
|
|
} catch (InterruptedException e) { |
|
|
|
@ -156,10 +152,10 @@ public class NettyClient { |
|
|
|
// 存入缓存 |
|
|
|
redisTemplate.opsForValue().set(String.valueOf(this.sendIndex), allBuf.toString(CharsetUtil.US_ASCII), 60L, TimeUnit.SECONDS); |
|
|
|
|
|
|
|
client.sendMsg(allBuf); |
|
|
|
client.sendProtoBuffer(allBuf); |
|
|
|
this.sendIndex++; |
|
|
|
} else { |
|
|
|
logger.warn("###### 会话:{}, 与上级系统连接失败 ######", uuid); |
|
|
|
log.warn("###### 会话:{}, 与上级系统连接失败 ######", uuid); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -171,25 +167,25 @@ public class NettyClient { |
|
|
|
|
|
|
|
if (!StringUtil.isNullOrEmpty(msg)) { |
|
|
|
ByteBuf allBuf = Unpooled.copiedBuffer(msg, CharsetUtil.US_ASCII); |
|
|
|
client.sendMsg(allBuf); |
|
|
|
client.sendProtoBuffer(allBuf); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
//线程处理接收函数 |
|
|
|
public void ReceiveMsg(BinaryModel binaryModel) { |
|
|
|
public void receiveUpstreamData(BinaryModel binaryModel) { |
|
|
|
// executorService.execute(() -> |
|
|
|
// { |
|
|
|
try { |
|
|
|
threadDealMsg(binaryModel); |
|
|
|
parseUpstreamData(binaryModel); |
|
|
|
} catch (Exception e) { |
|
|
|
logger.error("error", e); |
|
|
|
log.error("error", e); |
|
|
|
} |
|
|
|
// }); |
|
|
|
} |
|
|
|
|
|
|
|
//处理接收消息 |
|
|
|
private void threadDealMsg(BinaryModel binaryModel) throws DocumentException { |
|
|
|
private void parseUpstreamData(BinaryModel binaryModel) throws DocumentException { |
|
|
|
String xml = binaryModel.dataBuf.toString(CharsetUtil.UTF_8); |
|
|
|
this.receiveIndex = binaryModel.sendIndex; |
|
|
|
SAXReader saxReader = new SAXReader(); |
|
|
|
@ -206,7 +202,7 @@ public class NettyClient { |
|
|
|
if (type == SystemType.system) { |
|
|
|
if (command == SystemType.no_response || command == SystemType.has_response) { |
|
|
|
if (null != root.element("Code")) { |
|
|
|
if (root.element("Code").getText() == ResponseType.retry) { |
|
|
|
if (Objects.equals(root.element("Code").getText(), ResponseType.retry)) { |
|
|
|
resetSendMsg(binaryModel.receiveIndex); |
|
|
|
} |
|
|
|
} |
|
|
|
@ -216,7 +212,7 @@ public class NettyClient { |
|
|
|
String response = ""; |
|
|
|
String json = null; |
|
|
|
|
|
|
|
logger.info("###### 收到上级系统消息:{}, 类型:{} ######\n{}", binaryModel.id, type, xml); |
|
|
|
log.info("###### 解析上级系统消息, 通道: {}, 类型:{} ######\n{}", binaryModel.id, type, xml); |
|
|
|
switch (type) { |
|
|
|
case SystemType.system: |
|
|
|
switch (command) { |
|
|
|
@ -270,7 +266,7 @@ public class NettyClient { |
|
|
|
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, xml, ResultControl.class); |
|
|
|
break; |
|
|
|
default: |
|
|
|
logger.warn("ClienHandler接收到的type:{}不在处理范围内, 不予处理", type); |
|
|
|
log.warn("ClienHandler接收到的type:{}不在处理范围内, 不予处理", type); |
|
|
|
} |
|
|
|
|
|
|
|
// 将上级下发的指令,转发到业务端处理,接收业务端处理后的结果,上报给上级系统 |
|
|
|
@ -285,7 +281,7 @@ public class NettyClient { |
|
|
|
Result body = ajaxResultResponseEntity.getBody(); |
|
|
|
|
|
|
|
if (null == body) { |
|
|
|
logger.error("接收上级系统下发的指令,转发到应用业务端处理后,返回的响应体为空"); |
|
|
|
log.error("接收上级系统下发的指令,转发到应用业务端处理后,返回的响应体为空"); |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
@ -293,7 +289,7 @@ public class NettyClient { |
|
|
|
String msg = body.getMsg(); |
|
|
|
String data = body.getData(); |
|
|
|
|
|
|
|
logger.info("接收到上级系统下发指令,转发到巡视主机,成功,返回code:{},msg:{},data:{}", bodyCode, msg, data); |
|
|
|
log.info("接收到上级系统下发指令,转发到巡视主机,成功,返回code:{},msg:{},data:{}", bodyCode, msg, data); |
|
|
|
|
|
|
|
// 响应巡视主机 |
|
|
|
JSONObject item = JSONObject.parseObject(data); |
|
|
|
@ -301,7 +297,7 @@ public class NettyClient { |
|
|
|
|
|
|
|
} else { |
|
|
|
// 调用业务端处理失败 |
|
|
|
logger.warn("下发指令,失败,httpCode:{}", statusCode); |
|
|
|
log.warn("下发指令,失败,httpCode:{}", statusCode); |
|
|
|
response = createDownFailResponse(); |
|
|
|
|
|
|
|
} |
|
|
|
@ -338,34 +334,38 @@ public class NettyClient { |
|
|
|
xStream.addPermission(AnyTypePermission.ANY); |
|
|
|
obj = (RegisterResponseControl) xStream.fromXML(xml); |
|
|
|
} catch (com.thoughtworks.xstream.XStreamException e2) { |
|
|
|
logger.error("###### dealRegister解析失败:{} ######", e2.getMessage()); |
|
|
|
log.error("###### dealRegister解析失败:{} ######", e2.getMessage()); |
|
|
|
} |
|
|
|
} |
|
|
|
TimerSendControl(obj); |
|
|
|
timerSendControl(obj); |
|
|
|
|
|
|
|
logger.info("###### 客户端接收到服务端注册回馈, 服务注册完成 ######"); |
|
|
|
log.info("###### 客户端接收到服务端注册回馈, 服务注册完成 ######"); |
|
|
|
} |
|
|
|
|
|
|
|
//处理心跳 |
|
|
|
public void TimerSendControl(RegisterResponseControl response) { |
|
|
|
public void timerSendControl(RegisterResponseControl response) { |
|
|
|
try { |
|
|
|
if (response.Code.equals(ResponseType.succeed)) { |
|
|
|
int heart = Integer.parseInt(response.Items.get(0).heart_beat_interval); |
|
|
|
int patroldevice = Integer.parseInt(response.Items.get(0).patroldevice_run_interval); |
|
|
|
int nest = Integer.parseInt(response.Items.get(0).nest_run_interval); |
|
|
|
int weather = Integer.parseInt(response.Items.get(0).weather_interval); |
|
|
|
sendHeartToUpper(); |
|
|
|
int heart = StringUtils.isEmpty(response.Items.get(0).heart_beat_interval) ? 0 : |
|
|
|
Integer.parseInt(response.Items.get(0).heart_beat_interval); |
|
|
|
int patrolDevice = StringUtils.isEmpty(response.Items.get(0).patroldevice_run_interval) ? 0 : |
|
|
|
Integer.parseInt(response.Items.get(0).patroldevice_run_interval); |
|
|
|
int nest = StringUtils.isEmpty(response.Items.get(0).nest_run_interval) ? 0 : |
|
|
|
Integer.parseInt(response.Items.get(0).nest_run_interval); |
|
|
|
int weather = StringUtils.isEmpty(response.Items.get(0).weather_interval) ? 0 : |
|
|
|
Integer.parseInt(response.Items.get(0).weather_interval); |
|
|
|
sendHeartbeatToUpper(); |
|
|
|
// 定时心跳报活 |
|
|
|
scheduledExecutor.scheduleWithFixedDelay(new TimerTask() { |
|
|
|
@Override |
|
|
|
public void run() { |
|
|
|
sendHeartToUpper(); |
|
|
|
sendHeartbeatToUpper(); |
|
|
|
} |
|
|
|
}, 0, heart, TimeUnit.SECONDS); |
|
|
|
|
|
|
|
|
|
|
|
// 上级系统返回的定时信息存入redis |
|
|
|
cacheTimeInterval(heart, patroldevice, nest, weather); |
|
|
|
cacheTimeInterval(heart, patrolDevice, nest, weather); |
|
|
|
} |
|
|
|
} catch (Exception e) { |
|
|
|
e.printStackTrace(); |
|
|
|
@ -376,21 +376,21 @@ public class NettyClient { |
|
|
|
* 缓存上级系统返回的定时任务间隔 |
|
|
|
* |
|
|
|
* @param heart |
|
|
|
* @param patroldevice |
|
|
|
* @param patrolDevice |
|
|
|
* @param nest |
|
|
|
* @param weather |
|
|
|
*/ |
|
|
|
private void cacheTimeInterval(int heart, int patroldevice, int nest, int weather) { |
|
|
|
private void cacheTimeInterval(int heart, int patrolDevice, int nest, int weather) { |
|
|
|
JSONObject json = new JSONObject(); |
|
|
|
json.put("heart", heart); |
|
|
|
json.put("patroldevice", patroldevice); |
|
|
|
json.put("patroldevice", patrolDevice); |
|
|
|
json.put("nest", nest); |
|
|
|
json.put("weather", weather); |
|
|
|
|
|
|
|
redisTemplate.opsForValue().set(upTimeIntervalSetting, json.toJSONString()); |
|
|
|
} |
|
|
|
|
|
|
|
private String createRegHeart(boolean isheart) { |
|
|
|
private String createRegHeart(boolean isHeart) { |
|
|
|
ResponseControl obj = new ResponseControl(); |
|
|
|
XStream xStream = new XStream(new Xpp3Driver(new NoNameCoder())); |
|
|
|
xStream.autodetectAnnotations(true); |
|
|
|
@ -399,7 +399,7 @@ public class NettyClient { |
|
|
|
obj.ReceiveCode = receiveCode; |
|
|
|
obj.Type = String.valueOf(SystemType.system); |
|
|
|
obj.Code = ""; |
|
|
|
obj.Command = String.valueOf(isheart ? SystemType.heart_request : SystemType.register_request); |
|
|
|
obj.Command = String.valueOf(isHeart ? SystemType.heart_request : SystemType.register_request); |
|
|
|
obj.Time = CommonUtils.GetNowDateString(); |
|
|
|
obj.Items = ""; |
|
|
|
String resultXML = xStream.toXML(obj); |
|
|
|
@ -457,7 +457,7 @@ public class NettyClient { |
|
|
|
sendMsgToUpper(true, "", xml); |
|
|
|
} |
|
|
|
|
|
|
|
public void sendHeartToUpper() { |
|
|
|
public void sendHeartbeatToUpper() { |
|
|
|
String xml = createRegHeart(true); |
|
|
|
sendMsgToUpper(true, "", xml); |
|
|
|
} |
|
|
|
@ -495,81 +495,81 @@ public class NettyClient { |
|
|
|
case PushType.environment: |
|
|
|
// xml = upJson2Xml.EnvironmentControlJson2Xml(json); |
|
|
|
xml = upJson2Xml.UpStreamJson2Xml(json, EnvironmentControl.class); |
|
|
|
logger.info("###### 会话:{}, 向上级系统发送环境数据 ######\n{}", uuid, xml); |
|
|
|
log.info("###### 会话:{}, 向上级系统发送环境数据 ######\n{}", uuid, xml); |
|
|
|
break; |
|
|
|
case PushType.alarm: |
|
|
|
// xml = upJson2Xml.AlarmControlJson2Xml(json); |
|
|
|
xml = upJson2Xml.UpStreamJson2Xml(json, AlarmControl.class); |
|
|
|
logger.info("###### 会话:{}, 向上级系统发送巡视设备异常告警数据 ######\n{}", uuid, xml); |
|
|
|
log.info("###### 会话:{}, 向上级系统发送巡视设备异常告警数据 ######\n{}", uuid, xml); |
|
|
|
break; |
|
|
|
case PushType.analysisAlarm: |
|
|
|
// xml = upJson2Xml.AnalysisControlJson2Xml(json); |
|
|
|
xml = upJson2Xml.UpStreamJson2Xml(json, AnalysisControl.class); |
|
|
|
logger.info("###### 会话:{}, 向上级系统发送告警数据 ######\n{}", uuid, xml); |
|
|
|
log.info("###### 会话:{}, 向上级系统发送告警数据 ######\n{}", uuid, xml); |
|
|
|
break; |
|
|
|
case PushType.location: |
|
|
|
// xml = upJson2Xml.LocationControlJson2Xml(json); |
|
|
|
xml = upJson2Xml.UpStreamJson2Xml(json, LocationControl.class); |
|
|
|
logger.info("###### 会话:{}, 向上级系统发送巡视设备坐标 ######\n{}", uuid, xml); |
|
|
|
log.info("###### 会话:{}, 向上级系统发送巡视设备坐标 ######\n{}", uuid, xml); |
|
|
|
break; |
|
|
|
case PushType.monitor: |
|
|
|
// xml = upJson2Xml.MonitorControlJson2Xml(json); |
|
|
|
xml = upJson2Xml.UpStreamJson2Xml(json, MonitorControl.class); |
|
|
|
logger.info("###### 会话:{}, 向上级系统发送静默监视告警数据 ######\n{}", uuid, xml); |
|
|
|
log.info("###### 会话:{}, 向上级系统发送静默监视告警数据 ######\n{}", uuid, xml); |
|
|
|
break; |
|
|
|
case PushType.nestRunning: |
|
|
|
// xml = upJson2Xml.NestRunningJson2Xml(json); |
|
|
|
xml = upJson2Xml.UpStreamJson2Xml(json, NestRunningControl.class); |
|
|
|
logger.info("###### 会话:{}, 向上级系统发送无人机机巢运行数据 ######\n{}", uuid, xml); |
|
|
|
log.info("###### 会话:{}, 向上级系统发送无人机机巢运行数据 ######\n{}", uuid, xml); |
|
|
|
break; |
|
|
|
case PushType.nestState: |
|
|
|
// xml = upJson2Xml.NestStateJson2Xml(json); |
|
|
|
xml = upJson2Xml.UpStreamJson2Xml(json, NestStateControl.class); |
|
|
|
logger.info("###### 会话:{}, 向上级系统发送无人机机巢状态数据 ######\n{}", uuid, xml); |
|
|
|
log.info("###### 会话:{}, 向上级系统发送无人机机巢状态数据 ######\n{}", uuid, xml); |
|
|
|
break; |
|
|
|
case PushType.patrolDeviceState: |
|
|
|
// xml = upJson2Xml.PatrolDeviceStateControlJson2Xml(json); |
|
|
|
xml = upJson2Xml.UpStreamJson2Xml(json, PatrolDeviceStateControl.class); |
|
|
|
logger.info("###### 会话:{}, 向上级系统发送巡视设备状态数据 ######\n{}", uuid, xml); |
|
|
|
log.info("###### 会话:{}, 向上级系统发送巡视设备状态数据 ######\n{}", uuid, xml); |
|
|
|
break; |
|
|
|
case PushType.patrolDeviceRunning: |
|
|
|
// xml = upJson2Xml.PatrolDeviceRunningControlJson2Xml(json); |
|
|
|
xml = upJson2Xml.UpStreamJson2Xml(json, PatrolDeviceRunningControl.class); |
|
|
|
logger.info("###### 会话:{}, 向上级系统发送巡视设备运行数据 ######\n{}", uuid, xml); |
|
|
|
log.info("###### 会话:{}, 向上级系统发送巡视设备运行数据 ######\n{}", uuid, xml); |
|
|
|
break; |
|
|
|
case PushType.result: |
|
|
|
// xml = upJson2Xml.TaskResultControlJson2Xml(json); |
|
|
|
xml = upJson2Xml.UpStreamJson2Xml(json, TaskResultControl.class); |
|
|
|
logger.info("###### 会话:{}, 向上级系统发送巡视结果 ######\n{}", uuid, xml); |
|
|
|
log.info("###### 会话:{}, 向上级系统发送巡视结果 ######\n{}", uuid, xml); |
|
|
|
break; |
|
|
|
case PushType.taskState: |
|
|
|
// xml = upJson2Xml.TaskStateControlJson2Xml(json); |
|
|
|
xml = upJson2Xml.UpStreamJson2Xml(json, TaskStateControl.class); |
|
|
|
logger.info("###### 会话:{}, 向上级系统发送任务状态数据 ######\n{}", uuid, xml); |
|
|
|
log.info("###### 会话:{}, 向上级系统发送任务状态数据 ######\n{}", uuid, xml); |
|
|
|
break; |
|
|
|
case PushType.total: |
|
|
|
// xml = upJson2Xml.ReportControlJson2Xml(json); |
|
|
|
xml = upJson2Xml.UpStreamJson2Xml(json, ReportControl.class); |
|
|
|
logger.info("###### 会话:{}, 向上级系统发送巡视设备统计信息上报 ######\n{}", uuid, xml); |
|
|
|
log.info("###### 会话:{}, 向上级系统发送巡视设备统计信息上报 ######\n{}", uuid, xml); |
|
|
|
break; |
|
|
|
case PushType.route: |
|
|
|
// xml = upJson2Xml.RouteControlJson2Xml(json); |
|
|
|
xml = upJson2Xml.UpStreamJson2Xml(json, RouteControl.class); |
|
|
|
logger.info("###### 会话:{}, 向上级系统发送巡视路线 ######\n{}", uuid, xml); |
|
|
|
log.info("###### 会话:{}, 向上级系统发送巡视路线 ######\n{}", uuid, xml); |
|
|
|
break; |
|
|
|
case SystemType.system: |
|
|
|
// xml = upJson2Xml.ModelJson2Xml(json); |
|
|
|
xml = upJson2Xml.UpStreamJson2Xml(json, ModelControl.class); |
|
|
|
logger.info("###### 会话:{}, 向上级系统发送系统数据 ######\n{}", uuid, xml); |
|
|
|
log.info("###### 会话:{}, 向上级系统发送系统数据 ######\n{}", uuid, xml); |
|
|
|
break; |
|
|
|
case ModelType.modelUpdate: |
|
|
|
// xml = upJson2Xml.UpdateModelJson2Xml(json); |
|
|
|
//xml = up.ModelJson2Xml(json, UpdateModelControl.class); |
|
|
|
xml = upJson2Xml.UpStreamJson2Xml(json, UpdateModelControl.class); |
|
|
|
logger.info("###### 会话:{}, 向上级系统发送模型更新上报指令 ######\n{}", uuid, xml); |
|
|
|
log.info("###### 会话:{}, 向上级系统发送模型更新上报指令 ######\n{}", uuid, xml); |
|
|
|
break; |
|
|
|
default: |
|
|
|
logger.warn("###### 会话:{}, 应用向上级系统发送消息, 类型: [{}] 不在处理范围内, 不予处理 ######", uuid, type); |
|
|
|
log.warn("###### 会话:{}, 应用向上级系统发送消息, 类型: [{}] 不在处理范围内, 不予处理 ######", uuid, type); |
|
|
|
} |
|
|
|
if (!StringUtils.isEmpty(xml)) { |
|
|
|
// 将设备别名转换为上级别名 |
|
|
|
|