diff --git a/inspect-main/inspect-main-task/pom.xml b/inspect-main/inspect-main-task/pom.xml index cd5a570..32cbdc1 100644 --- a/inspect-main/inspect-main-task/pom.xml +++ b/inspect-main/inspect-main-task/pom.xml @@ -63,5 +63,11 @@ poi-ooxml 5.2.3 + + org.redisson + redisson + 3.23.5 + + \ No newline at end of file diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/constant/AnalyseConstants.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/constant/AnalyseConstants.java index abfdc84..5647aa4 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/constant/AnalyseConstants.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/constant/AnalyseConstants.java @@ -41,5 +41,7 @@ public class AnalyseConstants { public static final String ALGORITHM_REQUEST_QUEUE = "algorithm:request:queue"; public static final String ALGORITHM_RESPONSE_QUEUE = "algorithm:response:queue"; + public static final String ANALYSE_REQ_DELAY_QUEUE = "ANALYSE_REQ_DELAY_QUEUE"; + public static final String LUMINOSITY_REQUEST_QUEUE = "luminosity:request:queue"; } diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestRetryableConsumerManager.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestRetryableConsumerManager.java index 3d09ab0..2615c8c 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestRetryableConsumerManager.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestRetryableConsumerManager.java @@ -2,7 +2,6 @@ package com.inspect.analysis.service.impl; import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.inspect.analysis.constant.AnalyseConstants; -import com.inspect.base.core.utils.HttpClientUtils; import com.inspect.base.redis.service.RedisService; import com.inspect.partrolresult.domain.AnalyseRequest; import com.inspect.partrolresult.service.AnalyseRemoteService; @@ -34,6 +33,9 @@ public class AlgorithmRequestRetryableConsumerManager { @Value("${task.redis.request.idle-sleep-ms:500}") private long idleSleepMs; + @Value("${task.test-mode:false}") + private boolean testMode; + private RedisQueueRetryableConsumerAsync consumer; @Resource @@ -65,14 +67,14 @@ public class AlgorithmRequestRetryableConsumerManager { } catch (IOException e) { // 可以记录失败次数,避免无限重试 int retryCount = Optional.of(request.getRetryCount()).orElse(0); - log.info("AlgorithmRequestRetryableConsumerManager Send failed, will retry: {}, retryCount: {}, msg: {}", request, request.getRetryCount(), e.getMessage()); - if (retryCount < 1) { + log.info("AlgorithmRequestRetryableConsumerManager Failed, retryCount: {}, reqeust: {}, msg: {}", request.getRetryCount(), request, e.getMessage()); + final int TRIES_MAX = testMode ? 50 : 3; + if (retryCount < TRIES_MAX) { request.setRetryCount(retryCount + 1); - redisService.redisTemplate.opsForList().rightPush(AnalyseConstants.ALGORITHM_REQUEST_QUEUE, new Gson().toJson(request)); + redisService.redisTemplate.opsForList().leftPush(AnalyseConstants.ALGORITHM_REQUEST_QUEUE, new Gson().toJson(request)); } else { - log.info("AlgorithmRequestRetryableConsumerManager Max retry reached, dropping request: {}", request); - final String defaultUrl = "http://localhost:" + this.port + AnalyseConstants.ANALYSE_RET_URI; - HttpClientUtils.sendPostAgain(defaultUrl, request.toErrorResultStr()); + log.info("AlgorithmRequestRetryableConsumerManager Compensate, retryCount: {}, request: {}", retryCount, request); + analyseRemoteService.sendCompensateRequest(request); } } } diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisRedisKeyExpireListener.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisRedisKeyExpireListener.java deleted file mode 100644 index 1b5e568..0000000 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisRedisKeyExpireListener.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.inspect.analysis.service.impl; - -import com.inspect.analysis.constant.AnalyseConstants; -import com.inspect.base.core.constant.Color; -import com.inspect.base.redis.service.RedisService; - -import com.inspect.partrolresult.domain.AnalyseRequest; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; -import org.springframework.data.redis.listener.RedisMessageListenerContainer; -import org.springframework.stereotype.Component; - -import javax.annotation.Resource; - -@Component -public class AnalysisRedisKeyExpireListener extends KeyExpirationEventMessageListener { - private final Logger log = LoggerFactory.getLogger(AnalysisRedisKeyExpireListener.class); - - @Resource - private RedisService redisService; - - public AnalysisRedisKeyExpireListener(RedisMessageListenerContainer listenerContainer) { - super(listenerContainer); - } - - public void onMessage(Message message, byte[] pattern) { - String expiredKey = message.toString(); - log.info(Color.YELLOW + "Analysis RedisKey Expire: {}, pattern: {}" + Color.END, expiredKey, pattern); - if(expiredKey.startsWith(AnalyseConstants.ANALYSE_FILTER_REQUEST)) { - AnalyseRequest analyseRequest = (AnalyseRequest) redisService.redisTemplate.opsForValue().getAndDelete(expiredKey); - log.info(Color.YELLOW + "Analysis RedisKey Expire Request: {}" + Color.END, analyseRequest); - } - } - -} diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisServiceImpl.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisServiceImpl.java index cf71ab3..ca102c9 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisServiceImpl.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisServiceImpl.java @@ -23,6 +23,7 @@ import com.inspect.message.MessageUtils; import com.inspect.partrolresult.domain.AnalyseRequest; import com.inspect.partrolresult.domain.PatrolResult; import com.inspect.partrolresult.service.AnalyseRemoteService; +import com.inspect.partrolresult.service.DelayQueueService; import com.inspect.partrolresult.service.IPatrolResultService; import com.inspect.resultmain.domain.PatrolTaskResultMain; import com.inspect.resultmain.service.IPatrolTaskResultMainService; @@ -75,92 +76,8 @@ public class AnalysisServiceImpl implements IAnalysisService { @Resource private ResultAnalysisUtils resultAnalysisUtils; - -// @Override -// public void picAnalyseRetNotify(AnalyseResult analyseResult) { -// log.info(Color.CYAN + "###### 分析算法模块返回的表计识别结果 start, analyseResult:{} ######" + Color.END, analyseResult); -// String requestId = analyseResult.getRequestId(); -// String keyId = AnalyseConstants.ANALYSE_REQUEST_ID.concat(requestId); -// if (!redisService.hasKey(keyId)) { -// log.error("picAnalyseRetNotify isTest: {}, NO keyId={}, REQUEST_ID={} in REDIS!", analyseResult.isTest(), keyId, requestId); -// if (!analyseResult.isTest()) { -// return; -// } -// //redisService.setCacheObject(keyId, "123456789"); -// } -// -// String patrolTaskIdObj = redisService.getCacheObject(keyId); -// log.info("picAnalyseRetNotify keyId={}, requestId={}, patrolTaskIdObj={}", keyId, requestId, patrolTaskIdObj); -// analyseResult.setTaskPatrolId(patrolTaskIdObj); -// String keyFilterRequest = AnalyseConstants.ANALYSE_FILTER_REQUEST + requestId; -// log.info("picAnalyseRetNotify keyFilterRequest={}", keyFilterRequest); -// boolean bBigModelExecFlag = false; -// String[] algTypeList = {}; -// AnalyseRequest analyseRequest = null; -// if (redisService.hasKey(keyFilterRequest)) { // 初筛结果 -// analyseResult.setFilter("1"); // 设置初筛标志 -// analyseRequest = (AnalyseRequest) redisService.redisTemplate.opsForValue().getAndDelete(keyFilterRequest); -// log.info("FILTER_RESULT picAnalyseRetNotify analyseRequest IN REDIS: {}", analyseRequest); -// AnalyseResPoint analyseResPoint = analyseResult.getResultList().get(0).getResults().get(0); -// if (analyseRequest != null && analyseRequest.getObjectList() != null && !analyseRequest.getObjectList().isEmpty()) { -// algTypeList = analyseRequest.getObjectList().get(0).getTypeList(); -// } -// -// boolean bDefect = analyseResPoint.isDefect(); // code=2000代表初筛结果返回正常,value=1代表有缺陷 -// log.info("picAnalyseRetNotify FILTER bDefect={}, algorithmType={}", bDefect, analyseResPoint.getType()); -// analyseResult.reloadReq(analyseRequest);//只有一个,analyseReq.getObjectList().get(0) -// /* -// *大致两种情况:1. 算法是表计(meter); 2. 算法不是表计(meter) -// * 1. 算法不是表计(meter) -// * 1.1 初筛结果有缺陷:继续发给大模型处理 -// * 1.2 初筛结果无缺陷:不用调用大模型,流程就此结束 -// * 2. 算法是表计(meter) -// * 不论初筛结果有没有缺陷,都要继续调用大模型,因为大模型调用了表计(meter)识别算法 -// */ -// final String algType = analyseRequest.getObjectList().get(0).getTypeList()[0]; -// log.info("picAnalyseRetNotify algType IN REDIS: {}", algType); -// if (bDefect || (AlgConstants.METER.equals(algType) -// || AlgConstants.XB.equals(algType) -// || AlgConstants.INFRA_1800.equals(algType) -// || AlgConstants.INFRA_YU3.equals(algType) -// || AlgConstants.INFRA_CAMERA.equals(algType) -// || AlgConstants.INFRA_CAMERA_REVERSE.equals(algType)) -// ) { -// /* -// * 先检查结果有缺无缺陷,如果有缺陷不用判断算法直接调用大模型;* -// * 如果无缺陷,再去判断算法,如果算法是meter就继续调用大模型。* -// */ -// analyseResult.setResult("0"); -// bBigModelExecFlag = true; -//// log.info("picAnalyseRetNotify CALL BIG_MODEL REQUEST_ID={}", requestId); -//// ispAlgorithmRequestService.sendRequest(analyseRequest); -// } else { -// //初筛结果无缺陷并且非表计算法,不用调用大模型,流程就此结束 -// log.info("picAnalyseRetNotify NO BIG_MODEL WOULD CALLED REQUEST_ID={}", requestId); -// } -// } else { -// // 大模型结果 -// log.info("BIG_MODEL_RESULT CALLBACK picAnalyseRetNotify REQUEST_ID={}", requestId); -// analyseResult.setFilter("0"); // 设置大模型标志为0 -// } -// -// //qinyl -// analysisLogService.log(new AnalyseLog(analyseResult.toString(), "1", analyseResult.getTaskPatrolId(), analyseResult.getFilter(), requestId)); -// try { -// analysis(analyseResult); -// } catch (Exception e) { -// log.error("error", e); -// } -// //doAlgorithmAnalysis(analyseResult); -// //calculateProcess(analyseResult); -// log.info(Color.CYAN + "###### 分析算法模块返回的表计识别结果 end ######" + Color.END); -// -// if (bBigModelExecFlag) { -// log.info("picAnalyseRetNotify CALL BIG_MODEL REQUEST_ID={}", requestId); -//// ispAlgorithmRequestService.sendRequest(analyseRequest); -// analyseRemoteService.sendRequest(analyseRequest, algTypeList, false); -// } -// } + @Resource + private DelayQueueService delayQueueService; @Override public void picAnalyseRetNotify(AnalyseResult analyseResult) { @@ -198,6 +115,9 @@ public class AnalysisServiceImpl implements IAnalysisService { analyseResult.setFilter("1"); // 设置初筛标志 analyseRequest = (AnalyseRequest) redisService.redisTemplate.opsForValue().getAndDelete(filterRequestRedisKey); log.info("ALGO_RES_UUID: {}, FILTER_RESULT picAnalyseRetNotify analyseRequest IN REDIS: {}", logLabel, analyseRequest); + boolean isRemoved = delayQueueService.removeRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseRequest); + log.info("Redisson Removed Result: {}, analyseRequest: {}", isRemoved? "success" : "fail", analyseRequest); + AnalyseResPoint analyseResPoint = analyseResult.getResultList().get(0).getResults().get(0); if (analyseRequest != null && analyseRequest.getObjectList() != null && !analyseRequest.getObjectList().isEmpty()) { algTypeList = analyseRequest.getObjectList().get(0).getTypeList(); diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/AnalyseRequest.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/AnalyseRequest.java index a5ea3d9..0e5e650 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/AnalyseRequest.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/AnalyseRequest.java @@ -5,16 +5,18 @@ import com.inspect.analysis.domain.AnalyseResItem; import com.inspect.analysis.domain.AnalyseResPoint; import com.inspect.analysis.domain.AnalyseResult; import com.inspect.analysis.service.RetryableRequest; +import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; import java.io.Serializable; import java.util.ArrayList; + import java.util.List; -import java.util.Objects; @Setter @Getter +@EqualsAndHashCode(of = "requestId") public class AnalyseRequest implements RetryableRequest, Serializable { private List objectList; private String requestHostIp; @@ -33,6 +35,10 @@ public class AnalyseRequest implements RetryableRequest, Serializable { private int retryCount; private String requestUrl; + public AnalyseRequest() { + + } + @Override public int getRetryCount() { return retryCount; @@ -76,18 +82,4 @@ public class AnalyseRequest implements RetryableRequest, Serializable { public String toString() { return JSONObject.toJSONString(this); } - - @Override - public boolean equals(Object object) { - if (this == object) return true; - if (object == null || getClass() != object.getClass()) return false; - AnalyseRequest that = (AnalyseRequest) object; - return Objects.equals(objectList, that.objectList) && Objects.equals(requestHostIp, that.requestHostIp) && Objects.equals(requestHostPort, that.requestHostPort) && Objects.equals(requestId, that.requestId) && Objects.equals(algorithmPath, that.algorithmPath) && Objects.equals(taskPatrolId, that.taskPatrolId); - } - - @Override - public int hashCode() { - return Objects.hash(objectList, requestHostIp, requestHostPort, requestId, algorithmPath, taskPatrolId); - } - } diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRemoteService.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRemoteService.java index d3361cf..cde532e 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRemoteService.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRemoteService.java @@ -1,5 +1,6 @@ package com.inspect.partrolresult.service; +import com.fasterxml.jackson.databind.ObjectMapper; import com.github.pagehelper.util.StringUtil; import com.inspect.analysis.constant.AnalyseConstants; @@ -9,6 +10,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import com.inspect.base.core.constant.RedisConst; +import com.inspect.base.core.utils.HttpClientUtils; import com.inspect.base.core.utils.StringUtils; import com.inspect.base.redis.service.RedisService; import com.inspect.partrolresult.domain.AnalyseReqItem; @@ -39,6 +41,9 @@ public class AnalyseRemoteService { @Resource private AnalyseRequestRetryableDelegate retryDelegate; + @Resource + private DelayQueueService delayQueueService; + //qinyl public void sendRequest(AnalyseRequest analyseReq, String[] typeList, boolean isFilter) throws IOException { final long requestTimeout = 1L; @@ -60,7 +65,13 @@ public class AnalyseRemoteService { if ("1".equals(analyseFilter) && isFilter) { final String analyzeFilterRequestIdRedisKey = AnalyseConstants.ANALYSE_FILTER_REQUEST.concat(requestId); log.info("[FILTER] sendRequest analyseFilterRequestIdRedisKey: {}, analyseReq: {}", analyzeFilterRequestIdRedisKey, analyseReq); - this.redisService.setCacheObject(analyzeFilterRequestIdRedisKey, analyseReq.clone(), testMode?3L:requestTimeout, testMode?TimeUnit.MINUTES:TimeUnit.DAYS); + //redisService.setCacheObject(analyzeFilterRequestIdRedisKey, analyseReq.clone(), testMode?3L:requestTimeout, testMode?TimeUnit.MINUTES:TimeUnit.DAYS); + redisService.setCacheObject(analyzeFilterRequestIdRedisKey, analyseReq.clone()); + if(testMode) { + delayQueueService.submitRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseReq, (3L), TimeUnit.MINUTES); + } else { + delayQueueService.submitRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseReq, requestTimeout, TimeUnit.DAYS); + } AnalyseReqItem analyseReqItem = analyseReq.getObjectList().get(0);//只取第一个 analyseReqItem.setTypeList(typeList); analyseReq.setObjectList(Collections.singletonList(analyseReqItem)); @@ -97,5 +108,28 @@ public class AnalyseRemoteService { } } + public void sendCompensateRequest(AnalyseRequest request) { + final String defaultUrl = "http://localhost:" + this.port + AnalyseConstants.ANALYSE_RET_URI; + HttpClientUtils.sendPostAgain(defaultUrl, request.toErrorResultStr()); + } + + public static void main(String[] args) { + try { + ObjectMapper mapper = new ObjectMapper(); + + AnalyseRequest r1 = new AnalyseRequest(); + r1.setRequestId("3177fe1c5d6f44f7bc527d5de8451ab9"); + String json = mapper.writeValueAsString(r1); + + AnalyseRequest r2 = mapper.readValue(json, AnalyseRequest.class); + boolean res = r1.equals(r2); + System.out.println(res); // 应为 true + boolean res2 = r1.hashCode() == r2.hashCode(); + System.out.println(res2); // 应为 true + } catch (Exception e) { + e.printStackTrace(); + } + + } } diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestDelayQueueListener.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestDelayQueueListener.java new file mode 100644 index 0000000..9547689 --- /dev/null +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestDelayQueueListener.java @@ -0,0 +1,60 @@ +package com.inspect.partrolresult.service; + +import com.inspect.analysis.constant.AnalyseConstants; +import com.inspect.base.redis.service.RedisService; +import com.inspect.partrolresult.domain.AnalyseRequest; +import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RBlockingQueue; +import org.redisson.api.RedissonClient; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +@Slf4j +@Component +public class AnalyseRequestDelayQueueListener implements InitializingBean { + + private final String ListenerName = "AnalyseRequest-DelayQueue-Listener"; + + @Resource + private RedissonClient redissonClient; + + @Resource + private RedisService redisService; + + @Resource + private AnalyseRemoteService analyseRemoteService; + + @Override + public void afterPropertiesSet() { + // 启动监听线程 + new Thread(() -> { + RBlockingQueue blockingQueue = redissonClient.getBlockingQueue(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE); + log.info("Listening AnalyseRequest Delay Queue Task Launching"); + + while (true) { + try { + AnalyseRequest request = blockingQueue.take(); // 阻塞等待任务到期 + handleRequest(request); + } catch (Exception e) { + log.error("Listening AnalyseRequest Queue Exception: ", e); + } + } + }, ListenerName).start(); + } + + private void handleRequest(AnalyseRequest request) { + String redisKey = AnalyseConstants.ANALYSE_FILTER_REQUEST.concat(request.getRequestId()); + Object cached = redisService.redisTemplate.opsForValue().getAndDelete(redisKey); + if (cached != null) { + // 正常到期但业务未处理,走补偿逻辑 + log.info("AnalyseRequest Been Not Consumed, Compensate Triggered: {}", request); + analyseRemoteService.sendCompensateRequest(request); + } else { + // 正常业务已完成,什么也不做 + log.info("AnalyseRequest Been Normally Consumed: {}", request.getRequestId()); + } + } +} + diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/DelayQueueService.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/DelayQueueService.java new file mode 100644 index 0000000..def9973 --- /dev/null +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/DelayQueueService.java @@ -0,0 +1,10 @@ +package com.inspect.partrolresult.service; + +import java.util.concurrent.TimeUnit; + +public interface DelayQueueService { + void submitRequest(String queueName, T payload, long delay, TimeUnit timeUnit); + + boolean removeRequest(String queueName, T payload); +} + diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/RedissonConfig.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/RedissonConfig.java new file mode 100644 index 0000000..440329f --- /dev/null +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/RedissonConfig.java @@ -0,0 +1,17 @@ +package com.inspect.partrolresult.service; + +import org.redisson.Redisson; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class RedissonConfig { + @Bean + public RedissonClient redissonClient() { + Config config = new Config(); + config.useSingleServer().setAddress("redis://redis:6379"); + return Redisson.create(config); + } +} diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/RedissonDelayQueueServiceImpl.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/RedissonDelayQueueServiceImpl.java new file mode 100644 index 0000000..204bd5a --- /dev/null +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/RedissonDelayQueueServiceImpl.java @@ -0,0 +1,45 @@ +package com.inspect.partrolresult.service; + +import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RBlockingQueue; +import org.redisson.api.RDelayedQueue; +import org.redisson.api.RedissonClient; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Service +public class RedissonDelayQueueServiceImpl implements DelayQueueService { + + @Resource + private RedissonClient redissonClient; + + /** + * 工作机制如下: + * 1. 先往延迟池delayedQueue存入了一个 ZSET + HASH(Redis结构),并设置了时间戳 + * 2. Redisson 后台调度器线程会在数据到期时将它从延迟 ZSET 中移动到目标的 blockingQueue 中。 + * @param queueName + * @param payload + * @param delay + * @param timeUnit + */ + @Override + public void submitRequest(String queueName, T payload, long delay, TimeUnit timeUnit) { + RBlockingQueue blockingQueue = redissonClient.getBlockingQueue(queueName); + RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingQueue); + + delayedQueue.offer(payload, delay, timeUnit); + log.info("Submit Delay Queue Task To: {}, Delay: {} {}", queueName, delay, timeUnit); + } + + @Override + public boolean removeRequest(String queueName, T payload) { + RBlockingQueue blockingQueue = redissonClient.getBlockingQueue(queueName); + RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingQueue); + + return delayedQueue.remove(payload); + } +} +