Browse Source

/*

1.
智能巡视与远程算法之间的异步通信机制优化:由定时任务改为常驻内存任务方式,优化内存资源使用以及并发稳定性。
2. 修改巡视结果算法名称无法展示问题。
3. 开发远程算法进度功能。
*/
master
htjcAdmin 5 months ago
parent
commit
2c2de24c83
16 changed files with 437 additions and 67 deletions
  1. +3
    -0
      inspect-job/src/main/java/com/inspect/job/client/TaskExecClient.java
  2. +2
    -0
      inspect-job/src/main/java/com/inspect/job/domain/task/PatrolDevice.java
  3. +17
    -9
      inspect-job/src/main/java/com/inspect/job/task/JobMainTask.java
  4. +19
    -1
      inspect-main/inspect-main-task-exec/src/main/java/com/inspect/exec/controller/PatrolTaskExecController.java
  5. +2
    -0
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/domain/AnalyseResult.java
  6. +1
    -1
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/IAnalysisService.java
  7. +60
    -0
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestConsumerManager.java
  8. +33
    -13
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestProcessConsumer.java
  9. +62
    -0
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseConsumerManager.java
  10. +4
    -2
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseProcessConsumer.java
  11. +39
    -3
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisServiceImpl.java
  12. +66
    -0
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/RedisQueueConsumer.java
  13. +80
    -0
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/RedisQueueConsumerAsync.java
  14. +44
    -38
      inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/controller/PatrolResultController.java
  15. +1
    -0
      inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/AnalyseRequest.java
  16. +4
    -0
      inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/JsonRootBean.java

+ 3
- 0
inspect-job/src/main/java/com/inspect/job/client/TaskExecClient.java View File

@ -98,6 +98,9 @@ public interface TaskExecClient {
@PostMapping({"/exec/taskResultExised"})
boolean taskResultExited(@RequestParam("patrolDeviceCode") String deviceCode, @RequestParam("taskPatrolledId") String patrolId);
@GetMapping({"/exec/getFilterPatrolTaskList"})
List<PatrolTask> getFilterPatrolTaskList(final PatrolTask patrolTask);
@PostMapping({"/exec/makeCurrentDayTask"})
Map<String, List<PatrolTask>> makeCurrentDayTask();


+ 2
- 0
inspect-job/src/main/java/com/inspect/job/domain/task/PatrolDevice.java View File

@ -17,4 +17,6 @@ public class PatrolDevice {
private String receiveCode;
@JSONField(name = "Type")
private String type;
@JSONField(name = "TotalNumber")
private Integer totalNumber;
}

+ 17
- 9
inspect-job/src/main/java/com/inspect/job/task/JobMainTask.java View File

@ -643,12 +643,12 @@ public class JobMainTask {
}
//if (bOk)
if(!testMode)
//if(!testMode)
{
String patrolId = taskExecRecord.getTaskPatrolId();
String[] ids = patrolId.split(StringUtils.UNDERLINE);
String taskPatrolIdRemote = ids[1] + "_" + ids[2];
callRemoteSendMsgCtrlMode(taskPatrolIdRemote, pointExecRecord);/*上报巡视结果*/
callRemoteSendMsgCtrlMode(taskPatrolIdRemote, taskExecRecord.getTotalNumber(), pointExecRecord);/*上报巡视结果*/
}
}
@ -675,7 +675,10 @@ public class JobMainTask {
}
}
private void callRemoteSendMsgCtrlMode(final String taskPatrolId, PatrolTaskPointExecRecord pointExecRecord) {
private void callRemoteSendMsgCtrlMode(
final String taskPatrolId,
final int totalNumber,
PatrolTaskPointExecRecord pointExecRecord) {
PatrolResult patrolResult = PatrolResult.builder()
.taskPatrolledId(taskPatrolId)
.deviceId(pointExecRecord.getDeviceId())
@ -705,6 +708,7 @@ public class JobMainTask {
.receiveCode("")
.sendCode("")
.items(patrolResults)
.totalNumber(totalNumber)
.build();
Object patrolResultDataJsonObj = JSONArray.toJSON(patrolResultData);
try {
@ -933,16 +937,18 @@ public class JobMainTask {
}
@PostMapping({"/immediatelyExecTask"})
public void immediatelyExecTask(@RequestBody PatrolTask patrolTask) {
String taskCode = redisService.getCacheObjectOfTask(RedisConst.TASK_CURRENT_CODE, patrolTask.getTaskCode());
public void immediatelyExecTask(@RequestBody PatrolTask patrolTaskParam) {
List<PatrolTask> patrolTasks = taskExecClient.getFilterPatrolTaskList(patrolTaskParam);
PatrolTask patrolTask = patrolTasks.get(0);
String taskCode = redisService.getCacheObjectOfTask(RedisConst.TASK_CURRENT_CODE, patrolTaskParam.getTaskCode());
// if (StringUtils.isNotEmpty(taskCode)) {
// log.info(Color.CYAN + "TASK SLOT IS TAKEN BY: {}" + Color.END, taskCode);
// return;
// }
PatrolTaskInfo patrolTaskInfo = new PatrolTaskInfo();
List<PatrolTask> patrolTaskList = new ArrayList<>();
patrolTaskList.add(patrolTask);
// patrolTaskList.add(patrolTask);
List<PatrolTask> patrolTaskList = new ArrayList<>(patrolTasks);
List<PatrolTask> cameraPatrolTasks = new ArrayList<>();
assembleVideoTask(patrolTaskList, cameraPatrolTasks);
String redisImmediatelyExecTaskTime = redisService.getCacheObjectOfTask(RedisConst.IMMEDIATELY_EXEC_TASK_TIME, patrolTask.getTaskCode());
@ -974,7 +980,8 @@ public class JobMainTask {
Date time = patrolTask.getFixedStartTime();
String timeStr = time == null ? immediatelyExecTaskTime : DateUtils.parseDateToStr(DateUtils.yyyyMMddHHmmss, time);
patrolTaskExecRecord.setTaskPatrolId(devNos[0] + "_" + patrolTask.getTaskCode() + "_" + immediatelyExecTaskTime);// 这边暂定devNos[0]
patrolTaskExecRecord.setOldTaskPatrolId(patrolTask.getTaskCode() + "_" + timeStr + "_" + immediatelyExecTaskTime);
//patrolTaskExecRecord.setOldTaskPatrolId(patrolTask.getTaskCode() + "_" + timeStr + "_" + immediatelyExecTaskTime);
patrolTaskExecRecord.setOldTaskPatrolId(patrolTask.getTaskCode() + "_" + timeStr);
patrolTaskExecRecord.setTaskName(patrolTask.getTaskName());
patrolTaskExecRecord.setTaskCode(patrolTask.getTaskCode());
patrolTaskExecRecord.setTaskState(TaskStatus.RUNNING.getCode());
@ -1725,7 +1732,8 @@ public class JobMainTask {
private String getOldTaskPatrolId(final PatrolTask patrolTask) {
String time = DateUtils.format(DateUtils.yyyyMMddHHmmss, patrolTask.getFixedStartTime());
return patrolTask.getTaskCode() + "_" + time + "_" + time;
//return patrolTask.getTaskCode() + "_" + time + "_" + time;
return patrolTask.getTaskCode() + "_" + time;
}
@SuppressWarnings({"unused"})


+ 19
- 1
inspect-main/inspect-main-task-exec/src/main/java/com/inspect/exec/controller/PatrolTaskExecController.java View File

@ -470,6 +470,11 @@ public class PatrolTaskExecController extends BaseController {
return IntervalType.BY_DATE.getCode().equals(patrolTask.getIntervalType());
}
@PostMapping({"/getFilterPatrolTaskList"})
List<PatrolTask> getFilterPatrolTaskList(PatrolTask task) {
return filterPatrolTaskList(task);
}
@PostMapping({"/makeCurrentDayTask"})
public Map<String, List<PatrolTask>> makeCurrentDayTask() {
try {
@ -778,7 +783,7 @@ public class PatrolTaskExecController extends BaseController {
}
private List<PatrolTask> getPatrolTasks(PatrolTask task) {
private List<PatrolTask> remakePatrolTaskList(PatrolTask task) {
String[] devNos = task.getDevNo().split(StringUtils.COMMA);
String[] devTypes = task.getDevType().split(StringUtils.COMMA);
List<PatrolTask> patrolTaskList = new ArrayList<>();
@ -842,6 +847,19 @@ public class PatrolTaskExecController extends BaseController {
return patrolTaskList;
}
private List<PatrolTask> filterPatrolTaskList(PatrolTask taskParam) {
List<PatrolTask> patrolTaskListEx = patrolTaskService.selectPatrolTaskList(taskParam);
if(patrolTaskListEx.isEmpty())
return new ArrayList<>();
PatrolTask task = patrolTaskListEx.get(0);
return remakePatrolTaskList(task);
}
private List<PatrolTask> getPatrolTasks(PatrolTask task) {
return remakePatrolTaskList(task);
}
@PostMapping({"/getPatrolTaskStatusByTaskPatrolledId"})
public PatrolTaskStatus getPatrolTaskStatusByTaskPatrolledId(String taskPatrolledId) {
PatrolTaskStatus patrolTaskStatus = new PatrolTaskStatus();


+ 2
- 0
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/domain/AnalyseResult.java View File

@ -19,6 +19,7 @@ public class AnalyseResult implements Serializable {
private String filter = "0";
private String result = "1";
private boolean isTest = false;
private int totalNumber;
public void setResultsList(List<AnalyseResItem> resultsList) {
this.resultsList = resultsList;
@ -29,6 +30,7 @@ public class AnalyseResult implements Serializable {
}
public void reloadReq(AnalyseRequest analyseRequest) {
this.totalNumber = analyseRequest.getTotalNumber();
AnalyseResItem analyseResItem = this.resultList.get(0);
List<AnalyseResItem> resultList = new ArrayList<>();
this.setResultsList(resultList);


+ 1
- 1
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/IAnalysisService.java View File

@ -5,5 +5,5 @@ import com.inspect.analysis.domain.AnalyseResult;
public interface IAnalysisService {
void picAnalyseRetNotify(AnalyseResult analyseResult);
void handleAlgorithmResult(final String analyseResultJson);
void handleAlgorithmResult(final AnalyseResult analyseResult);
}

+ 60
- 0
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestConsumerManager.java View File

@ -0,0 +1,60 @@
package com.inspect.analysis.service.impl;
import com.inspect.analysis.constant.AnalyseConstants;
import com.inspect.base.core.constant.Color;
import com.inspect.base.redis.service.RedisService;
import com.inspect.partrolresult.domain.AnalyseRequest;
import com.inspect.partrolresult.service.AnalyseRemoteService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.annotation.PreDestroy;
@Component
public class AlgorithmRequestConsumerManager {
private static final Logger log = LoggerFactory.getLogger(AlgorithmRequestConsumerManager.class);
private RedisQueueConsumerAsync<AnalyseRequest> consumer;
@Resource
private RedisService redisService;
@Resource
private AnalyseRemoteService analyseRemoteService;
@PostConstruct
public void initConsumer() {
consumer = new RedisQueueConsumerAsync<AnalyseRequest>(
redisService.redisTemplate,
AnalyseConstants.ALGORITHM_REQUEST_QUEUE,
AnalyseRequest.class,
1000,
500,
request -> {
try {
log.info("AlgorithmRequestConsumerManager queueSize: {}, request: {}", getQueueSize(), request);
analyseRemoteService.sendRequest(request, request.getTypeList(), request.isFilter());
} catch (Exception e) {
log.info(Color.RED + "AlgorithmRequestConsumerManager error queueSize: {}, request: {}" + Color.END, getQueueSize(), request);
}
}
);
consumer.start(); // 应用启动时即开始消费
}
@PreDestroy
public void stopConsumer() {
if (consumer != null) {
consumer.stop();
}
}
public long getQueueSize() {
return redisService.redisTemplate.opsForList().size(AnalyseConstants.ALGORITHM_REQUEST_QUEUE);
}
}

+ 33
- 13
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestProcessConsumer.java View File

@ -14,11 +14,15 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.Duration;
@Component
//@Component
public class AlgorithmRequestProcessConsumer {
private final Logger logger = LoggerFactory.getLogger(AlgorithmRequestProcessConsumer.class);
@Value("${task.test-mode:false}")
private boolean testMode;
@Resource
private RedisService redisService;
@ -26,20 +30,36 @@ public class AlgorithmRequestProcessConsumer {
private AnalyseRemoteService analyseRemoteService;
//@Scheduled(fixedDelay = 1000)
// @Scheduled(fixedDelayString = "${task.scheduler.request.delay-ms:3000}")
// public void pollAndProcess() {
// //logger.info(Color.YELLOW + "AlgorithmRequestProcessConsumerTracer" + Color.END);
// String requestDataInRedis;
// RedisTemplate<String, String> redisTemplate = redisService.redisTemplate;
// while ((requestDataInRedis = redisTemplate.opsForList().leftPop(AnalyseConstants.ALGORITHM_REQUEST_QUEUE, Duration.ofMillis(500))) != null) {
// try {
// logger.info(Color.CYAN + "AlgorithmRequestProcessConsumerTracer queueSize: {}, requestData: {}" + Color.END, getQueueSize(), requestDataInRedis);
// AnalyseRequest analyseRequest = new Gson().fromJson(requestDataInRedis, AnalyseRequest.class);
// analyseRemoteService.sendRequest(analyseRequest, analyseRequest.getTypeList(), analyseRequest.isFilter());
// } catch (Exception e) {
// logger.error(e.getMessage(), e);
// }
// }
// }
@Scheduled(fixedDelayString = "${task.scheduler.request.delay-ms:3000}")
public void pollAndProcess() {
//logger.info(Color.YELLOW + "AlgorithmRequestProcessConsumerTracer" + Color.END);
String requestDataInRedis;
RedisTemplate<String, String> redisTemplate = redisService.redisTemplate;
while ((requestDataInRedis = redisTemplate.opsForList().leftPop(AnalyseConstants.ALGORITHM_REQUEST_QUEUE)) != null) {
try {
logger.info(Color.CYAN + "AlgorithmRequestProcessConsumerTracer queueSize: {}, requestData: {}" + Color.END, getQueueSize(), requestDataInRedis);
AnalyseRequest analyseRequest = new Gson().fromJson(requestDataInRedis, AnalyseRequest.class);
analyseRemoteService.sendRequest(analyseRequest, analyseRequest.getTypeList(), analyseRequest.isFilter());
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
RedisQueueConsumer<AnalyseRequest> consumer = new RedisQueueConsumer<AnalyseRequest>(
redisService.redisTemplate,
AnalyseConstants.ALGORITHM_REQUEST_QUEUE,
AnalyseRequest.class,
1000, // 每条消息间隔 1000ms 处理
500, // 空队列时等待 500ms
request -> {
analyseRemoteService.sendRequest(request, request.getTypeList(), request.isFilter());
}
);
consumer.consumeLoop(); // 开始消费本轮队列
}
public long getQueueSize() {


+ 62
- 0
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseConsumerManager.java View File

@ -0,0 +1,62 @@
package com.inspect.analysis.service.impl;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.inspect.analysis.constant.AnalyseConstants;
import com.inspect.analysis.domain.AnalyseResult;
import com.inspect.analysis.service.IAnalysisService;
import com.inspect.base.core.constant.Color;
import com.inspect.base.redis.service.RedisService;
import com.inspect.partrolresult.service.AnalyseRemoteService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
@Component
public class AlgorithmResponseConsumerManager {
private static final Logger log = LoggerFactory.getLogger(AlgorithmResponseConsumerManager.class);
private RedisQueueConsumerAsync<AnalyseResult> consumer;
@Resource
private RedisService redisService;
@Resource
private IAnalysisService analysisService;
@PostConstruct
public void initConsumer() {
consumer = new RedisQueueConsumerAsync<AnalyseResult>(
redisService.redisTemplate,
AnalyseConstants.ALGORITHM_RESPONSE_QUEUE,
AnalyseResult.class,
1000,
500,
response -> {
try {
log.info("AlgorithmResponseConsumerManager queueSize: {}, response: {}", getQueueSize(), response);
analysisService.handleAlgorithmResult(response);
} catch (Exception e) {
log.info(Color.RED + "AlgorithmResponseConsumerManager error queueSize: {}, request: {}" + Color.END, getQueueSize(), response);
}
}
);
consumer.start(); // 应用启动时即开始消费
}
@PreDestroy
public void stopConsumer() {
if (consumer != null) {
consumer.stop();
}
}
public long getQueueSize() {
return redisService.redisTemplate.opsForList().size(AnalyseConstants.ALGORITHM_RESPONSE_QUEUE);
}
}

+ 4
- 2
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseProcessConsumer.java View File

@ -2,6 +2,7 @@ package com.inspect.analysis.service.impl;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.inspect.analysis.constant.AnalyseConstants;
import com.inspect.analysis.domain.AnalyseResult;
import com.inspect.analysis.service.IAnalysisService;
import com.inspect.base.core.constant.Color;
import com.inspect.base.redis.service.RedisService;
@ -15,7 +16,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
//@Component
public class AlgorithmResponseProcessConsumer {
private final Logger logger = LoggerFactory.getLogger(AlgorithmResponseProcessConsumer.class);
@ -36,7 +37,8 @@ public class AlgorithmResponseProcessConsumer {
logger.info(Color.CYAN + "AlgorithmResponseProcessConsumerTracer queueSize: {}, responseData: {}" + Color.END, getQueueSize(), responseDataInRedis);
// AnalyseRequest analyseRequest = new Gson().fromJson(responseDataInRedis, AnalyseRequest.class);
// analyseRemoteService.sendRequest(analyseRequest, analyseRequest.getTypeList(), analyseRequest.isFilter());
analysisService.handleAlgorithmResult(responseDataInRedis);
AnalyseResult analyseResult = new Gson().fromJson(responseDataInRedis, AnalyseResult.class);
analysisService.handleAlgorithmResult(analyseResult);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}


+ 39
- 3
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisServiceImpl.java View File

@ -35,8 +35,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.text.DecimalFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Service
@ -73,6 +73,7 @@ public class AnalysisServiceImpl implements IAnalysisService {
@Resource
private ResultAnalysisUtils resultAnalysisUtils;
// @Override
// public void picAnalyseRetNotify(AnalyseResult analyseResult) {
// log.info(Color.CYAN + "###### 分析算法模块返回的表计识别结果 start, analyseResult:{} ######" + Color.END, analyseResult);
@ -168,9 +169,9 @@ public class AnalysisServiceImpl implements IAnalysisService {
);
}
public void handleAlgorithmResult(final String analyseResultJson) {
public void handleAlgorithmResult(final AnalyseResult analyseResult) {
String logLabel = UUID.randomUUID().toString().trim().replaceAll(StringUtils.DASH, StringUtils.EMPTY);
AnalyseResult analyseResult = new Gson().fromJson(analyseResultJson, AnalyseResult.class);
log.info(Color.CYAN + "ALGO_RES_UUID: {}, ###### 分析算法模块返回的表计识别结果 start, analyseResult:{} ######" + Color.END, logLabel, analyseResult);
String requestId = analyseResult.getRequestId();
String keyId = AnalyseConstants.ANALYSE_REQUEST_ID.concat(requestId);
@ -250,6 +251,7 @@ public class AnalysisServiceImpl implements IAnalysisService {
} else {
log.info("ALGO_RES_UUID: {}, UNKNOWN_RESULT, OR TIMEOUT BEYOND A DAY FROM REMOTE ALGORITHM, requestId: {}, analyseResult: {}",
logLabel, requestId, analyseResult);
return;
}
//qinyl
@ -834,6 +836,11 @@ public class AnalysisServiceImpl implements IAnalysisService {
}
}
// 计算初筛算法的进度
if("1".equals(analyseResult.getFilter())) {
calcRemoteAlgorithmProgress(analyseResult.getTotalNumber(), analyseResult.getTaskPatrolId());
}
//this.senWebsocket(websocketDataList);
}
@ -880,6 +887,35 @@ public class AnalysisServiceImpl implements IAnalysisService {
return resultAnalysis;
}
private void calcRemoteAlgorithmProgress(final int totalNumber, final String taskPatrolledId) {
ResultAnalysis resultAnalysis = new ResultAnalysis();
resultAnalysis.setTaskPatrolId(taskPatrolledId);
resultAnalysis.setFilter("1");
List<ResultAnalysis> resultAnalysisList = resultAnalysisService.selectResultAnalysisList(resultAnalysis);
String algorithmProgress = decimalFormatNum(resultAnalysisList.size(), totalNumber);
log.info(Color.GREEN + "calcRemoteAlgorithmProgress: curNumber: {}, totalNumer: {}, algorithmProgress: {}" + Color.END,
resultAnalysisList.size(), totalNumber, algorithmProgress);
PatrolTaskStatus patrolTaskStatus = new PatrolTaskStatus();
patrolTaskStatus.setTaskPatrolledId(taskPatrolledId);
List<PatrolTaskStatus> patrolTaskStatusList = patrolTaskStatusService.selectPatrolTaskStatusList(patrolTaskStatus);
if(!patrolTaskStatusList.isEmpty()) {
patrolTaskStatus = patrolTaskStatusList.get(0);
patrolTaskStatus.setTaskEstimatedTime(algorithmProgress);
patrolTaskStatusService.updatePatrolTaskStatus(patrolTaskStatus);
}
}
private String decimalFormatNum(int current, int all) {
DecimalFormat nf = new DecimalFormat("#");
nf.setMaximumIntegerDigits(3);
float decimalFormatNum = (float) (current) / (float) all;
if (decimalFormatNum > 1) {
decimalFormatNum = 1;
}
return nf.format(decimalFormatNum * 100.0F);
}
private AlgValue selectAlgMap(String devId, String type) {
log.info("selectAlgMap patrolPointId: {}, type: {}", devId, type);
AlgValue algValue = null;


+ 66
- 0
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/RedisQueueConsumer.java View File

@ -0,0 +1,66 @@
package com.inspect.analysis.service.impl;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import java.time.Duration;
import java.util.function.Consumer;
public class RedisQueueConsumer<T> {
private static final Logger logger = LoggerFactory.getLogger(RedisQueueConsumer.class);
private final RedisTemplate<String, String> redisTemplate;
private final Gson gson = new Gson();
private final String queueKey;
private final Class<T> clazz;
private final long intervalMs;
private final long idleSleepMs;
private final Consumer<T> handler;
public RedisQueueConsumer(RedisTemplate<String, String> redisTemplate,
String queueKey,
Class<T> clazz,
long intervalMs,
long idleSleepMs,
Consumer<T> handler) {
this.redisTemplate = redisTemplate;
this.queueKey = queueKey;
this.clazz = clazz;
this.intervalMs = intervalMs;
this.idleSleepMs = idleSleepMs;
this.handler = handler;
}
public void consumeLoop() {
while (true) {
try {
// 阻塞式弹出元素避免忙等
String json = redisTemplate.opsForList()
.leftPop(queueKey, Duration.ofSeconds(1));
if (json == null) {
Thread.sleep(idleSleepMs);
break; // 本轮结束等待下次 @Scheduled 调度
}
long start = System.currentTimeMillis();
T obj = gson.fromJson(json, clazz);
handler.accept(obj); // 业务处理回调
// 节流处理
long cost = System.currentTimeMillis() - start;
if (cost < intervalMs) {
Thread.sleep(intervalMs - cost);
}
} catch (Exception e) {
logger.error("RedisQueueConsumer error", e);
}
}
}
}

+ 80
- 0
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/RedisQueueConsumerAsync.java View File

@ -0,0 +1,80 @@
package com.inspect.analysis.service.impl;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import java.time.Duration;
import java.util.function.Consumer;
public class RedisQueueConsumerAsync<T> implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(RedisQueueConsumerAsync.class);
private final RedisTemplate<String, String> redisTemplate;
private final Gson gson = new Gson();
private final String queueKey;
private final Class<T> clazz;
private final long intervalMs;
private final long idleSleepMs;
private final Consumer<T> handler;
private volatile boolean running = false;
public RedisQueueConsumerAsync(RedisTemplate<String, String> redisTemplate,
String queueKey,
Class<T> clazz,
long intervalMs,
long idleSleepMs,
Consumer<T> handler) {
this.redisTemplate = redisTemplate;
this.queueKey = queueKey;
this.clazz = clazz;
this.intervalMs = intervalMs;
this.idleSleepMs = idleSleepMs;
this.handler = handler;
}
public void start() {
if (!running) {
running = true;
new Thread(this, "RedisQueueConsumerAsync-" + queueKey).start();
}
}
public void stop() {
running = false;
}
@Override
public void run() {
while (running) {
try {
String json = redisTemplate.opsForList().leftPop(queueKey, Duration.ofSeconds(1));
if (json == null) {
Thread.sleep(idleSleepMs);
continue;
}
long start = System.currentTimeMillis();
T obj = gson.fromJson(json, clazz);
handler.accept(obj);
long cost = System.currentTimeMillis() - start;
if (cost < intervalMs) {
Thread.sleep(intervalMs - cost);
}
} catch (Exception e) {
logger.error("RedisQueueConsumerAsync error", e);
}
}
logger.info("RedisQueueConsumerAsync for [{}] stopped.", queueKey);
}
}

+ 44
- 38
inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/controller/PatrolResultController.java View File

@ -700,7 +700,7 @@ public class PatrolResultController extends BaseController {
});
}
public String callRemoteAlgorithm(List<PatrolResult> patrolResultList) {
public String callRemoteAlgorithm(final int totalNumber, List<PatrolResult> patrolResultList) {
String result = "";
PatrolTaskFtp ftp = patrolTaskFtpService.selectPatrolTaskFtpByLineId(2L);
String ANALYSIS_URL = patrolTaskService.selectConfigByKey("ANALYSIS_URL");// 小模型分析路径
@ -945,6 +945,7 @@ public class PatrolResultController extends BaseController {
analyseReq.setSftpHostPort(port);
analyseReq.setSftpUsername(username);
analyseReq.setSftpPassword(password);
analyseReq.setTotalNumber(totalNumber);
if (filterList.get(0).getImageUrlList() != null && filterList.get(0).getImageUrlList().length > 0) {
// 初筛算法调用改为异步调用
final String meterFilter = patrolTaskService.selectConfigByKey(AnalyseConstants.ANALYSE_IS_METER_FILTER);
@ -969,6 +970,7 @@ public class PatrolResultController extends BaseController {
analyseReq.setSftpHostPort(port);
analyseReq.setSftpUsername(username);
analyseReq.setSftpPassword(password);
analyseReq.setTotalNumber(totalNumber);
if (bigModelList.get(0).getImageUrlList() != null && bigModelList.get(0).getImageUrlList().length > 0) {
// 大模型算法调用改为异步调用
//analyseRemoteService.sendRequest(analyseReq, bigModelList.get(0).getTypeList(), true);
@ -1088,7 +1090,7 @@ public class PatrolResultController extends BaseController {
PatrolTaskFtp patrolTaskFtp = patrolTaskFtpService.selectPatrolTaskFtpByLineId(2L);
int i = 1;
List<String> patrolDeviceCodeList = new ArrayList<>();
List listEqpBook;
//List listEqpBook;
if (type.equals(StaEnum.RunState.getCode())) {
logger.info(Color.CYAN + "###### RECEIVE DATA TO ANALYSIS [41] start ######" + Color.END);
try {
@ -1103,62 +1105,64 @@ public class PatrolResultController extends BaseController {
final String jsonArrayStr = String.valueOf(jsonArray);
List<PatrolTaskStatus> patrolTaskStatusListItems = JSONArray.parseArray(jsonArrayStr, PatrolTaskStatus.class);
if (!patrolTaskStatusListItems.isEmpty()) {
for (PatrolTaskStatus item : patrolTaskStatusListItems) {
for (PatrolTaskStatus patrolTaskStatusItem : patrolTaskStatusListItems) {
PatrolTask patrolTask = new PatrolTask();
patrolTask.setTaskCode(item.getTaskCode());
patrolTask.setTaskCode(patrolTaskStatusItem.getTaskCode());
List<PatrolTask> list = patrolTaskService.selectPatrolTaskList(patrolTask);
if (CollectionUtils.isEmpty(list)) {
logger.info("TASK CODE: {} NO EXIST, SKIP IT!", item.getTaskCode());
logger.info("TASK CODE: {} NO EXIST, SKIP IT!", patrolTaskStatusItem.getTaskCode());
} else {
try {
sendWebsocket(item);
sendWebsocket(patrolTaskStatusItem);
} catch (Exception e) {
logger.error("error", e);
}
if (StringUtils.isNotEmpty(item.getTaskPatrolledId())) {
if (StringUtils.isNotEmpty(patrolTaskStatusItem.getTaskPatrolledId())) {
String str = "";
PatrolTaskStatus patrolTaskStatus = new PatrolTaskStatus();
patrolTaskStatus.setTaskPatrolledId(item.getTaskPatrolledId());
if (StringUtils.isNotEmpty(item.getPosType())) {
patrolTaskStatus.setPosType(item.getPosType());
str = item.getPosType();
patrolTaskStatus.setTaskPatrolledId(patrolTaskStatusItem.getTaskPatrolledId());
if (StringUtils.isNotEmpty(patrolTaskStatusItem.getPosType())) {
patrolTaskStatus.setPosType(patrolTaskStatusItem.getPosType());
str = patrolTaskStatusItem.getPosType();
} else if (StringUtils.isNotEmpty(sendCode)) {
str = sendCodeToDevType(sendCode);
patrolTaskStatus.setPosType(str);
if ("1".equals(str) && "E100-001".equals(sendCode)) {
listEqpBook = baseDataClient.queryEqpBookCode(sendCode);
item.setCode(((BasedataEqpBookChannel) listEqpBook.get(0)).getChannelCode());
List<BasedataEqpBookChannel> listEqpBook = baseDataClient.queryEqpBookCode(sendCode);
patrolTaskStatusItem.setCode(listEqpBook.get(0).getChannelCode());
}
}
listEqpBook = iPatrolTaskStatusService.selectPatrolTaskStatusList(patrolTaskStatus);
if (item.getTaskState() != null &&
(item.getTaskState().equals(TaskStatus.DONE.getCode())
|| item.getTaskState().equals(TaskStatus.HALTED.getCode())
|| item.getTaskState().equals(TaskStatus.EXPIRED.getCode()))) {
item.setTaskState(item.getTaskState());
item.setEndTime(DateUtils.parseDateToStr(DateUtils.yyyyMMddHHmmss2, new Date()));
List<PatrolTaskStatus> patrolTaskStatusList = iPatrolTaskStatusService.selectPatrolTaskStatusList(patrolTaskStatus);
if (patrolTaskStatusItem.getTaskState() != null &&
(patrolTaskStatusItem.getTaskState().equals(TaskStatus.DONE.getCode())
|| patrolTaskStatusItem.getTaskState().equals(TaskStatus.HALTED.getCode())
|| patrolTaskStatusItem.getTaskState().equals(TaskStatus.EXPIRED.getCode()))) {
patrolTaskStatusItem.setTaskState(patrolTaskStatusItem.getTaskState());
patrolTaskStatusItem.setEndTime(DateUtils.parseDateToStr(DateUtils.yyyyMMddHHmmss2, new Date()));
}
if (!listEqpBook.isEmpty()) {
item.setLineId(((PatrolTaskStatus) listEqpBook.get(0)).getLineId());
if (!patrolTaskStatusList.isEmpty()) {
patrolTaskStatusItem.setLineId(patrolTaskStatusList.get(0).getLineId());
if (item.getTaskName().contains("联合") && !item.getTaskName().startsWith(sendCode)) {
item.setTaskName(sendCode + "-" + item.getTaskName());
if (patrolTaskStatusItem.getTaskName().contains("联合") && !patrolTaskStatusItem.getTaskName().startsWith(sendCode)) {
patrolTaskStatusItem.setTaskName(sendCode + "-" + patrolTaskStatusItem.getTaskName());
}
i = iPatrolTaskStatusService.updatePatrolTaskStatus(item);
logger.info(Color.CYAN + "patrol_task_status taskPatrolledId: {}, progress: {}" + Color.END,
patrolTaskStatusItem.getTaskPatrolledId(), patrolTaskStatusItem.getTaskProgress());
i = iPatrolTaskStatusService.updatePatrolTaskStatus(patrolTaskStatusItem);
if ("E100-001".equals(sendCode)) {
if ("100".equals(item.getTaskProgress())) {
saveDataToResultAnalysis(item.getTaskPatrolledId());
if ("100".equals(patrolTaskStatusItem.getTaskProgress())) {
saveDataToResultAnalysis(patrolTaskStatusItem.getTaskPatrolledId());
}
}
} else {
item.setPosType(str);
item.setCreateTime(new Date());
item.setStartTime(new Date());
if (item.getTaskName().contains("联合") && !item.getTaskName().startsWith(sendCode)) {
item.setTaskName(sendCode + "-" + item.getTaskName());
patrolTaskStatusItem.setPosType(str);
patrolTaskStatusItem.setCreateTime(new Date());
patrolTaskStatusItem.setStartTime(new Date());
if (patrolTaskStatusItem.getTaskName().contains("联合") && !patrolTaskStatusItem.getTaskName().startsWith(sendCode)) {
patrolTaskStatusItem.setTaskName(sendCode + "-" + patrolTaskStatusItem.getTaskName());
}
i = iPatrolTaskStatusService.insertPatrolTaskStatus(item);
i = iPatrolTaskStatusService.insertPatrolTaskStatus(patrolTaskStatusItem);
}
}
}
@ -1176,8 +1180,9 @@ public class PatrolResultController extends BaseController {
logger.info(Color.CYAN + "###### RECEIVE DATA TO ANALYSIS [61] start ######" + Color.END);
String Jqtype = "";
HashSet<String> superiorFilePaths = new HashSet<>();
JsonRootBean jsonRootBean;
try {
JsonRootBean jsonRootBean = JSONObject.parseObject(messageBody, JsonRootBean.class);
jsonRootBean = JSONObject.parseObject(messageBody, JsonRootBean.class);
List<ItemsInfo> itemsInfoList = jsonRootBean.getItems();
logger.info("点位数量: {}, 消息体: \n {}", itemsInfoList.size(), messageBody);
for (ItemsInfo itemsInfo : itemsInfoList) {
@ -1217,6 +1222,7 @@ public class PatrolResultController extends BaseController {
}
} catch (Exception e) {
logger.error("error", e);
jsonRootBean = new JsonRootBean();
}
int mainID = -1;
@ -1252,9 +1258,9 @@ public class PatrolResultController extends BaseController {
taskResultMain.setTaskId(String.valueOf(tasks.get(0).getTaskId()));
taskResultMain.setTaskPatrolledId(patrolResultList.get(0).getTaskPatrolledId());
logger.info("taskResultMain: {}", taskResultMain);
listEqpBook = iPatrolTaskResultMainService.selectPatrolTaskResultMainList(taskResultMain);
if (!listEqpBook.isEmpty()) {
mainID = ((PatrolTaskResultMain) listEqpBook.get(0)).getLineId().intValue();
List<PatrolTaskResultMain> patrolTaskResultMainList = iPatrolTaskResultMainService.selectPatrolTaskResultMainList(taskResultMain);
if (!patrolTaskResultMainList.isEmpty()) {
mainID = ((PatrolTaskResultMain) patrolTaskResultMainList.get(0)).getLineId().intValue();
logger.info("exist PatrolTaskResultMain Id: {}", mainID);
} else {
PatrolTaskResultMain patrolTaskResultMain = new PatrolTaskResultMain();
@ -1472,7 +1478,7 @@ public class PatrolResultController extends BaseController {
if (!resultList.isEmpty()) {
logger.info(Color.CYAN + "+++++++ callAlgorithm start +++++++" + Color.END);
//callLocalAlgorithm(resultList);
callRemoteAlgorithm(resultList);
callRemoteAlgorithm(jsonRootBean.getTotalNumber(), resultList);
logger.info(Color.CYAN + "+++++++ callAlgorithm end +++++++" + Color.END);
}
}


+ 1
- 0
inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/AnalyseRequest.java View File

@ -28,6 +28,7 @@ public class AnalyseRequest implements Serializable {
private boolean isFilter;
private String[] typeList;
private String algorithmType;
private int totalNumber;
public AnalyseRequest clone() {
return JSONObject.parseObject(JSONObject.toJSONString(this), AnalyseRequest.class);


+ 4
- 0
inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/JsonRootBean.java View File

@ -25,6 +25,10 @@ public class JsonRootBean {
name = "Type"
)
private String type;
@JSONField(
name = "TotalNumber"
)
private Integer totalNumber;
@JSONField(
name = "Items"
)


Loading…
Cancel
Save