diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/mqtt/MQTTUtil.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/mqtt/MQTTUtil.java index ca1195f..c39e2f5 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/mqtt/MQTTUtil.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/mqtt/MQTTUtil.java @@ -2,6 +2,7 @@ package com.inspect.mqtt; import javax.annotation.PostConstruct; +import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; @@ -11,6 +12,7 @@ import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +@Slf4j @Service public class MQTTUtil implements IMQTTUtil { @Value("${mqtt.ip}") @@ -32,17 +34,19 @@ public class MQTTUtil implements IMQTTUtil { @PostConstruct public void mqttUtil() { + log.info("mqttUtil start!!!"); } public void subscribe(String[] topicFilters, int[] qos) { try { - this.client.subscribe(topicFilters, qos); + log.info("subscribe topicFilters: {}, qos: {}", topicFilters, qos); + client.subscribe(topicFilters, qos); } catch (MqttException e) { - System.out.println("reason: " + e.getReasonCode()); - System.out.println("msg: " + e.getMessage()); - System.out.println("loc: " + e.getLocalizedMessage()); - System.out.println("cause: " + e.getCause()); - System.out.println("exception: " + e); + log.info("reason: {}", e.getReasonCode()); + log.info("msg: {}", e.getMessage()); + log.info("loc: {}", e.getLocalizedMessage()); + log.info("cause: {}", e.getCause()); + log.info("exception: {}", e.getMessage()); e.printStackTrace(); } @@ -51,15 +55,15 @@ public class MQTTUtil implements IMQTTUtil { public void connect(String broker, String clientId, String userName, String password, MqttCallback callback) { MemoryPersistence persistence = new MemoryPersistence(); try { - this.client = new MqttClient(broker, clientId, persistence); + client = new MqttClient(broker, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setUserName(userName); connOpts.setPassword(password.toCharArray()); connOpts.setCleanSession(true); - this.client.setCallback(callback); - System.out.println("Connecting to broker: " + broker); - this.client.connect(connOpts); - System.out.println("Connected"); + client.setCallback(callback); + log.info("Connecting to mqtt broker: {}", broker); + client.connect(connOpts); + log.info("Connected to mqtt broker"); } catch (MqttException e) { System.out.println("reason: " + e.getReasonCode()); System.out.println("msg: " + e.getMessage()); @@ -73,8 +77,9 @@ public class MQTTUtil implements IMQTTUtil { public void disconnect() { try { - this.client.disconnect(); - this.client.close(); + log.info("Disconnect mqtt broker"); + client.disconnect(); + client.close(); } catch (MqttException e) { System.out.println("reason: " + e.getReasonCode()); System.out.println("msg: " + e.getMessage()); @@ -93,7 +98,8 @@ public class MQTTUtil implements IMQTTUtil { try { MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); - this.client.publish(pubTopic, message); + log.info("Publish mqtt broker"); + client.publish(pubTopic, message); return true; } catch (MqttException e) { System.out.println("reason: " + e.getReasonCode()); diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/taskstatus/controller/PatrolTaskStatusController.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/taskstatus/controller/PatrolTaskStatusController.java index 0145707..362ece4 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/taskstatus/controller/PatrolTaskStatusController.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/taskstatus/controller/PatrolTaskStatusController.java @@ -885,11 +885,11 @@ public class PatrolTaskStatusController extends BaseController { } else if (posType.equals("1") || posType.equals("2")) { // 机器人,无人机 for (String receiveCode : receiveCodeSet) { - for (String wrj : wrjCode) { - String strSendTaskControl = JSONObject.toJSONString(SendTask.builder().receiveCode(receiveCode).sendCode(sendCode).code(wrj).type("1").command("5").items(Collections.singletonList(PatrolSendTask.builder().value("1").build())).build()); - log.info("[TASK] MODE CHANGE: {}", strSendTaskControl); - feignTaskClient.sendCommand(strSendTaskControl); - } +// for (String wrj : wrjCode) { +// String strSendTaskControl = JSONObject.toJSONString(SendTask.builder().receiveCode(receiveCode).sendCode(sendCode).code(wrj).type("1").command("5").items(Collections.singletonList(PatrolSendTask.builder().value("1").build())).build()); +// log.info("[TASK] MODE CHANGE: {}", strSendTaskControl); +// feignTaskClient.sendCommand(strSendTaskControl); +// } SendTask sendTask = new SendTask(); sendTask.setReceiveCode(receiveCode); sendTask.setSendCode(sendCode); diff --git a/inspect-metadata/pom.xml b/inspect-metadata/pom.xml index 5a5d50a..3327848 100644 --- a/inspect-metadata/pom.xml +++ b/inspect-metadata/pom.xml @@ -110,6 +110,12 @@ jaxb-core 2.3.0.1 + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.2 + diff --git a/inspect-metadata/src/main/java/com/inspect/metadata/mqtt/HuaRuanMQTTServiceImpl.java b/inspect-metadata/src/main/java/com/inspect/metadata/mqtt/HuaRuanMQTTServiceImpl.java new file mode 100644 index 0000000..d59e1db --- /dev/null +++ b/inspect-metadata/src/main/java/com/inspect/metadata/mqtt/HuaRuanMQTTServiceImpl.java @@ -0,0 +1,46 @@ +package com.inspect.metadata.mqtt; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +public class HuaRuanMQTTServiceImpl { + + private final MQTTUtils mqttUtils; + @Autowired + public HuaRuanMQTTServiceImpl(MQTTUtils mqttUtils) { + this.mqttUtils = mqttUtils; + + // 设置 MQTT 回调 + setupMqttCallback(); + } + + private void setupMqttCallback() { + mqttUtils.getClient().setCallback(new MqttCallback() { + @Override + public void connectionLost(Throwable cause) { + log.error("Connection lost: {}", cause.getMessage()); + } + + @Override + public void messageArrived(String topic, MqttMessage message) { + log.info("Message arrived from topic {}: {}", topic, new String(message.getPayload())); + handleIncomingMessage(topic, message); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + log.info("Delivery complete for message with token: {}", token.getMessageId()); + } + }); + } + + private void handleIncomingMessage(String topic, MqttMessage message) { + // 处理接收到的消息 + } +} diff --git a/inspect-metadata/src/main/java/com/inspect/metadata/mqtt/MQTTUtils.java b/inspect-metadata/src/main/java/com/inspect/metadata/mqtt/MQTTUtils.java new file mode 100644 index 0000000..6aa1ea7 --- /dev/null +++ b/inspect-metadata/src/main/java/com/inspect/metadata/mqtt/MQTTUtils.java @@ -0,0 +1,126 @@ +package com.inspect.metadata.mqtt; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +@Slf4j +@Service +public class MQTTUtils { + @Value("${huaRuan.mqtt.ip}") + protected String ip; + @Value("${huaRuan.mqtt.clientId}") + protected String clientId; + @Value("${huaRuan.mqtt.user}") + protected String user; + @Value("${huaRuan.mqtt.pwd}") + protected String pwd; + @Value("${huaRuan.mqtt.topicFilters}") + protected String topicFilters; + + private MqttClient client = null; + private Long id = 0L; + + public Long getId() { + return this.id; + } + + @PostConstruct + public void mqttUtils() { + log.info("mqttUtil start!!!"); + connectWithRetry(); + } + + private void connectWithRetry() { + MemoryPersistence persistence = new MemoryPersistence(); + MqttConnectOptions connOpts = new MqttConnectOptions(); + connOpts.setUserName(user); + connOpts.setPassword(pwd.toCharArray()); + connOpts.setCleanSession(true); + + while (true) { + try { + client = new MqttClient(ip, clientId, persistence); + client.setCallback(new MqttCallback() { + @Override + public void connectionLost(Throwable cause) { + log.error("Connection lost: {}", cause.getMessage()); + } + + @Override + public void messageArrived(String topic, MqttMessage message) { + // Handle incoming messages here + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + // Handle delivery completion here + } + }); + + log.info("Connecting to MQTT broker: {}", ip); + client.connect(connOpts); + log.info("Connected to MQTT broker"); + break; // Exit the loop if connected successfully + } catch (MqttException e) { + log.error("Failed to connect to MQTT broker. Reason: {}", e.getMessage()); + try { + log.info("Retrying connection in 30 seconds..."); + Thread.sleep(30000); // Wait for 30 seconds before retrying + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); // Restore interrupted status + log.error("Connection retry interrupted", ie); + } + } + } + } + + public void subscribe(String[] topicFilters, int[] qos) { + try { + log.info("Subscribing to topicFilters: {}, qos: {}", topicFilters, qos); + client.subscribe(topicFilters, qos); + } catch (MqttException e) { + log.error("Failed to subscribe. Reason: {}", e.getMessage()); + e.printStackTrace(); + } + } + + public void disconnect() { + try { + log.info("Disconnecting from MQTT broker"); + client.disconnect(); + client.close(); + } catch (MqttException e) { + log.error("Failed to disconnect. Reason: {}", e.getMessage()); + e.printStackTrace(); + } + } + + public boolean publish(int qos, String pubTopic, String content) { + try { + MqttMessage message = new MqttMessage(content.getBytes()); + message.setQos(qos); + log.info("Publishing message to topic: {}", pubTopic); + client.publish(pubTopic, message); + return true; + } catch (MqttException e) { + log.error("Failed to publish message. Reason: {}", e.getMessage()); + e.printStackTrace(); + return false; + } + } + + @PreDestroy + public void cleanup() { + disconnect(); + } + + public MqttClient getClient() { + return client; + } +}