From 2c880c2af6bc4e66b5151635f087471010a2767b Mon Sep 17 00:00:00 2001 From: htjcAdmin Date: Thu, 26 Jun 2025 09:05:14 +0800 Subject: [PATCH] =?UTF-8?q?/*=E5=B7=A1=E8=A7=86=E7=82=B9=E4=BD=8D=E5=9F=BA?= =?UTF-8?q?=E7=A1=80=E6=95=B0=E6=8D=AE=E4=B8=8D=E5=AE=8C=E6=95=B4=E7=9A=84?= =?UTF-8?q?=E6=83=85=E5=86=B5=E4=B8=8B=EF=BC=8C=E8=A1=A5=E5=85=85=E5=85=B6?= =?UTF-8?q?=E5=9F=BA=E7=A1=80=E6=95=B0=E6=8D=AE=E5=AE=8C=E6=88=90=E5=B7=A1?= =?UTF-8?q?=E6=A3=80=E6=B5=81=E7=A8=8B=E3=80=82*/?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ivs/controller/IvsDeviceController.java | 61 ++++- .../com/inspect/job/task/JobMainTask.java | 216 ++++++++++++++---- .../inspect/job/util/ParallelProcessor.java | 66 ++++++ .../controller/PatrolResultController.java | 19 +- .../task/controller/PatrolTaskController.java | 2 +- 5 files changed, 303 insertions(+), 61 deletions(-) create mode 100644 inspect-job/src/main/java/com/inspect/job/util/ParallelProcessor.java diff --git a/inspect-ivs/src/main/java/com/inspect/ivs/controller/IvsDeviceController.java b/inspect-ivs/src/main/java/com/inspect/ivs/controller/IvsDeviceController.java index be664ab..0948e58 100644 --- a/inspect-ivs/src/main/java/com/inspect/ivs/controller/IvsDeviceController.java +++ b/inspect-ivs/src/main/java/com/inspect/ivs/controller/IvsDeviceController.java @@ -3,7 +3,6 @@ package com.inspect.ivs.controller; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; -import com.inspect.base.core.constant.Color; import com.inspect.base.core.domain.Response; import com.inspect.base.core.utils.StringUtils; import com.inspect.ivs.base.feign.view.SipbDeviceListView; @@ -18,8 +17,11 @@ import com.inspect.ivs.vo.IvsChanSnapVo; import com.inspect.ivs.vo.IvsDevChanListVo; import com.inspect.ivs.vo.IvsDevChanSnapVo; +import java.awt.*; +import java.awt.image.BufferedImage; import java.io.*; import java.util.*; +import java.util.List; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpGet; @@ -32,6 +34,8 @@ import org.springframework.core.io.InputStreamResource; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; +import javax.imageio.ImageIO; + @RestController @RequestMapping({"/api/v1/device"}) public class IvsDeviceController { @@ -43,6 +47,10 @@ public class IvsDeviceController { private String version; @Value("${ivs.address}") private String address; + + @Value("${task.test-mode:false}") + private boolean testMode; + private final IvsCommonService ivsCommonService; public IvsDeviceController(IvsCommonService ivsCommonService) { @@ -242,10 +250,10 @@ public class IvsDeviceController { } else { int index = 0; - final int TRY_TIMES = 3; + final int TRY_TIMES = 1; final long mDelay = 3000; while (index < TRY_TIMES) { - log.info(Color.MAGENTA + "[IVS] SNAPSHOT: {}, TRY TIMES: {}" + Color.END, ivsDevChanSnapVo.getCameraCode(), index); + log.info(com.inspect.base.core.constant.Color.MAGENTA + "[IVS] SNAPSHOT: {}, TRY TIMES: {}" + com.inspect.base.core.constant.Color.END, ivsDevChanSnapVo.getCameraCode(), index); try { IvsSnapshotView ivsSnapshotView = ivsCommonService.get(UriUtils.parse(IvsConst.URI_PLATFORM_SNAPSHOT, ivsDevChanSnapVo), IvsSnapshotView.class); IvsChanSnapVo ivsChanSnapVo = new IvsChanSnapVo(ivsDevChanSnapVo.getCameraCode(), @@ -253,7 +261,8 @@ public class IvsDeviceController { ivsSnapshotView.getTaskID() ); - if ("prod".equals(activeProfile)) { + //if ("prod".equals(activeProfile)) { + if(!testMode) { log.info("[IVS] prod env get stream delay 9000ms"); Thread.sleep(9000L); } else { @@ -269,27 +278,61 @@ public class IvsDeviceController { && !snapShotView.getSnapshotInfoList().getSnapshotInfos().isEmpty()) { String cameraCode = snapShotView.getSnapshotInfoList().getSnapshotInfos().get(0).getCameraCode(); String pictureUrl = snapShotView.getSnapshotInfoList().getSnapshotInfos().get(0).getPictureUrl(); - log.info(Color.MAGENTA + "[IVS] TRYS: {}, SNAPSHOT cameraCode: {}, pictureUrl: {}" + Color.END, index, cameraCode, pictureUrl); + log.info(com.inspect.base.core.constant.Color.MAGENTA + "[IVS] TRYS: {}, SNAPSHOT cameraCode: {}, pictureUrl: {}" + com.inspect.base.core.constant.Color.END, index, cameraCode, pictureUrl); try (CloseableHttpClient httpClient = HttpClients.createDefault()) { HttpResponse httpResponse = httpClient.execute(new HttpGet(pictureUrl)); InputStream content = httpResponse.getEntity().getContent(); byte[] bytes = readStream(content); InputStream inputStream = new ByteArrayInputStream(bytes); - log.info(Color.MAGENTA + "[IVS] tries: {}, pictureUrl: {}, size: {} END" + Color.END, index, pictureUrl, bytes.length); - return ResponseEntity.ok().body(new InputStreamResource(inputStream)); + log.info(com.inspect.base.core.constant.Color.MAGENTA + "[IVS] tries: {}, pictureUrl: {}, size: {} END" + com.inspect.base.core.constant.Color.END, index, pictureUrl, bytes.length); + + if(bytes.length > 0) { + return ResponseEntity.ok().body(new InputStreamResource(inputStream)); + } } } } catch (Exception e) { - log.warn(Color.MAGENTA + "[IVS] SNAPSHOT EXCEPTION: {}" + Color.END, e.getMessage()); + log.warn(com.inspect.base.core.constant.Color.MAGENTA + "[IVS] SNAPSHOT EXCEPTION: {}" + com.inspect.base.core.constant.Color.END, e.getMessage()); } index++; Thread.sleep(mDelay); } - return ResponseEntity.ok().body(null); + + log.info(com.inspect.base.core.constant.Color.RED + "[IVS] GET PIC FAIL cameraCode: {}, channel: {}, domainCode: {}" + com.inspect.base.core.constant.Color.END, + ivsDevChanSnapVo.getCameraCode(), + ivsDevChanSnapVo.getChannel(), + ivsDevChanSnapVo.getDomainCode() + ); + return ResponseEntity.ok().body(new InputStreamResource(generateErrorImage())); } } + private InputStream generateErrorImage() { + int width = 400, height = 200; + BufferedImage bufferedImage = new BufferedImage(width, height, BufferedImage.TYPE_INT_RGB); + Graphics2D g = bufferedImage.createGraphics(); + + // 背景 + g.setColor(Color.WHITE); + g.fillRect(0, 0, width, height); + + // 字体 + g.setColor(Color.RED); + g.setFont(new Font("Arial", Font.BOLD, 20)); + g.drawString("Get Picture Fail", 120, 100); + g.dispose(); + + try { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + ImageIO.write(bufferedImage, "png", os); + return new ByteArrayInputStream(os.toByteArray()); + } catch (IOException e) { + throw new RuntimeException("生成错误图片失败", e); + } + } + + public byte[] readStream(InputStream inStream) throws Exception { ByteArrayOutputStream outStream = new ByteArrayOutputStream(); byte[] buffer = new byte[1024]; diff --git a/inspect-job/src/main/java/com/inspect/job/task/JobMainTask.java b/inspect-job/src/main/java/com/inspect/job/task/JobMainTask.java index 9586b18..2ab24a9 100644 --- a/inspect-job/src/main/java/com/inspect/job/task/JobMainTask.java +++ b/inspect-job/src/main/java/com/inspect/job/task/JobMainTask.java @@ -39,6 +39,7 @@ import java.time.LocalTime; import java.time.format.DateTimeFormatter; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import feign.RetryableException; import org.slf4j.Logger; @@ -76,6 +77,8 @@ public class JobMainTask { private static int execRecordCounter = 0; + private AtomicInteger asyncTaskPatrolPointCnt = new AtomicInteger(0); + @Value("${liveSIPB.url}") private String liveIVS_URL; @Value("${sftp.ip}") @@ -106,6 +109,9 @@ public class JobMainTask { @Value("${task.scheduler.batch-size:100}") private int taskBatchSize; + @Value("${task.test-mode:false}") + private boolean testMode; + private ExecutorService threadPool; final ShaoXinBigModel shaoXinBigModel; @@ -384,21 +390,29 @@ public class JobMainTask { } private PatrolTaskExecRecord prePointExec(PatrolTaskExecRecord taskExecRecord, PatrolTaskInfo patrolTaskInfo, int infoListSize) { + String uuid = UUID.randomUUID().toString().trim().replaceAll(StringUtils.DASH, StringUtils.EMPTY); PatrolPresetPos patrolPresetPos = PatrolPresetPos.builder() .patrolPointId(Long.parseLong(patrolTaskInfo.getDeviceId())) .isEnable("1") .build(); List presetPosList = taskExecClient.selectPatrolPresetPosList(patrolPresetPos); - log.info(Color.CYAN + "************************** executing patrolPoint, patrolPointId: {}, patrolPresetPos: {}" + Color.END, patrolTaskInfo.getDeviceId(), JSONObject.toJSONString(presetPosList, true)); + log.info(Color.CYAN + "[UUID] uuid: {}, executing patrolPoint, patrolPointId: {}, patrolPresetPos: {}" + Color.END, uuid, patrolTaskInfo.getDeviceId(), JSONObject.toJSONString(presetPosList, true)); + + if(presetPosList.isEmpty()) { + log.error(Color.RED + "[UUID] uuid: {}, patrol_task_info deviceId: {} no patrol_preset_pos record!!!", uuid, patrolTaskInfo.getDeviceId()); + } - PatrolPresetPos presetPos = !presetPosList.isEmpty() ? presetPosList.get(0) : PatrolPresetPos.builder().presetPosId("0").build(); + PatrolPresetPos presetPos = !presetPosList.isEmpty() + ? presetPosList.get(0) + : PatrolPresetPos.builder().presetPosId("99999999").channelType("vl").channelCode("00000000000000000000#ffffffffffffffffffffffffffffffff").videoNvrCode("ffffffffffffffffffffffffffffffff").build(); String taskPatrolId = taskExecRecord.getTaskPatrolId(); /*摄像头转到预置位*/ boolean noError = setCameraToPreset(presetPos); - if ("prod".equals(activeProfile) && noError) { +// if ("prod".equals(activeProfile) && noError) { + if (!testMode && noError) { log.info("setCameraToPreset no error, delay 20 seconds"); - myDelay(5000); + myDelay(20000); } else { log.info("setCameraToPreset has error, delay 2 seconds"); myDelay(2000); @@ -409,7 +423,7 @@ public class JobMainTask { PatrolTaskStatus patrolTaskStatus = new PatrolTaskStatus(); patrolTaskStatus.setTaskPatrolledId(taskPatrolledId); List list = taskExecClient.selectPatrolTaskStatusList(patrolTaskStatus); - if (list.size() > 0) { + if (!list.isEmpty()) { if ("3".equals(list.get(0).getTaskState())) {//暂停 while (true) { log.info("-----------------------task pause: {}", taskPatrolledId); @@ -419,7 +433,7 @@ public class JobMainTask { } list = taskExecClient.selectPatrolTaskStatusList(patrolTaskStatus); - if (list.size() > 0) { + if (!list.isEmpty()) { log.info("-----------------------TaskState: {}", list.get(0).getTaskState()); if ("4".equals(list.get(0).getTaskState())) {//终止 log.info("-----------------------task terminate: {}", taskPatrolledId); @@ -454,16 +468,13 @@ public class JobMainTask { + "/" + DateUtils.getDayEven() + "/" + taskExecRecord.getTaskCode() + "/"; - log.info("basePath: {}", basePath); + log.info("uuid: {}, basePath: {}", uuid, basePath); if (PresetAction.PHOTO.getCode().equals(presetAction.getActionType())) { - String chanType = presetPos.getChannelType(); + final String chanType = presetPos.getChannelType(); fileTypes.append("ir".equals(chanType) ? "1" : "vl".equals(chanType) ? "2" : "").append(","); - int delayMilliSeconds = presetAction.getPhotoGap() * 1000; - final long photoCount = presetAction.getPhotoCount(); boolean bOk = false; - try { - log.info("PHOTO PresetType chanType: {}, videoNvrCode: {}, channelCode: {}", chanType, presetPos.getVideoNvrCode(), presetPos.getChannelCode()); + log.info("PHOTO PresetType uuid: {}, chanType: {}, videoNvrCode: {}, channelCode: {}, patrolPointId: {}", uuid, chanType, presetPos.getVideoNvrCode(), presetPos.getChannelCode(), presetPos.getPatrolPointId()); if ("vl".equals(chanType)) { String paramUrl = liveIVS_URL + "/api/v1/device/channelsnap?serial=" + presetPos.getVideoNvrCode() + "&realtime=true&code=" + presetPos.getChannelCode(); String paramFileName = taskPatrolId.split(StringUtils.UNDERLINE)[1] + StringUtils.UNDERLINE @@ -597,11 +608,11 @@ public class JobMainTask { } synchronized private void addPointRecord(boolean bOk, - PatrolTaskExecRecord taskExecRecord, - PatrolTaskInfo taskInfo, - PatrolPresetPos presetPos, - StringBuffer fileTypes, - StringBuffer filePaths) { + PatrolTaskExecRecord taskExecRecord, + PatrolTaskInfo taskInfo, + PatrolPresetPos presetPos, + StringBuffer fileTypes, + StringBuffer filePaths) { String rectangle = getRectangle(presetPos.getPresetPosId()); String taskPatrolId = getTaskPatrolledId(taskInfo.getDevNo(), taskExecRecord.getTaskPatrolId()); final String deviceId = taskInfo.getDeviceId(); @@ -627,7 +638,13 @@ public class JobMainTask { .build(); taskExecClient.addPatrolTaskPointExecRecord(pointExecRecord); - if (bOk) { + if (!bOk) { + log.info(Color.RED + "pointExecRecord crud add taskPatrolId: {}, deviceId: {}" + Color.END, taskPatrolId, deviceId); + } + + //if (bOk) + if(!testMode) + { String patrolId = taskExecRecord.getTaskPatrolId(); String[] ids = patrolId.split(StringUtils.UNDERLINE); String taskPatrolIdRemote = ids[1] + "_" + ids[2]; @@ -715,22 +732,22 @@ public class JobMainTask { taskExecRecord.setCursorNumber(cursor); //if (bOk) { - success++; - log.info("execute point success, cursor: {}, success: {}, total: {}", cursor, success, total); - taskExecRecord.setFinishNumber(success); - String taskProgress = decimalFormatNum(success, total);// task_progress=success/total -> patrol_task_exec_record - taskExecRecord.setTaskProgress(taskProgress); + success++; + log.info("execute point success, cursor: {}, success: {}, total: {}", cursor, success, total); + taskExecRecord.setFinishNumber(success); + String taskProgress = decimalFormatNum(success, total);// task_progress=success/total -> patrol_task_exec_record + taskExecRecord.setTaskProgress(taskProgress); // List totalInfos = getInfosByRecord(taskExecRecord); - List finishedInfos = taskExecClient.selectPatrolTaskPointExecRecordList(PatrolTaskPointExecRecord.builder().oldTaskPatrolledId(taskExecRecord.getOldTaskPatrolId()).build()); - String totalProgress = decimalFormatNum(finishedInfos.size(), infoListSize);// task_progress=finish/total -> patrol_task_status - log.info("totalProgress: {}, finish: {}, total: {}", totalProgress, finishedInfos.size(), infoListSize); - callRemoteSendMsgRunMode( - taskExecRecord.getTaskCode(), - taskExecRecord.getTaskName(), - taskExecRecord.getTaskPatrolId(), - totalProgress, - TaskStatus.RUNNING.getCode()/*运行中百分比上报*/ - ); + List finishedInfos = taskExecClient.selectPatrolTaskPointExecRecordList(PatrolTaskPointExecRecord.builder().oldTaskPatrolledId(taskExecRecord.getOldTaskPatrolId()).build()); + String totalProgress = decimalFormatNum(finishedInfos.size(), total);// task_progress=finish/total -> patrol_task_status + log.info("totalProgress: {}, finish: {}, batchSize: {}, total: {}", totalProgress, finishedInfos.size(), infoListSize, total); + callRemoteSendMsgRunMode( + taskExecRecord.getTaskCode(), + taskExecRecord.getTaskName(), + taskExecRecord.getTaskPatrolId(), + totalProgress, + TaskStatus.RUNNING.getCode()/*运行中百分比上报*/ + ); // } // else { // log.info("execute point fail, cursor: {}, success: {}, total: {}", cursor, success, total); @@ -987,12 +1004,41 @@ public class JobMainTask { private List getInfosByRecord(final PatrolTaskExecRecord record) { List patrolTaskInfoList = new ArrayList<>(); + List patrolTaskInfoListEx = new ArrayList<>(); try { patrolTaskInfoList = taskExecClient.selectTaskInfoList(PatrolTaskInfo.builder() .devNo(record.getDevNo()) .taskMajorId(record.getTaskId().toString()) .build()); - return patrolTaskInfoMaintainAreaRemove(patrolTaskInfoList);//把检修区域的点位删除 + + for(PatrolTaskInfo patrolTaskInfo : patrolTaskInfoList) { + PatrolPresetPos patrolPresetPos = PatrolPresetPos.builder() + .patrolPointId(Long.parseLong(patrolTaskInfo.getDeviceId())) + .isEnable("1") + .build(); + List presetPosList = taskExecClient.selectPatrolPresetPosList(patrolPresetPos); + PatrolPresetPos presetPos = !presetPosList.isEmpty() ? presetPosList.get(0) : + PatrolPresetPos.builder().presetPosId("1") + .presetPosCode("1") + .channelCode("00000000000000000000#ffffffffffffffffffffffffffffffff") + .channelId("999999") + .patrolPointId(1234567890L) + .channelType("vl") + .isEnable("1") + .eqpBookId("1") + .build(); + String deviceCode = "00000000000000000000"; + try { + String[] channelCodeList = presetPos.getChannelCode().split(StringUtils.HASHTAG); + deviceCode = channelCodeList[0]; + } catch (Exception e) { + log.error("presetPos illegal channelCode: {} with deviceId: {}", presetPos.getChannelCode(), patrolTaskInfo.getDeviceId()); + } + + patrolTaskInfo.setDeviceCode(deviceCode); + patrolTaskInfoListEx.add(patrolTaskInfo); + } + return patrolTaskInfoMaintainAreaRemove(patrolTaskInfoListEx);//把检修区域的点位删除 } catch (Exception e) { log.error("error", e); return patrolTaskInfoList; @@ -1088,26 +1134,77 @@ public class JobMainTask { // log.info("-----------------------2 selectPatrolTaskStatusList is empty"); // } // } +// } + + // 任务分拆异步执行V1.0 +// private void prePointExecImmediate(final PatrolTaskExecRecord patrolTaskExecRecord, List patrolTaskInfoList) { +// int batchSize = taskBatchSize; +// int total = patrolTaskInfoList.size(); +// log.info(Color.CYAN + "batchSize: {}, total: {}" + Color.END, batchSize, total); +// +// int index = 1; +// for (int i = 0; i < total; i += batchSize) { +// int end = Math.min(i + batchSize, total); +// List batch = patrolTaskInfoList.subList(i, end); +// // 提交每一批次任务到线程池 +// final int threadCnt = index; +// threadPool.submit(() -> handlePrePointBatch(threadCnt, patrolTaskExecRecord, batch)); +// index++; +// } // } private void prePointExecImmediate(final PatrolTaskExecRecord patrolTaskExecRecord, List patrolTaskInfoList) { + Map> deviceMap = new HashMap<>(); + for (PatrolTaskInfo patrolTaskInfo : patrolTaskInfoList) { + deviceMap.computeIfAbsent(patrolTaskInfo.getDeviceCode(), k -> new ArrayList<>()).add(patrolTaskInfo); + } + + asyncTaskPatrolPointCnt.set(0); int batchSize = taskBatchSize; int total = patrolTaskInfoList.size(); - log.info(Color.CYAN + "batchSize: {}, total: {}" + Color.END, batchSize, total); - + log.info(Color.CYAN + "threadId: {}, batchSize: {}, total: {}" + Color.END, Thread.currentThread().getId(), batchSize, total); + List currentGroup = new ArrayList<>(); + List> futures = new ArrayList<>(); int index = 1; - for (int i = 0; i < total; i += batchSize) { - int end = Math.min(i + batchSize, total); - List batch = patrolTaskInfoList.subList(i, end); - // 提交每一批次任务到线程池 + for (List sameCodeList : deviceMap.values()) { + if (sameCodeList.size() > batchSize) { + String[] deviceCodeArray = sameCodeList.stream().map(PatrolTaskInfo::getDeviceCode).toArray(String[]::new); + log.info("BATCH_BIG_GROUP deviceCodeArray: {}", String.join(StringUtils.COMMA, deviceCodeArray)); + // 大组直接提交处理 + final int threadCnt = index; + futures.add(CompletableFuture.runAsync(() -> handlePrePointBatch(threadCnt, patrolTaskExecRecord, sameCodeList), threadPool)); + index++; + } else { + if (currentGroup.size() + sameCodeList.size() > batchSize) { + // 当前组满了,提交处理 + final int threadCnt = index; + List finalGroup = new ArrayList<>(currentGroup); + String[] deviceCodeArray = finalGroup.stream().map(PatrolTaskInfo::getDeviceCode).toArray(String[]::new); + log.info("BATCH_FINAL_GROUP deviceCodeArray: {}", String.join(StringUtils.COMMA, deviceCodeArray)); + futures.add(CompletableFuture.runAsync(() -> handlePrePointBatch(threadCnt, patrolTaskExecRecord, finalGroup), threadPool)); + currentGroup.clear(); + index++; + } + currentGroup.addAll(sameCodeList); + } + } + + if (!currentGroup.isEmpty()) { final int threadCnt = index; - threadPool.submit(() -> handlePrePointBatch(threadCnt, patrolTaskExecRecord, batch)); - index++; + List finalGroup = new ArrayList<>(currentGroup); + String[] deviceCodeArray = finalGroup.stream().map(PatrolTaskInfo::getDeviceCode).toArray(String[]::new); + log.info("BATCH_LAST_FINAL_GROUP deviceCodeArray: {}", String.join(StringUtils.COMMA, deviceCodeArray)); + futures.add(CompletableFuture.runAsync(() -> handlePrePointBatch(threadCnt, patrolTaskExecRecord, finalGroup), threadPool)); } + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + log.info("CompletableFuture Break Join!"); } private void handlePrePointBatch(final int threadCnt, final PatrolTaskExecRecord patrolTaskExecRecord, final List batch) { - log.info("handlePrePointBatch threadCnt: {}, batch size: {}, devNo: {}, taskId: {}", threadCnt, batch.size(), patrolTaskExecRecord.getDevNo(), patrolTaskExecRecord.getTaskId()); + asyncTaskPatrolPointCnt.getAndAdd(batch.size()); + log.info("handlePrePointBatch threadCnt: {}, asyncTaskPatrolPointCnt: {}, batch size: {}, devNo: {}, taskId: {}", + threadCnt, asyncTaskPatrolPointCnt.get(), batch.size(), patrolTaskExecRecord.getDevNo(), patrolTaskExecRecord.getTaskId()); for (final PatrolTaskInfo taskInfo : batch) { log.info("handlePrePointBatch taskPatrolId: {}, patrolPointId: {}, lineId: {}", patrolTaskExecRecord.getTaskPatrolId(), taskInfo.getDeviceId(), taskInfo.getLineId()); prePointExec(patrolTaskExecRecord, taskInfo, batch.size()); @@ -1127,6 +1224,39 @@ public class JobMainTask { } } + public List> optimizedGroup(List list) { + Map> deviceMap = new HashMap<>(); + + // 遍历原始列表,按 deviceCode 聚合 + for (PatrolTaskInfo item : list) { + deviceMap.computeIfAbsent(item.getDeviceCode(), k -> new ArrayList<>()).add(item); + } + + List> result = new ArrayList<>(); + List currentGroup = new ArrayList<>(); + int maxGroupSize = 100; + + for (List group : deviceMap.values()) { + if (group.size() > maxGroupSize) { + // 超过最大组,单独成组 + result.add(group); + } else { + if (currentGroup.size() + group.size() > maxGroupSize) { + result.add(currentGroup); + currentGroup = new ArrayList<>(); + } + currentGroup.addAll(group); + } + } + + if (!currentGroup.isEmpty()) { + result.add(currentGroup); + } + + return result; + } + + private void execRemoveMaintainArea(List patrolTaskInfoList, List maintAreaList, List deviceIds) { for (MaintainRegion area : maintAreaList) { if (area.getStartTime().getTime() < System.currentTimeMillis() && area.getEndTime().getTime() > System.currentTimeMillis()) { diff --git a/inspect-job/src/main/java/com/inspect/job/util/ParallelProcessor.java b/inspect-job/src/main/java/com/inspect/job/util/ParallelProcessor.java new file mode 100644 index 0000000..f899b09 --- /dev/null +++ b/inspect-job/src/main/java/com/inspect/job/util/ParallelProcessor.java @@ -0,0 +1,66 @@ +package com.inspect.job.util; + +import com.inspect.job.domain.task.PatrolTaskInfo; +import com.inspect.job.task.JobMainTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; + +public class ParallelProcessor { + private final Logger log = LoggerFactory.getLogger(ParallelProcessor.class); + + private static final int MAX_GROUP_SIZE = 100; + + private final ExecutorService threadPool = new ThreadPoolExecutor( + 8, 16, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000)); + + // 模拟处理逻辑 + public void processGroup(List group) { + System.out.println("Thread " + Thread.currentThread().getName() + + " processing group of size " + group.size()); + // 模拟耗时处理 + try { Thread.sleep(200); } catch (InterruptedException ignored) {} + } + + public void run(List allTasks) { + // Step 1: 分组(相同 deviceCode 不拆组) + Map> deviceMap = new HashMap<>(); + for (PatrolTaskInfo task : allTasks) { + deviceMap.computeIfAbsent(task.getDeviceCode(), k -> new ArrayList<>()).add(task); + } + + List> taskGroups = new ArrayList<>(); + List currentGroup = new ArrayList<>(); + + for (List group : deviceMap.values()) { + if (group.size() > MAX_GROUP_SIZE) { + taskGroups.add(group); // 超大组直接入列 + } else { + if (currentGroup.size() + group.size() > MAX_GROUP_SIZE) { + taskGroups.add(currentGroup); + currentGroup = new ArrayList<>(); + } + currentGroup.addAll(group); + } + } + if (!currentGroup.isEmpty()) { + taskGroups.add(currentGroup); + } + + // Step 2: 多线程并发处理 + List> futures = new ArrayList<>(); + for (List group : taskGroups) { + futures.add(CompletableFuture.runAsync(() -> processGroup(group), threadPool)); + } + + // 等待所有任务完成 + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + + threadPool.shutdown(); + } +} diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/controller/PatrolResultController.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/controller/PatrolResultController.java index 4601d91..b3b248d 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/controller/PatrolResultController.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/controller/PatrolResultController.java @@ -749,7 +749,6 @@ public class PatrolResultController extends BaseController { if (patrolResultTemp.getAlgId() != null && !patrolResultTemp.getAlgId().isEmpty()) { algInfoList = patrolResultService.selectAlgInfo(patrolResultTemp); } - // 点位配置的算法 logger.info("callRemoteAlgorithm algInfoList: {}", algInfoList); analyseReqItem.setImageUrlList(image.split(StringUtils.COMMA)); @@ -813,7 +812,8 @@ public class PatrolResultController extends BaseController { List judgeList = algInfoList.stream().filter( (item) -> item.getAlgName().contains("判别") ).collect(Collectors.toList()); - // 判别算法 + + // 将AlgName含有判别的点位加入大模型bigModelList列表 if (!judgeList.isEmpty()) { analyseReqItem.setTypeList(judgeList.stream().map(AlgInfo::getAlgSubtypeCode).toArray(String[]::new)); String[] images; @@ -828,7 +828,10 @@ public class PatrolResultController extends BaseController { bigModelList.add(analyseReqItem.clone()); } - algInfoList.removeAll(judgeList);//将已经加入bigModelList中的记录删除(算法名称为缺陷判别) + //将已经加入bigModelList中的记录删除(算法名称为缺陷判别) + algInfoList.removeAll(judgeList); + + // 将AlgSubtypeCode为以下几种类型的点位加入初筛filterList列表 List meterList = (algInfoList).stream().filter((item) -> AlgConstants.METER.equals(item.getAlgSubtypeCode()) @@ -843,8 +846,8 @@ public class PatrolResultController extends BaseController { filterList.add(analyseReqItem.clone()); } - algInfoList.removeAll(meterList);//将已经加入filterList中的记录删除(算法编码为meter的记录) - // 缺陷识别算法 + //将已经加入filterList中的记录删除(算法编码为meter的记录) + algInfoList.removeAll(meterList); List defectList = algInfoList.stream().filter((item) -> "缺陷识别".equals(item.getAlgName())).collect(Collectors.toList()); if (!defectList.isEmpty()) { analyseReqItem.setTypeList(defectList.stream().map(AlgInfo::getAlgSubtypeCode).toArray(String[]::new)); @@ -927,9 +930,9 @@ public class PatrolResultController extends BaseController { } } - logger.info("callRemoteAlgorithm algInfoList2: {}", algInfoList); - logger.info("callRemoteAlgorithm callModel filterList: {}", filterList); - logger.info("callRemoteAlgorithm callModel bigModelList: {}", bigModelList); + logger.info(Color.CYAN + "callRemoteAlgorithm algInfoList2: {}" + Color.END, algInfoList); + logger.info(Color.CYAN + "callRemoteAlgorithm filter size: {}, filterList: {}" + Color.END, filterList.size(), filterList); + logger.info(Color.CYAN + "callRemoteAlgorithm bigModel size: {}, bigModelList: {}" + Color.END, bigModelList.size(), bigModelList); //qinyl 初筛逻辑处理 if (!filterList.isEmpty()) { diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/task/controller/PatrolTaskController.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/task/controller/PatrolTaskController.java index 5ccecfa..3326056 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/task/controller/PatrolTaskController.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/task/controller/PatrolTaskController.java @@ -123,7 +123,7 @@ public class PatrolTaskController extends BaseController { public TableDataInfo list(PatrolTask patrolTask) { startPage(); List taskList = patrolTaskService.selectPatrolTaskList(patrolTask); - logger.info("[TASK] patrolTask: {}, taskList: {}", patrolTask, taskList); + logger.debug("[TASK] patrolTask: {}, taskList: {}", patrolTask, taskList); for (PatrolTask task : taskList) { PatrolTaskStatus patrolTaskStatus = new PatrolTaskStatus(); patrolTaskStatus.setTaskCode(task.getTaskCode());