diff --git a/inspect-job/src/main/java/com/inspect/job/client/TaskExecClient.java b/inspect-job/src/main/java/com/inspect/job/client/TaskExecClient.java index 1418ffc..7f3639d 100644 --- a/inspect-job/src/main/java/com/inspect/job/client/TaskExecClient.java +++ b/inspect-job/src/main/java/com/inspect/job/client/TaskExecClient.java @@ -98,6 +98,9 @@ public interface TaskExecClient { @PostMapping({"/exec/taskResultExised"}) boolean taskResultExited(@RequestParam("patrolDeviceCode") String deviceCode, @RequestParam("taskPatrolledId") String patrolId); + @GetMapping({"/exec/getFilterPatrolTaskList"}) + List getFilterPatrolTaskList(final PatrolTask patrolTask); + @PostMapping({"/exec/makeCurrentDayTask"}) Map> makeCurrentDayTask(); diff --git a/inspect-job/src/main/java/com/inspect/job/domain/task/PatrolDevice.java b/inspect-job/src/main/java/com/inspect/job/domain/task/PatrolDevice.java index f7a1c2d..125ef5f 100644 --- a/inspect-job/src/main/java/com/inspect/job/domain/task/PatrolDevice.java +++ b/inspect-job/src/main/java/com/inspect/job/domain/task/PatrolDevice.java @@ -17,4 +17,6 @@ public class PatrolDevice { private String receiveCode; @JSONField(name = "Type") private String type; + @JSONField(name = "TotalNumber") + private Integer totalNumber; } diff --git a/inspect-job/src/main/java/com/inspect/job/task/JobMainTask.java b/inspect-job/src/main/java/com/inspect/job/task/JobMainTask.java index 2ab24a9..41b0c76 100644 --- a/inspect-job/src/main/java/com/inspect/job/task/JobMainTask.java +++ b/inspect-job/src/main/java/com/inspect/job/task/JobMainTask.java @@ -643,12 +643,12 @@ public class JobMainTask { } //if (bOk) - if(!testMode) + //if(!testMode) { String patrolId = taskExecRecord.getTaskPatrolId(); String[] ids = patrolId.split(StringUtils.UNDERLINE); String taskPatrolIdRemote = ids[1] + "_" + ids[2]; - callRemoteSendMsgCtrlMode(taskPatrolIdRemote, pointExecRecord);/*上报巡视结果*/ + callRemoteSendMsgCtrlMode(taskPatrolIdRemote, taskExecRecord.getTotalNumber(), pointExecRecord);/*上报巡视结果*/ } } @@ -675,7 +675,10 @@ public class JobMainTask { } } - private void callRemoteSendMsgCtrlMode(final String taskPatrolId, PatrolTaskPointExecRecord pointExecRecord) { + private void callRemoteSendMsgCtrlMode( + final String taskPatrolId, + final int totalNumber, + PatrolTaskPointExecRecord pointExecRecord) { PatrolResult patrolResult = PatrolResult.builder() .taskPatrolledId(taskPatrolId) .deviceId(pointExecRecord.getDeviceId()) @@ -705,6 +708,7 @@ public class JobMainTask { .receiveCode("") .sendCode("") .items(patrolResults) + .totalNumber(totalNumber) .build(); Object patrolResultDataJsonObj = JSONArray.toJSON(patrolResultData); try { @@ -933,16 +937,18 @@ public class JobMainTask { } @PostMapping({"/immediatelyExecTask"}) - public void immediatelyExecTask(@RequestBody PatrolTask patrolTask) { - String taskCode = redisService.getCacheObjectOfTask(RedisConst.TASK_CURRENT_CODE, patrolTask.getTaskCode()); + public void immediatelyExecTask(@RequestBody PatrolTask patrolTaskParam) { + List patrolTasks = taskExecClient.getFilterPatrolTaskList(patrolTaskParam); + PatrolTask patrolTask = patrolTasks.get(0); + String taskCode = redisService.getCacheObjectOfTask(RedisConst.TASK_CURRENT_CODE, patrolTaskParam.getTaskCode()); // if (StringUtils.isNotEmpty(taskCode)) { // log.info(Color.CYAN + "TASK SLOT IS TAKEN BY: {}" + Color.END, taskCode); // return; // } PatrolTaskInfo patrolTaskInfo = new PatrolTaskInfo(); - List patrolTaskList = new ArrayList<>(); - patrolTaskList.add(patrolTask); + // patrolTaskList.add(patrolTask); + List patrolTaskList = new ArrayList<>(patrolTasks); List cameraPatrolTasks = new ArrayList<>(); assembleVideoTask(patrolTaskList, cameraPatrolTasks); String redisImmediatelyExecTaskTime = redisService.getCacheObjectOfTask(RedisConst.IMMEDIATELY_EXEC_TASK_TIME, patrolTask.getTaskCode()); @@ -974,7 +980,8 @@ public class JobMainTask { Date time = patrolTask.getFixedStartTime(); String timeStr = time == null ? immediatelyExecTaskTime : DateUtils.parseDateToStr(DateUtils.yyyyMMddHHmmss, time); patrolTaskExecRecord.setTaskPatrolId(devNos[0] + "_" + patrolTask.getTaskCode() + "_" + immediatelyExecTaskTime);// 这边暂定devNos[0] - patrolTaskExecRecord.setOldTaskPatrolId(patrolTask.getTaskCode() + "_" + timeStr + "_" + immediatelyExecTaskTime); + //patrolTaskExecRecord.setOldTaskPatrolId(patrolTask.getTaskCode() + "_" + timeStr + "_" + immediatelyExecTaskTime); + patrolTaskExecRecord.setOldTaskPatrolId(patrolTask.getTaskCode() + "_" + timeStr); patrolTaskExecRecord.setTaskName(patrolTask.getTaskName()); patrolTaskExecRecord.setTaskCode(patrolTask.getTaskCode()); patrolTaskExecRecord.setTaskState(TaskStatus.RUNNING.getCode()); @@ -1725,7 +1732,8 @@ public class JobMainTask { private String getOldTaskPatrolId(final PatrolTask patrolTask) { String time = DateUtils.format(DateUtils.yyyyMMddHHmmss, patrolTask.getFixedStartTime()); - return patrolTask.getTaskCode() + "_" + time + "_" + time; + //return patrolTask.getTaskCode() + "_" + time + "_" + time; + return patrolTask.getTaskCode() + "_" + time; } @SuppressWarnings({"unused"}) diff --git a/inspect-main/inspect-main-task-exec/src/main/java/com/inspect/exec/controller/PatrolTaskExecController.java b/inspect-main/inspect-main-task-exec/src/main/java/com/inspect/exec/controller/PatrolTaskExecController.java index 749f7ca..042b265 100644 --- a/inspect-main/inspect-main-task-exec/src/main/java/com/inspect/exec/controller/PatrolTaskExecController.java +++ b/inspect-main/inspect-main-task-exec/src/main/java/com/inspect/exec/controller/PatrolTaskExecController.java @@ -470,6 +470,11 @@ public class PatrolTaskExecController extends BaseController { return IntervalType.BY_DATE.getCode().equals(patrolTask.getIntervalType()); } + @PostMapping({"/getFilterPatrolTaskList"}) + List getFilterPatrolTaskList(PatrolTask task) { + return filterPatrolTaskList(task); + } + @PostMapping({"/makeCurrentDayTask"}) public Map> makeCurrentDayTask() { try { @@ -778,7 +783,7 @@ public class PatrolTaskExecController extends BaseController { } - private List getPatrolTasks(PatrolTask task) { + private List remakePatrolTaskList(PatrolTask task) { String[] devNos = task.getDevNo().split(StringUtils.COMMA); String[] devTypes = task.getDevType().split(StringUtils.COMMA); List patrolTaskList = new ArrayList<>(); @@ -842,6 +847,19 @@ public class PatrolTaskExecController extends BaseController { return patrolTaskList; } + private List filterPatrolTaskList(PatrolTask taskParam) { + List patrolTaskListEx = patrolTaskService.selectPatrolTaskList(taskParam); + if(patrolTaskListEx.isEmpty()) + return new ArrayList<>(); + + PatrolTask task = patrolTaskListEx.get(0); + return remakePatrolTaskList(task); + } + + private List getPatrolTasks(PatrolTask task) { + return remakePatrolTaskList(task); + } + @PostMapping({"/getPatrolTaskStatusByTaskPatrolledId"}) public PatrolTaskStatus getPatrolTaskStatusByTaskPatrolledId(String taskPatrolledId) { PatrolTaskStatus patrolTaskStatus = new PatrolTaskStatus(); diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/domain/AnalyseResult.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/domain/AnalyseResult.java index 7694da9..fad70f3 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/domain/AnalyseResult.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/domain/AnalyseResult.java @@ -19,6 +19,7 @@ public class AnalyseResult implements Serializable { private String filter = "0"; private String result = "1"; private boolean isTest = false; + private int totalNumber; public void setResultsList(List resultsList) { this.resultsList = resultsList; @@ -29,6 +30,7 @@ public class AnalyseResult implements Serializable { } public void reloadReq(AnalyseRequest analyseRequest) { + this.totalNumber = analyseRequest.getTotalNumber(); AnalyseResItem analyseResItem = this.resultList.get(0); List resultList = new ArrayList<>(); this.setResultsList(resultList); diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/IAnalysisService.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/IAnalysisService.java index e1a4ced..c9886da 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/IAnalysisService.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/IAnalysisService.java @@ -5,5 +5,5 @@ import com.inspect.analysis.domain.AnalyseResult; public interface IAnalysisService { void picAnalyseRetNotify(AnalyseResult analyseResult); - void handleAlgorithmResult(final String analyseResultJson); + void handleAlgorithmResult(final AnalyseResult analyseResult); } diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestConsumerManager.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestConsumerManager.java new file mode 100644 index 0000000..e77bfb9 --- /dev/null +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestConsumerManager.java @@ -0,0 +1,60 @@ +package com.inspect.analysis.service.impl; + +import com.inspect.analysis.constant.AnalyseConstants; +import com.inspect.base.core.constant.Color; +import com.inspect.base.redis.service.RedisService; +import com.inspect.partrolresult.domain.AnalyseRequest; +import com.inspect.partrolresult.service.AnalyseRemoteService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; +import javax.annotation.PreDestroy; + +@Component +public class AlgorithmRequestConsumerManager { + private static final Logger log = LoggerFactory.getLogger(AlgorithmRequestConsumerManager.class); + + private RedisQueueConsumerAsync consumer; + + @Resource + private RedisService redisService; + + @Resource + private AnalyseRemoteService analyseRemoteService; + + @PostConstruct + public void initConsumer() { + consumer = new RedisQueueConsumerAsync( + redisService.redisTemplate, + AnalyseConstants.ALGORITHM_REQUEST_QUEUE, + AnalyseRequest.class, + 1000, + 500, + request -> { + try { + log.info("AlgorithmRequestConsumerManager queueSize: {}, request: {}", getQueueSize(), request); + analyseRemoteService.sendRequest(request, request.getTypeList(), request.isFilter()); + } catch (Exception e) { + log.info(Color.RED + "AlgorithmRequestConsumerManager error queueSize: {}, request: {}" + Color.END, getQueueSize(), request); + } + } + ); + + consumer.start(); // 应用启动时即开始消费 + } + + @PreDestroy + public void stopConsumer() { + if (consumer != null) { + consumer.stop(); + } + } + + public long getQueueSize() { + return redisService.redisTemplate.opsForList().size(AnalyseConstants.ALGORITHM_REQUEST_QUEUE); + } +} + diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestProcessConsumer.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestProcessConsumer.java index 4906724..aa11c31 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestProcessConsumer.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestProcessConsumer.java @@ -14,11 +14,15 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.Resource; +import java.time.Duration; -@Component +//@Component public class AlgorithmRequestProcessConsumer { private final Logger logger = LoggerFactory.getLogger(AlgorithmRequestProcessConsumer.class); + @Value("${task.test-mode:false}") + private boolean testMode; + @Resource private RedisService redisService; @@ -26,20 +30,36 @@ public class AlgorithmRequestProcessConsumer { private AnalyseRemoteService analyseRemoteService; //@Scheduled(fixedDelay = 1000) +// @Scheduled(fixedDelayString = "${task.scheduler.request.delay-ms:3000}") +// public void pollAndProcess() { +// //logger.info(Color.YELLOW + "AlgorithmRequestProcessConsumerTracer" + Color.END); +// String requestDataInRedis; +// RedisTemplate redisTemplate = redisService.redisTemplate; +// while ((requestDataInRedis = redisTemplate.opsForList().leftPop(AnalyseConstants.ALGORITHM_REQUEST_QUEUE, Duration.ofMillis(500))) != null) { +// try { +// logger.info(Color.CYAN + "AlgorithmRequestProcessConsumerTracer queueSize: {}, requestData: {}" + Color.END, getQueueSize(), requestDataInRedis); +// AnalyseRequest analyseRequest = new Gson().fromJson(requestDataInRedis, AnalyseRequest.class); +// analyseRemoteService.sendRequest(analyseRequest, analyseRequest.getTypeList(), analyseRequest.isFilter()); +// } catch (Exception e) { +// logger.error(e.getMessage(), e); +// } +// } +// } + @Scheduled(fixedDelayString = "${task.scheduler.request.delay-ms:3000}") public void pollAndProcess() { - //logger.info(Color.YELLOW + "AlgorithmRequestProcessConsumerTracer" + Color.END); - String requestDataInRedis; - RedisTemplate redisTemplate = redisService.redisTemplate; - while ((requestDataInRedis = redisTemplate.opsForList().leftPop(AnalyseConstants.ALGORITHM_REQUEST_QUEUE)) != null) { - try { - logger.info(Color.CYAN + "AlgorithmRequestProcessConsumerTracer queueSize: {}, requestData: {}" + Color.END, getQueueSize(), requestDataInRedis); - AnalyseRequest analyseRequest = new Gson().fromJson(requestDataInRedis, AnalyseRequest.class); - analyseRemoteService.sendRequest(analyseRequest, analyseRequest.getTypeList(), analyseRequest.isFilter()); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } - } + RedisQueueConsumer consumer = new RedisQueueConsumer( + redisService.redisTemplate, + AnalyseConstants.ALGORITHM_REQUEST_QUEUE, + AnalyseRequest.class, + 1000, // 每条消息间隔 1000ms 处理 + 500, // 空队列时等待 500ms + request -> { + analyseRemoteService.sendRequest(request, request.getTypeList(), request.isFilter()); + } + ); + + consumer.consumeLoop(); // 开始消费本轮队列 } public long getQueueSize() { diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseConsumerManager.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseConsumerManager.java new file mode 100644 index 0000000..c63b18d --- /dev/null +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseConsumerManager.java @@ -0,0 +1,62 @@ +package com.inspect.analysis.service.impl; + +import com.alibaba.nacos.shaded.com.google.gson.Gson; +import com.inspect.analysis.constant.AnalyseConstants; +import com.inspect.analysis.domain.AnalyseResult; +import com.inspect.analysis.service.IAnalysisService; +import com.inspect.base.core.constant.Color; +import com.inspect.base.redis.service.RedisService; +import com.inspect.partrolresult.service.AnalyseRemoteService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.annotation.Resource; + +@Component +public class AlgorithmResponseConsumerManager { + private static final Logger log = LoggerFactory.getLogger(AlgorithmResponseConsumerManager.class); + + private RedisQueueConsumerAsync consumer; + + @Resource + private RedisService redisService; + + @Resource + private IAnalysisService analysisService; + + @PostConstruct + public void initConsumer() { + consumer = new RedisQueueConsumerAsync( + redisService.redisTemplate, + AnalyseConstants.ALGORITHM_RESPONSE_QUEUE, + AnalyseResult.class, + 1000, + 500, + response -> { + try { + log.info("AlgorithmResponseConsumerManager queueSize: {}, response: {}", getQueueSize(), response); + analysisService.handleAlgorithmResult(response); + } catch (Exception e) { + log.info(Color.RED + "AlgorithmResponseConsumerManager error queueSize: {}, request: {}" + Color.END, getQueueSize(), response); + } + } + ); + + consumer.start(); // 应用启动时即开始消费 + } + + @PreDestroy + public void stopConsumer() { + if (consumer != null) { + consumer.stop(); + } + } + + public long getQueueSize() { + return redisService.redisTemplate.opsForList().size(AnalyseConstants.ALGORITHM_RESPONSE_QUEUE); + } +} + diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseProcessConsumer.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseProcessConsumer.java index 0231149..2b5672a 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseProcessConsumer.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseProcessConsumer.java @@ -2,6 +2,7 @@ package com.inspect.analysis.service.impl; import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.inspect.analysis.constant.AnalyseConstants; +import com.inspect.analysis.domain.AnalyseResult; import com.inspect.analysis.service.IAnalysisService; import com.inspect.base.core.constant.Color; import com.inspect.base.redis.service.RedisService; @@ -15,7 +16,7 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; -@Component +//@Component public class AlgorithmResponseProcessConsumer { private final Logger logger = LoggerFactory.getLogger(AlgorithmResponseProcessConsumer.class); @@ -36,7 +37,8 @@ public class AlgorithmResponseProcessConsumer { logger.info(Color.CYAN + "AlgorithmResponseProcessConsumerTracer queueSize: {}, responseData: {}" + Color.END, getQueueSize(), responseDataInRedis); // AnalyseRequest analyseRequest = new Gson().fromJson(responseDataInRedis, AnalyseRequest.class); // analyseRemoteService.sendRequest(analyseRequest, analyseRequest.getTypeList(), analyseRequest.isFilter()); - analysisService.handleAlgorithmResult(responseDataInRedis); + AnalyseResult analyseResult = new Gson().fromJson(responseDataInRedis, AnalyseResult.class); + analysisService.handleAlgorithmResult(analyseResult); } catch (Exception e) { logger.error(e.getMessage(), e); } diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisServiceImpl.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisServiceImpl.java index ccf2ed5..4ed24e5 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisServiceImpl.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisServiceImpl.java @@ -35,8 +35,8 @@ import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import java.text.DecimalFormat; import java.util.*; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @Service @@ -73,6 +73,7 @@ public class AnalysisServiceImpl implements IAnalysisService { @Resource private ResultAnalysisUtils resultAnalysisUtils; + // @Override // public void picAnalyseRetNotify(AnalyseResult analyseResult) { // log.info(Color.CYAN + "###### 分析算法模块返回的表计识别结果 start, analyseResult:{} ######" + Color.END, analyseResult); @@ -168,9 +169,9 @@ public class AnalysisServiceImpl implements IAnalysisService { ); } - public void handleAlgorithmResult(final String analyseResultJson) { + public void handleAlgorithmResult(final AnalyseResult analyseResult) { String logLabel = UUID.randomUUID().toString().trim().replaceAll(StringUtils.DASH, StringUtils.EMPTY); - AnalyseResult analyseResult = new Gson().fromJson(analyseResultJson, AnalyseResult.class); + log.info(Color.CYAN + "ALGO_RES_UUID: {}, ###### 分析算法模块返回的表计识别结果 start, analyseResult:{} ######" + Color.END, logLabel, analyseResult); String requestId = analyseResult.getRequestId(); String keyId = AnalyseConstants.ANALYSE_REQUEST_ID.concat(requestId); @@ -250,6 +251,7 @@ public class AnalysisServiceImpl implements IAnalysisService { } else { log.info("ALGO_RES_UUID: {}, UNKNOWN_RESULT, OR TIMEOUT BEYOND A DAY FROM REMOTE ALGORITHM, requestId: {}, analyseResult: {}", logLabel, requestId, analyseResult); + return; } //qinyl @@ -834,6 +836,11 @@ public class AnalysisServiceImpl implements IAnalysisService { } } + // 计算初筛算法的进度 + if("1".equals(analyseResult.getFilter())) { + calcRemoteAlgorithmProgress(analyseResult.getTotalNumber(), analyseResult.getTaskPatrolId()); + } + //this.senWebsocket(websocketDataList); } @@ -880,6 +887,35 @@ public class AnalysisServiceImpl implements IAnalysisService { return resultAnalysis; } + private void calcRemoteAlgorithmProgress(final int totalNumber, final String taskPatrolledId) { + ResultAnalysis resultAnalysis = new ResultAnalysis(); + resultAnalysis.setTaskPatrolId(taskPatrolledId); + resultAnalysis.setFilter("1"); + List resultAnalysisList = resultAnalysisService.selectResultAnalysisList(resultAnalysis); + String algorithmProgress = decimalFormatNum(resultAnalysisList.size(), totalNumber); + log.info(Color.GREEN + "calcRemoteAlgorithmProgress: curNumber: {}, totalNumer: {}, algorithmProgress: {}" + Color.END, + resultAnalysisList.size(), totalNumber, algorithmProgress); + + PatrolTaskStatus patrolTaskStatus = new PatrolTaskStatus(); + patrolTaskStatus.setTaskPatrolledId(taskPatrolledId); + List patrolTaskStatusList = patrolTaskStatusService.selectPatrolTaskStatusList(patrolTaskStatus); + if(!patrolTaskStatusList.isEmpty()) { + patrolTaskStatus = patrolTaskStatusList.get(0); + patrolTaskStatus.setTaskEstimatedTime(algorithmProgress); + patrolTaskStatusService.updatePatrolTaskStatus(patrolTaskStatus); + } + } + + private String decimalFormatNum(int current, int all) { + DecimalFormat nf = new DecimalFormat("#"); + nf.setMaximumIntegerDigits(3); + float decimalFormatNum = (float) (current) / (float) all; + if (decimalFormatNum > 1) { + decimalFormatNum = 1; + } + return nf.format(decimalFormatNum * 100.0F); + } + private AlgValue selectAlgMap(String devId, String type) { log.info("selectAlgMap patrolPointId: {}, type: {}", devId, type); AlgValue algValue = null; diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/RedisQueueConsumer.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/RedisQueueConsumer.java new file mode 100644 index 0000000..a37cb91 --- /dev/null +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/RedisQueueConsumer.java @@ -0,0 +1,66 @@ +package com.inspect.analysis.service.impl; + +import com.alibaba.nacos.shaded.com.google.gson.Gson; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.redis.core.RedisTemplate; + +import java.time.Duration; +import java.util.function.Consumer; + +public class RedisQueueConsumer { + private static final Logger logger = LoggerFactory.getLogger(RedisQueueConsumer.class); + + private final RedisTemplate redisTemplate; + private final Gson gson = new Gson(); + private final String queueKey; + private final Class clazz; + + private final long intervalMs; + private final long idleSleepMs; + + private final Consumer handler; + + public RedisQueueConsumer(RedisTemplate redisTemplate, + String queueKey, + Class clazz, + long intervalMs, + long idleSleepMs, + Consumer handler) { + this.redisTemplate = redisTemplate; + this.queueKey = queueKey; + this.clazz = clazz; + this.intervalMs = intervalMs; + this.idleSleepMs = idleSleepMs; + this.handler = handler; + } + + public void consumeLoop() { + while (true) { + try { + // 阻塞式弹出元素(避免忙等) + String json = redisTemplate.opsForList() + .leftPop(queueKey, Duration.ofSeconds(1)); + + if (json == null) { + Thread.sleep(idleSleepMs); + break; // 本轮结束,等待下次 @Scheduled 调度 + } + + long start = System.currentTimeMillis(); + + T obj = gson.fromJson(json, clazz); + handler.accept(obj); // 业务处理回调 + + // 节流处理 + long cost = System.currentTimeMillis() - start; + if (cost < intervalMs) { + Thread.sleep(intervalMs - cost); + } + + } catch (Exception e) { + logger.error("RedisQueueConsumer error", e); + } + } + } +} diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/RedisQueueConsumerAsync.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/RedisQueueConsumerAsync.java new file mode 100644 index 0000000..638ce2a --- /dev/null +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/RedisQueueConsumerAsync.java @@ -0,0 +1,80 @@ +package com.inspect.analysis.service.impl; + +import com.alibaba.nacos.shaded.com.google.gson.Gson; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.redis.core.RedisTemplate; + +import java.time.Duration; +import java.util.function.Consumer; + +public class RedisQueueConsumerAsync implements Runnable { + private static final Logger logger = LoggerFactory.getLogger(RedisQueueConsumerAsync.class); + + private final RedisTemplate redisTemplate; + private final Gson gson = new Gson(); + private final String queueKey; + private final Class clazz; + + private final long intervalMs; + private final long idleSleepMs; + + private final Consumer handler; + + private volatile boolean running = false; + + public RedisQueueConsumerAsync(RedisTemplate redisTemplate, + String queueKey, + Class clazz, + long intervalMs, + long idleSleepMs, + Consumer handler) { + this.redisTemplate = redisTemplate; + this.queueKey = queueKey; + this.clazz = clazz; + this.intervalMs = intervalMs; + this.idleSleepMs = idleSleepMs; + this.handler = handler; + } + + public void start() { + if (!running) { + running = true; + new Thread(this, "RedisQueueConsumerAsync-" + queueKey).start(); + } + } + + public void stop() { + running = false; + } + + @Override + public void run() { + while (running) { + try { + String json = redisTemplate.opsForList().leftPop(queueKey, Duration.ofSeconds(1)); + + if (json == null) { + Thread.sleep(idleSleepMs); + continue; + } + + long start = System.currentTimeMillis(); + + T obj = gson.fromJson(json, clazz); + handler.accept(obj); + + long cost = System.currentTimeMillis() - start; + if (cost < intervalMs) { + Thread.sleep(intervalMs - cost); + } + + } catch (Exception e) { + logger.error("RedisQueueConsumerAsync error", e); + } + } + + logger.info("RedisQueueConsumerAsync for [{}] stopped.", queueKey); + } +} + diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/controller/PatrolResultController.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/controller/PatrolResultController.java index b3b248d..97a5b79 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/controller/PatrolResultController.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/controller/PatrolResultController.java @@ -700,7 +700,7 @@ public class PatrolResultController extends BaseController { }); } - public String callRemoteAlgorithm(List patrolResultList) { + public String callRemoteAlgorithm(final int totalNumber, List patrolResultList) { String result = ""; PatrolTaskFtp ftp = patrolTaskFtpService.selectPatrolTaskFtpByLineId(2L); String ANALYSIS_URL = patrolTaskService.selectConfigByKey("ANALYSIS_URL");// 小模型分析路径 @@ -945,6 +945,7 @@ public class PatrolResultController extends BaseController { analyseReq.setSftpHostPort(port); analyseReq.setSftpUsername(username); analyseReq.setSftpPassword(password); + analyseReq.setTotalNumber(totalNumber); if (filterList.get(0).getImageUrlList() != null && filterList.get(0).getImageUrlList().length > 0) { // 初筛算法调用改为异步调用 final String meterFilter = patrolTaskService.selectConfigByKey(AnalyseConstants.ANALYSE_IS_METER_FILTER); @@ -969,6 +970,7 @@ public class PatrolResultController extends BaseController { analyseReq.setSftpHostPort(port); analyseReq.setSftpUsername(username); analyseReq.setSftpPassword(password); + analyseReq.setTotalNumber(totalNumber); if (bigModelList.get(0).getImageUrlList() != null && bigModelList.get(0).getImageUrlList().length > 0) { // 大模型算法调用改为异步调用 //analyseRemoteService.sendRequest(analyseReq, bigModelList.get(0).getTypeList(), true); @@ -1088,7 +1090,7 @@ public class PatrolResultController extends BaseController { PatrolTaskFtp patrolTaskFtp = patrolTaskFtpService.selectPatrolTaskFtpByLineId(2L); int i = 1; List patrolDeviceCodeList = new ArrayList<>(); - List listEqpBook; + //List listEqpBook; if (type.equals(StaEnum.RunState.getCode())) { logger.info(Color.CYAN + "###### RECEIVE DATA TO ANALYSIS [41] start ######" + Color.END); try { @@ -1103,62 +1105,64 @@ public class PatrolResultController extends BaseController { final String jsonArrayStr = String.valueOf(jsonArray); List patrolTaskStatusListItems = JSONArray.parseArray(jsonArrayStr, PatrolTaskStatus.class); if (!patrolTaskStatusListItems.isEmpty()) { - for (PatrolTaskStatus item : patrolTaskStatusListItems) { + for (PatrolTaskStatus patrolTaskStatusItem : patrolTaskStatusListItems) { PatrolTask patrolTask = new PatrolTask(); - patrolTask.setTaskCode(item.getTaskCode()); + patrolTask.setTaskCode(patrolTaskStatusItem.getTaskCode()); List list = patrolTaskService.selectPatrolTaskList(patrolTask); if (CollectionUtils.isEmpty(list)) { - logger.info("TASK CODE: {} NO EXIST, SKIP IT!", item.getTaskCode()); + logger.info("TASK CODE: {} NO EXIST, SKIP IT!", patrolTaskStatusItem.getTaskCode()); } else { try { - sendWebsocket(item); + sendWebsocket(patrolTaskStatusItem); } catch (Exception e) { logger.error("error", e); } - if (StringUtils.isNotEmpty(item.getTaskPatrolledId())) { + if (StringUtils.isNotEmpty(patrolTaskStatusItem.getTaskPatrolledId())) { String str = ""; PatrolTaskStatus patrolTaskStatus = new PatrolTaskStatus(); - patrolTaskStatus.setTaskPatrolledId(item.getTaskPatrolledId()); - if (StringUtils.isNotEmpty(item.getPosType())) { - patrolTaskStatus.setPosType(item.getPosType()); - str = item.getPosType(); + patrolTaskStatus.setTaskPatrolledId(patrolTaskStatusItem.getTaskPatrolledId()); + if (StringUtils.isNotEmpty(patrolTaskStatusItem.getPosType())) { + patrolTaskStatus.setPosType(patrolTaskStatusItem.getPosType()); + str = patrolTaskStatusItem.getPosType(); } else if (StringUtils.isNotEmpty(sendCode)) { str = sendCodeToDevType(sendCode); patrolTaskStatus.setPosType(str); if ("1".equals(str) && "E100-001".equals(sendCode)) { - listEqpBook = baseDataClient.queryEqpBookCode(sendCode); - item.setCode(((BasedataEqpBookChannel) listEqpBook.get(0)).getChannelCode()); + List listEqpBook = baseDataClient.queryEqpBookCode(sendCode); + patrolTaskStatusItem.setCode(listEqpBook.get(0).getChannelCode()); } } - listEqpBook = iPatrolTaskStatusService.selectPatrolTaskStatusList(patrolTaskStatus); - if (item.getTaskState() != null && - (item.getTaskState().equals(TaskStatus.DONE.getCode()) - || item.getTaskState().equals(TaskStatus.HALTED.getCode()) - || item.getTaskState().equals(TaskStatus.EXPIRED.getCode()))) { - item.setTaskState(item.getTaskState()); - item.setEndTime(DateUtils.parseDateToStr(DateUtils.yyyyMMddHHmmss2, new Date())); + List patrolTaskStatusList = iPatrolTaskStatusService.selectPatrolTaskStatusList(patrolTaskStatus); + if (patrolTaskStatusItem.getTaskState() != null && + (patrolTaskStatusItem.getTaskState().equals(TaskStatus.DONE.getCode()) + || patrolTaskStatusItem.getTaskState().equals(TaskStatus.HALTED.getCode()) + || patrolTaskStatusItem.getTaskState().equals(TaskStatus.EXPIRED.getCode()))) { + patrolTaskStatusItem.setTaskState(patrolTaskStatusItem.getTaskState()); + patrolTaskStatusItem.setEndTime(DateUtils.parseDateToStr(DateUtils.yyyyMMddHHmmss2, new Date())); } - if (!listEqpBook.isEmpty()) { - item.setLineId(((PatrolTaskStatus) listEqpBook.get(0)).getLineId()); + if (!patrolTaskStatusList.isEmpty()) { + patrolTaskStatusItem.setLineId(patrolTaskStatusList.get(0).getLineId()); - if (item.getTaskName().contains("联合") && !item.getTaskName().startsWith(sendCode)) { - item.setTaskName(sendCode + "-" + item.getTaskName()); + if (patrolTaskStatusItem.getTaskName().contains("联合") && !patrolTaskStatusItem.getTaskName().startsWith(sendCode)) { + patrolTaskStatusItem.setTaskName(sendCode + "-" + patrolTaskStatusItem.getTaskName()); } - i = iPatrolTaskStatusService.updatePatrolTaskStatus(item); + logger.info(Color.CYAN + "patrol_task_status taskPatrolledId: {}, progress: {}" + Color.END, + patrolTaskStatusItem.getTaskPatrolledId(), patrolTaskStatusItem.getTaskProgress()); + i = iPatrolTaskStatusService.updatePatrolTaskStatus(patrolTaskStatusItem); if ("E100-001".equals(sendCode)) { - if ("100".equals(item.getTaskProgress())) { - saveDataToResultAnalysis(item.getTaskPatrolledId()); + if ("100".equals(patrolTaskStatusItem.getTaskProgress())) { + saveDataToResultAnalysis(patrolTaskStatusItem.getTaskPatrolledId()); } } } else { - item.setPosType(str); - item.setCreateTime(new Date()); - item.setStartTime(new Date()); - if (item.getTaskName().contains("联合") && !item.getTaskName().startsWith(sendCode)) { - item.setTaskName(sendCode + "-" + item.getTaskName()); + patrolTaskStatusItem.setPosType(str); + patrolTaskStatusItem.setCreateTime(new Date()); + patrolTaskStatusItem.setStartTime(new Date()); + if (patrolTaskStatusItem.getTaskName().contains("联合") && !patrolTaskStatusItem.getTaskName().startsWith(sendCode)) { + patrolTaskStatusItem.setTaskName(sendCode + "-" + patrolTaskStatusItem.getTaskName()); } - i = iPatrolTaskStatusService.insertPatrolTaskStatus(item); + i = iPatrolTaskStatusService.insertPatrolTaskStatus(patrolTaskStatusItem); } } } @@ -1176,8 +1180,9 @@ public class PatrolResultController extends BaseController { logger.info(Color.CYAN + "###### RECEIVE DATA TO ANALYSIS [61] start ######" + Color.END); String Jqtype = ""; HashSet superiorFilePaths = new HashSet<>(); + JsonRootBean jsonRootBean; try { - JsonRootBean jsonRootBean = JSONObject.parseObject(messageBody, JsonRootBean.class); + jsonRootBean = JSONObject.parseObject(messageBody, JsonRootBean.class); List itemsInfoList = jsonRootBean.getItems(); logger.info("点位数量: {}, 消息体: \n {}", itemsInfoList.size(), messageBody); for (ItemsInfo itemsInfo : itemsInfoList) { @@ -1217,6 +1222,7 @@ public class PatrolResultController extends BaseController { } } catch (Exception e) { logger.error("error", e); + jsonRootBean = new JsonRootBean(); } int mainID = -1; @@ -1252,9 +1258,9 @@ public class PatrolResultController extends BaseController { taskResultMain.setTaskId(String.valueOf(tasks.get(0).getTaskId())); taskResultMain.setTaskPatrolledId(patrolResultList.get(0).getTaskPatrolledId()); logger.info("taskResultMain: {}", taskResultMain); - listEqpBook = iPatrolTaskResultMainService.selectPatrolTaskResultMainList(taskResultMain); - if (!listEqpBook.isEmpty()) { - mainID = ((PatrolTaskResultMain) listEqpBook.get(0)).getLineId().intValue(); + List patrolTaskResultMainList = iPatrolTaskResultMainService.selectPatrolTaskResultMainList(taskResultMain); + if (!patrolTaskResultMainList.isEmpty()) { + mainID = ((PatrolTaskResultMain) patrolTaskResultMainList.get(0)).getLineId().intValue(); logger.info("exist PatrolTaskResultMain Id: {}", mainID); } else { PatrolTaskResultMain patrolTaskResultMain = new PatrolTaskResultMain(); @@ -1472,7 +1478,7 @@ public class PatrolResultController extends BaseController { if (!resultList.isEmpty()) { logger.info(Color.CYAN + "+++++++ callAlgorithm start +++++++" + Color.END); //callLocalAlgorithm(resultList); - callRemoteAlgorithm(resultList); + callRemoteAlgorithm(jsonRootBean.getTotalNumber(), resultList); logger.info(Color.CYAN + "+++++++ callAlgorithm end +++++++" + Color.END); } } diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/AnalyseRequest.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/AnalyseRequest.java index 29c23c7..1db09cc 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/AnalyseRequest.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/AnalyseRequest.java @@ -28,6 +28,7 @@ public class AnalyseRequest implements Serializable { private boolean isFilter; private String[] typeList; private String algorithmType; + private int totalNumber; public AnalyseRequest clone() { return JSONObject.parseObject(JSONObject.toJSONString(this), AnalyseRequest.class); diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/JsonRootBean.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/JsonRootBean.java index 761501f..8dd957c 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/JsonRootBean.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/JsonRootBean.java @@ -25,6 +25,10 @@ public class JsonRootBean { name = "Type" ) private String type; + @JSONField( + name = "TotalNumber" + ) + private Integer totalNumber; @JSONField( name = "Items" )