channelPool = new ConcurrentHashMap<>();
public int size() {
return channelPool.size();
diff --git a/src/main/java/com/inspect/tcpserver/tcp/ConnectionListener.java b/src/main/java/com/inspect/tcpserver/tcp/ConnectionListener.java
index a7b6dea..c08ecd0 100644
--- a/src/main/java/com/inspect/tcpserver/tcp/ConnectionListener.java
+++ b/src/main/java/com/inspect/tcpserver/tcp/ConnectionListener.java
@@ -21,7 +21,7 @@ public class ConnectionListener implements ChannelFutureListener {
loop.schedule(new Runnable() {
@Override
public void run() {
- nettyClient.ConnectServer();
+ nettyClient.connectUpperSystemServer();
}
}, 3L, TimeUnit.SECONDS);
}
diff --git a/src/main/java/com/inspect/tcpserver/tcp/DownXml2Json.java b/src/main/java/com/inspect/tcpserver/tcp/DownXml2Json.java
index e76825c..23d87e2 100644
--- a/src/main/java/com/inspect/tcpserver/tcp/DownXml2Json.java
+++ b/src/main/java/com/inspect/tcpserver/tcp/DownXml2Json.java
@@ -6,11 +6,12 @@ import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.io.naming.NoNameCoder;
import com.thoughtworks.xstream.io.xml.Xpp3Driver;
import com.thoughtworks.xstream.security.AnyTypePermission;
+import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@Slf4j
public class DownXml2Json {
- private final Logger logger = LoggerFactory.getLogger(DownXml2Json.class);
/**
* 默认为对上级的客户端的别名
@@ -46,7 +47,7 @@ public class DownXml2Json {
T obj = (T) xStream.fromXML(xml);
return JSON.toJSONString(obj);
} catch (com.thoughtworks.xstream.XStreamException ex) {
- logger.error(Color.RED + "###### 客户:{}, DownStreamJson2Xml解析失败:{} ######" + Color.END, id, e.getMessage());
+ log.error(Color.RED + "###### 客户:{}, DownStreamJson2Xml解析失败:{} ######" + Color.END, id, e.getMessage());
return null;
}
}
@@ -224,7 +225,7 @@ public class DownXml2Json {
public String AlarmControlXml2Json(String xml) {
XStream xStream = new XStream(new Xpp3Driver(new NoNameCoder()));
//xStream.alias(alias, AlarmControl.class);
- logger.info("[XML] AlarmControlXml2Json alias:{}", alias);
+ log.info("[XML] AlarmControlXml2Json alias:{}", alias);
xStream.alias(alias, AlarmControl.class);
xStream.autodetectAnnotations(true);
xStream.ignoreUnknownElements();
diff --git a/src/main/java/com/inspect/tcpserver/tcp/MyDecoder.java b/src/main/java/com/inspect/tcpserver/tcp/MyDecoder.java
index 4d5ec39..7144227 100644
--- a/src/main/java/com/inspect/tcpserver/tcp/MyDecoder.java
+++ b/src/main/java/com/inspect/tcpserver/tcp/MyDecoder.java
@@ -1,23 +1,18 @@
package com.inspect.tcpserver.tcp;
-import com.inspect.tcpserver.util.Color;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.tomcat.util.buf.HexUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Value;
import java.util.List;
+@Slf4j
public class MyDecoder extends ByteToMessageDecoder {
- private static final Logger log = LoggerFactory.getLogger(MyDecoder.class);
-
private final int BASE_LENGTH = 2 + 8 + 8 + 1 + 4 + 2;
private Integer printRecvData = 0;
@@ -64,6 +59,7 @@ public class MyDecoder extends ByteToMessageDecoder {
binaryModel.dataLength = xmlLength;
binaryModel.dataBuf = Unpooled.copiedBuffer(payload);
binaryModel.uuid = uuid;
+ binaryModel.id = ctx.channel().id().asShortText();
out.add(binaryModel);
break;
} else {
diff --git a/src/main/java/com/inspect/tcpserver/tcp/NettyClient.java b/src/main/java/com/inspect/tcpserver/tcp/NettyClient.java
index 8837c8a..d47a0e2 100644
--- a/src/main/java/com/inspect/tcpserver/tcp/NettyClient.java
+++ b/src/main/java/com/inspect/tcpserver/tcp/NettyClient.java
@@ -17,13 +17,12 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.StringUtil;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.http.HttpStatus;
@@ -35,15 +34,14 @@ import org.springframework.web.client.RestTemplate;
import javax.annotation.Resource;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
+import java.util.Objects;
import java.util.TimerTask;
import java.util.concurrent.*;
-
+@Slf4j
@Component
public class NettyClient {
- private Logger logger = LoggerFactory.getLogger(NettyClient.class);
-
// 客户端只需要一个 时间循环组 , 即 NioEventLoopGroup 线程池
private static final EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
@@ -74,7 +72,7 @@ public class NettyClient {
@Value("${up_time_interval_setting}")
String upTimeIntervalSetting;
- @Value("${print_recv_data:0}")
+ @Value("${print_recv_data:1}")
Integer printRecvData;
/**
@@ -93,15 +91,13 @@ public class NettyClient {
//释放资源
public void Close() {
- if (eventLoopGroup != null) {
- eventLoopGroup.shutdownGracefully();
- }
+ eventLoopGroup.shutdownGracefully();
scheduledExecutor.shutdown();
}
//连接服务器
@Async
- public void ConnectServer() {
+ public void connectUpperSystemServer() {
this.serverIP = upSystemServerProperties.ip;
this.serverPort = upSystemServerProperties.port;
this.sendCode = upSystemServerProperties.iipCode;
@@ -129,7 +125,7 @@ public class NettyClient {
// ChannelFuture 类分析 , Netty 异步模型
// sync 作用是该方法不会再次阻塞
ChannelFuture channelFuture = bootstrap.connect(serverIP, serverPort).addListener(new ConnectionListener(this)).sync();
- logger.info("nettyClient连接服务器成功");
+ log.info("nettyClient连接服务器成功");
// 关闭通道, 开始监听
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
@@ -156,10 +152,10 @@ public class NettyClient {
// 存入缓存
redisTemplate.opsForValue().set(String.valueOf(this.sendIndex), allBuf.toString(CharsetUtil.US_ASCII), 60L, TimeUnit.SECONDS);
- client.sendMsg(allBuf);
+ client.sendProtoBuffer(allBuf);
this.sendIndex++;
} else {
- logger.warn("###### 会话:{}, 与上级系统连接失败 ######", uuid);
+ log.warn("###### 会话:{}, 与上级系统连接失败 ######", uuid);
}
}
@@ -171,25 +167,25 @@ public class NettyClient {
if (!StringUtil.isNullOrEmpty(msg)) {
ByteBuf allBuf = Unpooled.copiedBuffer(msg, CharsetUtil.US_ASCII);
- client.sendMsg(allBuf);
+ client.sendProtoBuffer(allBuf);
}
}
}
//线程处理接收函数
- public void ReceiveMsg(BinaryModel binaryModel) {
+ public void receiveUpstreamData(BinaryModel binaryModel) {
// executorService.execute(() ->
// {
try {
- threadDealMsg(binaryModel);
+ parseUpstreamData(binaryModel);
} catch (Exception e) {
- logger.error("error", e);
+ log.error("error", e);
}
// });
}
//处理接收消息
- private void threadDealMsg(BinaryModel binaryModel) throws DocumentException {
+ private void parseUpstreamData(BinaryModel binaryModel) throws DocumentException {
String xml = binaryModel.dataBuf.toString(CharsetUtil.UTF_8);
this.receiveIndex = binaryModel.sendIndex;
SAXReader saxReader = new SAXReader();
@@ -206,7 +202,7 @@ public class NettyClient {
if (type == SystemType.system) {
if (command == SystemType.no_response || command == SystemType.has_response) {
if (null != root.element("Code")) {
- if (root.element("Code").getText() == ResponseType.retry) {
+ if (Objects.equals(root.element("Code").getText(), ResponseType.retry)) {
resetSendMsg(binaryModel.receiveIndex);
}
}
@@ -216,7 +212,7 @@ public class NettyClient {
String response = "";
String json = null;
- logger.info("###### 收到上级系统消息:{}, 类型:{} ######\n{}", binaryModel.id, type, xml);
+ log.info("###### 解析上级系统消息, 通道: {}, 类型:{} ######\n{}", binaryModel.id, type, xml);
switch (type) {
case SystemType.system:
switch (command) {
@@ -270,7 +266,7 @@ public class NettyClient {
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, xml, ResultControl.class);
break;
default:
- logger.warn("ClienHandler接收到的type:{}不在处理范围内, 不予处理", type);
+ log.warn("ClienHandler接收到的type:{}不在处理范围内, 不予处理", type);
}
// 将上级下发的指令,转发到业务端处理,接收业务端处理后的结果,上报给上级系统
@@ -285,7 +281,7 @@ public class NettyClient {
Result body = ajaxResultResponseEntity.getBody();
if (null == body) {
- logger.error("接收上级系统下发的指令,转发到应用业务端处理后,返回的响应体为空");
+ log.error("接收上级系统下发的指令,转发到应用业务端处理后,返回的响应体为空");
return;
}
@@ -293,7 +289,7 @@ public class NettyClient {
String msg = body.getMsg();
String data = body.getData();
- logger.info("接收到上级系统下发指令,转发到巡视主机,成功,返回code:{},msg:{},data:{}", bodyCode, msg, data);
+ log.info("接收到上级系统下发指令,转发到巡视主机,成功,返回code:{},msg:{},data:{}", bodyCode, msg, data);
// 响应巡视主机
JSONObject item = JSONObject.parseObject(data);
@@ -301,7 +297,7 @@ public class NettyClient {
} else {
// 调用业务端处理失败
- logger.warn("下发指令,失败,httpCode:{}", statusCode);
+ log.warn("下发指令,失败,httpCode:{}", statusCode);
response = createDownFailResponse();
}
@@ -338,34 +334,38 @@ public class NettyClient {
xStream.addPermission(AnyTypePermission.ANY);
obj = (RegisterResponseControl) xStream.fromXML(xml);
} catch (com.thoughtworks.xstream.XStreamException e2) {
- logger.error("###### dealRegister解析失败:{} ######", e2.getMessage());
+ log.error("###### dealRegister解析失败:{} ######", e2.getMessage());
}
}
- TimerSendControl(obj);
+ timerSendControl(obj);
- logger.info("###### 客户端接收到服务端注册回馈, 服务注册完成 ######");
+ log.info("###### 客户端接收到服务端注册回馈, 服务注册完成 ######");
}
//处理心跳
- public void TimerSendControl(RegisterResponseControl response) {
+ public void timerSendControl(RegisterResponseControl response) {
try {
if (response.Code.equals(ResponseType.succeed)) {
- int heart = Integer.parseInt(response.Items.get(0).heart_beat_interval);
- int patroldevice = Integer.parseInt(response.Items.get(0).patroldevice_run_interval);
- int nest = Integer.parseInt(response.Items.get(0).nest_run_interval);
- int weather = Integer.parseInt(response.Items.get(0).weather_interval);
- sendHeartToUpper();
+ int heart = StringUtils.isEmpty(response.Items.get(0).heart_beat_interval) ? 0 :
+ Integer.parseInt(response.Items.get(0).heart_beat_interval);
+ int patrolDevice = StringUtils.isEmpty(response.Items.get(0).patroldevice_run_interval) ? 0 :
+ Integer.parseInt(response.Items.get(0).patroldevice_run_interval);
+ int nest = StringUtils.isEmpty(response.Items.get(0).nest_run_interval) ? 0 :
+ Integer.parseInt(response.Items.get(0).nest_run_interval);
+ int weather = StringUtils.isEmpty(response.Items.get(0).weather_interval) ? 0 :
+ Integer.parseInt(response.Items.get(0).weather_interval);
+ sendHeartbeatToUpper();
// 定时心跳报活
scheduledExecutor.scheduleWithFixedDelay(new TimerTask() {
@Override
public void run() {
- sendHeartToUpper();
+ sendHeartbeatToUpper();
}
}, 0, heart, TimeUnit.SECONDS);
// 上级系统返回的定时信息存入redis
- cacheTimeInterval(heart, patroldevice, nest, weather);
+ cacheTimeInterval(heart, patrolDevice, nest, weather);
}
} catch (Exception e) {
e.printStackTrace();
@@ -376,21 +376,21 @@ public class NettyClient {
* 缓存上级系统返回的定时任务间隔
*
* @param heart
- * @param patroldevice
+ * @param patrolDevice
* @param nest
* @param weather
*/
- private void cacheTimeInterval(int heart, int patroldevice, int nest, int weather) {
+ private void cacheTimeInterval(int heart, int patrolDevice, int nest, int weather) {
JSONObject json = new JSONObject();
json.put("heart", heart);
- json.put("patroldevice", patroldevice);
+ json.put("patroldevice", patrolDevice);
json.put("nest", nest);
json.put("weather", weather);
redisTemplate.opsForValue().set(upTimeIntervalSetting, json.toJSONString());
}
- private String createRegHeart(boolean isheart) {
+ private String createRegHeart(boolean isHeart) {
ResponseControl obj = new ResponseControl();
XStream xStream = new XStream(new Xpp3Driver(new NoNameCoder()));
xStream.autodetectAnnotations(true);
@@ -399,7 +399,7 @@ public class NettyClient {
obj.ReceiveCode = receiveCode;
obj.Type = String.valueOf(SystemType.system);
obj.Code = "";
- obj.Command = String.valueOf(isheart ? SystemType.heart_request : SystemType.register_request);
+ obj.Command = String.valueOf(isHeart ? SystemType.heart_request : SystemType.register_request);
obj.Time = CommonUtils.GetNowDateString();
obj.Items = "";
String resultXML = xStream.toXML(obj);
@@ -457,7 +457,7 @@ public class NettyClient {
sendMsgToUpper(true, "", xml);
}
- public void sendHeartToUpper() {
+ public void sendHeartbeatToUpper() {
String xml = createRegHeart(true);
sendMsgToUpper(true, "", xml);
}
@@ -495,81 +495,81 @@ public class NettyClient {
case PushType.environment:
// xml = upJson2Xml.EnvironmentControlJson2Xml(json);
xml = upJson2Xml.UpStreamJson2Xml(json, EnvironmentControl.class);
- logger.info("###### 会话:{}, 向上级系统发送环境数据 ######\n{}", uuid, xml);
+ log.info("###### 会话:{}, 向上级系统发送环境数据 ######\n{}", uuid, xml);
break;
case PushType.alarm:
// xml = upJson2Xml.AlarmControlJson2Xml(json);
xml = upJson2Xml.UpStreamJson2Xml(json, AlarmControl.class);
- logger.info("###### 会话:{}, 向上级系统发送巡视设备异常告警数据 ######\n{}", uuid, xml);
+ log.info("###### 会话:{}, 向上级系统发送巡视设备异常告警数据 ######\n{}", uuid, xml);
break;
case PushType.analysisAlarm:
// xml = upJson2Xml.AnalysisControlJson2Xml(json);
xml = upJson2Xml.UpStreamJson2Xml(json, AnalysisControl.class);
- logger.info("###### 会话:{}, 向上级系统发送告警数据 ######\n{}", uuid, xml);
+ log.info("###### 会话:{}, 向上级系统发送告警数据 ######\n{}", uuid, xml);
break;
case PushType.location:
// xml = upJson2Xml.LocationControlJson2Xml(json);
xml = upJson2Xml.UpStreamJson2Xml(json, LocationControl.class);
- logger.info("###### 会话:{}, 向上级系统发送巡视设备坐标 ######\n{}", uuid, xml);
+ log.info("###### 会话:{}, 向上级系统发送巡视设备坐标 ######\n{}", uuid, xml);
break;
case PushType.monitor:
// xml = upJson2Xml.MonitorControlJson2Xml(json);
xml = upJson2Xml.UpStreamJson2Xml(json, MonitorControl.class);
- logger.info("###### 会话:{}, 向上级系统发送静默监视告警数据 ######\n{}", uuid, xml);
+ log.info("###### 会话:{}, 向上级系统发送静默监视告警数据 ######\n{}", uuid, xml);
break;
case PushType.nestRunning:
// xml = upJson2Xml.NestRunningJson2Xml(json);
xml = upJson2Xml.UpStreamJson2Xml(json, NestRunningControl.class);
- logger.info("###### 会话:{}, 向上级系统发送无人机机巢运行数据 ######\n{}", uuid, xml);
+ log.info("###### 会话:{}, 向上级系统发送无人机机巢运行数据 ######\n{}", uuid, xml);
break;
case PushType.nestState:
// xml = upJson2Xml.NestStateJson2Xml(json);
xml = upJson2Xml.UpStreamJson2Xml(json, NestStateControl.class);
- logger.info("###### 会话:{}, 向上级系统发送无人机机巢状态数据 ######\n{}", uuid, xml);
+ log.info("###### 会话:{}, 向上级系统发送无人机机巢状态数据 ######\n{}", uuid, xml);
break;
case PushType.patrolDeviceState:
// xml = upJson2Xml.PatrolDeviceStateControlJson2Xml(json);
xml = upJson2Xml.UpStreamJson2Xml(json, PatrolDeviceStateControl.class);
- logger.info("###### 会话:{}, 向上级系统发送巡视设备状态数据 ######\n{}", uuid, xml);
+ log.info("###### 会话:{}, 向上级系统发送巡视设备状态数据 ######\n{}", uuid, xml);
break;
case PushType.patrolDeviceRunning:
// xml = upJson2Xml.PatrolDeviceRunningControlJson2Xml(json);
xml = upJson2Xml.UpStreamJson2Xml(json, PatrolDeviceRunningControl.class);
- logger.info("###### 会话:{}, 向上级系统发送巡视设备运行数据 ######\n{}", uuid, xml);
+ log.info("###### 会话:{}, 向上级系统发送巡视设备运行数据 ######\n{}", uuid, xml);
break;
case PushType.result:
// xml = upJson2Xml.TaskResultControlJson2Xml(json);
xml = upJson2Xml.UpStreamJson2Xml(json, TaskResultControl.class);
- logger.info("###### 会话:{}, 向上级系统发送巡视结果 ######\n{}", uuid, xml);
+ log.info("###### 会话:{}, 向上级系统发送巡视结果 ######\n{}", uuid, xml);
break;
case PushType.taskState:
// xml = upJson2Xml.TaskStateControlJson2Xml(json);
xml = upJson2Xml.UpStreamJson2Xml(json, TaskStateControl.class);
- logger.info("###### 会话:{}, 向上级系统发送任务状态数据 ######\n{}", uuid, xml);
+ log.info("###### 会话:{}, 向上级系统发送任务状态数据 ######\n{}", uuid, xml);
break;
case PushType.total:
// xml = upJson2Xml.ReportControlJson2Xml(json);
xml = upJson2Xml.UpStreamJson2Xml(json, ReportControl.class);
- logger.info("###### 会话:{}, 向上级系统发送巡视设备统计信息上报 ######\n{}", uuid, xml);
+ log.info("###### 会话:{}, 向上级系统发送巡视设备统计信息上报 ######\n{}", uuid, xml);
break;
case PushType.route:
// xml = upJson2Xml.RouteControlJson2Xml(json);
xml = upJson2Xml.UpStreamJson2Xml(json, RouteControl.class);
- logger.info("###### 会话:{}, 向上级系统发送巡视路线 ######\n{}", uuid, xml);
+ log.info("###### 会话:{}, 向上级系统发送巡视路线 ######\n{}", uuid, xml);
break;
case SystemType.system:
// xml = upJson2Xml.ModelJson2Xml(json);
xml = upJson2Xml.UpStreamJson2Xml(json, ModelControl.class);
- logger.info("###### 会话:{}, 向上级系统发送系统数据 ######\n{}", uuid, xml);
+ log.info("###### 会话:{}, 向上级系统发送系统数据 ######\n{}", uuid, xml);
break;
case ModelType.modelUpdate:
// xml = upJson2Xml.UpdateModelJson2Xml(json);
//xml = up.ModelJson2Xml(json, UpdateModelControl.class);
xml = upJson2Xml.UpStreamJson2Xml(json, UpdateModelControl.class);
- logger.info("###### 会话:{}, 向上级系统发送模型更新上报指令 ######\n{}", uuid, xml);
+ log.info("###### 会话:{}, 向上级系统发送模型更新上报指令 ######\n{}", uuid, xml);
break;
default:
- logger.warn("###### 会话:{}, 应用向上级系统发送消息, 类型: [{}] 不在处理范围内, 不予处理 ######", uuid, type);
+ log.warn("###### 会话:{}, 应用向上级系统发送消息, 类型: [{}] 不在处理范围内, 不予处理 ######", uuid, type);
}
if (!StringUtils.isEmpty(xml)) {
// 将设备别名转换为上级别名
diff --git a/src/main/java/com/inspect/tcpserver/tcp/NettyClientHandler.java b/src/main/java/com/inspect/tcpserver/tcp/NettyClientHandler.java
index ef12062..a953246 100644
--- a/src/main/java/com/inspect/tcpserver/tcp/NettyClientHandler.java
+++ b/src/main/java/com/inspect/tcpserver/tcp/NettyClientHandler.java
@@ -4,8 +4,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@@ -17,23 +16,23 @@ import java.util.concurrent.TimeUnit;
*
* 规范 : 该 Handler 类中需要按照业务逻辑处理规范进行开发
*/
+@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
- private Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
-
private NettyClient nettyClient;
- public ChannelHandlerContext Context;
+ public ChannelHandlerContext clientChannelHandlerContext;
public NettyClientHandler(NettyClient nettyClient) {
+ log.info("NETTY_CLIENT_HANDLER");
this.nettyClient = nettyClient;
}
- public void sendMsg(ByteBuf byteBuf) {
- if (Context != null) {
- Context.writeAndFlush(byteBuf);
+ public void sendProtoBuffer(ByteBuf byteBuf) {
+ if (clientChannelHandlerContext != null) {
+ clientChannelHandlerContext.writeAndFlush(byteBuf);
} else {
- logger.info("client发送消息时,content为空,未连接服务端,取消发送");
+ log.info("client发送消息时,content为空,未连接服务端,取消发送");
}
}
@@ -41,12 +40,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
final EventLoop loop = ctx.channel().eventLoop();
- loop.schedule(new Runnable() {
- @Override
- public void run() {
- nettyClient.ConnectServer();
- }
- }, 3L, TimeUnit.SECONDS);
+ loop.schedule(() -> nettyClient.connectUpperSystemServer(), 3L, TimeUnit.SECONDS);
super.channelInactive(ctx);
}
@@ -58,9 +52,8 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
- Context = ctx;
-
try {
+ clientChannelHandlerContext = ctx;
nettyClient.sendRegisterToUpper();
} catch (Exception e) {
e.printStackTrace();
@@ -79,7 +72,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- nettyClient.ReceiveMsg((BinaryModel) msg);
+ nettyClient.receiveUpstreamData((BinaryModel) msg);
}
/**
diff --git a/src/main/java/com/inspect/tcpserver/tcp/NettyServer.java b/src/main/java/com/inspect/tcpserver/tcp/NettyServer.java
index 42ad799..a4b04ef 100644
--- a/src/main/java/com/inspect/tcpserver/tcp/NettyServer.java
+++ b/src/main/java/com/inspect/tcpserver/tcp/NettyServer.java
@@ -2,7 +2,7 @@ package com.inspect.tcpserver.tcp;
import com.alibaba.fastjson.JSONObject;
import com.inspect.tcpserver.constant.Constant;
-import com.inspect.tcpserver.controller.ClientController;
+import com.inspect.tcpserver.controller.UpstreamClientController;
import com.inspect.tcpserver.domain.DeviceServerProperties;
import com.inspect.tcpserver.util.Color;
import com.thoughtworks.xstream.XStream;
@@ -19,14 +19,12 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.StringUtil;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.exception.ExceptionUtils;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
@@ -38,15 +36,11 @@ import java.io.ByteArrayInputStream;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
import java.util.concurrent.*;
+@Slf4j
@Component
public class NettyServer {
- private final Logger logger = LoggerFactory.getLogger(NettyServer.class);
-
/**
* 接收/发送报文xml外层别名
*/
@@ -55,7 +49,7 @@ public class NettyServer {
private final String aliasDevice = "PatrolDevice";
@Resource
- ClientController clientController;
+ UpstreamClientController upstreamClientController;
private EventLoopGroup bossGroup;
private EventLoopGroup workGroup;
private long sendIndex = 0; //若重启系统后还要延续之前的序列号则需要把序列号存入redis中
@@ -77,17 +71,17 @@ public class NettyServer {
@Value("${iip_server.authDevice.url}")
String iipAuthDeviceUrl;
- @Value("${print_recv_data:0}")
+ @Value("${print_recv_data:1}")
Integer printRecvData;
- @Value("${seperating_packages:0}")
- Integer seperatingPackages;
+ @Value("${separate_packages:0}")
+ Integer separatePackages;
private int serverPort;
public void init() {
- logger.info("print_recv_data config: {}", printRecvData);
- logger.info("seperatingPackages config: {}", seperatingPackages);
+ log.info("print_recv_data config: {}", printRecvData);
+ log.info("separate_packages config: {}", separatePackages);
this.serverPort = deviceServerProperties.port;
upJson2Xml = new UpJson2Xml(aliasHost);
@@ -109,7 +103,7 @@ public class NettyServer {
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) {
- if(seperatingPackages > 0) {
+ if(separatePackages > 0) {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(ByteOrder.LITTLE_ENDIAN, Integer.MAX_VALUE, 19, 4, 2, 0, true));
}
ch.pipeline().addLast(new MyDecoder(printRecvData));
@@ -124,7 +118,7 @@ public class NettyServer {
.childOption(ChannelOption.SO_LINGER, 10);
try {
ChannelFuture future = bootstrap.bind(serverPort).sync();
- logger.info("###### TCP服务器启动 ######");
+ log.info("###### TCPSERVER服务器启动 ######");
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
@@ -176,7 +170,7 @@ public class NettyServer {
nettyServerHandler.sendMsg(uuid, clientKey, allBuf, compact(xml), request);
sendIndex++;
} else {
- logger.warn(Color.RED + "###### 客户端[{}]离线! ######" + Color.END, clientKey);
+ log.warn(Color.RED + "###### 客户端[{}]离线! ######" + Color.END, clientKey);
}
}
@@ -216,7 +210,7 @@ public class NettyServer {
try {
dealMsgInThreadPool(binaryModel, context);
} catch (Exception e) {
- logger.error("error", e);
+ log.error("error", e);
}
// });
}
@@ -251,7 +245,7 @@ public class NettyServer {
}
String compactXml = compact(xml);
-// logger.info(Color.YELLOW + "###### 会话:{}, 客户:[{}], 消息类型:{}, 命令:{}, 消息体: ######\n{}" + Color.END, binaryModel.uuid, sendCode, type, command, compactXml);
+// log.info(Color.YELLOW + "###### 会话:{}, 客户:[{}], 消息类型:{}, 命令:{}, 消息体: ######\n{}" + Color.END, binaryModel.uuid, sendCode, type, command, compactXml);
//判断是否重发
if (type == SystemType.system) {
if (command == SystemType.has_response || command == SystemType.no_response) {
@@ -261,15 +255,15 @@ public class NettyServer {
resetSendMsg(binaryModel.receiveIndex, sendCode);
return;
} else if (code.equals(ResponseType.succeed)) {
- logger.info(Color.YELLOW + "###### 客户端[{}]响应结果为成功 ######" + Color.END, sendCode);
+ log.info(Color.YELLOW + "###### 客户端[{}]响应结果为成功 ######" + Color.END, sendCode);
if(command == SystemType.no_response) {
return;
}
} else if (code.equals(ResponseType.fault)) {
- logger.warn(Color.RED + "###### 客户端[{}]响应结果为失败 ######" + Color.END, sendCode);
+ log.warn(Color.RED + "###### 客户端[{}]响应结果为失败 ######" + Color.END, sendCode);
return;
} else if (code.equals(ResponseType.reject)) {
- logger.warn(Color.RED + "###### 客户端[{}]响应结果为拒绝 ######" + Color.END, sendCode);
+ log.warn(Color.RED + "###### 客户端[{}]响应结果为拒绝 ######" + Color.END, sendCode);
return;
}
}
@@ -281,12 +275,12 @@ public class NettyServer {
switch (command) {
case SystemType.register_request:
// 收到接入侧注册信息
- logger.info(Color.YELLOW + "###### 客户端[{}]注册 ######" + Color.END, sendCode);
+ log.info(Color.YELLOW + "###### 客户端[{}]注册 ######" + Color.END, sendCode);
dealRegister(binaryModel.uuid, compactXml);
break;
case SystemType.heart_request:
// 处理心跳请求响应
- logger.info(Color.YELLOW + "###### 客户端[{}]上报心跳 ######" + Color.END, sendCode);
+ log.info(Color.YELLOW + "###### 客户端[{}]上报心跳 ######" + Color.END, sendCode);
sendHeartBeat(binaryModel.uuid, compactXml);
break;
case SystemType.has_response:
@@ -295,7 +289,7 @@ public class NettyServer {
// 处理设备上报的模型同步响应
if (null != root.element("Items").element("Item").attribute("device_file_path")) {
// 收到接入侧模型同步数据
- logger.info(Color.YELLOW + "###### 模型同步收到客户端[{}]响应数据 ######" + Color.END, sendCode);
+ log.info(Color.YELLOW + "###### 模型同步收到客户端[{}]响应数据 ######" + Color.END, sendCode);
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, ModelControl.class);
JSONObject jsonObject = JSONObject.parseObject(json);
jsonObject.put("uuid", binaryModel.uuid);
@@ -305,68 +299,68 @@ public class NettyServer {
// 任务控制响应任务执行ID
if (null != root.element("Items").element("Item").attribute("task_patrolled_id")) {
// 收到接入侧任务下发或控制回复数据
- logger.info(Color.YELLOW + "###### 任务下发收到客户端[{}]响应数据 ######" + Color.END, sendCode);
+ log.info(Color.YELLOW + "###### 任务下发收到客户端[{}]响应数据 ######" + Color.END, sendCode);
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, ModelControl.class);
JSONObject jsonObject = JSONObject.parseObject(json);
jsonObject.put("SendCode", "");
- clientController.sendMsg(binaryModel.uuid, jsonObject.toJSONString());
+ upstreamClientController.sendMsg(binaryModel.uuid, jsonObject.toJSONString());
}
} else {
- logger.warn(Color.RED + "###### 客户端[{}]响应数据没有items ######" + Color.END, sendCode);
+ log.warn(Color.RED + "###### 客户端[{}]响应数据没有items ######" + Color.END, sendCode);
}
break;
default:
- logger.warn(Color.RED + "###### 客户端[{}]非法的消息不予处理 ######" + Color.END, sendCode);
+ log.warn(Color.RED + "###### 客户端[{}]非法的消息不予处理 ######" + Color.END, sendCode);
}
break;
case PushType.patrolDeviceState:// insert into basedata_mont_patdevstadata
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, PatrolDeviceStateControl.class);
- logger.info(Color.YELLOW + "###### 客户端[{}]上报设备状态数据 ######" + Color.END, sendCode);
+ log.info(Color.YELLOW + "###### 客户端[{}]上报设备状态数据 ######" + Color.END, sendCode);
break;
case PushType.patrolDeviceRunning:// insert into basedata_mont_patdevrundata
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, PatrolDeviceRunningControl.class);
- logger.info(Color.YELLOW + "###### 客户端[{}]上报设备运行数据 ######" + Color.END, sendCode);
+ log.info(Color.YELLOW + "###### 客户端[{}]上报设备运行数据 ######" + Color.END, sendCode);
break;
case PushType.nestState:// insert into basedata_mont_neststadata
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, NestStateControl.class);
- logger.info(Color.YELLOW + "###### 客户端[{}]上报机巢状态数据 ######" + Color.END, sendCode);
+ log.info(Color.YELLOW + "###### 客户端[{}]上报机巢状态数据 ######" + Color.END, sendCode);
break;
case PushType.nestRunning:// insert into basedata_mont_nestrundata
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, NestRunningControl.class);
- logger.info(Color.YELLOW + "###### 客户端[{}]上报机巢运行数据 ######" + Color.END, sendCode);
+ log.info(Color.YELLOW + "###### 客户端[{}]上报机巢运行数据 ######" + Color.END, sendCode);
break;
case PushType.location:// insert into basedata_mont_patdevcoord
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, LocationControl.class);
- logger.info(Color.YELLOW + "###### 客户端[{}]上报设备坐标 ######" + Color.END, sendCode);
+ log.info(Color.YELLOW + "###### 客户端[{}]上报设备坐标 ######" + Color.END, sendCode);
break;
case PushType.route:// insert into basedata_mont_patdevpatroute
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, RouteControl.class);
- logger.info(Color.YELLOW + "###### 客户端[{}]上报设备路线 ######" + Color.END, sendCode);
+ log.info(Color.YELLOW + "###### 客户端[{}]上报设备路线 ######" + Color.END, sendCode);
break;
case PushType.alarm:// insert into basedata_mont_patdevalmabn
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, AlarmControl.class);
- logger.info(Color.YELLOW + "###### 客户端[{}]上报设备异常告警 ######" + Color.END, sendCode);
+ log.info(Color.YELLOW + "###### 客户端[{}]上报设备异常告警 ######" + Color.END, sendCode);
break;
case PushType.environment:// insert into basedata_mont_evndata
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, EnvironmentControl.class);
- logger.info(Color.YELLOW + "###### 客户端[{}]上报设备环境数据 ######" + Color.END, sendCode);
+ log.info(Color.YELLOW + "###### 客户端[{}]上报设备环境数据 ######" + Color.END, sendCode);
break;
case PushType.taskState:// insert into basedata_mont_taskstadata and patrol_task_status
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, TaskStateControl.class);
//root@Linx:/home/atia/data/log/inspect-main# grep -rn "type:41, messageBody:" info.log
- logger.info(Color.YELLOW + "###### 客户端[{}]上报设备任务状态 ######" + Color.END, sendCode);
+ log.info(Color.YELLOW + "###### 客户端[{}]上报设备任务状态 ######" + Color.END, sendCode);
break;
case PushType.result:// insert into basedata_mont_taskresult and patrol_task_result_main
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, TaskResultControl.class);
//root@Linx:/home/atia/data/log/inspect-main# grep -rn "type:61, messageBody:" info.log
- logger.info(Color.YELLOW + "###### 客户端[{}]上报巡视结果 ######" + Color.END, sendCode);
+ log.info(Color.YELLOW + "###### 客户端[{}]上报巡视结果 ######" + Color.END, sendCode);
break;
default:
- logger.info(Color.RED + "###### 客户端[{}]上报的非法消息不予处理 ######" + Color.END, sendCode);
+ log.info(Color.RED + "###### 客户端[{}]上报的非法消息不予处理 ######" + Color.END, sendCode);
}
if (type != SystemType.system && !StringUtil.isNullOrEmpty(json)) {
if ((type == NestCtlType.courseReversal && command == 3)) { // 处理用SSCOM模拟的数据, 向无人机发送控制指令
- logger.info("###### 向客户端[{}]透传200001控制指令 ######", sendCode);
+ log.info("###### 向客户端[{}]透传200001控制指令 ######", sendCode);
flushMsgToDeviceBroadcast(binaryModel.uuid, receiveCode, false, compactXml);
} else {
JSONObject jsonObject = JSONObject.parseObject(json);
@@ -382,7 +376,7 @@ public class NettyServer {
|| (type == NestCtlType.ptzPitch && command == 6)
|| (type == NestCtlType.picModelSet && command == 1)
|| (type == NestCtlType.nestSuddenStop && command == 2)) {// 处理用SSCOM模拟的数据, 向无人机发送控制指令
- logger.info("###### 向客户端[{}]透传200002~20005控制指令 ######", sendCode);
+ log.info("###### 向客户端[{}]透传200002~20005控制指令 ######", sendCode);
flushMsgToDeviceBroadcast(binaryModel.uuid, receiveCode, false, compactXml);
}
}
@@ -423,7 +417,7 @@ public class NettyServer {
xStreamEx.addPermission(AnyTypePermission.ANY);
obj = (BaseControl) xStreamEx.fromXML(xml);
} catch (com.thoughtworks.xstream.XStreamException e2) {
- logger.error(Color.RED + "###### dealRegister解析失败:{} ######" + Color.END, e2.getMessage());
+ log.error(Color.RED + "###### dealRegister解析失败:{} ######" + Color.END, e2.getMessage());
}
}
@@ -496,7 +490,7 @@ public class NettyServer {
xStream.autodetectAnnotations(true);
xml = xStream.toXML(responseControl);
} catch (com.thoughtworks.xstream.XStreamException e2) {
- logger.error(Color.RED + "###### sendRegisterResponse解析失败:{} ######" + Color.END, e2.getMessage());
+ log.error(Color.RED + "###### sendRegisterResponse解析失败:{} ######" + Color.END, e2.getMessage());
}
}
flushMsgToDevice(uuid, sendCode, false, xml);
@@ -522,7 +516,7 @@ public class NettyServer {
obj = (BaseControl) xStream.fromXML(xml);
isHost = false;
} catch (com.thoughtworks.xstream.XStreamException e2) {
- logger.error(Color.RED + "###### sendHeartBeat解析失败:{} ######" + Color.END, e2.getMessage());
+ log.error(Color.RED + "###### sendHeartBeat解析失败:{} ######" + Color.END, e2.getMessage());
}
}
@@ -577,13 +571,13 @@ public class NettyServer {
xml = upJson2Xml.UpStreamJson2Xml(json, LinkageTaskControl.class);
break;
default:
- logger.error(Color.RED + "###### 向设备端下发命令, 类型:{}错误, 不予处理 ######" + Color.END, type);
+ log.error(Color.RED + "###### 向设备端下发命令, 类型:{}错误, 不予处理 ######" + Color.END, type);
}
if (!StringUtils.isEmpty(xml)) {
flushMsgToDevice("", receiveCode, true, xml);
} else {
- logger.error(Color.RED + "###### xml is empty ######" + Color.END, type);
+ log.error(Color.RED + "###### xml is empty ######" + Color.END, type);
}
}
}
diff --git a/src/main/java/com/inspect/tcpserver/tcp/NettyServerHandler.java b/src/main/java/com/inspect/tcpserver/tcp/NettyServerHandler.java
index c01435a..714a1e8 100644
--- a/src/main/java/com/inspect/tcpserver/tcp/NettyServerHandler.java
+++ b/src/main/java/com/inspect/tcpserver/tcp/NettyServerHandler.java
@@ -6,11 +6,12 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
+import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
- public Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
private NettyServer nettyServer;
@@ -25,23 +26,23 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
(ChannelFuture future) -> {
if (future.isSuccess()) {
if(request) {
- logger.info("###### 活动连接:{},向客户端[{}]下发消息成功:{}######", ChannelCache.getInstance().getClients(), clientKey, xml);
+ log.info("###### 活动连接:{},向客户端[{}]下发消息成功:{}######", ChannelCache.getInstance().getClients(), clientKey, xml);
} else {
- logger.info(Color.CYAN + "###### 活动连接:{},向客户端[{}]响应成功######" + Color.END, ChannelCache.getInstance().getClients(), clientKey);
+ log.info(Color.CYAN + "###### 活动连接:{},向客户端[{}]响应成功######" + Color.END, ChannelCache.getInstance().getClients(), clientKey);
}
} else {
if(request) {
- logger.error(Color.RED + "###### 活动连接:{},向客户端[{}]下发消息失败:{}######" + Color.END, ChannelCache.getInstance().getClients(), clientKey, xml);
+ log.error(Color.RED + "###### 活动连接:{},向客户端[{}]下发消息失败:{}######" + Color.END, ChannelCache.getInstance().getClients(), clientKey, xml);
} else {
- logger.error(Color.RED + "###### 活动连接:{},向客户端[{}]响应失败######" + Color.END, ChannelCache.getInstance().getClients(), clientKey);
+ log.error(Color.RED + "###### 活动连接:{},向客户端[{}]响应失败######" + Color.END, ChannelCache.getInstance().getClients(), clientKey);
}
}
});
} else {
if(request) {
- logger.error(Color.RED + "###### 活动连接:{},无法向客户端[{}]下发消息,ctx==null######" + Color.END, ChannelCache.getInstance().getClients(), clientKey);
+ log.error(Color.RED + "###### 活动连接:{},无法向客户端[{}]下发消息,ctx==null######" + Color.END, ChannelCache.getInstance().getClients(), clientKey);
} else {
- logger.error(Color.RED + "###### 活动连接:{},无法向客户端[{}]响应,ctx==null######" + Color.END, ChannelCache.getInstance().getClients(), clientKey);
+ log.error(Color.RED + "###### 活动连接:{},无法向客户端[{}]响应,ctx==null######" + Color.END, ChannelCache.getInstance().getClients(), clientKey);
}
}
}
@@ -53,7 +54,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
public void channelActive(ChannelHandlerContext ctx) {
String id = ctx.channel().id().asShortText();
ChannelCache.getInstance().addIfAbsent(id, ctx);
- logger.info("###### 设备上线:{} ######", id);
+ log.info("###### 设备上线:{} ######", id);
}
/**
@@ -62,7 +63,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelInactive(ChannelHandlerContext ctx) {
String id = ctx.channel().id().asShortText();
- logger.info("###### 设备断开:{} ######", id);
+ log.info("###### 设备断开:{} ######", id);
ChannelCache.getInstance().remove(ctx);
}
@@ -89,6 +90,6 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- logger.error( "channel ctx: " + ctx.channel() + " exception", cause);
+ log.error( "channel ctx: " + ctx.channel() + " exception", cause);
}
}
diff --git a/src/main/java/com/inspect/tcpserver/tcp/UpJson2Xml.java b/src/main/java/com/inspect/tcpserver/tcp/UpJson2Xml.java
index 554b44a..679047e 100644
--- a/src/main/java/com/inspect/tcpserver/tcp/UpJson2Xml.java
+++ b/src/main/java/com/inspect/tcpserver/tcp/UpJson2Xml.java
@@ -5,12 +5,12 @@ import com.inspect.tcpserver.util.Color;
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.io.naming.NoNameCoder;
import com.thoughtworks.xstream.io.xml.Xpp3Driver;
+import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@Slf4j
public class UpJson2Xml {
- private final Logger logger = LoggerFactory.getLogger(DownXml2Json.class);
-
private String deviceAlias = "PatrolDevice";
public String getAlias() {
@@ -46,7 +46,7 @@ public class UpJson2Xml {
T obj = JSON.parseObject(json, clazz);
return xStream.toXML(obj);
} catch (com.thoughtworks.xstream.XStreamException ex) {
- logger.error(Color.RED + "###### UpStreamJson2Xml解析失败:{} ######" + Color.END, ex.getMessage());
+ log.error(Color.RED + "###### UpStreamJson2Xml解析失败:{} ######" + Color.END, ex.getMessage());
return null;
}
}