|
|
|
@ -39,6 +39,7 @@ import java.time.LocalTime; |
|
|
|
import java.time.format.DateTimeFormatter; |
|
|
|
import java.util.*; |
|
|
|
import java.util.concurrent.*; |
|
|
|
import java.util.concurrent.atomic.AtomicInteger; |
|
|
|
|
|
|
|
import feign.RetryableException; |
|
|
|
import org.slf4j.Logger; |
|
|
|
@ -76,6 +77,8 @@ public class JobMainTask { |
|
|
|
|
|
|
|
private static int execRecordCounter = 0; |
|
|
|
|
|
|
|
private AtomicInteger asyncTaskPatrolPointCnt = new AtomicInteger(0); |
|
|
|
|
|
|
|
@Value("${liveSIPB.url}") |
|
|
|
private String liveIVS_URL; |
|
|
|
@Value("${sftp.ip}") |
|
|
|
@ -106,6 +109,9 @@ public class JobMainTask { |
|
|
|
@Value("${task.scheduler.batch-size:100}") |
|
|
|
private int taskBatchSize; |
|
|
|
|
|
|
|
@Value("${task.test-mode:false}") |
|
|
|
private boolean testMode; |
|
|
|
|
|
|
|
private ExecutorService threadPool; |
|
|
|
|
|
|
|
final ShaoXinBigModel shaoXinBigModel; |
|
|
|
@ -384,21 +390,29 @@ public class JobMainTask { |
|
|
|
} |
|
|
|
|
|
|
|
private PatrolTaskExecRecord prePointExec(PatrolTaskExecRecord taskExecRecord, PatrolTaskInfo patrolTaskInfo, int infoListSize) { |
|
|
|
String uuid = UUID.randomUUID().toString().trim().replaceAll(StringUtils.DASH, StringUtils.EMPTY); |
|
|
|
PatrolPresetPos patrolPresetPos = PatrolPresetPos.builder() |
|
|
|
.patrolPointId(Long.parseLong(patrolTaskInfo.getDeviceId())) |
|
|
|
.isEnable("1") |
|
|
|
.build(); |
|
|
|
List<PatrolPresetPos> presetPosList = taskExecClient.selectPatrolPresetPosList(patrolPresetPos); |
|
|
|
log.info(Color.CYAN + "************************** executing patrolPoint, patrolPointId: {}, patrolPresetPos: {}" + Color.END, patrolTaskInfo.getDeviceId(), JSONObject.toJSONString(presetPosList, true)); |
|
|
|
log.info(Color.CYAN + "[UUID] uuid: {}, executing patrolPoint, patrolPointId: {}, patrolPresetPos: {}" + Color.END, uuid, patrolTaskInfo.getDeviceId(), JSONObject.toJSONString(presetPosList, true)); |
|
|
|
|
|
|
|
if(presetPosList.isEmpty()) { |
|
|
|
log.error(Color.RED + "[UUID] uuid: {}, patrol_task_info deviceId: {} no patrol_preset_pos record!!!", uuid, patrolTaskInfo.getDeviceId()); |
|
|
|
} |
|
|
|
|
|
|
|
PatrolPresetPos presetPos = !presetPosList.isEmpty() ? presetPosList.get(0) : PatrolPresetPos.builder().presetPosId("0").build(); |
|
|
|
PatrolPresetPos presetPos = !presetPosList.isEmpty() |
|
|
|
? presetPosList.get(0) |
|
|
|
: PatrolPresetPos.builder().presetPosId("99999999").channelType("vl").channelCode("00000000000000000000#ffffffffffffffffffffffffffffffff").videoNvrCode("ffffffffffffffffffffffffffffffff").build(); |
|
|
|
String taskPatrolId = taskExecRecord.getTaskPatrolId(); |
|
|
|
|
|
|
|
/*摄像头转到预置位*/ |
|
|
|
boolean noError = setCameraToPreset(presetPos); |
|
|
|
if ("prod".equals(activeProfile) && noError) { |
|
|
|
// if ("prod".equals(activeProfile) && noError) { |
|
|
|
if (!testMode && noError) { |
|
|
|
log.info("setCameraToPreset no error, delay 20 seconds"); |
|
|
|
myDelay(5000); |
|
|
|
myDelay(20000); |
|
|
|
} else { |
|
|
|
log.info("setCameraToPreset has error, delay 2 seconds"); |
|
|
|
myDelay(2000); |
|
|
|
@ -409,7 +423,7 @@ public class JobMainTask { |
|
|
|
PatrolTaskStatus patrolTaskStatus = new PatrolTaskStatus(); |
|
|
|
patrolTaskStatus.setTaskPatrolledId(taskPatrolledId); |
|
|
|
List<PatrolTaskStatus> list = taskExecClient.selectPatrolTaskStatusList(patrolTaskStatus); |
|
|
|
if (list.size() > 0) { |
|
|
|
if (!list.isEmpty()) { |
|
|
|
if ("3".equals(list.get(0).getTaskState())) {//暂停 |
|
|
|
while (true) { |
|
|
|
log.info("-----------------------task pause: {}", taskPatrolledId); |
|
|
|
@ -419,7 +433,7 @@ public class JobMainTask { |
|
|
|
} |
|
|
|
|
|
|
|
list = taskExecClient.selectPatrolTaskStatusList(patrolTaskStatus); |
|
|
|
if (list.size() > 0) { |
|
|
|
if (!list.isEmpty()) { |
|
|
|
log.info("-----------------------TaskState: {}", list.get(0).getTaskState()); |
|
|
|
if ("4".equals(list.get(0).getTaskState())) {//终止 |
|
|
|
log.info("-----------------------task terminate: {}", taskPatrolledId); |
|
|
|
@ -454,16 +468,13 @@ public class JobMainTask { |
|
|
|
+ "/" + DateUtils.getDayEven() |
|
|
|
+ "/" + taskExecRecord.getTaskCode() |
|
|
|
+ "/"; |
|
|
|
log.info("basePath: {}", basePath); |
|
|
|
log.info("uuid: {}, basePath: {}", uuid, basePath); |
|
|
|
if (PresetAction.PHOTO.getCode().equals(presetAction.getActionType())) { |
|
|
|
String chanType = presetPos.getChannelType(); |
|
|
|
final String chanType = presetPos.getChannelType(); |
|
|
|
fileTypes.append("ir".equals(chanType) ? "1" : "vl".equals(chanType) ? "2" : "").append(","); |
|
|
|
int delayMilliSeconds = presetAction.getPhotoGap() * 1000; |
|
|
|
final long photoCount = presetAction.getPhotoCount(); |
|
|
|
boolean bOk = false; |
|
|
|
|
|
|
|
try { |
|
|
|
log.info("PHOTO PresetType chanType: {}, videoNvrCode: {}, channelCode: {}", chanType, presetPos.getVideoNvrCode(), presetPos.getChannelCode()); |
|
|
|
log.info("PHOTO PresetType uuid: {}, chanType: {}, videoNvrCode: {}, channelCode: {}, patrolPointId: {}", uuid, chanType, presetPos.getVideoNvrCode(), presetPos.getChannelCode(), presetPos.getPatrolPointId()); |
|
|
|
if ("vl".equals(chanType)) { |
|
|
|
String paramUrl = liveIVS_URL + "/api/v1/device/channelsnap?serial=" + presetPos.getVideoNvrCode() + "&realtime=true&code=" + presetPos.getChannelCode(); |
|
|
|
String paramFileName = taskPatrolId.split(StringUtils.UNDERLINE)[1] + StringUtils.UNDERLINE |
|
|
|
@ -597,11 +608,11 @@ public class JobMainTask { |
|
|
|
} |
|
|
|
|
|
|
|
synchronized private void addPointRecord(boolean bOk, |
|
|
|
PatrolTaskExecRecord taskExecRecord, |
|
|
|
PatrolTaskInfo taskInfo, |
|
|
|
PatrolPresetPos presetPos, |
|
|
|
StringBuffer fileTypes, |
|
|
|
StringBuffer filePaths) { |
|
|
|
PatrolTaskExecRecord taskExecRecord, |
|
|
|
PatrolTaskInfo taskInfo, |
|
|
|
PatrolPresetPos presetPos, |
|
|
|
StringBuffer fileTypes, |
|
|
|
StringBuffer filePaths) { |
|
|
|
String rectangle = getRectangle(presetPos.getPresetPosId()); |
|
|
|
String taskPatrolId = getTaskPatrolledId(taskInfo.getDevNo(), taskExecRecord.getTaskPatrolId()); |
|
|
|
final String deviceId = taskInfo.getDeviceId(); |
|
|
|
@ -627,7 +638,13 @@ public class JobMainTask { |
|
|
|
.build(); |
|
|
|
taskExecClient.addPatrolTaskPointExecRecord(pointExecRecord); |
|
|
|
|
|
|
|
if (bOk) { |
|
|
|
if (!bOk) { |
|
|
|
log.info(Color.RED + "pointExecRecord crud add taskPatrolId: {}, deviceId: {}" + Color.END, taskPatrolId, deviceId); |
|
|
|
} |
|
|
|
|
|
|
|
//if (bOk) |
|
|
|
if(!testMode) |
|
|
|
{ |
|
|
|
String patrolId = taskExecRecord.getTaskPatrolId(); |
|
|
|
String[] ids = patrolId.split(StringUtils.UNDERLINE); |
|
|
|
String taskPatrolIdRemote = ids[1] + "_" + ids[2]; |
|
|
|
@ -715,22 +732,22 @@ public class JobMainTask { |
|
|
|
|
|
|
|
taskExecRecord.setCursorNumber(cursor); |
|
|
|
//if (bOk) { |
|
|
|
success++; |
|
|
|
log.info("execute point success, cursor: {}, success: {}, total: {}", cursor, success, total); |
|
|
|
taskExecRecord.setFinishNumber(success); |
|
|
|
String taskProgress = decimalFormatNum(success, total);// task_progress=success/total -> patrol_task_exec_record |
|
|
|
taskExecRecord.setTaskProgress(taskProgress); |
|
|
|
success++; |
|
|
|
log.info("execute point success, cursor: {}, success: {}, total: {}", cursor, success, total); |
|
|
|
taskExecRecord.setFinishNumber(success); |
|
|
|
String taskProgress = decimalFormatNum(success, total);// task_progress=success/total -> patrol_task_exec_record |
|
|
|
taskExecRecord.setTaskProgress(taskProgress); |
|
|
|
// List<PatrolTaskInfo> totalInfos = getInfosByRecord(taskExecRecord); |
|
|
|
List<PatrolTaskPointExecRecord> finishedInfos = taskExecClient.selectPatrolTaskPointExecRecordList(PatrolTaskPointExecRecord.builder().oldTaskPatrolledId(taskExecRecord.getOldTaskPatrolId()).build()); |
|
|
|
String totalProgress = decimalFormatNum(finishedInfos.size(), infoListSize);// task_progress=finish/total -> patrol_task_status |
|
|
|
log.info("totalProgress: {}, finish: {}, total: {}", totalProgress, finishedInfos.size(), infoListSize); |
|
|
|
callRemoteSendMsgRunMode( |
|
|
|
taskExecRecord.getTaskCode(), |
|
|
|
taskExecRecord.getTaskName(), |
|
|
|
taskExecRecord.getTaskPatrolId(), |
|
|
|
totalProgress, |
|
|
|
TaskStatus.RUNNING.getCode()/*运行中百分比上报*/ |
|
|
|
); |
|
|
|
List<PatrolTaskPointExecRecord> finishedInfos = taskExecClient.selectPatrolTaskPointExecRecordList(PatrolTaskPointExecRecord.builder().oldTaskPatrolledId(taskExecRecord.getOldTaskPatrolId()).build()); |
|
|
|
String totalProgress = decimalFormatNum(finishedInfos.size(), total);// task_progress=finish/total -> patrol_task_status |
|
|
|
log.info("totalProgress: {}, finish: {}, batchSize: {}, total: {}", totalProgress, finishedInfos.size(), infoListSize, total); |
|
|
|
callRemoteSendMsgRunMode( |
|
|
|
taskExecRecord.getTaskCode(), |
|
|
|
taskExecRecord.getTaskName(), |
|
|
|
taskExecRecord.getTaskPatrolId(), |
|
|
|
totalProgress, |
|
|
|
TaskStatus.RUNNING.getCode()/*运行中百分比上报*/ |
|
|
|
); |
|
|
|
// } |
|
|
|
// else { |
|
|
|
// log.info("execute point fail, cursor: {}, success: {}, total: {}", cursor, success, total); |
|
|
|
@ -987,12 +1004,41 @@ public class JobMainTask { |
|
|
|
|
|
|
|
private List<PatrolTaskInfo> getInfosByRecord(final PatrolTaskExecRecord record) { |
|
|
|
List<PatrolTaskInfo> patrolTaskInfoList = new ArrayList<>(); |
|
|
|
List<PatrolTaskInfo> patrolTaskInfoListEx = new ArrayList<>(); |
|
|
|
try { |
|
|
|
patrolTaskInfoList = taskExecClient.selectTaskInfoList(PatrolTaskInfo.builder() |
|
|
|
.devNo(record.getDevNo()) |
|
|
|
.taskMajorId(record.getTaskId().toString()) |
|
|
|
.build()); |
|
|
|
return patrolTaskInfoMaintainAreaRemove(patrolTaskInfoList);//把检修区域的点位删除 |
|
|
|
|
|
|
|
for(PatrolTaskInfo patrolTaskInfo : patrolTaskInfoList) { |
|
|
|
PatrolPresetPos patrolPresetPos = PatrolPresetPos.builder() |
|
|
|
.patrolPointId(Long.parseLong(patrolTaskInfo.getDeviceId())) |
|
|
|
.isEnable("1") |
|
|
|
.build(); |
|
|
|
List<PatrolPresetPos> presetPosList = taskExecClient.selectPatrolPresetPosList(patrolPresetPos); |
|
|
|
PatrolPresetPos presetPos = !presetPosList.isEmpty() ? presetPosList.get(0) : |
|
|
|
PatrolPresetPos.builder().presetPosId("1") |
|
|
|
.presetPosCode("1") |
|
|
|
.channelCode("00000000000000000000#ffffffffffffffffffffffffffffffff") |
|
|
|
.channelId("999999") |
|
|
|
.patrolPointId(1234567890L) |
|
|
|
.channelType("vl") |
|
|
|
.isEnable("1") |
|
|
|
.eqpBookId("1") |
|
|
|
.build(); |
|
|
|
String deviceCode = "00000000000000000000"; |
|
|
|
try { |
|
|
|
String[] channelCodeList = presetPos.getChannelCode().split(StringUtils.HASHTAG); |
|
|
|
deviceCode = channelCodeList[0]; |
|
|
|
} catch (Exception e) { |
|
|
|
log.error("presetPos illegal channelCode: {} with deviceId: {}", presetPos.getChannelCode(), patrolTaskInfo.getDeviceId()); |
|
|
|
} |
|
|
|
|
|
|
|
patrolTaskInfo.setDeviceCode(deviceCode); |
|
|
|
patrolTaskInfoListEx.add(patrolTaskInfo); |
|
|
|
} |
|
|
|
return patrolTaskInfoMaintainAreaRemove(patrolTaskInfoListEx);//把检修区域的点位删除 |
|
|
|
} catch (Exception e) { |
|
|
|
log.error("error", e); |
|
|
|
return patrolTaskInfoList; |
|
|
|
@ -1088,26 +1134,77 @@ public class JobMainTask { |
|
|
|
// log.info("-----------------------2 selectPatrolTaskStatusList is empty"); |
|
|
|
// } |
|
|
|
// } |
|
|
|
// } |
|
|
|
|
|
|
|
// 任务分拆异步执行V1.0 |
|
|
|
// private void prePointExecImmediate(final PatrolTaskExecRecord patrolTaskExecRecord, List<PatrolTaskInfo> patrolTaskInfoList) { |
|
|
|
// int batchSize = taskBatchSize; |
|
|
|
// int total = patrolTaskInfoList.size(); |
|
|
|
// log.info(Color.CYAN + "batchSize: {}, total: {}" + Color.END, batchSize, total); |
|
|
|
// |
|
|
|
// int index = 1; |
|
|
|
// for (int i = 0; i < total; i += batchSize) { |
|
|
|
// int end = Math.min(i + batchSize, total); |
|
|
|
// List<PatrolTaskInfo> batch = patrolTaskInfoList.subList(i, end); |
|
|
|
// // 提交每一批次任务到线程池 |
|
|
|
// final int threadCnt = index; |
|
|
|
// threadPool.submit(() -> handlePrePointBatch(threadCnt, patrolTaskExecRecord, batch)); |
|
|
|
// index++; |
|
|
|
// } |
|
|
|
// } |
|
|
|
|
|
|
|
private void prePointExecImmediate(final PatrolTaskExecRecord patrolTaskExecRecord, List<PatrolTaskInfo> patrolTaskInfoList) { |
|
|
|
Map<String, List<PatrolTaskInfo>> deviceMap = new HashMap<>(); |
|
|
|
for (PatrolTaskInfo patrolTaskInfo : patrolTaskInfoList) { |
|
|
|
deviceMap.computeIfAbsent(patrolTaskInfo.getDeviceCode(), k -> new ArrayList<>()).add(patrolTaskInfo); |
|
|
|
} |
|
|
|
|
|
|
|
asyncTaskPatrolPointCnt.set(0); |
|
|
|
int batchSize = taskBatchSize; |
|
|
|
int total = patrolTaskInfoList.size(); |
|
|
|
log.info(Color.CYAN + "batchSize: {}, total: {}" + Color.END, batchSize, total); |
|
|
|
|
|
|
|
log.info(Color.CYAN + "threadId: {}, batchSize: {}, total: {}" + Color.END, Thread.currentThread().getId(), batchSize, total); |
|
|
|
List<PatrolTaskInfo> currentGroup = new ArrayList<>(); |
|
|
|
List<CompletableFuture<Void>> futures = new ArrayList<>(); |
|
|
|
int index = 1; |
|
|
|
for (int i = 0; i < total; i += batchSize) { |
|
|
|
int end = Math.min(i + batchSize, total); |
|
|
|
List<PatrolTaskInfo> batch = patrolTaskInfoList.subList(i, end); |
|
|
|
// 提交每一批次任务到线程池 |
|
|
|
for (List<PatrolTaskInfo> sameCodeList : deviceMap.values()) { |
|
|
|
if (sameCodeList.size() > batchSize) { |
|
|
|
String[] deviceCodeArray = sameCodeList.stream().map(PatrolTaskInfo::getDeviceCode).toArray(String[]::new); |
|
|
|
log.info("BATCH_BIG_GROUP deviceCodeArray: {}", String.join(StringUtils.COMMA, deviceCodeArray)); |
|
|
|
// 大组直接提交处理 |
|
|
|
final int threadCnt = index; |
|
|
|
futures.add(CompletableFuture.runAsync(() -> handlePrePointBatch(threadCnt, patrolTaskExecRecord, sameCodeList), threadPool)); |
|
|
|
index++; |
|
|
|
} else { |
|
|
|
if (currentGroup.size() + sameCodeList.size() > batchSize) { |
|
|
|
// 当前组满了,提交处理 |
|
|
|
final int threadCnt = index; |
|
|
|
List<PatrolTaskInfo> finalGroup = new ArrayList<>(currentGroup); |
|
|
|
String[] deviceCodeArray = finalGroup.stream().map(PatrolTaskInfo::getDeviceCode).toArray(String[]::new); |
|
|
|
log.info("BATCH_FINAL_GROUP deviceCodeArray: {}", String.join(StringUtils.COMMA, deviceCodeArray)); |
|
|
|
futures.add(CompletableFuture.runAsync(() -> handlePrePointBatch(threadCnt, patrolTaskExecRecord, finalGroup), threadPool)); |
|
|
|
currentGroup.clear(); |
|
|
|
index++; |
|
|
|
} |
|
|
|
currentGroup.addAll(sameCodeList); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if (!currentGroup.isEmpty()) { |
|
|
|
final int threadCnt = index; |
|
|
|
threadPool.submit(() -> handlePrePointBatch(threadCnt, patrolTaskExecRecord, batch)); |
|
|
|
index++; |
|
|
|
List<PatrolTaskInfo> finalGroup = new ArrayList<>(currentGroup); |
|
|
|
String[] deviceCodeArray = finalGroup.stream().map(PatrolTaskInfo::getDeviceCode).toArray(String[]::new); |
|
|
|
log.info("BATCH_LAST_FINAL_GROUP deviceCodeArray: {}", String.join(StringUtils.COMMA, deviceCodeArray)); |
|
|
|
futures.add(CompletableFuture.runAsync(() -> handlePrePointBatch(threadCnt, patrolTaskExecRecord, finalGroup), threadPool)); |
|
|
|
} |
|
|
|
|
|
|
|
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); |
|
|
|
log.info("CompletableFuture Break Join!"); |
|
|
|
} |
|
|
|
|
|
|
|
private void handlePrePointBatch(final int threadCnt, final PatrolTaskExecRecord patrolTaskExecRecord, final List<PatrolTaskInfo> batch) { |
|
|
|
log.info("handlePrePointBatch threadCnt: {}, batch size: {}, devNo: {}, taskId: {}", threadCnt, batch.size(), patrolTaskExecRecord.getDevNo(), patrolTaskExecRecord.getTaskId()); |
|
|
|
asyncTaskPatrolPointCnt.getAndAdd(batch.size()); |
|
|
|
log.info("handlePrePointBatch threadCnt: {}, asyncTaskPatrolPointCnt: {}, batch size: {}, devNo: {}, taskId: {}", |
|
|
|
threadCnt, asyncTaskPatrolPointCnt.get(), batch.size(), patrolTaskExecRecord.getDevNo(), patrolTaskExecRecord.getTaskId()); |
|
|
|
for (final PatrolTaskInfo taskInfo : batch) { |
|
|
|
log.info("handlePrePointBatch taskPatrolId: {}, patrolPointId: {}, lineId: {}", patrolTaskExecRecord.getTaskPatrolId(), taskInfo.getDeviceId(), taskInfo.getLineId()); |
|
|
|
prePointExec(patrolTaskExecRecord, taskInfo, batch.size()); |
|
|
|
@ -1127,6 +1224,39 @@ public class JobMainTask { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public List<List<PatrolTaskInfo>> optimizedGroup(List<PatrolTaskInfo> list) { |
|
|
|
Map<String, List<PatrolTaskInfo>> deviceMap = new HashMap<>(); |
|
|
|
|
|
|
|
// 遍历原始列表,按 deviceCode 聚合 |
|
|
|
for (PatrolTaskInfo item : list) { |
|
|
|
deviceMap.computeIfAbsent(item.getDeviceCode(), k -> new ArrayList<>()).add(item); |
|
|
|
} |
|
|
|
|
|
|
|
List<List<PatrolTaskInfo>> result = new ArrayList<>(); |
|
|
|
List<PatrolTaskInfo> currentGroup = new ArrayList<>(); |
|
|
|
int maxGroupSize = 100; |
|
|
|
|
|
|
|
for (List<PatrolTaskInfo> group : deviceMap.values()) { |
|
|
|
if (group.size() > maxGroupSize) { |
|
|
|
// 超过最大组,单独成组 |
|
|
|
result.add(group); |
|
|
|
} else { |
|
|
|
if (currentGroup.size() + group.size() > maxGroupSize) { |
|
|
|
result.add(currentGroup); |
|
|
|
currentGroup = new ArrayList<>(); |
|
|
|
} |
|
|
|
currentGroup.addAll(group); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if (!currentGroup.isEmpty()) { |
|
|
|
result.add(currentGroup); |
|
|
|
} |
|
|
|
|
|
|
|
return result; |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private void execRemoveMaintainArea(List<PatrolTaskInfo> patrolTaskInfoList, List<MaintainRegion> maintAreaList, List<String> deviceIds) { |
|
|
|
for (MaintainRegion area : maintAreaList) { |
|
|
|
if (area.getStartTime().getTime() < System.currentTimeMillis() && area.getEndTime().getTime() > System.currentTimeMillis()) { |
|
|
|
|