diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/constant/AnalyseConstants.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/constant/AnalyseConstants.java index dcc7803..194fd98 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/constant/AnalyseConstants.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/constant/AnalyseConstants.java @@ -43,7 +43,7 @@ public class AnalyseConstants { public static final String ALGORITHM_REQUEST_QUEUE = "algorithm:request:queue"; public static final String ALGORITHM_RESPONSE_QUEUE = "algorithm:response:queue"; - public static final String ANALYSE_REQ_DELAY_QUEUE = "ANALYSE_REQ_DELAY_QUEUE"; + public static final String ALGORITHM_REQUEST_DELAY_QUEUE = "algorithm:request:delay:queue"; public static final String LUMINOSITY_REQUEST_QUEUE = "luminosity:request:queue"; } diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalyseResponseServiceImpl.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalyseResponseServiceImpl.java index c4347a0..1150ea9 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalyseResponseServiceImpl.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalyseResponseServiceImpl.java @@ -127,8 +127,12 @@ public class AnalyseResponseServiceImpl implements IAnalyseResponseService { analyseRequest.getTaskPatrolId(), requestId, analyseRequest); - boolean isRemoved = delayQueueService.removeRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseRequest); - log.info("ALGORITHM_FILTER_RESULT Redisson Removed requestId: {}, Result: {}, analyseRequest: {}", requestId, isRemoved ? "success" : "fail", analyseRequest); + if(delayQueueService.containRequest(AnalyseConstants.ALGORITHM_REQUEST_DELAY_QUEUE, analyseRequest)) { + boolean isRemoved = delayQueueService.removeRequest(AnalyseConstants.ALGORITHM_REQUEST_DELAY_QUEUE, analyseRequest); + log.info("ALGORITHM_FILTER_RESULT Redisson Removed requestId: {}, Result: {}, analyseRequest: {}", requestId, isRemoved ? "success" : "fail", analyseRequest); + } else { + log.info("ALGORITHM_FILTER_RESULT Redisson has Compensated requestId: {}, analyseRequest: {}", requestId, analyseRequest); + } algTypeList = analyseRequest.getObjectList().get(0).getTypeList(); analyseResult.reloadReq(analyseRequest);//只有一个,analyseReq.getObjectList().get(0) diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestDelayQueueListener.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestDelayQueueListener.java index 010574b..b8777b2 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestDelayQueueListener.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestDelayQueueListener.java @@ -4,8 +4,11 @@ import com.inspect.analysis.constant.AnalyseConstants; import com.inspect.base.redis.service.RedisService; import com.inspect.partrolresult.domain.AnalyseRequest; import lombok.extern.slf4j.Slf4j; +import org.redisson.RedissonShutdownException; import org.redisson.api.RBlockingQueue; import org.redisson.api.RedissonClient; +import org.redisson.client.RedisException; +import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; @@ -13,8 +16,8 @@ import javax.annotation.Resource; @Slf4j @Component -public class AnalyseRequestDelayQueueListener implements InitializingBean { - private final String ListenerName = "AnalyseRequest-DelayQueue-Listener"; +public class AnalyseRequestDelayQueueListener implements InitializingBean, DisposableBean { + private final String THREAD_NAME = "AnalyseRequest-DelayQueue-Listener"; @Resource private RedissonClient redissonClient; @@ -25,22 +28,44 @@ public class AnalyseRequestDelayQueueListener implements InitializingBean { @Resource private IAnalyseRequestService analyseRequestService; + private Thread listenerThread; + private volatile boolean running = true; + @Override public void afterPropertiesSet() { // 启动监听线程 - new Thread(() -> { - RBlockingQueue blockingQueue = redissonClient.getBlockingQueue(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE); + listenerThread = new Thread(() -> { + RBlockingQueue blockingQueue = redissonClient.getBlockingQueue(AnalyseConstants.ALGORITHM_REQUEST_DELAY_QUEUE); log.info("Listening AnalyseRequest Delay Queue Task Launching"); - while (true) { + while (running && !Thread.currentThread().isInterrupted()) { try { AnalyseRequest request = blockingQueue.take(); // 阻塞等待任务到期 handleRequest(request); - } catch (Exception e) { + } catch (RedissonShutdownException e) { + log.warn("Redisson 已关闭,终止监听线程"); + break; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // 响应中断 + log.info("监听线程被中断,正在关闭..."); + break; + } catch (RedisException e) { + log.error("Redis 连接异常: {}", e.getMessage(), e); + try { + Thread.sleep(3000); // 避免异常频繁重试 + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + catch (Exception e) { log.error("Listening AnalyseRequest Queue Exception: ", e); } } - }, ListenerName).start(); + }, THREAD_NAME); + + listenerThread.setDaemon(true); // 设置为守护线程,防止主程序不退出 + listenerThread.start(); } private void handleRequest(AnalyseRequest request) { @@ -62,5 +87,14 @@ public class AnalyseRequestDelayQueueListener implements InitializingBean { request.getRequestId()); } } + + @Override + public void destroy() { + log.info("正在关闭 AnalyseRequestDelayQueueListener..."); + running = false; + if (listenerThread != null) { + listenerThread.interrupt(); + } + } } diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestServiceImpl.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestServiceImpl.java index b237a9c..77321d7 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestServiceImpl.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestServiceImpl.java @@ -119,9 +119,9 @@ public class AnalyseRequestServiceImpl implements IAnalyseRequestService { analyseReq.setRedissonTime(DateUtils.parseDateToStr(DateUtils.yyyyMMddHHmmss, new Date())); if (testMode) { - delayQueueService.submitRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseReq, (3L), TimeUnit.MINUTES); + delayQueueService.submitRequest(AnalyseConstants.ALGORITHM_REQUEST_DELAY_QUEUE, analyseReq, (3L), TimeUnit.MINUTES); } else { - delayQueueService.submitRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseReq, requestTimeout, TimeUnit.DAYS); + delayQueueService.submitRequest(AnalyseConstants.ALGORITHM_REQUEST_DELAY_QUEUE, analyseReq, requestTimeout, TimeUnit.DAYS); } if (!retryDelegate.callRemoteAnalyseService(analyseReq)) { diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/DelayQueueService.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/DelayQueueService.java index def9973..6cbfc3e 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/DelayQueueService.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/DelayQueueService.java @@ -6,5 +6,7 @@ public interface DelayQueueService { void submitRequest(String queueName, T payload, long delay, TimeUnit timeUnit); boolean removeRequest(String queueName, T payload); + + boolean containRequest(String queueName, T payload); } diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/RedissonDelayQueueServiceImpl.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/RedissonDelayQueueServiceImpl.java index 204bd5a..ee95b86 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/RedissonDelayQueueServiceImpl.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/RedissonDelayQueueServiceImpl.java @@ -41,5 +41,13 @@ public class RedissonDelayQueueServiceImpl implements DelayQueueService { return delayedQueue.remove(payload); } + + @Override + public boolean containRequest(String queueName, T payload) { + RBlockingQueue blockingQueue = redissonClient.getBlockingQueue(queueName); + RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingQueue); + + return delayedQueue.contains(payload); + } }