|
|
|
@ -3,7 +3,6 @@ package com.inspect.metadata.mqtt; |
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.eclipse.paho.client.mqttv3.*; |
|
|
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; |
|
|
|
import org.springframework.beans.factory.annotation.Value; |
|
|
|
import org.springframework.stereotype.Service; |
|
|
|
|
|
|
|
import javax.annotation.PostConstruct; |
|
|
|
@ -12,50 +11,44 @@ import javax.annotation.PreDestroy; |
|
|
|
@Slf4j |
|
|
|
@Service |
|
|
|
public class MQTTUtils { |
|
|
|
@Value("${huaRuan.mqtt.ip:localhost}") |
|
|
|
protected String ip; |
|
|
|
@Value("${huaRuan.mqtt.port:1883}") |
|
|
|
protected String port; |
|
|
|
@Value("${huaRuan.mqtt.clientId:test}") |
|
|
|
protected String clientId; |
|
|
|
@Value("${huaRuan.mqtt.user:admin}") |
|
|
|
protected String user; |
|
|
|
@Value("${huaRuan.mqtt.pwd:123456}") |
|
|
|
protected String pwd; |
|
|
|
@Value("${huaRuan.mqtt.topicFilters:test}") |
|
|
|
protected String topicFilters; |
|
|
|
@Value("${mqtt.qos:1}") |
|
|
|
protected int[] qos; // QoS 数组 |
|
|
|
|
|
|
|
private MqttClient client = null; |
|
|
|
|
|
|
|
private final MqttProperties properties; |
|
|
|
|
|
|
|
private MqttClient client; |
|
|
|
private Long id = 0L; |
|
|
|
|
|
|
|
public MQTTUtils(MqttProperties properties) { |
|
|
|
this.properties = properties; |
|
|
|
} |
|
|
|
|
|
|
|
public Long getId() { |
|
|
|
return this.id; |
|
|
|
} |
|
|
|
|
|
|
|
@PostConstruct |
|
|
|
public void mqttUtils() { |
|
|
|
log.info("mqttUtil start!!!"); |
|
|
|
public void init() { |
|
|
|
log.info("MQTT Utils initializing..."); |
|
|
|
connectWithRetry(); |
|
|
|
} |
|
|
|
|
|
|
|
private void connectWithRetry() { |
|
|
|
MemoryPersistence persistence = new MemoryPersistence(); |
|
|
|
MqttConnectOptions connOpts = new MqttConnectOptions(); |
|
|
|
connOpts.setUserName(user); |
|
|
|
connOpts.setPassword(pwd.toCharArray()); |
|
|
|
connOpts.setUserName(properties.getUser()); |
|
|
|
connOpts.setPassword(properties.getPwd().toCharArray()); |
|
|
|
connOpts.setCleanSession(true); |
|
|
|
|
|
|
|
String broker = String.format("tcp://%s:%s", ip, port); |
|
|
|
String broker = String.format("tcp://%s:%d", properties.getIp(), properties.getPort()); |
|
|
|
|
|
|
|
while (true) { |
|
|
|
try { |
|
|
|
client = new MqttClient(broker, clientId, persistence); |
|
|
|
client = new MqttClient(broker, properties.getClientId(), persistence); |
|
|
|
client.setCallback(new MqttCallback() { |
|
|
|
@Override |
|
|
|
public void connectionLost(Throwable cause) { |
|
|
|
log.error("Connection lost: {}", cause.getMessage()); |
|
|
|
// 自动重连 |
|
|
|
connectWithRetry(); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
@ -65,7 +58,7 @@ public class MQTTUtils { |
|
|
|
|
|
|
|
@Override |
|
|
|
public void deliveryComplete(IMqttDeliveryToken token) { |
|
|
|
|
|
|
|
log.info("Delivery complete: {}", token.getMessageId()); |
|
|
|
} |
|
|
|
}); |
|
|
|
|
|
|
|
@ -73,15 +66,15 @@ public class MQTTUtils { |
|
|
|
client.connect(connOpts); |
|
|
|
log.info("Connected to MQTT broker"); |
|
|
|
|
|
|
|
subscribe(new String[]{topicFilters}, qos); |
|
|
|
subscribe(properties.getTopicFilters(), properties.getQos()); |
|
|
|
break; |
|
|
|
} catch (MqttException e) { |
|
|
|
log.error("Failed to connect to MQTT broker. Reason: {}", e.getMessage()); |
|
|
|
try { |
|
|
|
log.info("Retrying connection in 30 seconds..."); |
|
|
|
Thread.sleep(30000); // Wait for 30 seconds before retrying |
|
|
|
Thread.sleep(30000); |
|
|
|
} catch (InterruptedException ie) { |
|
|
|
Thread.currentThread().interrupt(); // Restore interrupted status |
|
|
|
Thread.currentThread().interrupt(); |
|
|
|
log.error("Connection retry interrupted", ie); |
|
|
|
} |
|
|
|
} |
|
|
|
@ -93,19 +86,21 @@ public class MQTTUtils { |
|
|
|
log.info("Subscribing to topicFilters: {}, qos: {}", topicFilters, qos); |
|
|
|
client.subscribe(topicFilters, qos); |
|
|
|
} catch (MqttException e) { |
|
|
|
log.error("Failed to subscribe. Reason: {}", e.getMessage()); |
|
|
|
e.printStackTrace(); |
|
|
|
log.error("Failed to subscribe. Reason: {}", e.getMessage(), e); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public void disconnect() { |
|
|
|
try { |
|
|
|
log.info("Disconnecting from MQTT broker"); |
|
|
|
client.disconnect(); |
|
|
|
client.close(); |
|
|
|
if (client != null && client.isConnected()) { |
|
|
|
client.disconnect(); |
|
|
|
} |
|
|
|
if (client != null) { |
|
|
|
client.close(); |
|
|
|
} |
|
|
|
} catch (MqttException e) { |
|
|
|
log.error("Failed to disconnect. Reason: {}", e.getMessage()); |
|
|
|
e.printStackTrace(); |
|
|
|
log.error("Failed to disconnect. Reason: {}", e.getMessage(), e); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -117,8 +112,7 @@ public class MQTTUtils { |
|
|
|
client.publish(pubTopic, message); |
|
|
|
return true; |
|
|
|
} catch (MqttException e) { |
|
|
|
log.error("Failed to publish message. Reason: {}", e.getMessage()); |
|
|
|
e.printStackTrace(); |
|
|
|
log.error("Failed to publish message. Reason: {}", e.getMessage(), e); |
|
|
|
return false; |
|
|
|
} |
|
|
|
} |
|
|
|
@ -131,4 +125,4 @@ public class MQTTUtils { |
|
|
|
public MqttClient getClient() { |
|
|
|
return client; |
|
|
|
} |
|
|
|
} |
|
|
|
} |