Browse Source

/*1. mqtt; 2. 云深处机器人暂停问题修改。*/

master
htjcAdmin 3 months ago
parent
commit
c5a1c8bb39
5 changed files with 203 additions and 19 deletions
  1. +20
    -14
      inspect-main/inspect-main-task/src/main/java/com/inspect/mqtt/MQTTUtil.java
  2. +5
    -5
      inspect-main/inspect-main-task/src/main/java/com/inspect/taskstatus/controller/PatrolTaskStatusController.java
  3. +6
    -0
      inspect-metadata/pom.xml
  4. +46
    -0
      inspect-metadata/src/main/java/com/inspect/metadata/mqtt/HuaRuanMQTTServiceImpl.java
  5. +126
    -0
      inspect-metadata/src/main/java/com/inspect/metadata/mqtt/MQTTUtils.java

+ 20
- 14
inspect-main/inspect-main-task/src/main/java/com/inspect/mqtt/MQTTUtil.java View File

@ -2,6 +2,7 @@ package com.inspect.mqtt;
import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
@ -11,6 +12,7 @@ import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class MQTTUtil implements IMQTTUtil {
@Value("${mqtt.ip}")
@ -32,17 +34,19 @@ public class MQTTUtil implements IMQTTUtil {
@PostConstruct
public void mqttUtil() {
log.info("mqttUtil start!!!");
}
public void subscribe(String[] topicFilters, int[] qos) {
try {
this.client.subscribe(topicFilters, qos);
log.info("subscribe topicFilters: {}, qos: {}", topicFilters, qos);
client.subscribe(topicFilters, qos);
} catch (MqttException e) {
System.out.println("reason: " + e.getReasonCode());
System.out.println("msg: " + e.getMessage());
System.out.println("loc: " + e.getLocalizedMessage());
System.out.println("cause: " + e.getCause());
System.out.println("exception: " + e);
log.info("reason: {}", e.getReasonCode());
log.info("msg: {}", e.getMessage());
log.info("loc: {}", e.getLocalizedMessage());
log.info("cause: {}", e.getCause());
log.info("exception: {}", e.getMessage());
e.printStackTrace();
}
@ -51,15 +55,15 @@ public class MQTTUtil implements IMQTTUtil {
public void connect(String broker, String clientId, String userName, String password, MqttCallback callback) {
MemoryPersistence persistence = new MemoryPersistence();
try {
this.client = new MqttClient(broker, clientId, persistence);
client = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(userName);
connOpts.setPassword(password.toCharArray());
connOpts.setCleanSession(true);
this.client.setCallback(callback);
System.out.println("Connecting to broker: " + broker);
this.client.connect(connOpts);
System.out.println("Connected");
client.setCallback(callback);
log.info("Connecting to mqtt broker: {}", broker);
client.connect(connOpts);
log.info("Connected to mqtt broker");
} catch (MqttException e) {
System.out.println("reason: " + e.getReasonCode());
System.out.println("msg: " + e.getMessage());
@ -73,8 +77,9 @@ public class MQTTUtil implements IMQTTUtil {
public void disconnect() {
try {
this.client.disconnect();
this.client.close();
log.info("Disconnect mqtt broker");
client.disconnect();
client.close();
} catch (MqttException e) {
System.out.println("reason: " + e.getReasonCode());
System.out.println("msg: " + e.getMessage());
@ -93,7 +98,8 @@ public class MQTTUtil implements IMQTTUtil {
try {
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
this.client.publish(pubTopic, message);
log.info("Publish mqtt broker");
client.publish(pubTopic, message);
return true;
} catch (MqttException e) {
System.out.println("reason: " + e.getReasonCode());


+ 5
- 5
inspect-main/inspect-main-task/src/main/java/com/inspect/taskstatus/controller/PatrolTaskStatusController.java View File

@ -885,11 +885,11 @@ public class PatrolTaskStatusController extends BaseController {
} else if (posType.equals("1") || posType.equals("2")) {
// 机器人无人机
for (String receiveCode : receiveCodeSet) {
for (String wrj : wrjCode) {
String strSendTaskControl = JSONObject.toJSONString(SendTask.builder().receiveCode(receiveCode).sendCode(sendCode).code(wrj).type("1").command("5").items(Collections.singletonList(PatrolSendTask.builder().value("1").build())).build());
log.info("[TASK] MODE CHANGE: {}", strSendTaskControl);
feignTaskClient.sendCommand(strSendTaskControl);
}
// for (String wrj : wrjCode) {
// String strSendTaskControl = JSONObject.toJSONString(SendTask.builder().receiveCode(receiveCode).sendCode(sendCode).code(wrj).type("1").command("5").items(Collections.singletonList(PatrolSendTask.builder().value("1").build())).build());
// log.info("[TASK] MODE CHANGE: {}", strSendTaskControl);
// feignTaskClient.sendCommand(strSendTaskControl);
// }
SendTask sendTask = new SendTask();
sendTask.setReceiveCode(receiveCode);
sendTask.setSendCode(sendCode);


+ 6
- 0
inspect-metadata/pom.xml View File

@ -110,6 +110,12 @@
<artifactId>jaxb-core</artifactId>
<version>2.3.0.1</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
</dependencies>
<build>


+ 46
- 0
inspect-metadata/src/main/java/com/inspect/metadata/mqtt/HuaRuanMQTTServiceImpl.java View File

@ -0,0 +1,46 @@
package com.inspect.metadata.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class HuaRuanMQTTServiceImpl {
private final MQTTUtils mqttUtils;
@Autowired
public HuaRuanMQTTServiceImpl(MQTTUtils mqttUtils) {
this.mqttUtils = mqttUtils;
// 设置 MQTT 回调
setupMqttCallback();
}
private void setupMqttCallback() {
mqttUtils.getClient().setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
log.error("Connection lost: {}", cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) {
log.info("Message arrived from topic {}: {}", topic, new String(message.getPayload()));
handleIncomingMessage(topic, message);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
log.info("Delivery complete for message with token: {}", token.getMessageId());
}
});
}
private void handleIncomingMessage(String topic, MqttMessage message) {
// 处理接收到的消息
}
}

+ 126
- 0
inspect-metadata/src/main/java/com/inspect/metadata/mqtt/MQTTUtils.java View File

@ -0,0 +1,126 @@
package com.inspect.metadata.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@Slf4j
@Service
public class MQTTUtils {
@Value("${huaRuan.mqtt.ip}")
protected String ip;
@Value("${huaRuan.mqtt.clientId}")
protected String clientId;
@Value("${huaRuan.mqtt.user}")
protected String user;
@Value("${huaRuan.mqtt.pwd}")
protected String pwd;
@Value("${huaRuan.mqtt.topicFilters}")
protected String topicFilters;
private MqttClient client = null;
private Long id = 0L;
public Long getId() {
return this.id;
}
@PostConstruct
public void mqttUtils() {
log.info("mqttUtil start!!!");
connectWithRetry();
}
private void connectWithRetry() {
MemoryPersistence persistence = new MemoryPersistence();
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(user);
connOpts.setPassword(pwd.toCharArray());
connOpts.setCleanSession(true);
while (true) {
try {
client = new MqttClient(ip, clientId, persistence);
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
log.error("Connection lost: {}", cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) {
// Handle incoming messages here
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// Handle delivery completion here
}
});
log.info("Connecting to MQTT broker: {}", ip);
client.connect(connOpts);
log.info("Connected to MQTT broker");
break; // Exit the loop if connected successfully
} catch (MqttException e) {
log.error("Failed to connect to MQTT broker. Reason: {}", e.getMessage());
try {
log.info("Retrying connection in 30 seconds...");
Thread.sleep(30000); // Wait for 30 seconds before retrying
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // Restore interrupted status
log.error("Connection retry interrupted", ie);
}
}
}
}
public void subscribe(String[] topicFilters, int[] qos) {
try {
log.info("Subscribing to topicFilters: {}, qos: {}", topicFilters, qos);
client.subscribe(topicFilters, qos);
} catch (MqttException e) {
log.error("Failed to subscribe. Reason: {}", e.getMessage());
e.printStackTrace();
}
}
public void disconnect() {
try {
log.info("Disconnecting from MQTT broker");
client.disconnect();
client.close();
} catch (MqttException e) {
log.error("Failed to disconnect. Reason: {}", e.getMessage());
e.printStackTrace();
}
}
public boolean publish(int qos, String pubTopic, String content) {
try {
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
log.info("Publishing message to topic: {}", pubTopic);
client.publish(pubTopic, message);
return true;
} catch (MqttException e) {
log.error("Failed to publish message. Reason: {}", e.getMessage());
e.printStackTrace();
return false;
}
}
@PreDestroy
public void cleanup() {
disconnect();
}
public MqttClient getClient() {
return client;
}
}

Loading…
Cancel
Save