Browse Source

/*任务并发策略对应修改*/

master
htjcAdmin 6 months ago
parent
commit
c9bf2279d9
7 changed files with 163 additions and 13 deletions
  1. +12
    -8
      inspect-job/src/main/java/com/inspect/job/task/JobMainTask.java
  2. +1
    -0
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/constant/AnalyseConstants.java
  3. +2
    -0
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/IAnalysisService.java
  4. +3
    -3
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestProcessConsumer.java
  5. +46
    -0
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseProcessConsumer.java
  6. +96
    -0
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisServiceImpl.java
  7. +3
    -2
      inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRemoteService.java

+ 12
- 8
inspect-job/src/main/java/com/inspect/job/task/JobMainTask.java View File

@ -593,7 +593,7 @@ public class JobMainTask {
return rectangles.toString();
}
private void addPointRecord(boolean bOk,
synchronized private void addPointRecord(boolean bOk,
PatrolTaskExecRecord taskExecRecord,
PatrolTaskInfo taskInfo,
PatrolPresetPos presetPos,
@ -695,7 +695,7 @@ public class JobMainTask {
}
}
private int addCursorNumber(String taskPatrolId, boolean bOk, int infoListSize) {
synchronized private int addCursorNumber(String taskPatrolId, boolean bOk, int infoListSize) {
PatrolTaskExecRecord taskExecRecord = taskExecClient.selectPatrolTaskExecRecordByTaskPatrolId(taskPatrolId);
int total = taskExecRecord.getTotalNumber();
@ -752,7 +752,7 @@ public class JobMainTask {
redisService.deleteObjectOfTask(RedisConst.TASK_CURRENT_CODE, curTaskCode);
}
private void windUpRecord(String taskPatrolId, int infoListSize) {
synchronized private void windUpRecord(String taskPatrolId, int infoListSize) {
PatrolTaskExecRecord execRecord = taskExecClient.selectPatrolTaskExecRecordByTaskPatrolId(taskPatrolId);
final int total = execRecord.getTotalNumber();
final int cursor = execRecord.getCursorNumber();
@ -818,24 +818,28 @@ public class JobMainTask {
StringBuffer fileTypes,
StringBuffer filePaths) {
log.info(Color.CYAN + "**************************addPointRecord start" + Color.END);
log.info(Color.CYAN + "recordPersistTrace bOk: {}" + Color.END, bOk);
try {
addPointRecord(bOk, taskExecRecord, taskInfo, presetPos, fileTypes, filePaths);
} catch (Exception e) {
log.error("error", e);
}
log.info(Color.CYAN + "**************************addPointRecord end" + Color.END);
//log.info(Color.CYAN + "**************************addPointRecord end" + Color.END);
log.info(Color.CYAN + "**************************addCursorNumber start" + Color.END);
log.info(Color.CYAN + "addCursorNumberStart" + Color.END);
int cursorNumber = 0;
try {
cursorNumber = addCursorNumber(taskExecRecord.getTaskPatrolId(), bOk, infoListSize);
} catch (Exception e) {
log.error("error", e);
}
log.info(Color.CYAN + "**************************addCursorNumber end" + Color.END);
if (cursorNumber == infoListSize) {// devNo下对应的点执行完毕
PatrolTaskExecRecord taskExecRecordEx = taskExecClient.selectPatrolTaskExecRecordByTaskPatrolId(taskExecRecord.getTaskPatrolId());
int totalNumber = taskExecRecordEx.getTotalNumber();
log.info(Color.CYAN + "addCursorNumberEnd: cursorNumber: {}, batchNumber: {}, total: {}" + Color.END, cursorNumber, infoListSize, totalNumber);
//if (cursorNumber == infoListSize) {// devNo下对应的点执行完毕
if (cursorNumber == totalNumber) {// devNo下对应的点执行完毕
log.info(Color.CYAN + "**************************windUpRecord start" + Color.END);
windUpRecord(taskExecRecord.getTaskPatrolId(), infoListSize);
log.info(Color.CYAN + "**************************windUpRecord end" + Color.END);


+ 1
- 0
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/constant/AnalyseConstants.java View File

@ -38,4 +38,5 @@ public class AnalyseConstants {
public static final String MAX_NUM = "MAX_NUM";
public static final String ALGORITHM_REQUEST_QUEUE = "algorithm:request:queue";
public static final String ALGORITHM_RESPONSE_QUEUE = "algorithm:response:queue";
}

+ 2
- 0
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/IAnalysisService.java View File

@ -4,4 +4,6 @@ import com.inspect.analysis.domain.AnalyseResult;
public interface IAnalysisService {
void picAnalyseRetNotify(AnalyseResult analyseResult);
void handleAlgorithmResult(final String analyseResultJson);
}

inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmProcessConsumer.java → inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestProcessConsumer.java View File


+ 46
- 0
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseProcessConsumer.java View File

@ -0,0 +1,46 @@
package com.inspect.analysis.service.impl;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.inspect.analysis.constant.AnalyseConstants;
import com.inspect.analysis.service.IAnalysisService;
import com.inspect.base.redis.service.RedisService;
import com.inspect.partrolresult.domain.AnalyseRequest;
import com.inspect.partrolresult.service.AnalyseRemoteService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class AlgorithmResponseProcessConsumer {
private final Logger logger = LoggerFactory.getLogger(AlgorithmResponseProcessConsumer.class);
@Resource
private RedisService redisService;
@Resource
private IAnalysisService analysisService;
@Scheduled(fixedDelay = 1000)
public void pollAndProcess() {
String responseDataInRedis;
RedisTemplate<String, String> redisTemplate = redisService.redisTemplate;
while ((responseDataInRedis = redisTemplate.opsForList().leftPop(AnalyseConstants.ALGORITHM_RESPONSE_QUEUE)) != null) {
try {
logger.info("AlgorithmResponseProcessConsumer pollAndProcess queueSize: {}, responseData:{}", getQueueSize(), responseDataInRedis);
// AnalyseRequest analyseRequest = new Gson().fromJson(responseDataInRedis, AnalyseRequest.class);
// analyseRemoteService.sendRequest(analyseRequest, analyseRequest.getTypeList(), analyseRequest.isFilter());
analysisService.handleAlgorithmResult(responseDataInRedis);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
public long getQueueSize() {
return redisService.redisTemplate.opsForList().size(AnalyseConstants.ALGORITHM_RESPONSE_QUEUE);
}
}

+ 96
- 0
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisServiceImpl.java View File

@ -72,8 +72,103 @@ public class AnalysisServiceImpl implements IAnalysisService {
@Resource
private ResultAnalysisUtils resultAnalysisUtils;
// @Override
// public void picAnalyseRetNotify(AnalyseResult analyseResult) {
// log.info(Color.CYAN + "###### 分析算法模块返回的表计识别结果 start, analyseResult:{} ######" + Color.END, analyseResult);
// String requestId = analyseResult.getRequestId();
// String keyId = AnalyseConstants.ANALYSE_REQUEST_ID.concat(requestId);
// if (!redisService.hasKey(keyId)) {
// log.error("picAnalyseRetNotify isTest: {}, NO keyId={}, REQUEST_ID={} in REDIS!", analyseResult.isTest(), keyId, requestId);
// if (!analyseResult.isTest()) {
// return;
// }
// //redisService.setCacheObject(keyId, "123456789");
// }
//
// String patrolTaskIdObj = redisService.getCacheObject(keyId);
// log.info("picAnalyseRetNotify keyId={}, requestId={}, patrolTaskIdObj={}", keyId, requestId, patrolTaskIdObj);
// analyseResult.setTaskPatrolId(patrolTaskIdObj);
// String keyFilterRequest = AnalyseConstants.ANALYSE_FILTER_REQUEST + requestId;
// log.info("picAnalyseRetNotify keyFilterRequest={}", keyFilterRequest);
// boolean bBigModelExecFlag = false;
// String[] algTypeList = {};
// AnalyseRequest analyseRequest = null;
// if (redisService.hasKey(keyFilterRequest)) { // 初筛结果
// analyseResult.setFilter("1"); // 设置初筛标志
// analyseRequest = (AnalyseRequest) redisService.redisTemplate.opsForValue().getAndDelete(keyFilterRequest);
// log.info("FILTER_RESULT picAnalyseRetNotify analyseRequest IN REDIS: {}", analyseRequest);
// AnalyseResPoint analyseResPoint = analyseResult.getResultList().get(0).getResults().get(0);
// if (analyseRequest != null && analyseRequest.getObjectList() != null && !analyseRequest.getObjectList().isEmpty()) {
// algTypeList = analyseRequest.getObjectList().get(0).getTypeList();
// }
//
// boolean bDefect = analyseResPoint.isDefect(); // code=2000代表初筛结果返回正常value=1代表有缺陷
// log.info("picAnalyseRetNotify FILTER bDefect={}, algorithmType={}", bDefect, analyseResPoint.getType());
// analyseResult.reloadReq(analyseRequest);//只有一个,analyseReq.getObjectList().get(0)
// /*
// *大致两种情况1. 算法是表计(meter); 2. 算法不是表计(meter)
// * 1. 算法不是表计(meter)
// * 1.1 初筛结果有缺陷继续发给大模型处理
// * 1.2 初筛结果无缺陷不用调用大模型流程就此结束
// * 2. 算法是表计(meter)
// * 不论初筛结果有没有缺陷都要继续调用大模型因为大模型调用了表计(meter)识别算法
// */
// final String algType = analyseRequest.getObjectList().get(0).getTypeList()[0];
// log.info("picAnalyseRetNotify algType IN REDIS: {}", algType);
// if (bDefect || (AlgConstants.METER.equals(algType)
// || AlgConstants.XB.equals(algType)
// || AlgConstants.INFRA_1800.equals(algType)
// || AlgConstants.INFRA_YU3.equals(algType)
// || AlgConstants.INFRA_CAMERA.equals(algType)
// || AlgConstants.INFRA_CAMERA_REVERSE.equals(algType))
// ) {
// /*
// * 先检查结果有缺无缺陷如果有缺陷不用判断算法直接调用大模型*
// * 如果无缺陷再去判断算法如果算法是meter就继续调用大模型*
// */
// analyseResult.setResult("0");
// bBigModelExecFlag = true;
//// log.info("picAnalyseRetNotify CALL BIG_MODEL REQUEST_ID={}", requestId);
//// ispAlgorithmRequestService.sendRequest(analyseRequest);
// } else {
// //初筛结果无缺陷并且非表计算法不用调用大模型流程就此结束
// log.info("picAnalyseRetNotify NO BIG_MODEL WOULD CALLED REQUEST_ID={}", requestId);
// }
// } else {
// // 大模型结果
// log.info("BIG_MODEL_RESULT CALLBACK picAnalyseRetNotify REQUEST_ID={}", requestId);
// analyseResult.setFilter("0"); // 设置大模型标志为0
// }
//
// //qinyl
// analysisLogService.log(new AnalyseLog(analyseResult.toString(), "1", analyseResult.getTaskPatrolId(), analyseResult.getFilter(), requestId));
// try {
// analysis(analyseResult);
// } catch (Exception e) {
// log.error("error", e);
// }
// //doAlgorithmAnalysis(analyseResult);
// //calculateProcess(analyseResult);
// log.info(Color.CYAN + "###### 分析算法模块返回的表计识别结果 end ######" + Color.END);
//
// if (bBigModelExecFlag) {
// log.info("picAnalyseRetNotify CALL BIG_MODEL REQUEST_ID={}", requestId);
//// ispAlgorithmRequestService.sendRequest(analyseRequest);
// analyseRemoteService.sendRequest(analyseRequest, algTypeList, false);
// }
// }
@Override
public void picAnalyseRetNotify(AnalyseResult analyseResult) {
log.info("Algorithm Result To Redis!");
redisService.redisTemplate.opsForList().rightPush(
AnalyseConstants.ALGORITHM_RESPONSE_QUEUE,
new Gson().toJson(analyseResult)
);
}
public void handleAlgorithmResult(final String analyseResultJson) {
AnalyseResult analyseResult = new Gson().fromJson(analyseResultJson, AnalyseResult.class);
log.info(Color.CYAN + "###### 分析算法模块返回的表计识别结果 start, analyseResult:{} ######" + Color.END, analyseResult);
String requestId = analyseResult.getRequestId();
String keyId = AnalyseConstants.ANALYSE_REQUEST_ID.concat(requestId);
@ -158,6 +253,7 @@ public class AnalysisServiceImpl implements IAnalysisService {
}
}
public synchronized void doAlgorithmAnalysis(AnalyseResult analyseResult) {
log.info("doAlgorithmAnalysis analyseResult: {}", analyseResult);
int size = 0;


+ 3
- 2
inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRemoteService.java View File

@ -45,9 +45,10 @@ public class AnalyseRemoteService {
analyseReq.setRequestId(requestId);
String taskSetKey = AnalyseConstants.ANALYSE_TASK_REQUEST + taskPatrolId;
String requestSetKey = AnalyseConstants.ANALYSE_REQUEST_ALG + requestId;
redisService.setCacheObject(AnalyseConstants.ANALYSE_REQUEST_ID.concat(requestId), taskPatrolId, 20L, TimeUnit.MINUTES);
final long requestTimeout = 720;
redisService.setCacheObject(AnalyseConstants.ANALYSE_REQUEST_ID.concat(requestId), taskPatrolId, requestTimeout, TimeUnit.MINUTES);
redisService.redisTemplate.opsForSet().add(taskSetKey, requestId);
redisService.expire(taskSetKey, 20L, TimeUnit.MINUTES);
redisService.expire(taskSetKey, requestTimeout, TimeUnit.MINUTES);
String requestUrl;
String analyseFilter = patrolTaskService.selectConfigByKey(AnalyseConstants.ANALYSE_IS_FILTER);


Loading…
Cancel
Save