Browse Source

netty修改

master
lijw 8 months ago
parent
commit
d515f010d5
3 changed files with 54 additions and 39 deletions
  1. +15
    -0
      src/main/java/com/inspect/tcpserver/tcp/ChannelCache.java
  2. +33
    -33
      src/main/java/com/inspect/tcpserver/tcp/NettyServer.java
  3. +6
    -6
      src/main/java/com/inspect/tcpserver/tcp/NettyServerHandler.java

+ 15
- 0
src/main/java/com/inspect/tcpserver/tcp/ChannelCache.java View File

@ -3,6 +3,7 @@ package com.inspect.tcpserver.tcp;
import io.netty.channel.ChannelHandlerContext;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class ChannelCache {
@ -47,4 +48,18 @@ public class ChannelCache {
}
}
}
public void replace(String newKey, ChannelHandlerContext ctx) {
for (Map.Entry entry : channelPool.entrySet()) {
if (entry.getValue() == ctx) {
channelPool.remove(entry.getKey());
channelPool.put(newKey, ctx);
break;
}
}
}
public Set<String> getClients() {
return channelPool.keySet();
}
}

+ 33
- 33
src/main/java/com/inspect/tcpserver/tcp/NettyServer.java View File

@ -38,6 +38,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
@Component
@ -55,7 +56,6 @@ public class NettyServer {
ClientController clientController;
private EventLoopGroup bossGroup;
private EventLoopGroup workGroup;
private Map<String, String> tcpClientMap = new HashMap<>(); //机器人id,通道id
private long sendIndex = 0; //若重启系统后还要延续之前的序列号则需要把序列号存入redis中
private long receiveIndex = 0;
private DownXml2Json downXml2Json = new DownXml2Json(aliasHost);
@ -138,7 +138,7 @@ public class NettyServer {
//发送消息
public void flushMsgToDevice(String uuid, String clientKey, boolean request, String xml) {
if (tcpClientMap.containsKey(clientKey)) {
if (ChannelCache.getInstance().get(clientKey) != null) {
if(clientKey.startsWith("areaPatrolServer")) {
xml = xml.replace("<PatrolDevice>", "<PatrolHost>");
xml = xml.replace("</PatrolDevice>", "</PatrolHost>");
@ -159,15 +159,15 @@ public class NettyServer {
allBuf.writeByte(0xEB);
allBuf.writeByte(0x90);
redisTemplate.opsForValue().set(String.valueOf(sendIndex), allBuf.toString(CharsetUtil.US_ASCII), 60L, TimeUnit.SECONDS);
nettyServerHandler.sendMsg(uuid, clientKey, tcpClientMap.get(clientKey), allBuf, compact(xml));
nettyServerHandler.sendMsg(uuid, clientKey, allBuf, compact(xml));
sendIndex++;
} else {
logger.warn(Color.RED + "###### 客户端[{}/{}]离线! ######" + Color.END, clientKey, tcpClientMap.get(clientKey));
logger.warn(Color.RED + "###### 客户端[{}]离线! ######" + Color.END, clientKey);
}
}
public void flushMsgToDeviceBroadcast(String uuid, String clientKey, boolean request, String xml) {
for (Map.Entry<String, String> entry : tcpClientMap.entrySet()) {
for (String client : ChannelCache.getInstance().getClients()) {
if(clientKey.startsWith("areaPatrolServer")) {
xml = xml.replace("<PatrolDevice>", "<PatrolHost>");
xml = xml.replace("</PatrolDevice>", "</PatrolHost>");
@ -188,7 +188,7 @@ public class NettyServer {
allBuf.writeByte(0xEB);
allBuf.writeByte(0x90);
redisTemplate.opsForValue().set(String.valueOf(sendIndex), allBuf.toString(CharsetUtil.US_ASCII), 60L, TimeUnit.SECONDS);
nettyServerHandler.sendMsg(uuid, entry.getKey(), tcpClientMap.get(entry.getKey()), allBuf, compact(xml));
nettyServerHandler.sendMsg(uuid, client, allBuf, compact(xml));
sendIndex++;
try {
Thread.sleep(1);
@ -197,10 +197,10 @@ public class NettyServer {
}
//开启线程处理消息
public void receiveMsg(BinaryModel binaryModel) {
public void receiveMsg(BinaryModel binaryModel, ChannelHandlerContext context) {
executorService.execute(() -> {
try {
dealMsgInThreadPool(binaryModel);
dealMsgInThreadPool(binaryModel, context);
} catch (Exception e) {
logger.error(ExceptionUtils.getStackTrace(e));
}
@ -218,7 +218,7 @@ public class NettyServer {
}
//处理接收消息
private void dealMsgInThreadPool(BinaryModel binaryModel) throws DocumentException {
private void dealMsgInThreadPool(BinaryModel binaryModel, ChannelHandlerContext context) throws DocumentException {
String xml = binaryModel.dataBuf.toString(CharsetUtil.UTF_8);
receiveIndex = binaryModel.sendIndex;
SAXReader saxReader = new SAXReader();
@ -226,7 +226,7 @@ public class NettyServer {
Element root = document.getRootElement();
String sendCode = root.element("SendCode").getText();
String receiveCode = root.element("ReceiveCode").getText();
tcpClientMap.put(sendCode, binaryModel.id);
ChannelCache.getInstance().replace(sendCode, context);
int type = 0;
if (null != root.element("Type") && !StringUtil.isNullOrEmpty(root.element("Type").getText())) {
type = Integer.parseInt(root.element("Type").getText());
@ -237,7 +237,7 @@ public class NettyServer {
}
String compactXml = compact(xml);
// logger.info(Color.YELLOW + "###### 会话:{}, 客户:[{}/{}], 消息类型:{}, 命令:{}, 消息体: ######\n{}" + Color.END, binaryModel.uuid, sendCode, binaryModel.id, type, command, compactXml);
// logger.info(Color.YELLOW + "###### 会话:{}, 客户:[{}], 消息类型:{}, 命令:{}, 消息体: ######\n{}" + Color.END, binaryModel.uuid, sendCode, type, command, compactXml);
//判断是否重发
if (type == SystemType.system) {
if (command == SystemType.has_response || command == SystemType.no_response) {
@ -247,15 +247,15 @@ public class NettyServer {
resetSendMsg(binaryModel.receiveIndex, sendCode);
return;
} else if (code.equals(ResponseType.succeed)) {
logger.info(Color.YELLOW + "###### 客户端[{}/{}]响应结果为成功 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.YELLOW + "###### 客户端[{}]响应结果为成功 ######" + Color.END, sendCode);
if(command == SystemType.no_response) {
return;
}
} else if (code.equals(ResponseType.fault)) {
logger.warn(Color.RED + "###### 客户端[{}/{}]响应结果为失败 ######" + Color.END, sendCode, binaryModel.id);
logger.warn(Color.RED + "###### 客户端[{}]响应结果为失败 ######" + Color.END, sendCode);
return;
} else if (code.equals(ResponseType.reject)) {
logger.warn(Color.RED + "###### 客户端[{}/{}]响应结果为拒绝 ######" + Color.END, sendCode, binaryModel.id);
logger.warn(Color.RED + "###### 客户端[{}]响应结果为拒绝 ######" + Color.END, sendCode);
return;
}
}
@ -267,12 +267,12 @@ public class NettyServer {
switch (command) {
case SystemType.register_request:
// 收到接入侧注册信息
logger.info(Color.YELLOW + "###### 客户端[{}/{}]注册 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.YELLOW + "###### 客户端[{}]注册 ######" + Color.END, sendCode);
dealRegister(binaryModel.uuid, compactXml);
break;
case SystemType.heart_request:
// 处理心跳请求响应
logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报心跳 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.YELLOW + "###### 客户端[{}]上报心跳 ######" + Color.END, sendCode);
sendHeartBeat(binaryModel.uuid, compactXml);
break;
case SystemType.has_response:
@ -281,7 +281,7 @@ public class NettyServer {
// 处理设备上报的模型同步响应
if (null != root.element("Items").element("Item").attribute("device_file_path")) {
// 收到接入侧模型同步数据
logger.info(Color.YELLOW + "###### 模型同步收到客户端[{}/{}]响应数据 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.YELLOW + "###### 模型同步收到客户端[{}]响应数据 ######" + Color.END, sendCode);
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, ModelControl.class);
JSONObject jsonObject = JSONObject.parseObject(json);
jsonObject.put("uuid", binaryModel.uuid);
@ -291,66 +291,66 @@ public class NettyServer {
// 任务控制响应任务执行ID
if (null != root.element("Items").element("Item").attribute("task_patrolled_id")) {
// 收到接入侧任务下发或控制回复数据
logger.info(Color.YELLOW + "###### 任务下发收到客户端[{}/{}]响应数据 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.YELLOW + "###### 任务下发收到客户端[{}]响应数据 ######" + Color.END, sendCode);
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, ModelControl.class);
JSONObject jsonObject = JSONObject.parseObject(json);
jsonObject.put("SendCode", "");
clientController.sendMsg(binaryModel.uuid, jsonObject.toJSONString());
}
} else {
logger.warn(Color.RED + "###### 客户端[{}/{}]响应数据没有items ######" + Color.END, sendCode, binaryModel.id);
logger.warn(Color.RED + "###### 客户端[{}]响应数据没有items ######" + Color.END, sendCode);
}
break;
default:
logger.warn(Color.RED + "###### 客户端[{}/{}]非法的消息不予处理 ######" + Color.END, sendCode, binaryModel.id);
logger.warn(Color.RED + "###### 客户端[{}]非法的消息不予处理 ######" + Color.END, sendCode);
}
break;
case PushType.patrolDeviceState:// insert into basedata_mont_patdevstadata
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, PatrolDeviceStateControl.class);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报设备状态数据 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.YELLOW + "###### 客户端[{}]上报设备状态数据 ######" + Color.END, sendCode);
break;
case PushType.patrolDeviceRunning:// insert into basedata_mont_patdevrundata
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, PatrolDeviceRunningControl.class);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报设备运行数据 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.YELLOW + "###### 客户端[{}]上报设备运行数据 ######" + Color.END, sendCode);
break;
case PushType.nestState:// insert into basedata_mont_neststadata
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, NestStateControl.class);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报机巢状态数据 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.YELLOW + "###### 客户端[{}]上报机巢状态数据 ######" + Color.END, sendCode);
break;
case PushType.nestRunning:// insert into basedata_mont_nestrundata
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, NestRunningControl.class);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报机巢运行数据 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.YELLOW + "###### 客户端[{}]上报机巢运行数据 ######" + Color.END, sendCode);
break;
case PushType.location:// insert into basedata_mont_patdevcoord
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, LocationControl.class);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报设备坐标 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.YELLOW + "###### 客户端[{}]上报设备坐标 ######" + Color.END, sendCode);
break;
case PushType.route:// insert into basedata_mont_patdevpatroute
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, RouteControl.class);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报设备路线 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.YELLOW + "###### 客户端[{}]上报设备路线 ######" + Color.END, sendCode);
break;
case PushType.alarm:// insert into basedata_mont_patdevalmabn
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, AlarmControl.class);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报设备异常告警 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.YELLOW + "###### 客户端[{}]上报设备异常告警 ######" + Color.END, sendCode);
break;
case PushType.environment:// insert into basedata_mont_evndata
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, EnvironmentControl.class);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报设备环境数据 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.YELLOW + "###### 客户端[{}]上报设备环境数据 ######" + Color.END, sendCode);
break;
case PushType.taskState:// insert into basedata_mont_taskstadata and patrol_task_status
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, TaskStateControl.class);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报设备任务状态 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.YELLOW + "###### 客户端[{}]上报设备任务状态 ######" + Color.END, sendCode);
break;
case PushType.result:// insert into basedata_mont_taskresult and patrol_task_result_main
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, TaskResultControl.class);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报巡视结果 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.YELLOW + "###### 客户端[{}]上报巡视结果 ######" + Color.END, sendCode);
break;
default:
logger.info(Color.RED + "###### 客户端[{}/{}]上报的非法消息不予处理 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.RED + "###### 客户端[{}]上报的非法消息不予处理 ######" + Color.END, sendCode);
}
if (type != SystemType.system && !StringUtil.isNullOrEmpty(json)) {
if ((type == NestCtlType.courseReversal && command == 3)) { // 处理用SSCOM模拟的数据, 向无人机发送控制指令
logger.info("###### 向客户端[{}/{}]透传200001控制指令 ######", sendCode, binaryModel.id);
logger.info("###### 向客户端[{}]透传200001控制指令 ######", sendCode);
flushMsgToDeviceBroadcast(binaryModel.uuid, receiveCode, false, compactXml);
} else {
JSONObject jsonObject = JSONObject.parseObject(json);
@ -366,7 +366,7 @@ public class NettyServer {
|| (type == NestCtlType.ptzPitch && command == 6)
|| (type == NestCtlType.picModelSet && command == 1)
|| (type == NestCtlType.nestSuddenStop && command == 2)) {// 处理用SSCOM模拟的数据, 向无人机发送控制指令
logger.info("###### 向客户端[{}/{}]透传200002~20005控制指令 ######", sendCode, binaryModel.id);
logger.info("###### 向客户端[{}]透传200002~20005控制指令 ######", sendCode);
flushMsgToDeviceBroadcast(binaryModel.uuid, receiveCode, false, compactXml);
}
}


+ 6
- 6
src/main/java/com/inspect/tcpserver/tcp/NettyServerHandler.java View File

@ -18,19 +18,19 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
this.nettyServer = nettyServer;
}
public void sendMsg(String uuid, String clientKey, String clientValue, ByteBuf byteBuf, String xml) {
ChannelHandlerContext ctx = ChannelCache.getInstance().get(clientValue);
public void sendMsg(String uuid, String clientKey, ByteBuf byteBuf, String xml) {
ChannelHandlerContext ctx = ChannelCache.getInstance().get(clientKey);
if(ctx != null) {
ctx.writeAndFlush(Unpooled.wrappedBuffer(byteBuf)).addListener(
(ChannelFuture future) -> {
if (future.isSuccess()) {
logger.info(Color.CYAN + "###### 当前连接数:{},向客户:[{}/{}]下发消息成功:{}######" + Color.END, ChannelCache.getInstance().size(), clientKey, clientValue, xml);
logger.info(Color.CYAN + "###### 活动连接:{},向客户端[{}]下发消息成功:{}######" + Color.END, ChannelCache.getInstance().getClients(), clientKey, xml);
} else {
logger.error(Color.RED + "###### 当前连接数:{},向客户:[{}/{}]下发消息失败:{}######" + Color.END, ChannelCache.getInstance().size(), clientKey, clientValue, xml);
logger.error(Color.RED + "###### 活动连接:{},向客户端[{}]下发消息失败:{}######" + Color.END, ChannelCache.getInstance().getClients(), clientKey, xml);
}
});
} else {
logger.error(Color.RED + "###### 当前连接数:{},无法向客户:[{}/{}]下发消息,ctx==null######" + Color.END, ChannelCache.getInstance().size(), clientKey, clientValue);
logger.error(Color.RED + "###### 活动连接:{},无法向客户端[{}]下发消息,ctx==null######" + Color.END, ChannelCache.getInstance().getClients(), clientKey);
}
}
@ -62,7 +62,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
String id = ctx.channel().id().asShortText();
BinaryModel binaryModel = (BinaryModel) msg;
binaryModel.id = id;
nettyServer.receiveMsg((BinaryModel) msg);
nettyServer.receiveMsg((BinaryModel) msg, ctx);
}
@Override


Loading…
Cancel
Save