Sfoglia il codice sorgente

增加mqtt订阅断线自动重连

wukai 1 mese fa
parent
commit
9b04d9a929
1 ha cambiato i file con 104 aggiunte e 56 eliminazioni
  1. 104 56
      jjt-admin/src/main/java/com/jjt/biz/mqtt/MqttSubscriber.java

+ 104 - 56
jjt-admin/src/main/java/com/jjt/biz/mqtt/MqttSubscriber.java

@@ -23,6 +23,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * MQTT订阅者
@@ -51,70 +52,117 @@ public class MqttSubscriber {
     private static final String BROKER = "tcp://47.109.90.136:1883";
     private static final String TOPIC = "/wh/house/#";
     private static final String CLIENT_ID = "JJT_MQTT_SENSOR_CLIENT";
+    
+    // 添加重连标志
+    private final AtomicBoolean reconnecting = new AtomicBoolean(false);
 
     @PostConstruct
     public void connect() {
         // 使用Ruoyi的线程池执行MQTT连接任务
         threadPoolTaskExecutor.execute(() -> {
-            int retryCount = 0;
-            final int maxRetries = 5;
+            connectWithRetry();
+        });
+    }
+    
+    /**
+     * 带重试机制的连接方法
+     */
+    private void connectWithRetry() {
+        int retryCount = 0;
+        final int maxRetries = 5;
+
+        while (retryCount < maxRetries) {
+            try {
+                logger.info("开始连接MQTT测试服务器: {} 并订阅主题: {} (尝试 {}/{})", BROKER, TOPIC, retryCount + 1, maxRetries);
+                // 如果client已经存在,先关闭它
+                if (client != null) {
+                    try {
+                        client.close();
+                    } catch (Exception e) {
+                        // 忽略关闭异常
+                    }
+                }
+                
+                client = new MqttClient(BROKER, CLIENT_ID, new MemoryPersistence());
+                MqttConnectOptions options = new MqttConnectOptions();
+                options.setCleanSession(true);
+                // 增加连接超时时间
+                options.setConnectionTimeout(60);
+                // 增加keepalive间隔
+                options.setKeepAliveInterval(120);
+                // 启用自动重连
+                options.setAutomaticReconnect(true);
+                // 设置遗嘱消息
+                options.setWill("client/disconnect", "Client disconnected".getBytes(), 1, true);
+
+                client.setCallback(new MqttCallback() {
+                    @Override
+                    public void connectionLost(Throwable cause) {
+                        logger.error("MQTT测试连接断开,原因: ", cause);
+                        // 启动重连机制
+                        handleReconnect();
+                    }
 
-            while (retryCount < maxRetries) {
-                try {
-                    logger.info("开始连接MQTT测试服务器: {} 并订阅主题: {} (尝试 {}/{})", BROKER, TOPIC, retryCount + 1, maxRetries);
-                    client = new MqttClient(BROKER, CLIENT_ID, new MemoryPersistence());
-                    MqttConnectOptions options = new MqttConnectOptions();
-                    options.setCleanSession(true);
-                    options.setConnectionTimeout(30);
-                    options.setKeepAliveInterval(60);
-
-                    client.setCallback(new MqttCallback() {
-                        @Override
-                        public void connectionLost(Throwable cause) {
-                            logger.error("MQTT测试连接断开,原因: ", cause);
-                        }
-
-                        @Override
-                        public void messageArrived(String topic, MqttMessage message) throws Exception {
-                            String payload = new String(message.getPayload());
-                            // 根据示例输出格式记录日志
-                            logger.info("Topic: {} QoS: {}", topic, message.getQos());
-                            logger.info("{}", payload);
-
-                            // 解析并记录数据
-                            parseAndLogData(topic, payload);
-
-                            // 记录时间戳
-                            logger.info("{}", new java.sql.Timestamp(System.currentTimeMillis()));
-                        }
-
-                        @Override
-                        public void deliveryComplete(IMqttDeliveryToken token) {
-                            // 对于订阅者来说,通常不需要处理发送完成事件
-                        }
-                    });
-
-                    client.connect(options);
-                    client.subscribe(TOPIC, 0);
-                    logger.info("成功连接到MQTT测试服务器: {} 并订阅主题: {}", BROKER, TOPIC);
-                    break; // 成功连接后跳出循环
-
-                } catch (MqttException e) {
-                    retryCount++;
-                    logger.error("MQTT测试连接失败 (尝试 {}/{}): {}", retryCount, maxRetries, e.getMessage());
-                    if (retryCount < maxRetries) {
-                        try {
-                            Thread.sleep(5000); // 等待5秒后重试
-                        } catch (InterruptedException ie) {
-                            Thread.currentThread().interrupt();
-                            break;
-                        }
-                    } else {
-                        logger.error("MQTT测试连接失败,已达到最大重试次数", e);
+                    @Override
+                    public void messageArrived(String topic, MqttMessage message) throws Exception {
+                        String payload = new String(message.getPayload());
+                        // 根据示例输出格式记录日志
+                        logger.info("Topic: {} QoS: {}", topic, message.getQos());
+                        logger.info("{}", payload);
+
+                        // 解析并记录数据
+                        parseAndLogData(topic, payload);
+
+                        // 记录时间戳
+                        logger.info("{}", new java.sql.Timestamp(System.currentTimeMillis()));
                     }
+
+                    @Override
+                    public void deliveryComplete(IMqttDeliveryToken token) {
+                        // 对于订阅者来说,通常不需要处理发送完成事件
+                    }
+                });
+
+                client.connect(options);
+                client.subscribe(TOPIC, 0);
+                logger.info("成功连接到MQTT测试服务器: {} 并订阅主题: {}", BROKER, TOPIC);
+                break; // 成功连接后跳出循环
+
+            } catch (MqttException e) {
+                retryCount++;
+                logger.error("MQTT测试连接失败 (尝试 {}/{}): {}", retryCount, maxRetries, e.getMessage());
+                if (retryCount < maxRetries) {
+                    try {
+                        Thread.sleep(5000); // 等待5秒后重试
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                        break;
+                    }
+                } else {
+                    logger.error("MQTT测试连接失败,已达到最大重试次数", e);
                 }
             }
-        });
+        }
+    }
+    
+    /**
+     * 处理重连
+     */
+    private void handleReconnect() {
+        // 使用原子操作确保只有一个重连线程
+        if (reconnecting.compareAndSet(false, true)) {
+            threadPoolTaskExecutor.execute(() -> {
+                try {
+                    logger.info("开始尝试重连MQTT服务器...");
+                    Thread.sleep(5000); // 等待5秒后重连
+                    connectWithRetry();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } finally {
+                    reconnecting.set(false);
+                }
+            });
+        }
     }
 
     /**
@@ -183,4 +231,4 @@ public class MqttSubscriber {
     public boolean isConnected() {
         return client != null && client.isConnected();
     }
-}
+}