From a547d2d2063e9be8aeb46e9e8c97ba64f2cda456 Mon Sep 17 00:00:00 2001 From: htjcAdmin Date: Sat, 26 Jul 2025 22:09:51 +0800 Subject: [PATCH] =?UTF-8?q?/*=E9=92=88=E5=AF=B9=E9=87=91=E5=8D=8E=E7=AB=99?= =?UTF-8?q?=E6=99=BA=E8=83=BD=E5=B7=A1=E8=A7=86=E4=BB=BB=E5=8A=A1=E5=87=BA?= =?UTF-8?q?=E7=8E=B0=E7=9A=84bug=E8=BF=9B=E8=A1=8C=E4=BF=AE=E6=94=B9?= =?UTF-8?q?=E3=80=82*/?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/IvsResourceRetryableDelegate.java | 2 +- .../com/inspect/job/task/JobMainTask.java | 40 ++++++-- .../analysis/domain/AnalyseResult.java | 4 + .../analysis/service/IAnalyseRespService.java | 4 +- ...orithmRequestRetryableConsumerManager.java | 15 +-- .../AlgorithmResponseConsumerManager.java | 12 +-- .../AlgorithmResponseProcessConsumer.java | 48 --------- ...rithmResponseRetryableConsumerManager.java | 74 ++++++++++++++ .../service/impl/AnalyseRespServiceImpl.java | 98 +++++++++++-------- .../RedisQueueRetryableConsumerAsync.java | 31 +++--- .../partrolresult/domain/AnalyseRequest.java | 5 + .../service/AnalyseRemoteService.java | 48 +++++---- .../AnalyseRequestDelayQueueListener.java | 17 +++- .../AnalyseRequestRetryableDelegate.java | 14 ++- .../task/controller/PatrolTaskController.java | 8 +- 15 files changed, 266 insertions(+), 154 deletions(-) delete mode 100644 inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseProcessConsumer.java create mode 100644 inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseRetryableConsumerManager.java diff --git a/inspect-ivs/src/main/java/com/inspect/ivs/service/IvsResourceRetryableDelegate.java b/inspect-ivs/src/main/java/com/inspect/ivs/service/IvsResourceRetryableDelegate.java index c805560..763c9c8 100644 --- a/inspect-ivs/src/main/java/com/inspect/ivs/service/IvsResourceRetryableDelegate.java +++ b/inspect-ivs/src/main/java/com/inspect/ivs/service/IvsResourceRetryableDelegate.java @@ -45,7 +45,7 @@ import java.util.UUID; public class IvsResourceRetryableDelegate { private static final Logger log = LoggerFactory.getLogger(IvsResourceRetryableDelegate.class); - private static final int RETRYABLE_MIN = 10; + private static final int RETRYABLE_MIN = 5; private static final int RETRYABLE_MAX = 20; diff --git a/inspect-job/src/main/java/com/inspect/job/task/JobMainTask.java b/inspect-job/src/main/java/com/inspect/job/task/JobMainTask.java index c2fe8cb..7a1db61 100644 --- a/inspect-job/src/main/java/com/inspect/job/task/JobMainTask.java +++ b/inspect-job/src/main/java/com/inspect/job/task/JobMainTask.java @@ -498,15 +498,19 @@ public class JobMainTask { + "/" + DateUtils.getDayEven() + "/" + taskExecRecord.getTaskCode() + "/"; - log.info("uuid: {}, basePath: {}", uuid, basePath); + log.info("PRE_POINT_EXEC patrolPointId: {}, actionId: {}, actionType: {}, deviceId: {}", + taskExecRecord.getOldTaskPatrolId(), + presetAction.getPresetActionId(), + presetAction.getActionType(), + presetPos.getPatrolPointId()); String taskPatrolId = taskExecRecord.getTaskPatrolId(); if (PresetAction.PHOTO.getCode().equals(presetAction.getActionType())) { final String chanType = presetPos.getChannelType(); fileTypes.append("ir".equals(chanType) ? "1" : "vl".equals(chanType) ? "2" : "").append(","); //boolean bOk = false; try { - log.info("PHOTO_PRESET_TYPE uuid: {}, chanType: {}, patrolPointId: {}, channelCode: {}, videoNvrCode: {}", - uuid, + log.info("PHOTO_PRESET_TYPE taskPatrolId: {}, chanType: {}, patrolPointId: {}, channelCode: {}, videoNvrCode: {}", + taskExecRecord.getOldTaskPatrolId(), chanType, presetPos.getPatrolPointId(), presetPos.getChannelCode(), @@ -621,8 +625,8 @@ public class JobMainTask { // recordPersist(taskExecRecord, infoListSize, patrolTaskInfo, presetPos, fileTypes, filePaths); } else if (PresetAction.VIDEO.getCode().equals(presetAction.getActionType())) { - log.info("VIDEO_PRESET_TYPE uuid: {}, chanType: {}, patrolPointId: {}, channelCode: {}, videoNvrCode: {}", - uuid, + log.info("VIDEO_PRESET_TYPE taskPatrolId: {}, chanType: {}, patrolPointId: {}, channelCode: {}, videoNvrCode: {}", + taskExecRecord.getOldTaskPatrolId(), presetPos.getChannelType(), presetPos.getPatrolPointId(), presetPos.getChannelCode(), @@ -660,6 +664,13 @@ public class JobMainTask { } } }, 0, 1000L); + } else { + log.info("UNKNOWN_PRESET_TYPE taskPatrolId: {}, chanType: {}, patrolPointId: {}, channelCode: {}, videoNvrCode: {}", + taskExecRecord.getOldTaskPatrolId(), + presetPos.getChannelType(), + presetPos.getPatrolPointId(), + presetPos.getChannelCode(), + presetPos.getVideoNvrCode()); } return taskExecRecord; @@ -1300,14 +1311,18 @@ public class JobMainTask { } CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); - TaskStatusManager.remove(patrolTaskExecRecord.getOldTaskPatrolId()); - log.info("CompletableFuture Break Join: taskPatrolId: {}", patrolTaskExecRecord.getOldTaskPatrolId()); + consummateCompletedTask(patrolTaskExecRecord); } private void handlePrePointBatch(final int threadCnt, final PatrolTaskExecRecord patrolTaskExecRecord, final List taskInfoBatch) { asyncTaskPatrolPointCnt.getAndAdd(taskInfoBatch.size()); - log.info("handlePrePointBatch threadCnt: {}, asyncTaskPatrolPointCnt: {}, batch size: {}, devNo: {}, taskId: {}", - threadCnt, asyncTaskPatrolPointCnt.get(), taskInfoBatch.size(), patrolTaskExecRecord.getDevNo(), patrolTaskExecRecord.getTaskId()); + log.info("HANDLE_PRESET_POINT_BATCH patrolPointId: {}, threadCnt: {}, asyncTaskPatrolPointCnt: {}, batch size: {}, devNo: {}, taskId: {}", + patrolTaskExecRecord.getOldTaskPatrolId(), + threadCnt, + asyncTaskPatrolPointCnt.get(), + taskInfoBatch.size(), + patrolTaskExecRecord.getDevNo(), + patrolTaskExecRecord.getTaskId()); final String taskPatrolId = patrolTaskExecRecord.getOldTaskPatrolId(); StatusMonitor monitor = TaskStatusManager.get(patrolTaskExecRecord.getOldTaskPatrolId()); @@ -1345,6 +1360,13 @@ public class JobMainTask { } } + private void consummateCompletedTask(final PatrolTaskExecRecord patrolTaskExecRecord) { + TaskStatusManager.remove(patrolTaskExecRecord.getOldTaskPatrolId()); + boolean isConsummate = redisService.deleteObjectOfTask(RedisConst.TASK_CURRENT_CODE, patrolTaskExecRecord.getTaskCode()); + log.info("CONSUMMATE_COMPLETED_TASK taskPatrolId: {}, isConsummate: {}", + patrolTaskExecRecord.getOldTaskPatrolId(), + isConsummate ? "false" : "true"); + } public List> optimizedGroup(List list) { Map> deviceMap = new HashMap<>(); diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/domain/AnalyseResult.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/domain/AnalyseResult.java index 95fe93a..bc46639 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/domain/AnalyseResult.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/domain/AnalyseResult.java @@ -4,6 +4,7 @@ package com.inspect.analysis.domain; import com.inspect.partrolresult.domain.AnalyseReqItem; import com.inspect.partrolresult.domain.AnalyseRequest; import lombok.Getter; +import lombok.Setter; import java.io.Serializable; import java.util.ArrayList; @@ -11,6 +12,7 @@ import java.util.List; import java.util.Objects; @Getter +@Setter public class AnalyseResult implements Serializable { private String requestId; private List resultList; @@ -20,6 +22,7 @@ public class AnalyseResult implements Serializable { private String result = "1"; private boolean isTest = false; private int totalNumber; + private boolean isCompensate = false; // false-normal; true-compensate public void setResultsList(List resultsList) { this.resultsList = resultsList; @@ -98,6 +101,7 @@ public class AnalyseResult implements Serializable { ", resultList=" + resultList + ", resultsList=" + resultsList + ", taskPatrolId='" + taskPatrolId + '\'' + + ", isCompensate='" + isCompensate + '\'' + ", filter='" + filter + '\'' + ", result='" + result + '\'' + '}'; diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/IAnalyseRespService.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/IAnalyseRespService.java index 7dd3860..4fadfd1 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/IAnalyseRespService.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/IAnalyseRespService.java @@ -2,8 +2,10 @@ package com.inspect.analysis.service; import com.inspect.analysis.domain.AnalyseResult; +import java.io.IOException; + public interface IAnalyseRespService { void picAnalyseRetNotify(AnalyseResult analyseResult); - void handleAlgorithmResult(final AnalyseResult analyseResult); + void handleAlgorithmResult(final AnalyseResult analyseResult) throws IOException; } diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestRetryableConsumerManager.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestRetryableConsumerManager.java index 2615c8c..2d9f2b5 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestRetryableConsumerManager.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestRetryableConsumerManager.java @@ -5,6 +5,7 @@ import com.inspect.analysis.constant.AnalyseConstants; import com.inspect.base.redis.service.RedisService; import com.inspect.partrolresult.domain.AnalyseRequest; import com.inspect.partrolresult.service.AnalyseRemoteService; +import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; @@ -20,15 +21,15 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; +@Slf4j @Component public class AlgorithmRequestRetryableConsumerManager { - private static final Logger log = LoggerFactory.getLogger(AlgorithmRequestRetryableConsumerManager.class); @Value("${server.port}") private String port; - @Value("${task.redis.request.interval-ms:2000}") - private long intervalMs; + @Value("${task.redis.request.permits-per-second:0.5}") + private double permitsPerSecond; @Value("${task.redis.request.idle-sleep-ms:500}") private long idleSleepMs; @@ -52,7 +53,7 @@ public class AlgorithmRequestRetryableConsumerManager { redisService.redisTemplate, AnalyseConstants.ALGORITHM_REQUEST_QUEUE, AnalyseRequest.class, - intervalMs, + permitsPerSecond, idleSleepMs, request -> executor.submit(() -> processRequest(request)) // 交给线程池处理 ); @@ -62,18 +63,18 @@ public class AlgorithmRequestRetryableConsumerManager { private void processRequest(AnalyseRequest request) { try { - log.info("AlgorithmRequestRetryableConsumerManager Processing queueSize: {}, request: {}", getQueueSize(), request); + log.info("ALGORITHM_REQUEST_RETRYABLE_CONSUME_MANAGE queueSize: {}, request: {}", getQueueSize(), request); analyseRemoteService.sendRequest(request, request.getTypeList(), request.isFilter()); } catch (IOException e) { // 可以记录失败次数,避免无限重试 int retryCount = Optional.of(request.getRetryCount()).orElse(0); - log.info("AlgorithmRequestRetryableConsumerManager Failed, retryCount: {}, reqeust: {}, msg: {}", request.getRetryCount(), request, e.getMessage()); + log.info("ALGORITHM_REQUEST_RETRYABLE_CONSUME_MANAGE failed, retryCount: {}, reqeust: {}, msg: {}", request.getRetryCount(), request, e.getMessage()); final int TRIES_MAX = testMode ? 50 : 3; if (retryCount < TRIES_MAX) { request.setRetryCount(retryCount + 1); redisService.redisTemplate.opsForList().leftPush(AnalyseConstants.ALGORITHM_REQUEST_QUEUE, new Gson().toJson(request)); } else { - log.info("AlgorithmRequestRetryableConsumerManager Compensate, retryCount: {}, request: {}", retryCount, request); + log.info("ALGORITHM_REQUEST_RETRYABLE_CONSUME_MANAGE compensate, retryCount: {}, request: {}", retryCount, request); analyseRemoteService.sendCompensateRequest(request); } } diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseConsumerManager.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseConsumerManager.java index 07b08aa..645cc91 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseConsumerManager.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseConsumerManager.java @@ -5,17 +5,17 @@ import com.inspect.analysis.domain.AnalyseResult; import com.inspect.analysis.service.IAnalyseRespService; import com.inspect.base.core.constant.Color; import com.inspect.base.redis.service.RedisService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.annotation.Resource; +import java.io.IOException; -@Component +@Slf4j +//@Component public class AlgorithmResponseConsumerManager { - private static final Logger log = LoggerFactory.getLogger(AlgorithmResponseConsumerManager.class); private RedisQueueConsumerAsync consumer; @@ -37,13 +37,13 @@ public class AlgorithmResponseConsumerManager { try { log.info("AlgorithmResponseConsumerManager queueSize: {}, response: {}", getQueueSize(), response); analysisService.handleAlgorithmResult(response); - } catch (Exception e) { + } catch (IOException e) { log.info(Color.RED + "AlgorithmResponseConsumerManager error queueSize: {}, request: {}" + Color.END, getQueueSize(), response); } } ); - consumer.start(); // 应用启动时即开始消费 + consumer.start(); } @PreDestroy diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseProcessConsumer.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseProcessConsumer.java deleted file mode 100644 index 1f71949..0000000 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseProcessConsumer.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.inspect.analysis.service.impl; - -import com.alibaba.nacos.shaded.com.google.gson.Gson; -import com.inspect.analysis.constant.AnalyseConstants; -import com.inspect.analysis.domain.AnalyseResult; -import com.inspect.analysis.service.IAnalyseRespService; -import com.inspect.base.core.constant.Color; -import com.inspect.base.redis.service.RedisService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.scheduling.annotation.Scheduled; - -import javax.annotation.Resource; - -//@Component -public class AlgorithmResponseProcessConsumer { - private final Logger logger = LoggerFactory.getLogger(AlgorithmResponseProcessConsumer.class); - - @Resource - private RedisService redisService; - - @Resource - private IAnalyseRespService analysisService; - - //@Scheduled(fixedDelay = 1000) - @Scheduled(fixedDelayString = "${task.scheduler.response.delay-ms:3000}") - public void pollAndProcess() { - //logger.info(Color.CYAN + "AlgorithmResponseProcessConsumerTracer" + Color.END); - String responseDataInRedis; - RedisTemplate redisTemplate = redisService.redisTemplate; - while ((responseDataInRedis = redisTemplate.opsForList().leftPop(AnalyseConstants.ALGORITHM_RESPONSE_QUEUE)) != null) { - try { - logger.info(Color.CYAN + "AlgorithmResponseProcessConsumerTracer queueSize: {}, responseData: {}" + Color.END, getQueueSize(), responseDataInRedis); -// AnalyseRequest analyseRequest = new Gson().fromJson(responseDataInRedis, AnalyseRequest.class); -// analyseRemoteService.sendRequest(analyseRequest, analyseRequest.getTypeList(), analyseRequest.isFilter()); - AnalyseResult analyseResult = new Gson().fromJson(responseDataInRedis, AnalyseResult.class); - analysisService.handleAlgorithmResult(analyseResult); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } - } - } - - public long getQueueSize() { - return redisService.redisTemplate.opsForList().size(AnalyseConstants.ALGORITHM_RESPONSE_QUEUE); - } -} diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseRetryableConsumerManager.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseRetryableConsumerManager.java new file mode 100644 index 0000000..633ce38 --- /dev/null +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseRetryableConsumerManager.java @@ -0,0 +1,74 @@ +package com.inspect.analysis.service.impl; + +import com.inspect.analysis.constant.AnalyseConstants; +import com.inspect.analysis.domain.AnalyseResult; +import com.inspect.analysis.service.IAnalyseRespService; +import com.inspect.base.core.constant.Color; +import com.inspect.base.redis.service.RedisService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.annotation.Resource; +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Slf4j +@Component +public class AlgorithmResponseRetryableConsumerManager { + + private RedisQueueRetryableConsumerAsync consumer; + @Resource + private RedisService redisService; + + @Resource + private IAnalyseRespService analysisService; + + private final ExecutorService executor = Executors.newFixedThreadPool(10); + + @PostConstruct + public void initConsumer() { + consumer = new RedisQueueRetryableConsumerAsync( + redisService.redisTemplate, + AnalyseConstants.ALGORITHM_RESPONSE_QUEUE, + AnalyseResult.class, + 1.0, + 500, + response -> executor.submit(() -> { + try { + log.info("ALGORITHM_RESPONSE_RETRYABLE_CONSUME_MANAGE queueSize: {}, reqeustId: {}, patrolId: {}", + getQueueSize(), + response.getRequestId(), + response.getTaskPatrolId()); + long start = System.currentTimeMillis(); + analysisService.handleAlgorithmResult(response); + log.info("ALGORITHM_RESPONSE_RETRYABLE_CONSUME_MANAGE COST: {}, reqeustId: {}, patrolId: {}", + System.currentTimeMillis() - start, + response.getRequestId(), + response.getTaskPatrolId()); + } catch (IOException e) { + log.info(Color.RED + "ALGORITHM_RESPONSE_RETRYABLE_CONSUME_MANAGE error queueSize: {}, request: {}, Msg: {}" + Color.END, + getQueueSize(), + response, + e.getMessage()); + } + }) + ); + + consumer.start(); + } + + @PreDestroy + public void stopConsumer() { + if (consumer != null) { + consumer.stop(); + } + } + + public long getQueueSize() { + return redisService.redisTemplate.opsForList().size(AnalyseConstants.ALGORITHM_RESPONSE_QUEUE); + } +} + diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalyseRespServiceImpl.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalyseRespServiceImpl.java index 8d07ad8..c02a214 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalyseRespServiceImpl.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalyseRespServiceImpl.java @@ -81,21 +81,30 @@ public class AnalyseRespServiceImpl implements IAnalyseRespService { @Override public void picAnalyseRetNotify(AnalyseResult analyseResult) { - log.info("Algorithm Result To Redis!"); + log.info("RESULT_TO_ALGORITHM_RESPONSE_QUEUE: patrolPointId: {}, isCompensate: {}, requestId: {}, RESULT: {}", + analyseResult.getTaskPatrolId(), + analyseResult.isCompensate(), + analyseResult.getRequestId(), + analyseResult + ); redisService.redisTemplate.opsForList().rightPush( AnalyseConstants.ALGORITHM_RESPONSE_QUEUE, new Gson().toJson(analyseResult) ); } - public void handleAlgorithmResult(final AnalyseResult analyseResult) { - String logLabel = UUID.randomUUID().toString().trim().replaceAll(StringUtils.DASH, StringUtils.EMPTY); - - log.info(Color.CYAN + "ALGO_RES_UUID: {}, ###### 分析算法模块返回的表计识别结果 start, analyseResult:{} ######" + Color.END, logLabel, analyseResult); + public void handleAlgorithmResult(final AnalyseResult analyseResult) throws IOException { + //String logLabel = UUID.randomUUID().toString().trim().replaceAll(StringUtils.DASH, StringUtils.EMPTY); String requestId = analyseResult.getRequestId(); + log.info(Color.GREEN + "HANDLE_ALGORITHM_RESULT_S patrolPointId: {}, filter: {}, requestId: {}, analyseResult:{}" + Color.END, + analyseResult.getTaskPatrolId(), + analyseResult.getFilter(), + requestId, + analyseResult); + String keyId = AnalyseConstants.ANALYSE_REQUEST_ID.concat(requestId); if (!redisService.hasKey(keyId)) { - log.error("ALGO_RES_UUID: {}, picAnalyseRetNotify isTest: {}, NO keyId={}, REQUEST_ID={} in REDIS!", logLabel, analyseResult.isTest(), keyId, requestId); + log.error("ALGO_RES_UUID: {}, picAnalyseRetNotify isTest: {}, NO keyId={}, REQUEST_ID={} in REDIS!", requestId, analyseResult.isTest(), keyId, requestId); if (!analyseResult.isTest()) { return; } @@ -103,31 +112,33 @@ public class AnalyseRespServiceImpl implements IAnalyseRespService { } String patrolTaskIdObj = redisService.getCacheObject(keyId); - log.info("ALGO_RES_UUID: {}, picAnalyseRetNotify keyId={}, requestId={}, patrolTaskIdObj={}", logLabel, keyId, requestId, patrolTaskIdObj); + log.info("ALGO_RES_UUID: {}, picAnalyseRetNotify keyId={}, requestId={}, patrolTaskIdObj={}", requestId, keyId, requestId, patrolTaskIdObj); analyseResult.setTaskPatrolId(patrolTaskIdObj); final String filterRequestRedisKey = AnalyseConstants.ANALYSE_FILTER_REQUEST + requestId; final String bigModelRequestRedisKey = AnalyseConstants.ANALYSE_AI_REQUEST + requestId; - log.info("ALGO_RES_UUID: {}, picAnalyseRetNotify filterRequestRedisKey={}, bigModelRequestRedisKey={}", logLabel, filterRequestRedisKey, bigModelRequestRedisKey); + log.info("ALGO_RES_UUID: {}, picAnalyseRetNotify filterRequestRedisKey={}, bigModelRequestRedisKey={}", requestId, filterRequestRedisKey, bigModelRequestRedisKey); boolean bBigModelExecFlag = false; String[] algTypeList = {}; AnalyseRequest analyseRequest = null; if (redisService.hasKey(filterRequestRedisKey)) { // 初筛结果 analyseResult.setFilter("1"); // 设置初筛标志 analyseRequest = (AnalyseRequest) redisService.redisTemplate.opsForValue().getAndDelete(filterRequestRedisKey); - log.info("ALGO_RES_UUID: {}, FILTER_RESULT picAnalyseRetNotify analyseRequest IN REDIS: {}", logLabel, analyseRequest); - boolean isRemoved = delayQueueService.removeRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseRequest); - log.info("Redisson Removed Result: {}, analyseRequest: {}", isRemoved? "success" : "fail", analyseRequest); - - AnalyseResPoint analyseResPoint = analyseResult.getResultList().get(0).getResults().get(0); if (analyseRequest != null && analyseRequest.getObjectList() != null && !analyseRequest.getObjectList().isEmpty()) { + log.info("ALGORITHM_FILTER_RESULT taskPatrolId: {}, requestId: {}, DATA IN REDIS: {}", + analyseRequest.getTaskPatrolId(), + requestId, + analyseRequest); + boolean isRemoved = delayQueueService.removeRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseRequest); + log.info("ALGORITHM_FILTER_RESULT Redisson Removed requestId: {}, Result: {}, analyseRequest: {}", requestId, isRemoved ? "success" : "fail", analyseRequest); + algTypeList = analyseRequest.getObjectList().get(0).getTypeList(); analyseResult.reloadReq(analyseRequest);//只有一个,analyseReq.getObjectList().get(0) - log.info("ALGO_RES_UUID: {}, FILTER_RESULT picAnalyseRetNotify algTypeList: {}, analyseResult reload: {}", logLabel, algTypeList, analyseResult); + log.info("ALGORITHM_FILTER_RESULT RELOAD requestId: {}, algTypeList: {}, analyseResult: {}", + requestId, + algTypeList, + analyseResult); } - boolean bDefect = analyseResPoint.isDefect(); // code=2000代表初筛结果返回正常,value=1代表有缺陷 - log.info("ALGO_RES_UUID: {}, picAnalyseRetNotify FILTER bDefect={}, algorithmType={}", logLabel, bDefect, analyseResPoint.getType()); - /* *大致两种情况:1. 算法是表计(meter); 2. 算法不是表计(meter) * 1. 算法不是表计(meter) @@ -136,8 +147,15 @@ public class AnalyseRespServiceImpl implements IAnalyseRespService { * 2. 算法是表计(meter) * 不论初筛结果有没有缺陷,都要继续调用大模型,因为大模型调用了表计(meter)识别算法 */ + AnalyseResPoint analyseResPoint = analyseResult.getResultList().get(0).getResults().get(0); + boolean bDefect = analyseResPoint.isDefect(); // code=2000代表初筛结果返回正常,value=1代表有缺陷 + final String algType = analyseRequest.getObjectList().get(0).getTypeList()[0]; - log.info("ALGO_RES_UUID: {}, picAnalyseRetNotify algType IN REDIS: {}", logLabel, algType); + log.info("ALGORITHM_FILTER_RESULT CHECK DEFECT requestId: {}, bDefect: {}, algType: {}, algTypeInRedis: {}", + requestId, + bDefect, + analyseResPoint.getType(), + algType); if (bDefect || (AlgConstants.METER.equals(algType) || AlgConstants.XB.equals(algType) || AlgConstants.INFRA_1800.equals(algType) @@ -155,18 +173,16 @@ public class AnalyseRespServiceImpl implements IAnalyseRespService { // ispAlgorithmRequestService.sendRequest(analyseRequest); } else { //初筛结果无缺陷并且非表计算法,不用调用大模型,流程就此结束 - log.info("ALGO_RES_UUID: {}, picAnalyseRetNotify NO BIG_MODEL WOULD CALLED REQUEST_ID={}", logLabel, requestId); + log.info("ALGORITHM_FILTER_RESULT CALL_NO_BIG_MODEL requestId: {}", requestId); } } else if (redisService.hasKey(bigModelRequestRedisKey)) { // 大模型结果 analyseRequest = (AnalyseRequest) redisService.redisTemplate.opsForValue().getAndDelete(bigModelRequestRedisKey); - log.info("ALGO_RES_UUID: {}, BIG_MODEL_RESULT picAnalyseRetNotify analyseRequest IN REDIS: {}", logLabel, analyseRequest); - if(analyseRequest != null) { + if (analyseRequest != null) { analyseResult.reloadReq(analyseRequest); - log.info("ALGO_RES_UUID: {}, BIG_MODEL_RESULT picAnalyseRetNotify type: {}, analyseResult reload: {}", - logLabel, - analyseResult.getResultList().get(0).getResults().get(0).getType(), - analyseResult - ); + log.info("ALGORITHM_BIG_MODEL_RESULT taskPatrolId: {}, requestId: {}, DATA IN REDIS: {}", + analyseRequest.getTaskPatrolId(), + requestId, + analyseRequest); // 发给光明大模型处理 redisService.redisTemplate.opsForList().rightPush( @@ -177,8 +193,10 @@ public class AnalyseRespServiceImpl implements IAnalyseRespService { analyseResult.setFilter("0"); // 设置大模型标志为0 } else { - log.info("ALGO_RES_UUID: {}, UNKNOWN_RESULT, OR TIMEOUT BEYOND A DAY FROM REMOTE ALGORITHM, requestId: {}, analyseResult: {}", - logLabel, requestId, analyseResult); + log.info("ALGORITHM_UNKNOWN_RESULT taskPatrolId: {}, requestId: {}, result: {}", + analyseResult.getTaskPatrolId(), + requestId, + analyseResult); return; } @@ -191,20 +209,22 @@ public class AnalyseRespServiceImpl implements IAnalyseRespService { } //doAlgorithmAnalysis(analyseResult); //calculateProcess(analyseResult); - log.info(Color.CYAN + "ALGO_RES_UUID: {}, ###### 分析算法模块返回的表计识别结果 end ######" + Color.END, logLabel); + //log.info(Color.CYAN + "ALGO_RES_UUID: {}, ###### 分析算法模块返回的表计识别结果 end ######" + Color.END, requestId); if (bBigModelExecFlag) { -// ispAlgorithmRequestService.sendRequest(analyseRequest); - log.info("ALGO_RES_UUID: {}, picAnalyseRetNotify CALL BIG_MODEL REQUEST_ID={}", logLabel, requestId); -// final long requestTimeout = 1L; -// final String analyzeBigModelRequestIdRedisKey = AnalyseConstants.ANALYSE_AI_REQUEST.concat(requestId); -// redisService.setCacheObject(analyzeBigModelRequestIdRedisKey, analyseRequest.clone(), requestTimeout, TimeUnit.DAYS); + log.info("ALGORITHM_RESULT CALL_BIG_MODEL , taskPatrolId: {}, requestId: {}", analyseResult.getTaskPatrolId(), requestId); try { + analyseRequest.setFilter(false); analyseRemoteService.sendRequest(analyseRequest, algTypeList, Boolean.FALSE); } catch (IOException e) { - log.error("大模型算法调用异常: ", e); + log.info("ALGORITHM_RESULT BIG_MODEL_CALL Exception requestId: {}, Msg: {}", requestId, e.getMessage()); + throw e; } } + + log.info(Color.GREEN + "HANDLE_ALGORITHM_RESULT_E patrolPointId: {}, requestId: {}" + Color.END, + analyseResult.getTaskPatrolId(), + requestId); } @@ -769,7 +789,7 @@ public class AnalyseRespServiceImpl implements IAnalyseRespService { } // 计算初筛算法的进度 - if("1".equals(analyseResult.getFilter())) { + if ("1".equals(analyseResult.getFilter())) { calcRemoteAlgorithmProgress(analyseResult.getTotalNumber(), analyseResult.getTaskPatrolId()); } @@ -831,10 +851,10 @@ public class AnalyseRespServiceImpl implements IAnalyseRespService { PatrolTaskStatus patrolTaskStatus = new PatrolTaskStatus(); patrolTaskStatus.setTaskPatrolledId(taskPatrolledId); List patrolTaskStatusList = patrolTaskStatusService.selectPatrolTaskStatusList(patrolTaskStatus); - if(!patrolTaskStatusList.isEmpty()) { + if (!patrolTaskStatusList.isEmpty()) { patrolTaskStatus = patrolTaskStatusList.get(0); patrolTaskStatus.setTaskEstimatedTime(algorithmProgress); - if("100.0".equals(algorithmProgress)) { + if ("100.0".equals(algorithmProgress)) { patrolTaskStatus.setEndTime(DateUtil.formatDateTime(new Date())); log.info(Color.GREEN + "DONE calcRemoteAlgorithmProgress: curNumber: {}, totalNumer: {}, algorithmProgress: {}, taskPatrolledId: {}, db-taskProgress: {}" + Color.END, resultAnalysisList.size(), totalNumber, algorithmProgress, taskPatrolledId, patrolTaskStatus.getTaskProgress()); @@ -845,7 +865,7 @@ public class AnalyseRespServiceImpl implements IAnalyseRespService { log.info(Color.GREEN + "RUNNING calcRemoteAlgorithmProgress: curNumber: {}, totalNumer: {}, algorithmProgress: {}, taskPatrolledId: {}, status: {}" + Color.END, resultAnalysisList.size(), totalNumber, algorithmProgress, taskPatrolledId, patrolTaskStatus.getTaskState()); - if(TaskStatus.RUNNING.getCode().equals(patrolTaskStatus.getTaskState())) { + if (TaskStatus.RUNNING.getCode().equals(patrolTaskStatus.getTaskState())) { patrolTaskStatus.setTaskState(TaskStatus.RUNNING.getCode()); patrolTaskStatusService.updatePatrolTaskStatus(patrolTaskStatus); } diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/RedisQueueRetryableConsumerAsync.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/RedisQueueRetryableConsumerAsync.java index a416926..2ecf1b6 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/RedisQueueRetryableConsumerAsync.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/RedisQueueRetryableConsumerAsync.java @@ -1,32 +1,31 @@ package com.inspect.analysis.service.impl; import com.alibaba.nacos.shaded.com.google.gson.Gson; +import com.google.common.util.concurrent.RateLimiter; import com.inspect.analysis.service.RetryableRequest; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.RedisTemplate; import java.time.Duration; import java.util.concurrent.*; import java.util.function.Consumer; +@Slf4j public class RedisQueueRetryableConsumerAsync implements Runnable { - private static final Logger logger = LoggerFactory.getLogger(RedisQueueRetryableConsumerAsync.class); private final RedisTemplate redisTemplate; private final Gson gson = new Gson(); private final String queueKey; private final Class clazz; - private final long intervalMs; private final long idleSleepMs; private final Consumer handler; private volatile boolean running = false; - //private final ExecutorService executor = Executors.newFixedThreadPool(10); + private final RateLimiter rateLimiter; public ThreadPoolExecutor getExecutor() { return executor; @@ -35,7 +34,8 @@ public class RedisQueueRetryableConsumerAsync implements Runnable { private final ThreadPoolExecutor executor = new ThreadPoolExecutor( 4, // core threads 10, // max threads - 60, TimeUnit.SECONDS, + 60, + TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), // 队列容量 new ThreadPoolExecutor.CallerRunsPolicy() // 背压策略:调用者线程执行任务 ); @@ -43,15 +43,15 @@ public class RedisQueueRetryableConsumerAsync implements Runnable { public RedisQueueRetryableConsumerAsync(RedisTemplate redisTemplate, String queueKey, Class clazz, - long intervalMs, + double permitsPerSecond, // 每秒许可数,控制速率 long idleSleepMs, Consumer handler) { this.redisTemplate = redisTemplate; this.queueKey = queueKey; this.clazz = clazz; - this.intervalMs = intervalMs; this.idleSleepMs = idleSleepMs; this.handler = handler; + this.rateLimiter = RateLimiter.create(permitsPerSecond); } public void start() { @@ -76,11 +76,13 @@ public class RedisQueueRetryableConsumerAsync implements Runnable { continue; } + rateLimiter.acquire(); + T obj = gson.fromJson(json, clazz); // 判断线程池是否拥堵 if (executor.getQueue().size() > 80) { - logger.warn("RedisQueueRetryableConsumerAsync Thread pool queue is almost full, pushing back to Redis: {}", queueKey); + log.warn("RedisQueueRetryableConsumerAsync Thread pool queue is almost full, pushing back to Redis: {}", queueKey); // 推回 Redis 保证不丢数据 redisTemplate.opsForList().rightPush(queueKey, json); Thread.sleep(1000); @@ -91,7 +93,7 @@ public class RedisQueueRetryableConsumerAsync implements Runnable { try { handler.accept(obj); } catch (Exception e) { - logger.warn("RedisQueueRetryableConsumerAsync handler failed for obj={}, retrying...", json, e); + log.warn("RedisQueueRetryableConsumerAsync handler failed for obj={}, retrying...", json, e); // 反序列化带 retryCount 的对象 if (obj instanceof RetryableRequest) { @@ -101,22 +103,19 @@ public class RedisQueueRetryableConsumerAsync implements Runnable { rr.setRetryCount(retry + 1); redisTemplate.opsForList().rightPush(queueKey, gson.toJson(rr)); } else { - logger.error("RedisQueueRetryableConsumerAsync max retries reached for: {}", json); + log.error("RedisQueueRetryableConsumerAsync max retries reached for: {}", json); } } else { redisTemplate.opsForList().rightPush(queueKey, json); // 简单重入队 } } }); - - Thread.sleep(intervalMs); - } catch (Exception e) { - logger.error("RedisQueueRetryableConsumerAsync error", e); + log.error("RedisQueueRetryableConsumerAsync error", e); } } - logger.info("RedisQueueRetryableConsumerAsync for [{}] stopped.", queueKey); + log.info("RedisQueueRetryableConsumerAsync for [{}] stopped.", queueKey); } } diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/AnalyseRequest.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/AnalyseRequest.java index 8d6195d..f9ece7b 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/AnalyseRequest.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/AnalyseRequest.java @@ -12,6 +12,7 @@ import lombok.Setter; import java.io.Serializable; import java.util.ArrayList; +import java.util.Date; import java.util.List; @Setter @@ -34,6 +35,7 @@ public class AnalyseRequest implements RetryableRequest, Serializable { private int totalNumber; private int retryCount; private String requestUrl; + private String redissonTime; public AnalyseRequest() { @@ -56,6 +58,9 @@ public class AnalyseRequest implements RetryableRequest, Serializable { public String toErrorResultStr() { AnalyseResult analyseResult = new AnalyseResult(); analyseResult.setRequestId(this.requestId); + analyseResult.setCompensate(true); + analyseResult.setFilter(this.isFilter ? "1" : "0"); + analyseResult.setTaskPatrolId(this.getTaskPatrolId()); List list = new ArrayList<>(); analyseResult.setResultsList(list); diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRemoteService.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRemoteService.java index b385f73..a29591b 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRemoteService.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRemoteService.java @@ -10,6 +10,7 @@ import java.util.concurrent.TimeUnit; import com.inspect.base.core.constant.AlgConstants; import com.inspect.base.core.constant.RedisConst; +import com.inspect.base.core.utils.DateUtils; import com.inspect.base.core.utils.HttpClientUtils; import com.inspect.base.core.utils.StringUtils; import com.inspect.base.redis.service.RedisService; @@ -46,7 +47,9 @@ public class AnalyseRemoteService { //qinyl public void sendRequest(AnalyseRequest analyseReq, String[] typeList, boolean isFilter) throws IOException { final long requestTimeout = 1L; - String requestId = UUID.randomUUID().toString().trim().replaceAll(StringUtils.DASH, StringUtils.EMPTY); + //String requestId = UUID.randomUUID().toString().trim().replaceAll(StringUtils.DASH, StringUtils.EMPTY); + String requestId = StringUtils.isNotEmpty(analyseReq.getRequestId()) ? + analyseReq.getRequestId() : UUID.randomUUID().toString().trim().replaceAll(StringUtils.DASH, StringUtils.EMPTY); String taskPatrolId = analyseReq.getTaskPatrolId(); redisService.setCacheObject(RedisConst.REQUEST_UUID + requestId, taskPatrolId, requestTimeout, TimeUnit.DAYS); //log.info("CALL_REMOTE_ANALYZE isFilter: {}, requestId: {}, taskPatrolId: {}", isFilter, requestId, taskPatrolId); @@ -63,14 +66,18 @@ public class AnalyseRemoteService { String analyseFilter = patrolTaskService.selectConfigByKey(AnalyseConstants.ANALYSE_IS_FILTER); if ("1".equals(analyseFilter) && isFilter) { final String analyzeFilterRequestIdRedisKey = AnalyseConstants.ANALYSE_FILTER_REQUEST.concat(requestId); - log.info("[FILTER] sendRequest analyseFilterRequestIdRedisKey: {}, analyseReq: {}", analyzeFilterRequestIdRedisKey, analyseReq); + log.info("SEND_REQUEST_FILTER taskPatrolId: {}, requestId: {}, analyseFilterRequestIdRedisKey: {}, analyseReq: {}", + analyseReq.getTaskPatrolId(), + requestId, + analyzeFilterRequestIdRedisKey, + analyseReq); //redisService.setCacheObject(analyzeFilterRequestIdRedisKey, analyseReq.clone(), testMode?3L:requestTimeout, testMode?TimeUnit.MINUTES:TimeUnit.DAYS); redisService.setCacheObject(analyzeFilterRequestIdRedisKey, analyseReq.clone()); - if(testMode) { - delayQueueService.submitRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseReq, (3L), TimeUnit.MINUTES); - } else { - delayQueueService.submitRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseReq, requestTimeout, TimeUnit.DAYS); - } +// if(testMode) { +// delayQueueService.submitRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseReq, (3L), TimeUnit.MINUTES); +// } else { +// delayQueueService.submitRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseReq, requestTimeout, TimeUnit.DAYS); +// } AnalyseReqItem analyseReqItem = analyseReq.getObjectList().get(0);//只取第一个 analyseReqItem.setTypeList(typeList); analyseReq.setObjectList(Collections.singletonList(analyseReqItem)); @@ -85,14 +92,17 @@ public class AnalyseRemoteService { if(checkInfraredType(typeList)) { requestUrl = patrolTaskService.selectConfigByKey(AnalyseConstants.ANALYSIS_BIG_URL_INFRARED); - log.info("BIG_MODEL_INFRARED requestUrl: {}", requestUrl); + log.info("BIG_MODEL_INFRARED requestId: {}, requestUrl: {}", requestId, requestUrl); } else { requestUrl = patrolTaskService.selectConfigByKey(AnalyseConstants.ANALYSIS_BIG_URL); - log.info("BIG_MODEL_OTHER requestUrl: {}", requestUrl); + log.info("BIG_MODEL_OTHER requestId: {}, requestUrl: {}", requestId, requestUrl); } final String analyzeBigModelRequestIdRedisKey = AnalyseConstants.ANALYSE_AI_REQUEST.concat(requestId); - log.info("BIG_MODEL analyzeBigModelRequestIdRedisKey: {}, analyseReq: {}", analyzeBigModelRequestIdRedisKey, analyseReq); + log.info("SEND_REQUEST_BIG_MODEL requestId: {}, analyzeBigModelRequestIdRedisKey: {}, analyseReq: {}", + requestId, + analyzeBigModelRequestIdRedisKey, + analyseReq); redisService.setCacheObject(analyzeBigModelRequestIdRedisKey, analyseReq.clone(), requestTimeout, TimeUnit.DAYS); } @@ -102,15 +112,19 @@ public class AnalyseRemoteService { analyseReq.setFilter(isFilter); analyseReq.setRequestUrl(requestUrl.concat(AnalyseConstants.ANALYSE_URI)); + analyseReq.setRedissonTime(DateUtils.parseDateToStr(DateUtils.yyyyMMddHHmmss, new Date())); - try { - retryDelegate.callRemoteAnalyseService(analyseReq); - } catch (IOException e) { - log.info("FINALLY FAIL CALL REMOTE ANALYSE SERVICE, requestId: {}, taskPatrolId: {}, error: {}", + if(testMode) { + delayQueueService.submitRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseReq, (3L), TimeUnit.MINUTES); + } else { + delayQueueService.submitRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseReq, requestTimeout, TimeUnit.DAYS); + } + + if(!retryDelegate.callRemoteAnalyseService(analyseReq)) { + log.info("FINALLY FAIL CALL REMOTE ANALYSE SERVICE, requestId: {}, taskPatrolId: {}", analyseReq.getRequestId(), - analyseReq.getTaskPatrolId(), - e.getMessage()); - throw e; + analyseReq.getTaskPatrolId()); + throw new IOException("Remote analyse service failed"); } } diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestDelayQueueListener.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestDelayQueueListener.java index 9547689..dafee77 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestDelayQueueListener.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestDelayQueueListener.java @@ -45,15 +45,22 @@ public class AnalyseRequestDelayQueueListener implements InitializingBean { } private void handleRequest(AnalyseRequest request) { - String redisKey = AnalyseConstants.ANALYSE_FILTER_REQUEST.concat(request.getRequestId()); - Object cached = redisService.redisTemplate.opsForValue().getAndDelete(redisKey); + String redisKey = request.isFilter() ? AnalyseConstants.ANALYSE_FILTER_REQUEST.concat(request.getRequestId()) + : AnalyseConstants.ANALYSE_AI_REQUEST.concat(request.getRequestId()); + //Object cached = redisService.redisTemplate.opsForValue().getAndDelete(redisKey); + Object cached = redisService.redisTemplate.opsForValue().get(redisKey); if (cached != null) { // 正常到期但业务未处理,走补偿逻辑 - log.info("AnalyseRequest Been Not Consumed, Compensate Triggered: {}", request); + log.info("ANALYSE_REQUEST_DELAY_QUEUE COMPENSATE patrolPointId: {}, requestId: {}, BODY: {}", + request.getTaskPatrolId(), + request.getRequestId(), + request); analyseRemoteService.sendCompensateRequest(request); } else { - // 正常业务已完成,什么也不做 - log.info("AnalyseRequest Been Normally Consumed: {}", request.getRequestId()); + // 正常业务已完成,什么也不做,正常情况下不会进入这个逻辑 + log.info("ANALYSE_REQUEST_DELAY_QUEUE CONSUMED: patrolPointId: {}, requestId: {}", + request.getTaskPatrolId(), + request.getRequestId()); } } } diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestRetryableDelegate.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestRetryableDelegate.java index c62d17b..cb5a9f6 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestRetryableDelegate.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestRetryableDelegate.java @@ -6,8 +6,11 @@ import com.inspect.analysis.utils.SimpleHttpClient; import com.inspect.partrolresult.domain.AnalyseRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.core.io.InputStreamResource; +import org.springframework.http.ResponseEntity; import org.springframework.retry.RetryContext; import org.springframework.retry.annotation.Backoff; +import org.springframework.retry.annotation.Recover; import org.springframework.retry.annotation.Retryable; import org.springframework.retry.support.RetrySynchronizationManager; import org.springframework.stereotype.Component; @@ -22,7 +25,7 @@ public class AnalyseRequestRetryableDelegate { value = IOException.class, maxAttempts = 5, backoff = @Backoff(delay = 2000, multiplier = 1.0)) - public void callRemoteAnalyseService(final AnalyseRequest analyseReq) throws IOException { + public boolean callRemoteAnalyseService(final AnalyseRequest analyseReq) throws IOException { RetryContext retryContext = RetrySynchronizationManager.getContext(); int retryCount = retryContext != null ? retryContext.getRetryCount() : 0; log.info("CALL_REMOTE_ANALYZE retryCount: {}, requestId: {}, isFilter: {}, URL: {}, PARAMS: {}", @@ -32,5 +35,14 @@ public class AnalyseRequestRetryableDelegate { if (!"200".equals(JSONObject.parseObject(result).getString(AnalyseConstants.ANALYSE_CODE))) { log.info("CALL_REMOTE_ANALYZE FAIL: {}", JSONObject.parseObject(result).getString(AnalyseConstants.ANALYSE_CODE)); } + + return true; + } + + @SuppressWarnings("unused") + @Recover + public boolean recover(IOException e, final AnalyseRequest analyseReq) { + log.info("CALL_REMOTE_ANALYZE RECOVER: requestId: {}", analyseReq.getRequestId()); + return false; } } diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/task/controller/PatrolTaskController.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/task/controller/PatrolTaskController.java index b976062..061ce43 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/task/controller/PatrolTaskController.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/task/controller/PatrolTaskController.java @@ -150,10 +150,10 @@ public class PatrolTaskController extends BaseController { } } - if(StringUtils.isNotEmpty(patrolTask.getAreaName())) { + if (StringUtils.isNotEmpty(patrolTask.getAreaName())) { List taskListEx = new ArrayList<>(); for (PatrolTask task : taskList) { - if(StringUtils.isNotEmpty(task.getAreaName()) && task.getAreaName().contains(patrolTask.getAreaName())) { + if (StringUtils.isNotEmpty(task.getAreaName()) && task.getAreaName().contains(patrolTask.getAreaName())) { taskListEx.add(task); } } @@ -2769,8 +2769,8 @@ public class PatrolTaskController extends BaseController { cell10.setCellValue(MessageUtils.get("分析图片")); Cell cell11 = row.createCell(11); cell11.setCellValue(MessageUtils.get("分析结果")); - HSSFPatriarch patriarch = (HSSFPatriarch)sheet.createDrawingPatriarch(); - for(int i = 0; i < newList.size(); ++i) { + HSSFPatriarch patriarch = (HSSFPatriarch) sheet.createDrawingPatriarch(); + for (int i = 0; i < newList.size(); ++i) { Row row1 = sheet.createRow(i + 1); Cell cell12 = row1.createCell(0); cell12.setCellValue(i + 1);