From 6349c0ff2f7db404e753a587badb48f5d611de1d Mon Sep 17 00:00:00 2001 From: htjcAdmin Date: Thu, 7 Aug 2025 15:54:46 +0800 Subject: [PATCH] =?UTF-8?q?/*=20*=201.=20Redisson=E6=9C=BA=E5=88=B6?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=81=A5=E5=A3=AE=E6=80=A7=EF=BC=8C=E4=BD=BF?= =?UTF-8?q?=E7=94=A8=E7=9A=84=E7=BA=BF=E7=A8=8B=E6=94=B9=E4=B8=BA=E7=94=B1?= =?UTF-8?q?Spring=E5=AE=B9=E5=99=A8=E6=89=98=E7=AE=A1=EF=BC=9B=20*=202.=20?= =?UTF-8?q?=20=20Algorithm=E7=AE=97=E6=B3=95=E8=BF=94=E5=9B=9E=E5=A4=84?= =?UTF-8?q?=E7=90=86=E8=BF=87=E7=A8=8B=E4=B8=AD=EF=BC=8C=E5=9C=A8=E6=B8=85?= =?UTF-8?q?=E7=90=86=E5=AF=B9=E5=BA=94=E7=9A=84Redisson=E8=AE=B0=E5=BD=95?= =?UTF-8?q?=E7=9A=84=E6=97=B6=E5=80=99=E5=A2=9E=E5=8A=A0=E5=AF=B9=E4=BA=8E?= =?UTF-8?q?=E8=AF=A5=E8=AE=B0=E5=BD=95=E7=9A=84=E5=88=A4=E6=96=AD=EF=BC=8C?= =?UTF-8?q?=E9=98=B2=E6=AD=A2=E7=94=B1=E4=BA=8ERedisson=E8=B6=85=E6=97=B6?= =?UTF-8?q?=E4=B9=8B=E5=90=8E=E5=B7=B2=E7=BB=8F=E5=B0=86=E8=AF=A5Redisson?= =?UTF-8?q?=E8=AE=B0=E5=BD=95=E5=88=A0=E9=99=A4=E4=B9=8B=E5=90=8E=EF=BC=8C?= =?UTF-8?q?=E5=86=8D=E6=AC=A1=E8=BF=9B=E8=A1=8C=E9=87=8D=E5=A4=8D=E5=88=A0?= =?UTF-8?q?=E9=99=A4=E3=80=82=20*/?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../analysis/constant/AnalyseConstants.java | 2 +- .../impl/AnalyseResponseServiceImpl.java | 8 +++- .../AnalyseRequestDelayQueueListener.java | 48 ++++++++++++++++--- .../service/AnalyseRequestServiceImpl.java | 4 +- .../service/DelayQueueService.java | 2 + .../RedissonDelayQueueServiceImpl.java | 8 ++++ 6 files changed, 60 insertions(+), 12 deletions(-) diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/constant/AnalyseConstants.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/constant/AnalyseConstants.java index dcc7803..194fd98 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/constant/AnalyseConstants.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/constant/AnalyseConstants.java @@ -43,7 +43,7 @@ public class AnalyseConstants { public static final String ALGORITHM_REQUEST_QUEUE = "algorithm:request:queue"; public static final String ALGORITHM_RESPONSE_QUEUE = "algorithm:response:queue"; - public static final String ANALYSE_REQ_DELAY_QUEUE = "ANALYSE_REQ_DELAY_QUEUE"; + public static final String ALGORITHM_REQUEST_DELAY_QUEUE = "algorithm:request:delay:queue"; public static final String LUMINOSITY_REQUEST_QUEUE = "luminosity:request:queue"; } diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalyseResponseServiceImpl.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalyseResponseServiceImpl.java index c4347a0..1150ea9 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalyseResponseServiceImpl.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalyseResponseServiceImpl.java @@ -127,8 +127,12 @@ public class AnalyseResponseServiceImpl implements IAnalyseResponseService { analyseRequest.getTaskPatrolId(), requestId, analyseRequest); - boolean isRemoved = delayQueueService.removeRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseRequest); - log.info("ALGORITHM_FILTER_RESULT Redisson Removed requestId: {}, Result: {}, analyseRequest: {}", requestId, isRemoved ? "success" : "fail", analyseRequest); + if(delayQueueService.containRequest(AnalyseConstants.ALGORITHM_REQUEST_DELAY_QUEUE, analyseRequest)) { + boolean isRemoved = delayQueueService.removeRequest(AnalyseConstants.ALGORITHM_REQUEST_DELAY_QUEUE, analyseRequest); + log.info("ALGORITHM_FILTER_RESULT Redisson Removed requestId: {}, Result: {}, analyseRequest: {}", requestId, isRemoved ? "success" : "fail", analyseRequest); + } else { + log.info("ALGORITHM_FILTER_RESULT Redisson has Compensated requestId: {}, analyseRequest: {}", requestId, analyseRequest); + } algTypeList = analyseRequest.getObjectList().get(0).getTypeList(); analyseResult.reloadReq(analyseRequest);//只有一个,analyseReq.getObjectList().get(0) diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestDelayQueueListener.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestDelayQueueListener.java index 010574b..b8777b2 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestDelayQueueListener.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestDelayQueueListener.java @@ -4,8 +4,11 @@ import com.inspect.analysis.constant.AnalyseConstants; import com.inspect.base.redis.service.RedisService; import com.inspect.partrolresult.domain.AnalyseRequest; import lombok.extern.slf4j.Slf4j; +import org.redisson.RedissonShutdownException; import org.redisson.api.RBlockingQueue; import org.redisson.api.RedissonClient; +import org.redisson.client.RedisException; +import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; @@ -13,8 +16,8 @@ import javax.annotation.Resource; @Slf4j @Component -public class AnalyseRequestDelayQueueListener implements InitializingBean { - private final String ListenerName = "AnalyseRequest-DelayQueue-Listener"; +public class AnalyseRequestDelayQueueListener implements InitializingBean, DisposableBean { + private final String THREAD_NAME = "AnalyseRequest-DelayQueue-Listener"; @Resource private RedissonClient redissonClient; @@ -25,22 +28,44 @@ public class AnalyseRequestDelayQueueListener implements InitializingBean { @Resource private IAnalyseRequestService analyseRequestService; + private Thread listenerThread; + private volatile boolean running = true; + @Override public void afterPropertiesSet() { // 启动监听线程 - new Thread(() -> { - RBlockingQueue blockingQueue = redissonClient.getBlockingQueue(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE); + listenerThread = new Thread(() -> { + RBlockingQueue blockingQueue = redissonClient.getBlockingQueue(AnalyseConstants.ALGORITHM_REQUEST_DELAY_QUEUE); log.info("Listening AnalyseRequest Delay Queue Task Launching"); - while (true) { + while (running && !Thread.currentThread().isInterrupted()) { try { AnalyseRequest request = blockingQueue.take(); // 阻塞等待任务到期 handleRequest(request); - } catch (Exception e) { + } catch (RedissonShutdownException e) { + log.warn("Redisson 已关闭,终止监听线程"); + break; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // 响应中断 + log.info("监听线程被中断,正在关闭..."); + break; + } catch (RedisException e) { + log.error("Redis 连接异常: {}", e.getMessage(), e); + try { + Thread.sleep(3000); // 避免异常频繁重试 + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + catch (Exception e) { log.error("Listening AnalyseRequest Queue Exception: ", e); } } - }, ListenerName).start(); + }, THREAD_NAME); + + listenerThread.setDaemon(true); // 设置为守护线程,防止主程序不退出 + listenerThread.start(); } private void handleRequest(AnalyseRequest request) { @@ -62,5 +87,14 @@ public class AnalyseRequestDelayQueueListener implements InitializingBean { request.getRequestId()); } } + + @Override + public void destroy() { + log.info("正在关闭 AnalyseRequestDelayQueueListener..."); + running = false; + if (listenerThread != null) { + listenerThread.interrupt(); + } + } } diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestServiceImpl.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestServiceImpl.java index b237a9c..77321d7 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestServiceImpl.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestServiceImpl.java @@ -119,9 +119,9 @@ public class AnalyseRequestServiceImpl implements IAnalyseRequestService { analyseReq.setRedissonTime(DateUtils.parseDateToStr(DateUtils.yyyyMMddHHmmss, new Date())); if (testMode) { - delayQueueService.submitRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseReq, (3L), TimeUnit.MINUTES); + delayQueueService.submitRequest(AnalyseConstants.ALGORITHM_REQUEST_DELAY_QUEUE, analyseReq, (3L), TimeUnit.MINUTES); } else { - delayQueueService.submitRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseReq, requestTimeout, TimeUnit.DAYS); + delayQueueService.submitRequest(AnalyseConstants.ALGORITHM_REQUEST_DELAY_QUEUE, analyseReq, requestTimeout, TimeUnit.DAYS); } if (!retryDelegate.callRemoteAnalyseService(analyseReq)) { diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/DelayQueueService.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/DelayQueueService.java index def9973..6cbfc3e 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/DelayQueueService.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/DelayQueueService.java @@ -6,5 +6,7 @@ public interface DelayQueueService { void submitRequest(String queueName, T payload, long delay, TimeUnit timeUnit); boolean removeRequest(String queueName, T payload); + + boolean containRequest(String queueName, T payload); } diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/RedissonDelayQueueServiceImpl.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/RedissonDelayQueueServiceImpl.java index 204bd5a..ee95b86 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/RedissonDelayQueueServiceImpl.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/RedissonDelayQueueServiceImpl.java @@ -41,5 +41,13 @@ public class RedissonDelayQueueServiceImpl implements DelayQueueService { return delayedQueue.remove(payload); } + + @Override + public boolean containRequest(String queueName, T payload) { + RBlockingQueue blockingQueue = redissonClient.getBlockingQueue(queueName); + RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingQueue); + + return delayedQueue.contains(payload); + } }