diff --git a/inspect-job/src/main/java/com/inspect/job/task/JobMainTask.java b/inspect-job/src/main/java/com/inspect/job/task/JobMainTask.java index bfef263..8f6aa53 100644 --- a/inspect-job/src/main/java/com/inspect/job/task/JobMainTask.java +++ b/inspect-job/src/main/java/com/inspect/job/task/JobMainTask.java @@ -593,7 +593,7 @@ public class JobMainTask { return rectangles.toString(); } - private void addPointRecord(boolean bOk, + synchronized private void addPointRecord(boolean bOk, PatrolTaskExecRecord taskExecRecord, PatrolTaskInfo taskInfo, PatrolPresetPos presetPos, @@ -695,7 +695,7 @@ public class JobMainTask { } } - private int addCursorNumber(String taskPatrolId, boolean bOk, int infoListSize) { + synchronized private int addCursorNumber(String taskPatrolId, boolean bOk, int infoListSize) { PatrolTaskExecRecord taskExecRecord = taskExecClient.selectPatrolTaskExecRecordByTaskPatrolId(taskPatrolId); int total = taskExecRecord.getTotalNumber(); @@ -752,7 +752,7 @@ public class JobMainTask { redisService.deleteObjectOfTask(RedisConst.TASK_CURRENT_CODE, curTaskCode); } - private void windUpRecord(String taskPatrolId, int infoListSize) { + synchronized private void windUpRecord(String taskPatrolId, int infoListSize) { PatrolTaskExecRecord execRecord = taskExecClient.selectPatrolTaskExecRecordByTaskPatrolId(taskPatrolId); final int total = execRecord.getTotalNumber(); final int cursor = execRecord.getCursorNumber(); @@ -818,24 +818,28 @@ public class JobMainTask { StringBuffer fileTypes, StringBuffer filePaths) { - log.info(Color.CYAN + "**************************addPointRecord start" + Color.END); + log.info(Color.CYAN + "recordPersistTrace bOk: {}" + Color.END, bOk); try { addPointRecord(bOk, taskExecRecord, taskInfo, presetPos, fileTypes, filePaths); } catch (Exception e) { log.error("error", e); } - log.info(Color.CYAN + "**************************addPointRecord end" + Color.END); + //log.info(Color.CYAN + "**************************addPointRecord end" + Color.END); - log.info(Color.CYAN + "**************************addCursorNumber start" + Color.END); + log.info(Color.CYAN + "addCursorNumberStart" + Color.END); int cursorNumber = 0; try { cursorNumber = addCursorNumber(taskExecRecord.getTaskPatrolId(), bOk, infoListSize); } catch (Exception e) { log.error("error", e); } - log.info(Color.CYAN + "**************************addCursorNumber end" + Color.END); - if (cursorNumber == infoListSize) {// devNo下对应的点执行完毕 + PatrolTaskExecRecord taskExecRecordEx = taskExecClient.selectPatrolTaskExecRecordByTaskPatrolId(taskExecRecord.getTaskPatrolId()); + int totalNumber = taskExecRecordEx.getTotalNumber(); + log.info(Color.CYAN + "addCursorNumberEnd: cursorNumber: {}, batchNumber: {}, total: {}" + Color.END, cursorNumber, infoListSize, totalNumber); + + //if (cursorNumber == infoListSize) {// devNo下对应的点执行完毕 + if (cursorNumber == totalNumber) {// devNo下对应的点执行完毕 log.info(Color.CYAN + "**************************windUpRecord start" + Color.END); windUpRecord(taskExecRecord.getTaskPatrolId(), infoListSize); log.info(Color.CYAN + "**************************windUpRecord end" + Color.END); diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/constant/AnalyseConstants.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/constant/AnalyseConstants.java index 01b7eab..4936b93 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/constant/AnalyseConstants.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/constant/AnalyseConstants.java @@ -38,4 +38,5 @@ public class AnalyseConstants { public static final String MAX_NUM = "MAX_NUM"; public static final String ALGORITHM_REQUEST_QUEUE = "algorithm:request:queue"; + public static final String ALGORITHM_RESPONSE_QUEUE = "algorithm:response:queue"; } diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/IAnalysisService.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/IAnalysisService.java index 1084532..e1a4ced 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/IAnalysisService.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/IAnalysisService.java @@ -4,4 +4,6 @@ import com.inspect.analysis.domain.AnalyseResult; public interface IAnalysisService { void picAnalyseRetNotify(AnalyseResult analyseResult); + + void handleAlgorithmResult(final String analyseResultJson); } diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmProcessConsumer.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestProcessConsumer.java similarity index 93% rename from inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmProcessConsumer.java rename to inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestProcessConsumer.java index eb2e396..f89a8bd 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmProcessConsumer.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestProcessConsumer.java @@ -14,8 +14,8 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; @Component -public class AlgorithmProcessConsumer { - private final Logger logger = LoggerFactory.getLogger(AlgorithmProcessConsumer.class); +public class AlgorithmRequestProcessConsumer { + private final Logger logger = LoggerFactory.getLogger(AlgorithmRequestProcessConsumer.class); @Resource private RedisService redisService; @@ -23,7 +23,7 @@ public class AlgorithmProcessConsumer { @Resource private AnalyseRemoteService analyseRemoteService; - @Scheduled(fixedDelay = 3000) + @Scheduled(fixedDelay = 1000) public void pollAndProcess() { String requestDataInRedis; RedisTemplate redisTemplate = redisService.redisTemplate; diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseProcessConsumer.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseProcessConsumer.java new file mode 100644 index 0000000..1cb40e7 --- /dev/null +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseProcessConsumer.java @@ -0,0 +1,46 @@ +package com.inspect.analysis.service.impl; + +import com.alibaba.nacos.shaded.com.google.gson.Gson; +import com.inspect.analysis.constant.AnalyseConstants; +import com.inspect.analysis.service.IAnalysisService; +import com.inspect.base.redis.service.RedisService; +import com.inspect.partrolresult.domain.AnalyseRequest; +import com.inspect.partrolresult.service.AnalyseRemoteService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +@Component +public class AlgorithmResponseProcessConsumer { + private final Logger logger = LoggerFactory.getLogger(AlgorithmResponseProcessConsumer.class); + + @Resource + private RedisService redisService; + + @Resource + private IAnalysisService analysisService; + + @Scheduled(fixedDelay = 1000) + public void pollAndProcess() { + String responseDataInRedis; + RedisTemplate redisTemplate = redisService.redisTemplate; + while ((responseDataInRedis = redisTemplate.opsForList().leftPop(AnalyseConstants.ALGORITHM_RESPONSE_QUEUE)) != null) { + try { + logger.info("AlgorithmResponseProcessConsumer pollAndProcess queueSize: {}, responseData:{}", getQueueSize(), responseDataInRedis); +// AnalyseRequest analyseRequest = new Gson().fromJson(responseDataInRedis, AnalyseRequest.class); +// analyseRemoteService.sendRequest(analyseRequest, analyseRequest.getTypeList(), analyseRequest.isFilter()); + analysisService.handleAlgorithmResult(responseDataInRedis); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + } + + public long getQueueSize() { + return redisService.redisTemplate.opsForList().size(AnalyseConstants.ALGORITHM_RESPONSE_QUEUE); + } +} diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisServiceImpl.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisServiceImpl.java index e446c11..9e928e4 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisServiceImpl.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisServiceImpl.java @@ -72,8 +72,103 @@ public class AnalysisServiceImpl implements IAnalysisService { @Resource private ResultAnalysisUtils resultAnalysisUtils; +// @Override +// public void picAnalyseRetNotify(AnalyseResult analyseResult) { +// log.info(Color.CYAN + "###### 分析算法模块返回的表计识别结果 start, analyseResult:{} ######" + Color.END, analyseResult); +// String requestId = analyseResult.getRequestId(); +// String keyId = AnalyseConstants.ANALYSE_REQUEST_ID.concat(requestId); +// if (!redisService.hasKey(keyId)) { +// log.error("picAnalyseRetNotify isTest: {}, NO keyId={}, REQUEST_ID={} in REDIS!", analyseResult.isTest(), keyId, requestId); +// if (!analyseResult.isTest()) { +// return; +// } +// //redisService.setCacheObject(keyId, "123456789"); +// } +// +// String patrolTaskIdObj = redisService.getCacheObject(keyId); +// log.info("picAnalyseRetNotify keyId={}, requestId={}, patrolTaskIdObj={}", keyId, requestId, patrolTaskIdObj); +// analyseResult.setTaskPatrolId(patrolTaskIdObj); +// String keyFilterRequest = AnalyseConstants.ANALYSE_FILTER_REQUEST + requestId; +// log.info("picAnalyseRetNotify keyFilterRequest={}", keyFilterRequest); +// boolean bBigModelExecFlag = false; +// String[] algTypeList = {}; +// AnalyseRequest analyseRequest = null; +// if (redisService.hasKey(keyFilterRequest)) { // 初筛结果 +// analyseResult.setFilter("1"); // 设置初筛标志 +// analyseRequest = (AnalyseRequest) redisService.redisTemplate.opsForValue().getAndDelete(keyFilterRequest); +// log.info("FILTER_RESULT picAnalyseRetNotify analyseRequest IN REDIS: {}", analyseRequest); +// AnalyseResPoint analyseResPoint = analyseResult.getResultList().get(0).getResults().get(0); +// if (analyseRequest != null && analyseRequest.getObjectList() != null && !analyseRequest.getObjectList().isEmpty()) { +// algTypeList = analyseRequest.getObjectList().get(0).getTypeList(); +// } +// +// boolean bDefect = analyseResPoint.isDefect(); // code=2000代表初筛结果返回正常,value=1代表有缺陷 +// log.info("picAnalyseRetNotify FILTER bDefect={}, algorithmType={}", bDefect, analyseResPoint.getType()); +// analyseResult.reloadReq(analyseRequest);//只有一个,analyseReq.getObjectList().get(0) +// /* +// *大致两种情况:1. 算法是表计(meter); 2. 算法不是表计(meter) +// * 1. 算法不是表计(meter) +// * 1.1 初筛结果有缺陷:继续发给大模型处理 +// * 1.2 初筛结果无缺陷:不用调用大模型,流程就此结束 +// * 2. 算法是表计(meter) +// * 不论初筛结果有没有缺陷,都要继续调用大模型,因为大模型调用了表计(meter)识别算法 +// */ +// final String algType = analyseRequest.getObjectList().get(0).getTypeList()[0]; +// log.info("picAnalyseRetNotify algType IN REDIS: {}", algType); +// if (bDefect || (AlgConstants.METER.equals(algType) +// || AlgConstants.XB.equals(algType) +// || AlgConstants.INFRA_1800.equals(algType) +// || AlgConstants.INFRA_YU3.equals(algType) +// || AlgConstants.INFRA_CAMERA.equals(algType) +// || AlgConstants.INFRA_CAMERA_REVERSE.equals(algType)) +// ) { +// /* +// * 先检查结果有缺无缺陷,如果有缺陷不用判断算法直接调用大模型;* +// * 如果无缺陷,再去判断算法,如果算法是meter就继续调用大模型。* +// */ +// analyseResult.setResult("0"); +// bBigModelExecFlag = true; +//// log.info("picAnalyseRetNotify CALL BIG_MODEL REQUEST_ID={}", requestId); +//// ispAlgorithmRequestService.sendRequest(analyseRequest); +// } else { +// //初筛结果无缺陷并且非表计算法,不用调用大模型,流程就此结束 +// log.info("picAnalyseRetNotify NO BIG_MODEL WOULD CALLED REQUEST_ID={}", requestId); +// } +// } else { +// // 大模型结果 +// log.info("BIG_MODEL_RESULT CALLBACK picAnalyseRetNotify REQUEST_ID={}", requestId); +// analyseResult.setFilter("0"); // 设置大模型标志为0 +// } +// +// //qinyl +// analysisLogService.log(new AnalyseLog(analyseResult.toString(), "1", analyseResult.getTaskPatrolId(), analyseResult.getFilter(), requestId)); +// try { +// analysis(analyseResult); +// } catch (Exception e) { +// log.error("error", e); +// } +// //doAlgorithmAnalysis(analyseResult); +// //calculateProcess(analyseResult); +// log.info(Color.CYAN + "###### 分析算法模块返回的表计识别结果 end ######" + Color.END); +// +// if (bBigModelExecFlag) { +// log.info("picAnalyseRetNotify CALL BIG_MODEL REQUEST_ID={}", requestId); +//// ispAlgorithmRequestService.sendRequest(analyseRequest); +// analyseRemoteService.sendRequest(analyseRequest, algTypeList, false); +// } +// } + @Override public void picAnalyseRetNotify(AnalyseResult analyseResult) { + log.info("Algorithm Result To Redis!"); + redisService.redisTemplate.opsForList().rightPush( + AnalyseConstants.ALGORITHM_RESPONSE_QUEUE, + new Gson().toJson(analyseResult) + ); + } + + public void handleAlgorithmResult(final String analyseResultJson) { + AnalyseResult analyseResult = new Gson().fromJson(analyseResultJson, AnalyseResult.class); log.info(Color.CYAN + "###### 分析算法模块返回的表计识别结果 start, analyseResult:{} ######" + Color.END, analyseResult); String requestId = analyseResult.getRequestId(); String keyId = AnalyseConstants.ANALYSE_REQUEST_ID.concat(requestId); @@ -158,6 +253,7 @@ public class AnalysisServiceImpl implements IAnalysisService { } } + public synchronized void doAlgorithmAnalysis(AnalyseResult analyseResult) { log.info("doAlgorithmAnalysis analyseResult: {}", analyseResult); int size = 0; diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRemoteService.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRemoteService.java index d0d7665..f5bead8 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRemoteService.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRemoteService.java @@ -45,9 +45,10 @@ public class AnalyseRemoteService { analyseReq.setRequestId(requestId); String taskSetKey = AnalyseConstants.ANALYSE_TASK_REQUEST + taskPatrolId; String requestSetKey = AnalyseConstants.ANALYSE_REQUEST_ALG + requestId; - redisService.setCacheObject(AnalyseConstants.ANALYSE_REQUEST_ID.concat(requestId), taskPatrolId, 20L, TimeUnit.MINUTES); + final long requestTimeout = 720; + redisService.setCacheObject(AnalyseConstants.ANALYSE_REQUEST_ID.concat(requestId), taskPatrolId, requestTimeout, TimeUnit.MINUTES); redisService.redisTemplate.opsForSet().add(taskSetKey, requestId); - redisService.expire(taskSetKey, 20L, TimeUnit.MINUTES); + redisService.expire(taskSetKey, requestTimeout, TimeUnit.MINUTES); String requestUrl; String analyseFilter = patrolTaskService.selectConfigByKey(AnalyseConstants.ANALYSE_IS_FILTER);