Browse Source

netty代码优化

master
lijiuwei 8 months ago
parent
commit
ba0263d84b
5 changed files with 43 additions and 111 deletions
  1. +2
    -1
      src/main/java/com/inspect/tcpserver/tcp/MyDecoder.java
  2. +1
    -1
      src/main/java/com/inspect/tcpserver/tcp/NettyClient.java
  3. +31
    -92
      src/main/java/com/inspect/tcpserver/tcp/NettyServer.java
  4. +8
    -16
      src/main/java/com/inspect/tcpserver/tcp/NettyServerHandler.java
  5. +1
    -1
      src/main/java/com/inspect/tcpserver/tcp/SystemType.java

+ 2
- 1
src/main/java/com/inspect/tcpserver/tcp/MyDecoder.java View File

@ -32,6 +32,7 @@ public class MyDecoder extends ByteToMessageDecoder {
final String uuid = RandomStringUtils.randomAlphanumeric(16); final String uuid = RandomStringUtils.randomAlphanumeric(16);
// ByteBuf forPrint = in.copy(); // ByteBuf forPrint = in.copy();
// log.info("###### 会话:{}, 客户:{}, 上行原始报文 ######\n {}", uuid, ctx.channel().id().asShortText(), ByteBufUtil.hexDump(forPrint)); // log.info("###### 会话:{}, 客户:{}, 上行原始报文 ######\n {}", uuid, ctx.channel().id().asShortText(), ByteBufUtil.hexDump(forPrint));
// forPrint.release();
int index; int index;
String flag; String flag;
@ -62,7 +63,7 @@ public class MyDecoder extends ByteToMessageDecoder {
if(in.readableBytes() > 0) { if(in.readableBytes() > 0) {
in.readShortLE(); in.readShortLE();
} else { } else {
log.warn(Color.RED + "###### readShortLE() but readableBytes = 0 ######" + Color.END);
log.error("readShortLE() but readableBytes = 0");
} }
BinaryModel binaryModel = new BinaryModel(); BinaryModel binaryModel = new BinaryModel();
binaryModel.receiveIndex = receiveIndex; binaryModel.receiveIndex = receiveIndex;


+ 1
- 1
src/main/java/com/inspect/tcpserver/tcp/NettyClient.java View File

@ -57,7 +57,7 @@ public class NettyClient {
private DownXml2Json downXml2Json; private DownXml2Json downXml2Json;
private NettyClientHandler client; private NettyClientHandler client;
private ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(4); private ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(4);
private ExecutorService executorService = new ThreadPoolExecutor(1, 5, 5L, TimeUnit.SECONDS, new ArrayBlockingQueue(4), Executors.defaultThreadFactory());
private ExecutorService executorService = new ThreadPoolExecutor(20, 50, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(400));
@Resource @Resource
UpSystemServerProperties upSystemServerProperties; UpSystemServerProperties upSystemServerProperties;


+ 31
- 92
src/main/java/com/inspect/tcpserver/tcp/NettyServer.java View File

@ -31,14 +31,12 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.concurrent.*; import java.util.concurrent.*;
@ -63,7 +61,7 @@ public class NettyServer {
private DownXml2Json downXml2Json = new DownXml2Json(aliasHost); private DownXml2Json downXml2Json = new DownXml2Json(aliasHost);
private UpJson2Xml upJson2Xml = new UpJson2Xml(aliasHost); private UpJson2Xml upJson2Xml = new UpJson2Xml(aliasHost);
private NettyServerHandler nettyServerHandler; private NettyServerHandler nettyServerHandler;
private ExecutorService executorService = new ThreadPoolExecutor(1, 10, 5L, TimeUnit.SECONDS, new ArrayBlockingQueue(4), Executors.defaultThreadFactory());
private ExecutorService executorService = new ThreadPoolExecutor(20, 50, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(400));
@Resource @Resource
DeviceServerProperties deviceServerProperties; DeviceServerProperties deviceServerProperties;
@ -71,23 +69,15 @@ public class NettyServer {
@Resource(name = "stringRedisTemplate") @Resource(name = "stringRedisTemplate")
private RedisTemplate<String, String> redisTemplate; private RedisTemplate<String, String> redisTemplate;
@Resource
private RestTemplate restTemplate;
@Resource @Resource
private RabbitTemplate rabbitTemplate; private RabbitTemplate rabbitTemplate;
@Value("${iip_server.authDevice.url}") @Value("${iip_server.authDevice.url}")
String iipAuthDeviceUrl; String iipAuthDeviceUrl;
private String serverIP;
private int serverPort; private int serverPort;
private int num = 410;
public void init() { public void init() {
this.serverIP = deviceServerProperties.ip;
this.serverPort = deviceServerProperties.port; this.serverPort = deviceServerProperties.port;
upJson2Xml = new UpJson2Xml(aliasHost); upJson2Xml = new UpJson2Xml(aliasHost);
downXml2Json = new DownXml2Json(aliasHost); downXml2Json = new DownXml2Json(aliasHost);
@ -107,18 +97,17 @@ public class NettyServer {
.channel(NioServerSocketChannel.class) .channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() { .childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
protected void initChannel(SocketChannel ch) throws Exception {
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new MyDecoder()); ch.pipeline().addLast(new MyDecoder());
nettyServerHandler = new NettyServerHandler(NettyServer.this); nettyServerHandler = new NettyServerHandler(NettyServer.this);
ch.pipeline().addLast(nettyServerHandler); ch.pipeline().addLast(nettyServerHandler);
} }
}) })
.localAddress(serverPort) .localAddress(serverPort)
//设置队列大小
.option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_BACKLOG, 1024)
// 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
.childOption(ChannelOption.SO_KEEPALIVE, true);
//绑定端口,开始接收进来的连接
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.SO_LINGER, 10);
try { try {
ChannelFuture future = bootstrap.bind(serverPort).sync(); ChannelFuture future = bootstrap.bind(serverPort).sync();
logger.info("###### TCP服务器启动 ######"); logger.info("###### TCP服务器启动 ######");
@ -170,11 +159,10 @@ public class NettyServer {
allBuf.writeByte(0xEB); allBuf.writeByte(0xEB);
allBuf.writeByte(0x90); allBuf.writeByte(0x90);
redisTemplate.opsForValue().set(String.valueOf(sendIndex), allBuf.toString(CharsetUtil.US_ASCII), 60L, TimeUnit.SECONDS); redisTemplate.opsForValue().set(String.valueOf(sendIndex), allBuf.toString(CharsetUtil.US_ASCII), 60L, TimeUnit.SECONDS);
logger.info(Color.MAGENTA + "###### 会话:{}, 客户:[{}/{}], 向设备回送序列:{}, 接收端序列:{}, 消息: ######\n{}" + Color.END, uuid, clientKey, tcpClientMap.get(clientKey), sendIndex, receiveIndex, compact(xml));
nettyServerHandler.sendMsg(uuid, clientKey, tcpClientMap.get(clientKey), allBuf);
nettyServerHandler.sendMsg(uuid, clientKey, tcpClientMap.get(clientKey), allBuf, compact(xml));
sendIndex++; sendIndex++;
} else { } else {
logger.warn(Color.RED + "###### 会话:{}, 客户:[{}/{}] 离线!!! ######" + Color.END, uuid, clientKey, tcpClientMap.get(clientKey));
logger.warn(Color.RED + "###### 客户端[{}/{}]离线! ######" + Color.END, clientKey, tcpClientMap.get(clientKey));
} }
} }
@ -200,8 +188,7 @@ public class NettyServer {
allBuf.writeByte(0xEB); allBuf.writeByte(0xEB);
allBuf.writeByte(0x90); allBuf.writeByte(0x90);
redisTemplate.opsForValue().set(String.valueOf(sendIndex), allBuf.toString(CharsetUtil.US_ASCII), 60L, TimeUnit.SECONDS); redisTemplate.opsForValue().set(String.valueOf(sendIndex), allBuf.toString(CharsetUtil.US_ASCII), 60L, TimeUnit.SECONDS);
logger.info(Color.MAGENTA + "###### 会话:{}, 客户:[{}/{}], 向设备回送序列:{}, 接收端序列:{}, 消息: ######\n{}" + Color.END, uuid, clientKey, tcpClientMap.get(clientKey), sendIndex, receiveIndex, compact(xml));
nettyServerHandler.sendMsg(uuid, entry.getKey(), tcpClientMap.get(entry.getKey()), allBuf);
nettyServerHandler.sendMsg(uuid, entry.getKey(), tcpClientMap.get(entry.getKey()), allBuf, compact(xml));
sendIndex++; sendIndex++;
try { try {
Thread.sleep(1); Thread.sleep(1);
@ -250,7 +237,7 @@ public class NettyServer {
} }
String compactXml = compact(xml); String compactXml = compact(xml);
logger.info(Color.YELLOW + "###### 会话:{}, 客户:[{}/{}], 消息类型:{}, 命令:{}, 消息体: ######\n{}" + Color.END, binaryModel.uuid, sendCode, binaryModel.id, type, command, compactXml);
// logger.info(Color.YELLOW + "###### 会话:{}, 客户:[{}/{}], 消息类型:{}, 命令:{}, 消息体: ######\n{}" + Color.END, binaryModel.uuid, sendCode, binaryModel.id, type, command, compactXml);
//判断是否重发 //判断是否重发
if (type == SystemType.system) { if (type == SystemType.system) {
if (command == SystemType.has_response || command == SystemType.no_response) { if (command == SystemType.has_response || command == SystemType.no_response) {
@ -260,15 +247,15 @@ public class NettyServer {
resetSendMsg(binaryModel.receiveIndex, sendCode); resetSendMsg(binaryModel.receiveIndex, sendCode);
return; return;
} else if (code.equals(ResponseType.succeed)) { } else if (code.equals(ResponseType.succeed)) {
logger.info(Color.YELLOW + "###### 响应结果为成功 ######" + Color.END);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]响应结果为成功 ######" + Color.END, sendCode, binaryModel.id);
if(command == SystemType.no_response) { if(command == SystemType.no_response) {
return; return;
} }
} else if (code.equals(ResponseType.fault)) { } else if (code.equals(ResponseType.fault)) {
logger.warn(Color.RED + "###### 响应结果为失败 ######" + Color.END);
logger.warn(Color.RED + "###### 客户端[{}/{}]响应结果为失败 ######" + Color.END, sendCode, binaryModel.id);
return; return;
} else if (code.equals(ResponseType.reject)) { } else if (code.equals(ResponseType.reject)) {
logger.warn(Color.RED + "###### 响应结果为拒绝 ######" + Color.END);
logger.warn(Color.RED + "###### 客户端[{}/{}]响应结果为拒绝 ######" + Color.END, sendCode, binaryModel.id);
return; return;
} }
} }
@ -285,7 +272,7 @@ public class NettyServer {
break; break;
case SystemType.heart_request: case SystemType.heart_request:
// 处理心跳请求响应 // 处理心跳请求响应
logger.info(Color.YELLOW + "###### 客户端[{}/{}]心跳 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报心跳 ######" + Color.END, sendCode, binaryModel.id);
sendHeartBeat(binaryModel.uuid, compactXml); sendHeartBeat(binaryModel.uuid, compactXml);
break; break;
case SystemType.has_response: case SystemType.has_response:
@ -294,8 +281,7 @@ public class NettyServer {
// 处理设备上报的模型同步响应 // 处理设备上报的模型同步响应
if (null != root.element("Items").element("Item").attribute("device_file_path")) { if (null != root.element("Items").element("Item").attribute("device_file_path")) {
// 收到接入侧模型同步数据 // 收到接入侧模型同步数据
logger.info(Color.YELLOW + "###### 模型同步响应数据 ######" + Color.END);
// json = downXml2Json.ModelControlXml2Json(xml);
logger.info(Color.YELLOW + "###### 模型同步收到客户端[{}/{}]响应数据 ######" + Color.END, sendCode, binaryModel.id);
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, ModelControl.class); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, ModelControl.class);
JSONObject jsonObject = JSONObject.parseObject(json); JSONObject jsonObject = JSONObject.parseObject(json);
jsonObject.put("uuid", binaryModel.uuid); jsonObject.put("uuid", binaryModel.uuid);
@ -305,83 +291,68 @@ public class NettyServer {
// 任务控制响应任务执行ID // 任务控制响应任务执行ID
if (null != root.element("Items").element("Item").attribute("task_patrolled_id")) { if (null != root.element("Items").element("Item").attribute("task_patrolled_id")) {
// 收到接入侧任务下发或控制回复数据 // 收到接入侧任务下发或控制回复数据
logger.info(Color.YELLOW + "###### 任务下发响应数据 ######" + Color.END);
// json = downXml2Json.ModelControlXml2Json(xml);
logger.info(Color.YELLOW + "###### 任务下发收到客户端[{}/{}]响应数据 ######" + Color.END, sendCode, binaryModel.id);
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, ModelControl.class); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, ModelControl.class);
JSONObject jsonObject = JSONObject.parseObject(json); JSONObject jsonObject = JSONObject.parseObject(json);
jsonObject.put("SendCode", ""); jsonObject.put("SendCode", "");
clientController.sendMsg(binaryModel.uuid, jsonObject.toJSONString()); clientController.sendMsg(binaryModel.uuid, jsonObject.toJSONString());
} }
} else { } else {
// 接收到的系统类信息报文中root:{},中不包含items或items中没有item,不予处理
logger.warn(Color.RED + "###### 响应数据没有items ######" + Color.END);
logger.warn(Color.RED + "###### 客户端[{}/{}]响应数据没有items ######" + Color.END, sendCode, binaryModel.id);
} }
break; break;
default: default:
// 接收到的系统类信息报文中command:{},不在处理范围内不予处理
logger.warn(Color.RED + "###### 非法的消息不予处理 ######" + Color.END);
logger.warn(Color.RED + "###### 客户端[{}/{}]非法的消息不予处理 ######" + Color.END, sendCode, binaryModel.id);
} }
break; break;
case PushType.patrolDeviceState:// insert into basedata_mont_patdevstadata case PushType.patrolDeviceState:// insert into basedata_mont_patdevstadata
// json = downXml2Json.PatrolDeviceStateControlXml2Json(xml);
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, PatrolDeviceStateControl.class); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, PatrolDeviceStateControl.class);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]设备状态数据 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报设备状态数据 ######" + Color.END, sendCode, binaryModel.id);
break; break;
case PushType.patrolDeviceRunning:// insert into basedata_mont_patdevrundata case PushType.patrolDeviceRunning:// insert into basedata_mont_patdevrundata
// json = downXml2Json.PatrolDeviceRunningControlXml2Json(xml);
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, PatrolDeviceRunningControl.class); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, PatrolDeviceRunningControl.class);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]设备运行数据 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报设备运行数据 ######" + Color.END, sendCode, binaryModel.id);
break; break;
case PushType.nestState:// insert into basedata_mont_neststadata case PushType.nestState:// insert into basedata_mont_neststadata
// json = downXml2Json.NestStateControlXml2Json(xml);
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, NestStateControl.class); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, NestStateControl.class);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]机巢状态数据 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报机巢状态数据 ######" + Color.END, sendCode, binaryModel.id);
break; break;
case PushType.nestRunning:// insert into basedata_mont_nestrundata case PushType.nestRunning:// insert into basedata_mont_nestrundata
// json = downXml2Json.NestRunningControlXml2Json(xml);
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, NestRunningControl.class); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, NestRunningControl.class);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]机巢运行数据 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报机巢运行数据 ######" + Color.END, sendCode, binaryModel.id);
break; break;
case PushType.location:// insert into basedata_mont_patdevcoord case PushType.location:// insert into basedata_mont_patdevcoord
// json = downXml2Json.LocationControlXml2Json(xml);
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, LocationControl.class); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, LocationControl.class);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]设备坐标 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报设备坐标 ######" + Color.END, sendCode, binaryModel.id);
break; break;
case PushType.route:// insert into basedata_mont_patdevpatroute case PushType.route:// insert into basedata_mont_patdevpatroute
// json = downXml2Json.RouteControlXml2Json(xml);
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, RouteControl.class); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, RouteControl.class);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]设备路线 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报设备路线 ######" + Color.END, sendCode, binaryModel.id);
break; break;
case PushType.alarm:// insert into basedata_mont_patdevalmabn case PushType.alarm:// insert into basedata_mont_patdevalmabn
// json = down.AlarmControlXml2Json(xml);
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, AlarmControl.class); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, AlarmControl.class);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]设备异常告警 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报设备异常告警 ######" + Color.END, sendCode, binaryModel.id);
break; break;
case PushType.environment:// insert into basedata_mont_evndata case PushType.environment:// insert into basedata_mont_evndata
// json = downXml2Json.EnvironmentControlXml2Json(xml);
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, EnvironmentControl.class); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, EnvironmentControl.class);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]设备上报环境数据 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报设备环境数据 ######" + Color.END, sendCode, binaryModel.id);
break; break;
case PushType.taskState:// insert into basedata_mont_taskstadata and patrol_task_status case PushType.taskState:// insert into basedata_mont_taskstadata and patrol_task_status
// json = downXml2Json.TaskStateControlXml2Json(xml);
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, TaskStateControl.class); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, TaskStateControl.class);
logger.info(Color.YELLOW + "###### 客户端{}/{}]设备任务状态 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报设备任务状态 ######" + Color.END, sendCode, binaryModel.id);
break; break;
case PushType.result:// insert into basedata_mont_taskresult and patrol_task_result_main case PushType.result:// insert into basedata_mont_taskresult and patrol_task_result_main
// json = downXml2Json.TaskResultControlXml2Json(xml);
json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, TaskResultControl.class); json = downXml2Json.DownStreamJson2Xml(binaryModel.uuid, binaryModel.id, compactXml, TaskResultControl.class);
logger.info(Color.YELLOW + "###### 客户端{}/{}]巡视结果 ######" + Color.END, sendCode, binaryModel.id);
logger.info(Color.YELLOW + "###### 客户端[{}/{}]上报巡视结果 ######" + Color.END, sendCode, binaryModel.id);
break; break;
default: default:
// server-handle-接收到的type:{},不在处理范围内不予处理
logger.info(Color.RED + "###### 非法的消息不予处理 ######" + Color.END);
logger.info(Color.RED + "###### 客户端[{}/{}]上报的非法消息不予处理 ######" + Color.END, sendCode, binaryModel.id);
} }
if (type != SystemType.system && !StringUtil.isNullOrEmpty(json)) { if (type != SystemType.system && !StringUtil.isNullOrEmpty(json)) {
if ((type == NestCtlType.courseReversal && command == 3)) { // 处理用SSCOM模拟的数据, 向无人机发送控制指令 if ((type == NestCtlType.courseReversal && command == 3)) { // 处理用SSCOM模拟的数据, 向无人机发送控制指令
logger.info("###### 客户:{}, 向设备透传200001控制指令 ######", sendCode);
logger.info("###### 向客户端[{}/{}]透传200001控制指令 ######", sendCode, binaryModel.id);
flushMsgToDeviceBroadcast(binaryModel.uuid, receiveCode, false, compactXml); flushMsgToDeviceBroadcast(binaryModel.uuid, receiveCode, false, compactXml);
} else { } else {
//rabbitmq推送到消息队列中基于springboot_xggd
JSONObject jsonObject = JSONObject.parseObject(json); JSONObject jsonObject = JSONObject.parseObject(json);
jsonObject.put("uuid", binaryModel.uuid); jsonObject.put("uuid", binaryModel.uuid);
json = jsonObject.toJSONString(); json = jsonObject.toJSONString();
@ -395,7 +366,7 @@ public class NettyServer {
|| (type == NestCtlType.ptzPitch && command == 6) || (type == NestCtlType.ptzPitch && command == 6)
|| (type == NestCtlType.picModelSet && command == 1) || (type == NestCtlType.picModelSet && command == 1)
|| (type == NestCtlType.nestSuddenStop && command == 2)) {// 处理用SSCOM模拟的数据, 向无人机发送控制指令 || (type == NestCtlType.nestSuddenStop && command == 2)) {// 处理用SSCOM模拟的数据, 向无人机发送控制指令
logger.info("###### 客户:{}, 向设备透传200002~20005控制指令 ######", sendCode);
logger.info("###### 向客户端[{}/{}]透传200002~20005控制指令 ######", sendCode, binaryModel.id);
flushMsgToDeviceBroadcast(binaryModel.uuid, receiveCode, false, compactXml); flushMsgToDeviceBroadcast(binaryModel.uuid, receiveCode, false, compactXml);
} }
} }
@ -492,26 +463,9 @@ public class NettyServer {
* @return * @return
*/ */
public boolean authDevice(String sendCode) { public boolean authDevice(String sendCode) {
// // 调用基础服务的鉴权巡视设备接口
// String url = String.format(iipAuthDeviceUrl,sendCode);
//
// String resultString = restTemplate.getForObject(url,String.class);
//
// Result result = JSONObject.parseObject(resultString, Result.class);
//
// // 判断鉴权结果
// if(StringUtils.equals(result.getCode(),"200")){
// logger.info("设备鉴权成功,result:{}",result);
// return true;
// }
//
// logger.warn("设备鉴权失败 sendCode:{},resutl:{}",sendCode,result);
// return false;
return true; return true;
} }
public void sendRegisterResponse(String uuid, RegisterResponseControl responseControl, String sendCode) { public void sendRegisterResponse(String uuid, RegisterResponseControl responseControl, String sendCode) {
String xml = ""; String xml = "";
try { try {
@ -569,13 +523,6 @@ public class NettyServer {
rabbitTemplate.convertAndSend(Constant.EX_CHANGE_NAME, Constant.ROUTING_KEY_NAME, jsonObject.toJSONString()); rabbitTemplate.convertAndSend(Constant.EX_CHANGE_NAME, Constant.ROUTING_KEY_NAME, jsonObject.toJSONString());
} }
/**
* 判断机器人是否在线
*/
public boolean isOnline(String code) {
return tcpClientMap.containsKey(code);
}
public void sendXmlMessage(JSONObject obj) { public void sendXmlMessage(JSONObject obj) {
int type = obj.getInteger("Type"); int type = obj.getInteger("Type");
String receiveCode = obj.getString("ReceiveCode"); String receiveCode = obj.getString("ReceiveCode");
@ -589,7 +536,6 @@ public class NettyServer {
case RobotType.robotFz: case RobotType.robotFz:
case RobotType.robotIr: case RobotType.robotIr:
case RobotType.robotPtz: case RobotType.robotPtz:
// xml = upJson2Xml.RobotJson2Xml(json);
xml = upJson2Xml.UpStreamJson2Xml(json, RobotControl.class); xml = upJson2Xml.UpStreamJson2Xml(json, RobotControl.class);
break; break;
case UAVType.uav: case UAVType.uav:
@ -597,35 +543,28 @@ public class NettyServer {
case UAVType.uavKz: case UAVType.uavKz:
case UAVType.uavYt: case UAVType.uavYt:
case UAVType.nest: case UAVType.nest:
// xml = upJson2Xml.UavControlJson2Xml(json);
xml = upJson2Xml.UpStreamJson2Xml(json, RobotControl.class); xml = upJson2Xml.UpStreamJson2Xml(json, RobotControl.class);
break; break;
case TaskType.taskControl: case TaskType.taskControl:
// xml = upJson2Xml.ResponseJson2Xml(json);
xml = upJson2Xml.UpStreamJson2Xml(json, ResponseControl.class); xml = upJson2Xml.UpStreamJson2Xml(json, ResponseControl.class);
break; break;
case TaskType.taskSend: case TaskType.taskSend:
// xml = upJson2Xml.TaskSendJson2Xml(json);
xml = upJson2Xml.UpStreamJson2Xml(json, TaskSendControl.class); xml = upJson2Xml.UpStreamJson2Xml(json, TaskSendControl.class);
break; break;
case TaskType.taskArea: case TaskType.taskArea:
// xml = upJson2Xml.AreaJson2Xml(json);
xml = upJson2Xml.UpStreamJson2Xml(json, AreaControl.class); xml = upJson2Xml.UpStreamJson2Xml(json, AreaControl.class);
break; break;
case ModelType.modelSync: case ModelType.modelSync:
// xml = upJson2Xml.ResponseJson2Xml(json);
xml = upJson2Xml.UpStreamJson2Xml(json, ResponseControl.class); xml = upJson2Xml.UpStreamJson2Xml(json, ResponseControl.class);
break; break;
case TaskType.lendonTask: case TaskType.lendonTask:
// xml = upJson2Xml.LinkageTaskJson2Xml(json);
xml = upJson2Xml.UpStreamJson2Xml(json, LinkageTaskControl.class); xml = upJson2Xml.UpStreamJson2Xml(json, LinkageTaskControl.class);
break; break;
default: default:
logger.warn(Color.RED + "###### 向设备端下发命令, 类型:{}错误, 不予处理 ######" + Color.END, type);
logger.error(Color.RED + "###### 向设备端下发命令, 类型:{}错误, 不予处理 ######" + Color.END, type);
} }
if (!StringUtils.isEmpty(xml)) { if (!StringUtils.isEmpty(xml)) {
//logger.info("###### 向设备端下发命令 ######\n{}", xml);
flushMsgToDevice("", receiveCode, true, xml); flushMsgToDevice("", receiveCode, true, xml);
} }
} }


+ 8
- 16
src/main/java/com/inspect/tcpserver/tcp/NettyServerHandler.java View File

@ -18,21 +18,19 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
this.nettyServer = nettyServer; this.nettyServer = nettyServer;
} }
public void sendMsg(String uuid, String clientKey, String clientValue, ByteBuf byteBuf) {
logger.info(Color.MAGENTA + "###### clientKey:" + clientKey + ", clientValue:" + clientValue + ", channelPool:" + ChannelCache.getInstance().size() + " ######" + Color.END);
public void sendMsg(String uuid, String clientKey, String clientValue, ByteBuf byteBuf, String xml) {
ChannelHandlerContext ctx = ChannelCache.getInstance().get(clientValue); ChannelHandlerContext ctx = ChannelCache.getInstance().get(clientValue);
if(ctx != null) { if(ctx != null) {
logger.info(Color.MAGENTA + "###### 执行下发 ######" + Color.END);
ctx.writeAndFlush(Unpooled.wrappedBuffer(byteBuf)).addListener( ctx.writeAndFlush(Unpooled.wrappedBuffer(byteBuf)).addListener(
(ChannelFuture future) -> { (ChannelFuture future) -> {
if (future.isSuccess()) { if (future.isSuccess()) {
logger.info(Color.MAGENTA + "send command success" + Color.END);
logger.info(Color.CYAN + "###### 当前连接数:{},向客户:[{}/{}]下发消息成功:{}######" + Color.END, ChannelCache.getInstance().size(), clientKey, clientValue, xml);
} else { } else {
logger.error(Color.MAGENTA + "send command fail" + Color.END);
logger.error(Color.RED + "###### 当前连接数:{},向客户:[{}/{}]下发消息失败:{}######" + Color.END, ChannelCache.getInstance().size(), clientKey, clientValue, xml);
} }
}); });
} else { } else {
logger.info(Color.MAGENTA + "###### 无法执行下发,ctx==null ######" + Color.END);
logger.error(Color.RED + "###### 当前连接数:{},无法向客户:[{}/{}]下发消息,ctx==null######" + Color.END, ChannelCache.getInstance().size(), clientKey, clientValue);
} }
} }
@ -40,7 +38,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
* 客户端连接会触发 * 客户端连接会触发
*/ */
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
public void channelActive(ChannelHandlerContext ctx) {
String id = ctx.channel().id().asShortText(); String id = ctx.channel().id().asShortText();
ChannelCache.getInstance().addIfAbsent(id, ctx); ChannelCache.getInstance().addIfAbsent(id, ctx);
logger.info("###### 设备上线:{} ######", id); logger.info("###### 设备上线:{} ######", id);
@ -50,9 +48,9 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
* 客户端断开会触发 * 客户端断开会触发
*/ */
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
public void channelInactive(ChannelHandlerContext ctx) {
String id = ctx.channel().id().asShortText(); String id = ctx.channel().id().asShortText();
logger.warn("###### 设备断开:{} ######", id);
logger.info("###### 设备断开:{} ######", id);
ChannelCache.getInstance().remove(ctx); ChannelCache.getInstance().remove(ctx);
} }
@ -60,16 +58,10 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
* 客户端发消息会触发 * 客户端发消息会触发
*/ */
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String id = ctx.channel().id().asShortText(); String id = ctx.channel().id().asShortText();
BinaryModel binaryModel = (BinaryModel) msg; BinaryModel binaryModel = (BinaryModel) msg;
binaryModel.id = id; binaryModel.id = id;
logger.debug("###### 会话:{}, 客户:{}, 客户序列号:{}, 服务序列号:{}, 长度:{} ######",
binaryModel.uuid,
binaryModel.id,
binaryModel.sendIndex,
binaryModel.receiveIndex,
binaryModel.dataLength);
nettyServer.receiveMsg((BinaryModel) msg); nettyServer.receiveMsg((BinaryModel) msg);
} }


+ 1
- 1
src/main/java/com/inspect/tcpserver/tcp/SystemType.java View File

@ -74,7 +74,7 @@ class PushType {
class ConfigType { class ConfigType {
public static final int dataLength = 25; public static final int dataLength = 25;
public static final String heart_beat_interval = "300";
public static final String heart_beat_interval = "30";
public static final String patroldevice_run_interval = "300"; public static final String patroldevice_run_interval = "300";
public static final String nest_run_interval = "300"; public static final String nest_run_interval = "300";
public static final String env_interval = "300"; public static final String env_interval = "300";


Loading…
Cancel
Save