From 9af94c637123eaaa56915de2e6f93e00fa16bd87 Mon Sep 17 00:00:00 2001 From: htjcAdmin Date: Sun, 28 Sep 2025 16:00:16 +0800 Subject: [PATCH] =?UTF-8?q?/*=E5=8D=8E=E8=BD=AFMQTT=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E5=99=A8=E4=BF=A1=E6=81=AF=E6=94=B9=E4=B8=BA=E7=94=B1@Configur?= =?UTF-8?q?ationProperties=E6=9D=A5=E8=AF=BB=E5=8F=96=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E5=99=A8=E9=85=8D=E7=BD=AE=E4=BF=A1=E6=81=AF*/?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/inspect/metadata/mqtt/MQTTUtils.java | 66 +++++++++---------- .../inspect/metadata/mqtt/MqttProperties.java | 31 +++++++++ 2 files changed, 61 insertions(+), 36 deletions(-) create mode 100644 inspect-metadata/src/main/java/com/inspect/metadata/mqtt/MqttProperties.java diff --git a/inspect-metadata/src/main/java/com/inspect/metadata/mqtt/MQTTUtils.java b/inspect-metadata/src/main/java/com/inspect/metadata/mqtt/MQTTUtils.java index 40175d1..1de7537 100644 --- a/inspect-metadata/src/main/java/com/inspect/metadata/mqtt/MQTTUtils.java +++ b/inspect-metadata/src/main/java/com/inspect/metadata/mqtt/MQTTUtils.java @@ -3,7 +3,6 @@ package com.inspect.metadata.mqtt; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; @@ -12,50 +11,44 @@ import javax.annotation.PreDestroy; @Slf4j @Service public class MQTTUtils { - @Value("${huaRuan.mqtt.ip:localhost}") - protected String ip; - @Value("${huaRuan.mqtt.port:1883}") - protected String port; - @Value("${huaRuan.mqtt.clientId:test}") - protected String clientId; - @Value("${huaRuan.mqtt.user:admin}") - protected String user; - @Value("${huaRuan.mqtt.pwd:123456}") - protected String pwd; - @Value("${huaRuan.mqtt.topicFilters:test}") - protected String topicFilters; - @Value("${mqtt.qos:1}") - protected int[] qos; // QoS 数组 - - private MqttClient client = null; + + private final MqttProperties properties; + + private MqttClient client; private Long id = 0L; + public MQTTUtils(MqttProperties properties) { + this.properties = properties; + } + public Long getId() { return this.id; } @PostConstruct - public void mqttUtils() { - log.info("mqttUtil start!!!"); + public void init() { + log.info("MQTT Utils initializing..."); connectWithRetry(); } private void connectWithRetry() { MemoryPersistence persistence = new MemoryPersistence(); MqttConnectOptions connOpts = new MqttConnectOptions(); - connOpts.setUserName(user); - connOpts.setPassword(pwd.toCharArray()); + connOpts.setUserName(properties.getUser()); + connOpts.setPassword(properties.getPwd().toCharArray()); connOpts.setCleanSession(true); - String broker = String.format("tcp://%s:%s", ip, port); + String broker = String.format("tcp://%s:%d", properties.getIp(), properties.getPort()); while (true) { try { - client = new MqttClient(broker, clientId, persistence); + client = new MqttClient(broker, properties.getClientId(), persistence); client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { log.error("Connection lost: {}", cause.getMessage()); + // 自动重连 + connectWithRetry(); } @Override @@ -65,7 +58,7 @@ public class MQTTUtils { @Override public void deliveryComplete(IMqttDeliveryToken token) { - + log.info("Delivery complete: {}", token.getMessageId()); } }); @@ -73,15 +66,15 @@ public class MQTTUtils { client.connect(connOpts); log.info("Connected to MQTT broker"); - subscribe(new String[]{topicFilters}, qos); + subscribe(properties.getTopicFilters(), properties.getQos()); break; } catch (MqttException e) { log.error("Failed to connect to MQTT broker. Reason: {}", e.getMessage()); try { log.info("Retrying connection in 30 seconds..."); - Thread.sleep(30000); // Wait for 30 seconds before retrying + Thread.sleep(30000); } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); // Restore interrupted status + Thread.currentThread().interrupt(); log.error("Connection retry interrupted", ie); } } @@ -93,19 +86,21 @@ public class MQTTUtils { log.info("Subscribing to topicFilters: {}, qos: {}", topicFilters, qos); client.subscribe(topicFilters, qos); } catch (MqttException e) { - log.error("Failed to subscribe. Reason: {}", e.getMessage()); - e.printStackTrace(); + log.error("Failed to subscribe. Reason: {}", e.getMessage(), e); } } public void disconnect() { try { log.info("Disconnecting from MQTT broker"); - client.disconnect(); - client.close(); + if (client != null && client.isConnected()) { + client.disconnect(); + } + if (client != null) { + client.close(); + } } catch (MqttException e) { - log.error("Failed to disconnect. Reason: {}", e.getMessage()); - e.printStackTrace(); + log.error("Failed to disconnect. Reason: {}", e.getMessage(), e); } } @@ -117,8 +112,7 @@ public class MQTTUtils { client.publish(pubTopic, message); return true; } catch (MqttException e) { - log.error("Failed to publish message. Reason: {}", e.getMessage()); - e.printStackTrace(); + log.error("Failed to publish message. Reason: {}", e.getMessage(), e); return false; } } @@ -131,4 +125,4 @@ public class MQTTUtils { public MqttClient getClient() { return client; } -} +} \ No newline at end of file diff --git a/inspect-metadata/src/main/java/com/inspect/metadata/mqtt/MqttProperties.java b/inspect-metadata/src/main/java/com/inspect/metadata/mqtt/MqttProperties.java new file mode 100644 index 0000000..3a094ec --- /dev/null +++ b/inspect-metadata/src/main/java/com/inspect/metadata/mqtt/MqttProperties.java @@ -0,0 +1,31 @@ +package com.inspect.metadata.mqtt; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +@Data +@Component +@ConfigurationProperties(prefix = "hua-ruan.mqtt") +public class MqttProperties { + /** MQTT Broker IP */ + private String ip = "localhost"; + + /** MQTT Broker Port */ + private int port = 1883; + + /** 客户端 ID */ + private String clientId = "test"; + + /** 用户名 */ + private String user = "admin"; + + /** 密码 */ + private String pwd = "123456"; + + /** 订阅的 topic 列表 */ + private String[] topicFilters = new String[0]; + + /** QoS 数组 */ + private int[] qos = new int[]{1}; +}