Browse Source

/*针对金华站智能巡视任务出现的bug进行修改。*/

master
htjcAdmin 5 months ago
parent
commit
a547d2d206
15 changed files with 266 additions and 154 deletions
  1. +1
    -1
      inspect-ivs/src/main/java/com/inspect/ivs/service/IvsResourceRetryableDelegate.java
  2. +31
    -9
      inspect-job/src/main/java/com/inspect/job/task/JobMainTask.java
  3. +4
    -0
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/domain/AnalyseResult.java
  4. +3
    -1
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/IAnalyseRespService.java
  5. +8
    -7
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestRetryableConsumerManager.java
  6. +6
    -6
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseConsumerManager.java
  7. +0
    -48
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseProcessConsumer.java
  8. +74
    -0
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseRetryableConsumerManager.java
  9. +59
    -39
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalyseRespServiceImpl.java
  10. +15
    -16
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/RedisQueueRetryableConsumerAsync.java
  11. +5
    -0
      inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/AnalyseRequest.java
  12. +31
    -17
      inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRemoteService.java
  13. +12
    -5
      inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestDelayQueueListener.java
  14. +13
    -1
      inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestRetryableDelegate.java
  15. +4
    -4
      inspect-main/inspect-main-task/src/main/java/com/inspect/task/controller/PatrolTaskController.java

+ 1
- 1
inspect-ivs/src/main/java/com/inspect/ivs/service/IvsResourceRetryableDelegate.java View File

@ -45,7 +45,7 @@ import java.util.UUID;
public class IvsResourceRetryableDelegate { public class IvsResourceRetryableDelegate {
private static final Logger log = LoggerFactory.getLogger(IvsResourceRetryableDelegate.class); private static final Logger log = LoggerFactory.getLogger(IvsResourceRetryableDelegate.class);
private static final int RETRYABLE_MIN = 10;
private static final int RETRYABLE_MIN = 5;
private static final int RETRYABLE_MAX = 20; private static final int RETRYABLE_MAX = 20;


+ 31
- 9
inspect-job/src/main/java/com/inspect/job/task/JobMainTask.java View File

@ -498,15 +498,19 @@ public class JobMainTask {
+ "/" + DateUtils.getDayEven() + "/" + DateUtils.getDayEven()
+ "/" + taskExecRecord.getTaskCode() + "/" + taskExecRecord.getTaskCode()
+ "/"; + "/";
log.info("uuid: {}, basePath: {}", uuid, basePath);
log.info("PRE_POINT_EXEC patrolPointId: {}, actionId: {}, actionType: {}, deviceId: {}",
taskExecRecord.getOldTaskPatrolId(),
presetAction.getPresetActionId(),
presetAction.getActionType(),
presetPos.getPatrolPointId());
String taskPatrolId = taskExecRecord.getTaskPatrolId(); String taskPatrolId = taskExecRecord.getTaskPatrolId();
if (PresetAction.PHOTO.getCode().equals(presetAction.getActionType())) { if (PresetAction.PHOTO.getCode().equals(presetAction.getActionType())) {
final String chanType = presetPos.getChannelType(); final String chanType = presetPos.getChannelType();
fileTypes.append("ir".equals(chanType) ? "1" : "vl".equals(chanType) ? "2" : "").append(","); fileTypes.append("ir".equals(chanType) ? "1" : "vl".equals(chanType) ? "2" : "").append(",");
//boolean bOk = false; //boolean bOk = false;
try { try {
log.info("PHOTO_PRESET_TYPE uuid: {}, chanType: {}, patrolPointId: {}, channelCode: {}, videoNvrCode: {}",
uuid,
log.info("PHOTO_PRESET_TYPE taskPatrolId: {}, chanType: {}, patrolPointId: {}, channelCode: {}, videoNvrCode: {}",
taskExecRecord.getOldTaskPatrolId(),
chanType, chanType,
presetPos.getPatrolPointId(), presetPos.getPatrolPointId(),
presetPos.getChannelCode(), presetPos.getChannelCode(),
@ -621,8 +625,8 @@ public class JobMainTask {
// recordPersist(taskExecRecord, infoListSize, patrolTaskInfo, presetPos, fileTypes, filePaths); // recordPersist(taskExecRecord, infoListSize, patrolTaskInfo, presetPos, fileTypes, filePaths);
} else if (PresetAction.VIDEO.getCode().equals(presetAction.getActionType())) { } else if (PresetAction.VIDEO.getCode().equals(presetAction.getActionType())) {
log.info("VIDEO_PRESET_TYPE uuid: {}, chanType: {}, patrolPointId: {}, channelCode: {}, videoNvrCode: {}",
uuid,
log.info("VIDEO_PRESET_TYPE taskPatrolId: {}, chanType: {}, patrolPointId: {}, channelCode: {}, videoNvrCode: {}",
taskExecRecord.getOldTaskPatrolId(),
presetPos.getChannelType(), presetPos.getChannelType(),
presetPos.getPatrolPointId(), presetPos.getPatrolPointId(),
presetPos.getChannelCode(), presetPos.getChannelCode(),
@ -660,6 +664,13 @@ public class JobMainTask {
} }
} }
}, 0, 1000L); }, 0, 1000L);
} else {
log.info("UNKNOWN_PRESET_TYPE taskPatrolId: {}, chanType: {}, patrolPointId: {}, channelCode: {}, videoNvrCode: {}",
taskExecRecord.getOldTaskPatrolId(),
presetPos.getChannelType(),
presetPos.getPatrolPointId(),
presetPos.getChannelCode(),
presetPos.getVideoNvrCode());
} }
return taskExecRecord; return taskExecRecord;
@ -1300,14 +1311,18 @@ public class JobMainTask {
} }
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
TaskStatusManager.remove(patrolTaskExecRecord.getOldTaskPatrolId());
log.info("CompletableFuture Break Join: taskPatrolId: {}", patrolTaskExecRecord.getOldTaskPatrolId());
consummateCompletedTask(patrolTaskExecRecord);
} }
private void handlePrePointBatch(final int threadCnt, final PatrolTaskExecRecord patrolTaskExecRecord, final List<PatrolTaskInfo> taskInfoBatch) { private void handlePrePointBatch(final int threadCnt, final PatrolTaskExecRecord patrolTaskExecRecord, final List<PatrolTaskInfo> taskInfoBatch) {
asyncTaskPatrolPointCnt.getAndAdd(taskInfoBatch.size()); asyncTaskPatrolPointCnt.getAndAdd(taskInfoBatch.size());
log.info("handlePrePointBatch threadCnt: {}, asyncTaskPatrolPointCnt: {}, batch size: {}, devNo: {}, taskId: {}",
threadCnt, asyncTaskPatrolPointCnt.get(), taskInfoBatch.size(), patrolTaskExecRecord.getDevNo(), patrolTaskExecRecord.getTaskId());
log.info("HANDLE_PRESET_POINT_BATCH patrolPointId: {}, threadCnt: {}, asyncTaskPatrolPointCnt: {}, batch size: {}, devNo: {}, taskId: {}",
patrolTaskExecRecord.getOldTaskPatrolId(),
threadCnt,
asyncTaskPatrolPointCnt.get(),
taskInfoBatch.size(),
patrolTaskExecRecord.getDevNo(),
patrolTaskExecRecord.getTaskId());
final String taskPatrolId = patrolTaskExecRecord.getOldTaskPatrolId(); final String taskPatrolId = patrolTaskExecRecord.getOldTaskPatrolId();
StatusMonitor monitor = TaskStatusManager.get(patrolTaskExecRecord.getOldTaskPatrolId()); StatusMonitor monitor = TaskStatusManager.get(patrolTaskExecRecord.getOldTaskPatrolId());
@ -1345,6 +1360,13 @@ public class JobMainTask {
} }
} }
private void consummateCompletedTask(final PatrolTaskExecRecord patrolTaskExecRecord) {
TaskStatusManager.remove(patrolTaskExecRecord.getOldTaskPatrolId());
boolean isConsummate = redisService.deleteObjectOfTask(RedisConst.TASK_CURRENT_CODE, patrolTaskExecRecord.getTaskCode());
log.info("CONSUMMATE_COMPLETED_TASK taskPatrolId: {}, isConsummate: {}",
patrolTaskExecRecord.getOldTaskPatrolId(),
isConsummate ? "false" : "true");
}
public List<List<PatrolTaskInfo>> optimizedGroup(List<PatrolTaskInfo> list) { public List<List<PatrolTaskInfo>> optimizedGroup(List<PatrolTaskInfo> list) {
Map<String, List<PatrolTaskInfo>> deviceMap = new HashMap<>(); Map<String, List<PatrolTaskInfo>> deviceMap = new HashMap<>();


+ 4
- 0
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/domain/AnalyseResult.java View File

@ -4,6 +4,7 @@ package com.inspect.analysis.domain;
import com.inspect.partrolresult.domain.AnalyseReqItem; import com.inspect.partrolresult.domain.AnalyseReqItem;
import com.inspect.partrolresult.domain.AnalyseRequest; import com.inspect.partrolresult.domain.AnalyseRequest;
import lombok.Getter; import lombok.Getter;
import lombok.Setter;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
@ -11,6 +12,7 @@ import java.util.List;
import java.util.Objects; import java.util.Objects;
@Getter @Getter
@Setter
public class AnalyseResult implements Serializable { public class AnalyseResult implements Serializable {
private String requestId; private String requestId;
private List<AnalyseResItem> resultList; private List<AnalyseResItem> resultList;
@ -20,6 +22,7 @@ public class AnalyseResult implements Serializable {
private String result = "1"; private String result = "1";
private boolean isTest = false; private boolean isTest = false;
private int totalNumber; private int totalNumber;
private boolean isCompensate = false; // false-normal; true-compensate
public void setResultsList(List<AnalyseResItem> resultsList) { public void setResultsList(List<AnalyseResItem> resultsList) {
this.resultsList = resultsList; this.resultsList = resultsList;
@ -98,6 +101,7 @@ public class AnalyseResult implements Serializable {
", resultList=" + resultList + ", resultList=" + resultList +
", resultsList=" + resultsList + ", resultsList=" + resultsList +
", taskPatrolId='" + taskPatrolId + '\'' + ", taskPatrolId='" + taskPatrolId + '\'' +
", isCompensate='" + isCompensate + '\'' +
", filter='" + filter + '\'' + ", filter='" + filter + '\'' +
", result='" + result + '\'' + ", result='" + result + '\'' +
'}'; '}';


+ 3
- 1
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/IAnalyseRespService.java View File

@ -2,8 +2,10 @@ package com.inspect.analysis.service;
import com.inspect.analysis.domain.AnalyseResult; import com.inspect.analysis.domain.AnalyseResult;
import java.io.IOException;
public interface IAnalyseRespService { public interface IAnalyseRespService {
void picAnalyseRetNotify(AnalyseResult analyseResult); void picAnalyseRetNotify(AnalyseResult analyseResult);
void handleAlgorithmResult(final AnalyseResult analyseResult);
void handleAlgorithmResult(final AnalyseResult analyseResult) throws IOException;
} }

+ 8
- 7
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestRetryableConsumerManager.java View File

@ -5,6 +5,7 @@ import com.inspect.analysis.constant.AnalyseConstants;
import com.inspect.base.redis.service.RedisService; import com.inspect.base.redis.service.RedisService;
import com.inspect.partrolresult.domain.AnalyseRequest; import com.inspect.partrolresult.domain.AnalyseRequest;
import com.inspect.partrolresult.service.AnalyseRemoteService; import com.inspect.partrolresult.service.AnalyseRemoteService;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
@ -20,15 +21,15 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@Slf4j
@Component @Component
public class AlgorithmRequestRetryableConsumerManager { public class AlgorithmRequestRetryableConsumerManager {
private static final Logger log = LoggerFactory.getLogger(AlgorithmRequestRetryableConsumerManager.class);
@Value("${server.port}") @Value("${server.port}")
private String port; private String port;
@Value("${task.redis.request.interval-ms:2000}")
private long intervalMs;
@Value("${task.redis.request.permits-per-second:0.5}")
private double permitsPerSecond;
@Value("${task.redis.request.idle-sleep-ms:500}") @Value("${task.redis.request.idle-sleep-ms:500}")
private long idleSleepMs; private long idleSleepMs;
@ -52,7 +53,7 @@ public class AlgorithmRequestRetryableConsumerManager {
redisService.redisTemplate, redisService.redisTemplate,
AnalyseConstants.ALGORITHM_REQUEST_QUEUE, AnalyseConstants.ALGORITHM_REQUEST_QUEUE,
AnalyseRequest.class, AnalyseRequest.class,
intervalMs,
permitsPerSecond,
idleSleepMs, idleSleepMs,
request -> executor.submit(() -> processRequest(request)) // 交给线程池处理 request -> executor.submit(() -> processRequest(request)) // 交给线程池处理
); );
@ -62,18 +63,18 @@ public class AlgorithmRequestRetryableConsumerManager {
private void processRequest(AnalyseRequest request) { private void processRequest(AnalyseRequest request) {
try { try {
log.info("AlgorithmRequestRetryableConsumerManager Processing queueSize: {}, request: {}", getQueueSize(), request);
log.info("ALGORITHM_REQUEST_RETRYABLE_CONSUME_MANAGE queueSize: {}, request: {}", getQueueSize(), request);
analyseRemoteService.sendRequest(request, request.getTypeList(), request.isFilter()); analyseRemoteService.sendRequest(request, request.getTypeList(), request.isFilter());
} catch (IOException e) { } catch (IOException e) {
// 可以记录失败次数避免无限重试 // 可以记录失败次数避免无限重试
int retryCount = Optional.of(request.getRetryCount()).orElse(0); int retryCount = Optional.of(request.getRetryCount()).orElse(0);
log.info("AlgorithmRequestRetryableConsumerManager Failed, retryCount: {}, reqeust: {}, msg: {}", request.getRetryCount(), request, e.getMessage());
log.info("ALGORITHM_REQUEST_RETRYABLE_CONSUME_MANAGE failed, retryCount: {}, reqeust: {}, msg: {}", request.getRetryCount(), request, e.getMessage());
final int TRIES_MAX = testMode ? 50 : 3; final int TRIES_MAX = testMode ? 50 : 3;
if (retryCount < TRIES_MAX) { if (retryCount < TRIES_MAX) {
request.setRetryCount(retryCount + 1); request.setRetryCount(retryCount + 1);
redisService.redisTemplate.opsForList().leftPush(AnalyseConstants.ALGORITHM_REQUEST_QUEUE, new Gson().toJson(request)); redisService.redisTemplate.opsForList().leftPush(AnalyseConstants.ALGORITHM_REQUEST_QUEUE, new Gson().toJson(request));
} else { } else {
log.info("AlgorithmRequestRetryableConsumerManager Compensate, retryCount: {}, request: {}", retryCount, request);
log.info("ALGORITHM_REQUEST_RETRYABLE_CONSUME_MANAGE compensate, retryCount: {}, request: {}", retryCount, request);
analyseRemoteService.sendCompensateRequest(request); analyseRemoteService.sendCompensateRequest(request);
} }
} }


+ 6
- 6
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseConsumerManager.java View File

@ -5,17 +5,17 @@ import com.inspect.analysis.domain.AnalyseResult;
import com.inspect.analysis.service.IAnalyseRespService; import com.inspect.analysis.service.IAnalyseRespService;
import com.inspect.base.core.constant.Color; import com.inspect.base.core.constant.Color;
import com.inspect.base.redis.service.RedisService; import com.inspect.base.redis.service.RedisService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.IOException;
@Component
@Slf4j
//@Component
public class AlgorithmResponseConsumerManager { public class AlgorithmResponseConsumerManager {
private static final Logger log = LoggerFactory.getLogger(AlgorithmResponseConsumerManager.class);
private RedisQueueConsumerAsync<AnalyseResult> consumer; private RedisQueueConsumerAsync<AnalyseResult> consumer;
@ -37,13 +37,13 @@ public class AlgorithmResponseConsumerManager {
try { try {
log.info("AlgorithmResponseConsumerManager queueSize: {}, response: {}", getQueueSize(), response); log.info("AlgorithmResponseConsumerManager queueSize: {}, response: {}", getQueueSize(), response);
analysisService.handleAlgorithmResult(response); analysisService.handleAlgorithmResult(response);
} catch (Exception e) {
} catch (IOException e) {
log.info(Color.RED + "AlgorithmResponseConsumerManager error queueSize: {}, request: {}" + Color.END, getQueueSize(), response); log.info(Color.RED + "AlgorithmResponseConsumerManager error queueSize: {}, request: {}" + Color.END, getQueueSize(), response);
} }
} }
); );
consumer.start(); // 应用启动时即开始消费
consumer.start();
} }
@PreDestroy @PreDestroy


+ 0
- 48
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseProcessConsumer.java View File

@ -1,48 +0,0 @@
package com.inspect.analysis.service.impl;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.inspect.analysis.constant.AnalyseConstants;
import com.inspect.analysis.domain.AnalyseResult;
import com.inspect.analysis.service.IAnalyseRespService;
import com.inspect.base.core.constant.Color;
import com.inspect.base.redis.service.RedisService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import javax.annotation.Resource;
//@Component
public class AlgorithmResponseProcessConsumer {
private final Logger logger = LoggerFactory.getLogger(AlgorithmResponseProcessConsumer.class);
@Resource
private RedisService redisService;
@Resource
private IAnalyseRespService analysisService;
//@Scheduled(fixedDelay = 1000)
@Scheduled(fixedDelayString = "${task.scheduler.response.delay-ms:3000}")
public void pollAndProcess() {
//logger.info(Color.CYAN + "AlgorithmResponseProcessConsumerTracer" + Color.END);
String responseDataInRedis;
RedisTemplate<String, String> redisTemplate = redisService.redisTemplate;
while ((responseDataInRedis = redisTemplate.opsForList().leftPop(AnalyseConstants.ALGORITHM_RESPONSE_QUEUE)) != null) {
try {
logger.info(Color.CYAN + "AlgorithmResponseProcessConsumerTracer queueSize: {}, responseData: {}" + Color.END, getQueueSize(), responseDataInRedis);
// AnalyseRequest analyseRequest = new Gson().fromJson(responseDataInRedis, AnalyseRequest.class);
// analyseRemoteService.sendRequest(analyseRequest, analyseRequest.getTypeList(), analyseRequest.isFilter());
AnalyseResult analyseResult = new Gson().fromJson(responseDataInRedis, AnalyseResult.class);
analysisService.handleAlgorithmResult(analyseResult);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
public long getQueueSize() {
return redisService.redisTemplate.opsForList().size(AnalyseConstants.ALGORITHM_RESPONSE_QUEUE);
}
}

+ 74
- 0
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmResponseRetryableConsumerManager.java View File

@ -0,0 +1,74 @@
package com.inspect.analysis.service.impl;
import com.inspect.analysis.constant.AnalyseConstants;
import com.inspect.analysis.domain.AnalyseResult;
import com.inspect.analysis.service.IAnalyseRespService;
import com.inspect.base.core.constant.Color;
import com.inspect.base.redis.service.RedisService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
@Component
public class AlgorithmResponseRetryableConsumerManager {
private RedisQueueRetryableConsumerAsync<AnalyseResult> consumer;
@Resource
private RedisService redisService;
@Resource
private IAnalyseRespService analysisService;
private final ExecutorService executor = Executors.newFixedThreadPool(10);
@PostConstruct
public void initConsumer() {
consumer = new RedisQueueRetryableConsumerAsync<AnalyseResult>(
redisService.redisTemplate,
AnalyseConstants.ALGORITHM_RESPONSE_QUEUE,
AnalyseResult.class,
1.0,
500,
response -> executor.submit(() -> {
try {
log.info("ALGORITHM_RESPONSE_RETRYABLE_CONSUME_MANAGE queueSize: {}, reqeustId: {}, patrolId: {}",
getQueueSize(),
response.getRequestId(),
response.getTaskPatrolId());
long start = System.currentTimeMillis();
analysisService.handleAlgorithmResult(response);
log.info("ALGORITHM_RESPONSE_RETRYABLE_CONSUME_MANAGE COST: {}, reqeustId: {}, patrolId: {}",
System.currentTimeMillis() - start,
response.getRequestId(),
response.getTaskPatrolId());
} catch (IOException e) {
log.info(Color.RED + "ALGORITHM_RESPONSE_RETRYABLE_CONSUME_MANAGE error queueSize: {}, request: {}, Msg: {}" + Color.END,
getQueueSize(),
response,
e.getMessage());
}
})
);
consumer.start();
}
@PreDestroy
public void stopConsumer() {
if (consumer != null) {
consumer.stop();
}
}
public long getQueueSize() {
return redisService.redisTemplate.opsForList().size(AnalyseConstants.ALGORITHM_RESPONSE_QUEUE);
}
}

+ 59
- 39
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalyseRespServiceImpl.java View File

@ -81,21 +81,30 @@ public class AnalyseRespServiceImpl implements IAnalyseRespService {
@Override @Override
public void picAnalyseRetNotify(AnalyseResult analyseResult) { public void picAnalyseRetNotify(AnalyseResult analyseResult) {
log.info("Algorithm Result To Redis!");
log.info("RESULT_TO_ALGORITHM_RESPONSE_QUEUE: patrolPointId: {}, isCompensate: {}, requestId: {}, RESULT: {}",
analyseResult.getTaskPatrolId(),
analyseResult.isCompensate(),
analyseResult.getRequestId(),
analyseResult
);
redisService.redisTemplate.opsForList().rightPush( redisService.redisTemplate.opsForList().rightPush(
AnalyseConstants.ALGORITHM_RESPONSE_QUEUE, AnalyseConstants.ALGORITHM_RESPONSE_QUEUE,
new Gson().toJson(analyseResult) new Gson().toJson(analyseResult)
); );
} }
public void handleAlgorithmResult(final AnalyseResult analyseResult) {
String logLabel = UUID.randomUUID().toString().trim().replaceAll(StringUtils.DASH, StringUtils.EMPTY);
log.info(Color.CYAN + "ALGO_RES_UUID: {}, ###### 分析算法模块返回的表计识别结果 start, analyseResult:{} ######" + Color.END, logLabel, analyseResult);
public void handleAlgorithmResult(final AnalyseResult analyseResult) throws IOException {
//String logLabel = UUID.randomUUID().toString().trim().replaceAll(StringUtils.DASH, StringUtils.EMPTY);
String requestId = analyseResult.getRequestId(); String requestId = analyseResult.getRequestId();
log.info(Color.GREEN + "HANDLE_ALGORITHM_RESULT_S patrolPointId: {}, filter: {}, requestId: {}, analyseResult:{}" + Color.END,
analyseResult.getTaskPatrolId(),
analyseResult.getFilter(),
requestId,
analyseResult);
String keyId = AnalyseConstants.ANALYSE_REQUEST_ID.concat(requestId); String keyId = AnalyseConstants.ANALYSE_REQUEST_ID.concat(requestId);
if (!redisService.hasKey(keyId)) { if (!redisService.hasKey(keyId)) {
log.error("ALGO_RES_UUID: {}, picAnalyseRetNotify isTest: {}, NO keyId={}, REQUEST_ID={} in REDIS!", logLabel, analyseResult.isTest(), keyId, requestId);
log.error("ALGO_RES_UUID: {}, picAnalyseRetNotify isTest: {}, NO keyId={}, REQUEST_ID={} in REDIS!", requestId, analyseResult.isTest(), keyId, requestId);
if (!analyseResult.isTest()) { if (!analyseResult.isTest()) {
return; return;
} }
@ -103,31 +112,33 @@ public class AnalyseRespServiceImpl implements IAnalyseRespService {
} }
String patrolTaskIdObj = redisService.getCacheObject(keyId); String patrolTaskIdObj = redisService.getCacheObject(keyId);
log.info("ALGO_RES_UUID: {}, picAnalyseRetNotify keyId={}, requestId={}, patrolTaskIdObj={}", logLabel, keyId, requestId, patrolTaskIdObj);
log.info("ALGO_RES_UUID: {}, picAnalyseRetNotify keyId={}, requestId={}, patrolTaskIdObj={}", requestId, keyId, requestId, patrolTaskIdObj);
analyseResult.setTaskPatrolId(patrolTaskIdObj); analyseResult.setTaskPatrolId(patrolTaskIdObj);
final String filterRequestRedisKey = AnalyseConstants.ANALYSE_FILTER_REQUEST + requestId; final String filterRequestRedisKey = AnalyseConstants.ANALYSE_FILTER_REQUEST + requestId;
final String bigModelRequestRedisKey = AnalyseConstants.ANALYSE_AI_REQUEST + requestId; final String bigModelRequestRedisKey = AnalyseConstants.ANALYSE_AI_REQUEST + requestId;
log.info("ALGO_RES_UUID: {}, picAnalyseRetNotify filterRequestRedisKey={}, bigModelRequestRedisKey={}", logLabel, filterRequestRedisKey, bigModelRequestRedisKey);
log.info("ALGO_RES_UUID: {}, picAnalyseRetNotify filterRequestRedisKey={}, bigModelRequestRedisKey={}", requestId, filterRequestRedisKey, bigModelRequestRedisKey);
boolean bBigModelExecFlag = false; boolean bBigModelExecFlag = false;
String[] algTypeList = {}; String[] algTypeList = {};
AnalyseRequest analyseRequest = null; AnalyseRequest analyseRequest = null;
if (redisService.hasKey(filterRequestRedisKey)) { // 初筛结果 if (redisService.hasKey(filterRequestRedisKey)) { // 初筛结果
analyseResult.setFilter("1"); // 设置初筛标志 analyseResult.setFilter("1"); // 设置初筛标志
analyseRequest = (AnalyseRequest) redisService.redisTemplate.opsForValue().getAndDelete(filterRequestRedisKey); analyseRequest = (AnalyseRequest) redisService.redisTemplate.opsForValue().getAndDelete(filterRequestRedisKey);
log.info("ALGO_RES_UUID: {}, FILTER_RESULT picAnalyseRetNotify analyseRequest IN REDIS: {}", logLabel, analyseRequest);
boolean isRemoved = delayQueueService.removeRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseRequest);
log.info("Redisson Removed Result: {}, analyseRequest: {}", isRemoved? "success" : "fail", analyseRequest);
AnalyseResPoint analyseResPoint = analyseResult.getResultList().get(0).getResults().get(0);
if (analyseRequest != null && analyseRequest.getObjectList() != null && !analyseRequest.getObjectList().isEmpty()) { if (analyseRequest != null && analyseRequest.getObjectList() != null && !analyseRequest.getObjectList().isEmpty()) {
log.info("ALGORITHM_FILTER_RESULT taskPatrolId: {}, requestId: {}, DATA IN REDIS: {}",
analyseRequest.getTaskPatrolId(),
requestId,
analyseRequest);
boolean isRemoved = delayQueueService.removeRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseRequest);
log.info("ALGORITHM_FILTER_RESULT Redisson Removed requestId: {}, Result: {}, analyseRequest: {}", requestId, isRemoved ? "success" : "fail", analyseRequest);
algTypeList = analyseRequest.getObjectList().get(0).getTypeList(); algTypeList = analyseRequest.getObjectList().get(0).getTypeList();
analyseResult.reloadReq(analyseRequest);//只有一个,analyseReq.getObjectList().get(0) analyseResult.reloadReq(analyseRequest);//只有一个,analyseReq.getObjectList().get(0)
log.info("ALGO_RES_UUID: {}, FILTER_RESULT picAnalyseRetNotify algTypeList: {}, analyseResult reload: {}", logLabel, algTypeList, analyseResult);
log.info("ALGORITHM_FILTER_RESULT RELOAD requestId: {}, algTypeList: {}, analyseResult: {}",
requestId,
algTypeList,
analyseResult);
} }
boolean bDefect = analyseResPoint.isDefect(); // code=2000代表初筛结果返回正常value=1代表有缺陷
log.info("ALGO_RES_UUID: {}, picAnalyseRetNotify FILTER bDefect={}, algorithmType={}", logLabel, bDefect, analyseResPoint.getType());
/* /*
*大致两种情况1. 算法是表计(meter); 2. 算法不是表计(meter) *大致两种情况1. 算法是表计(meter); 2. 算法不是表计(meter)
* 1. 算法不是表计(meter) * 1. 算法不是表计(meter)
@ -136,8 +147,15 @@ public class AnalyseRespServiceImpl implements IAnalyseRespService {
* 2. 算法是表计(meter) * 2. 算法是表计(meter)
* 不论初筛结果有没有缺陷都要继续调用大模型因为大模型调用了表计(meter)识别算法 * 不论初筛结果有没有缺陷都要继续调用大模型因为大模型调用了表计(meter)识别算法
*/ */
AnalyseResPoint analyseResPoint = analyseResult.getResultList().get(0).getResults().get(0);
boolean bDefect = analyseResPoint.isDefect(); // code=2000代表初筛结果返回正常value=1代表有缺陷
final String algType = analyseRequest.getObjectList().get(0).getTypeList()[0]; final String algType = analyseRequest.getObjectList().get(0).getTypeList()[0];
log.info("ALGO_RES_UUID: {}, picAnalyseRetNotify algType IN REDIS: {}", logLabel, algType);
log.info("ALGORITHM_FILTER_RESULT CHECK DEFECT requestId: {}, bDefect: {}, algType: {}, algTypeInRedis: {}",
requestId,
bDefect,
analyseResPoint.getType(),
algType);
if (bDefect || (AlgConstants.METER.equals(algType) if (bDefect || (AlgConstants.METER.equals(algType)
|| AlgConstants.XB.equals(algType) || AlgConstants.XB.equals(algType)
|| AlgConstants.INFRA_1800.equals(algType) || AlgConstants.INFRA_1800.equals(algType)
@ -155,18 +173,16 @@ public class AnalyseRespServiceImpl implements IAnalyseRespService {
// ispAlgorithmRequestService.sendRequest(analyseRequest); // ispAlgorithmRequestService.sendRequest(analyseRequest);
} else { } else {
//初筛结果无缺陷并且非表计算法不用调用大模型流程就此结束 //初筛结果无缺陷并且非表计算法不用调用大模型流程就此结束
log.info("ALGO_RES_UUID: {}, picAnalyseRetNotify NO BIG_MODEL WOULD CALLED REQUEST_ID={}", logLabel, requestId);
log.info("ALGORITHM_FILTER_RESULT CALL_NO_BIG_MODEL requestId: {}", requestId);
} }
} else if (redisService.hasKey(bigModelRequestRedisKey)) { // 大模型结果 } else if (redisService.hasKey(bigModelRequestRedisKey)) { // 大模型结果
analyseRequest = (AnalyseRequest) redisService.redisTemplate.opsForValue().getAndDelete(bigModelRequestRedisKey); analyseRequest = (AnalyseRequest) redisService.redisTemplate.opsForValue().getAndDelete(bigModelRequestRedisKey);
log.info("ALGO_RES_UUID: {}, BIG_MODEL_RESULT picAnalyseRetNotify analyseRequest IN REDIS: {}", logLabel, analyseRequest);
if(analyseRequest != null) {
if (analyseRequest != null) {
analyseResult.reloadReq(analyseRequest); analyseResult.reloadReq(analyseRequest);
log.info("ALGO_RES_UUID: {}, BIG_MODEL_RESULT picAnalyseRetNotify type: {}, analyseResult reload: {}",
logLabel,
analyseResult.getResultList().get(0).getResults().get(0).getType(),
analyseResult
);
log.info("ALGORITHM_BIG_MODEL_RESULT taskPatrolId: {}, requestId: {}, DATA IN REDIS: {}",
analyseRequest.getTaskPatrolId(),
requestId,
analyseRequest);
// 发给光明大模型处理 // 发给光明大模型处理
redisService.redisTemplate.opsForList().rightPush( redisService.redisTemplate.opsForList().rightPush(
@ -177,8 +193,10 @@ public class AnalyseRespServiceImpl implements IAnalyseRespService {
analyseResult.setFilter("0"); // 设置大模型标志为0 analyseResult.setFilter("0"); // 设置大模型标志为0
} else { } else {
log.info("ALGO_RES_UUID: {}, UNKNOWN_RESULT, OR TIMEOUT BEYOND A DAY FROM REMOTE ALGORITHM, requestId: {}, analyseResult: {}",
logLabel, requestId, analyseResult);
log.info("ALGORITHM_UNKNOWN_RESULT taskPatrolId: {}, requestId: {}, result: {}",
analyseResult.getTaskPatrolId(),
requestId,
analyseResult);
return; return;
} }
@ -191,20 +209,22 @@ public class AnalyseRespServiceImpl implements IAnalyseRespService {
} }
//doAlgorithmAnalysis(analyseResult); //doAlgorithmAnalysis(analyseResult);
//calculateProcess(analyseResult); //calculateProcess(analyseResult);
log.info(Color.CYAN + "ALGO_RES_UUID: {}, ###### 分析算法模块返回的表计识别结果 end ######" + Color.END, logLabel);
//log.info(Color.CYAN + "ALGO_RES_UUID: {}, ###### 分析算法模块返回的表计识别结果 end ######" + Color.END, requestId);
if (bBigModelExecFlag) { if (bBigModelExecFlag) {
// ispAlgorithmRequestService.sendRequest(analyseRequest);
log.info("ALGO_RES_UUID: {}, picAnalyseRetNotify CALL BIG_MODEL REQUEST_ID={}", logLabel, requestId);
// final long requestTimeout = 1L;
// final String analyzeBigModelRequestIdRedisKey = AnalyseConstants.ANALYSE_AI_REQUEST.concat(requestId);
// redisService.setCacheObject(analyzeBigModelRequestIdRedisKey, analyseRequest.clone(), requestTimeout, TimeUnit.DAYS);
log.info("ALGORITHM_RESULT CALL_BIG_MODEL , taskPatrolId: {}, requestId: {}", analyseResult.getTaskPatrolId(), requestId);
try { try {
analyseRequest.setFilter(false);
analyseRemoteService.sendRequest(analyseRequest, algTypeList, Boolean.FALSE); analyseRemoteService.sendRequest(analyseRequest, algTypeList, Boolean.FALSE);
} catch (IOException e) { } catch (IOException e) {
log.error("大模型算法调用异常: ", e);
log.info("ALGORITHM_RESULT BIG_MODEL_CALL Exception requestId: {}, Msg: {}", requestId, e.getMessage());
throw e;
} }
} }
log.info(Color.GREEN + "HANDLE_ALGORITHM_RESULT_E patrolPointId: {}, requestId: {}" + Color.END,
analyseResult.getTaskPatrolId(),
requestId);
} }
@ -769,7 +789,7 @@ public class AnalyseRespServiceImpl implements IAnalyseRespService {
} }
// 计算初筛算法的进度 // 计算初筛算法的进度
if("1".equals(analyseResult.getFilter())) {
if ("1".equals(analyseResult.getFilter())) {
calcRemoteAlgorithmProgress(analyseResult.getTotalNumber(), analyseResult.getTaskPatrolId()); calcRemoteAlgorithmProgress(analyseResult.getTotalNumber(), analyseResult.getTaskPatrolId());
} }
@ -831,10 +851,10 @@ public class AnalyseRespServiceImpl implements IAnalyseRespService {
PatrolTaskStatus patrolTaskStatus = new PatrolTaskStatus(); PatrolTaskStatus patrolTaskStatus = new PatrolTaskStatus();
patrolTaskStatus.setTaskPatrolledId(taskPatrolledId); patrolTaskStatus.setTaskPatrolledId(taskPatrolledId);
List<PatrolTaskStatus> patrolTaskStatusList = patrolTaskStatusService.selectPatrolTaskStatusList(patrolTaskStatus); List<PatrolTaskStatus> patrolTaskStatusList = patrolTaskStatusService.selectPatrolTaskStatusList(patrolTaskStatus);
if(!patrolTaskStatusList.isEmpty()) {
if (!patrolTaskStatusList.isEmpty()) {
patrolTaskStatus = patrolTaskStatusList.get(0); patrolTaskStatus = patrolTaskStatusList.get(0);
patrolTaskStatus.setTaskEstimatedTime(algorithmProgress); patrolTaskStatus.setTaskEstimatedTime(algorithmProgress);
if("100.0".equals(algorithmProgress)) {
if ("100.0".equals(algorithmProgress)) {
patrolTaskStatus.setEndTime(DateUtil.formatDateTime(new Date())); patrolTaskStatus.setEndTime(DateUtil.formatDateTime(new Date()));
log.info(Color.GREEN + "DONE calcRemoteAlgorithmProgress: curNumber: {}, totalNumer: {}, algorithmProgress: {}, taskPatrolledId: {}, db-taskProgress: {}" + Color.END, log.info(Color.GREEN + "DONE calcRemoteAlgorithmProgress: curNumber: {}, totalNumer: {}, algorithmProgress: {}, taskPatrolledId: {}, db-taskProgress: {}" + Color.END,
resultAnalysisList.size(), totalNumber, algorithmProgress, taskPatrolledId, patrolTaskStatus.getTaskProgress()); resultAnalysisList.size(), totalNumber, algorithmProgress, taskPatrolledId, patrolTaskStatus.getTaskProgress());
@ -845,7 +865,7 @@ public class AnalyseRespServiceImpl implements IAnalyseRespService {
log.info(Color.GREEN + "RUNNING calcRemoteAlgorithmProgress: curNumber: {}, totalNumer: {}, algorithmProgress: {}, taskPatrolledId: {}, status: {}" + Color.END, log.info(Color.GREEN + "RUNNING calcRemoteAlgorithmProgress: curNumber: {}, totalNumer: {}, algorithmProgress: {}, taskPatrolledId: {}, status: {}" + Color.END,
resultAnalysisList.size(), totalNumber, algorithmProgress, taskPatrolledId, patrolTaskStatus.getTaskState()); resultAnalysisList.size(), totalNumber, algorithmProgress, taskPatrolledId, patrolTaskStatus.getTaskState());
if(TaskStatus.RUNNING.getCode().equals(patrolTaskStatus.getTaskState())) {
if (TaskStatus.RUNNING.getCode().equals(patrolTaskStatus.getTaskState())) {
patrolTaskStatus.setTaskState(TaskStatus.RUNNING.getCode()); patrolTaskStatus.setTaskState(TaskStatus.RUNNING.getCode());
patrolTaskStatusService.updatePatrolTaskStatus(patrolTaskStatus); patrolTaskStatusService.updatePatrolTaskStatus(patrolTaskStatus);
} }


+ 15
- 16
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/RedisQueueRetryableConsumerAsync.java View File

@ -1,32 +1,31 @@
package com.inspect.analysis.service.impl; package com.inspect.analysis.service.impl;
import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.google.common.util.concurrent.RateLimiter;
import com.inspect.analysis.service.RetryableRequest; import com.inspect.analysis.service.RetryableRequest;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import java.time.Duration; import java.time.Duration;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.function.Consumer; import java.util.function.Consumer;
@Slf4j
public class RedisQueueRetryableConsumerAsync<T> implements Runnable { public class RedisQueueRetryableConsumerAsync<T> implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(RedisQueueRetryableConsumerAsync.class);
private final RedisTemplate<String, String> redisTemplate; private final RedisTemplate<String, String> redisTemplate;
private final Gson gson = new Gson(); private final Gson gson = new Gson();
private final String queueKey; private final String queueKey;
private final Class<T> clazz; private final Class<T> clazz;
private final long intervalMs;
private final long idleSleepMs; private final long idleSleepMs;
private final Consumer<T> handler; private final Consumer<T> handler;
private volatile boolean running = false; private volatile boolean running = false;
//private final ExecutorService executor = Executors.newFixedThreadPool(10);
private final RateLimiter rateLimiter;
public ThreadPoolExecutor getExecutor() { public ThreadPoolExecutor getExecutor() {
return executor; return executor;
@ -35,7 +34,8 @@ public class RedisQueueRetryableConsumerAsync<T> implements Runnable {
private final ThreadPoolExecutor executor = new ThreadPoolExecutor( private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
4, // core threads 4, // core threads
10, // max threads 10, // max threads
60, TimeUnit.SECONDS,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100), // 队列容量 new ArrayBlockingQueue<>(100), // 队列容量
new ThreadPoolExecutor.CallerRunsPolicy() // 背压策略调用者线程执行任务 new ThreadPoolExecutor.CallerRunsPolicy() // 背压策略调用者线程执行任务
); );
@ -43,15 +43,15 @@ public class RedisQueueRetryableConsumerAsync<T> implements Runnable {
public RedisQueueRetryableConsumerAsync(RedisTemplate<String, String> redisTemplate, public RedisQueueRetryableConsumerAsync(RedisTemplate<String, String> redisTemplate,
String queueKey, String queueKey,
Class<T> clazz, Class<T> clazz,
long intervalMs,
double permitsPerSecond, // 每秒许可数控制速率
long idleSleepMs, long idleSleepMs,
Consumer<T> handler) { Consumer<T> handler) {
this.redisTemplate = redisTemplate; this.redisTemplate = redisTemplate;
this.queueKey = queueKey; this.queueKey = queueKey;
this.clazz = clazz; this.clazz = clazz;
this.intervalMs = intervalMs;
this.idleSleepMs = idleSleepMs; this.idleSleepMs = idleSleepMs;
this.handler = handler; this.handler = handler;
this.rateLimiter = RateLimiter.create(permitsPerSecond);
} }
public void start() { public void start() {
@ -76,11 +76,13 @@ public class RedisQueueRetryableConsumerAsync<T> implements Runnable {
continue; continue;
} }
rateLimiter.acquire();
T obj = gson.fromJson(json, clazz); T obj = gson.fromJson(json, clazz);
// 判断线程池是否拥堵 // 判断线程池是否拥堵
if (executor.getQueue().size() > 80) { if (executor.getQueue().size() > 80) {
logger.warn("RedisQueueRetryableConsumerAsync Thread pool queue is almost full, pushing back to Redis: {}", queueKey);
log.warn("RedisQueueRetryableConsumerAsync Thread pool queue is almost full, pushing back to Redis: {}", queueKey);
// 推回 Redis 保证不丢数据 // 推回 Redis 保证不丢数据
redisTemplate.opsForList().rightPush(queueKey, json); redisTemplate.opsForList().rightPush(queueKey, json);
Thread.sleep(1000); Thread.sleep(1000);
@ -91,7 +93,7 @@ public class RedisQueueRetryableConsumerAsync<T> implements Runnable {
try { try {
handler.accept(obj); handler.accept(obj);
} catch (Exception e) { } catch (Exception e) {
logger.warn("RedisQueueRetryableConsumerAsync handler failed for obj={}, retrying...", json, e);
log.warn("RedisQueueRetryableConsumerAsync handler failed for obj={}, retrying...", json, e);
// 反序列化带 retryCount 的对象 // 反序列化带 retryCount 的对象
if (obj instanceof RetryableRequest) { if (obj instanceof RetryableRequest) {
@ -101,22 +103,19 @@ public class RedisQueueRetryableConsumerAsync<T> implements Runnable {
rr.setRetryCount(retry + 1); rr.setRetryCount(retry + 1);
redisTemplate.opsForList().rightPush(queueKey, gson.toJson(rr)); redisTemplate.opsForList().rightPush(queueKey, gson.toJson(rr));
} else { } else {
logger.error("RedisQueueRetryableConsumerAsync max retries reached for: {}", json);
log.error("RedisQueueRetryableConsumerAsync max retries reached for: {}", json);
} }
} else { } else {
redisTemplate.opsForList().rightPush(queueKey, json); // 简单重入队 redisTemplate.opsForList().rightPush(queueKey, json); // 简单重入队
} }
} }
}); });
Thread.sleep(intervalMs);
} catch (Exception e) { } catch (Exception e) {
logger.error("RedisQueueRetryableConsumerAsync error", e);
log.error("RedisQueueRetryableConsumerAsync error", e);
} }
} }
logger.info("RedisQueueRetryableConsumerAsync for [{}] stopped.", queueKey);
log.info("RedisQueueRetryableConsumerAsync for [{}] stopped.", queueKey);
} }
} }

+ 5
- 0
inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/AnalyseRequest.java View File

@ -12,6 +12,7 @@ import lombok.Setter;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date;
import java.util.List; import java.util.List;
@Setter @Setter
@ -34,6 +35,7 @@ public class AnalyseRequest implements RetryableRequest, Serializable {
private int totalNumber; private int totalNumber;
private int retryCount; private int retryCount;
private String requestUrl; private String requestUrl;
private String redissonTime;
public AnalyseRequest() { public AnalyseRequest() {
@ -56,6 +58,9 @@ public class AnalyseRequest implements RetryableRequest, Serializable {
public String toErrorResultStr() { public String toErrorResultStr() {
AnalyseResult analyseResult = new AnalyseResult(); AnalyseResult analyseResult = new AnalyseResult();
analyseResult.setRequestId(this.requestId); analyseResult.setRequestId(this.requestId);
analyseResult.setCompensate(true);
analyseResult.setFilter(this.isFilter ? "1" : "0");
analyseResult.setTaskPatrolId(this.getTaskPatrolId());
List<AnalyseResItem> list = new ArrayList<>(); List<AnalyseResItem> list = new ArrayList<>();
analyseResult.setResultsList(list); analyseResult.setResultsList(list);


+ 31
- 17
inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRemoteService.java View File

@ -10,6 +10,7 @@ import java.util.concurrent.TimeUnit;
import com.inspect.base.core.constant.AlgConstants; import com.inspect.base.core.constant.AlgConstants;
import com.inspect.base.core.constant.RedisConst; import com.inspect.base.core.constant.RedisConst;
import com.inspect.base.core.utils.DateUtils;
import com.inspect.base.core.utils.HttpClientUtils; import com.inspect.base.core.utils.HttpClientUtils;
import com.inspect.base.core.utils.StringUtils; import com.inspect.base.core.utils.StringUtils;
import com.inspect.base.redis.service.RedisService; import com.inspect.base.redis.service.RedisService;
@ -46,7 +47,9 @@ public class AnalyseRemoteService {
//qinyl //qinyl
public void sendRequest(AnalyseRequest analyseReq, String[] typeList, boolean isFilter) throws IOException { public void sendRequest(AnalyseRequest analyseReq, String[] typeList, boolean isFilter) throws IOException {
final long requestTimeout = 1L; final long requestTimeout = 1L;
String requestId = UUID.randomUUID().toString().trim().replaceAll(StringUtils.DASH, StringUtils.EMPTY);
//String requestId = UUID.randomUUID().toString().trim().replaceAll(StringUtils.DASH, StringUtils.EMPTY);
String requestId = StringUtils.isNotEmpty(analyseReq.getRequestId()) ?
analyseReq.getRequestId() : UUID.randomUUID().toString().trim().replaceAll(StringUtils.DASH, StringUtils.EMPTY);
String taskPatrolId = analyseReq.getTaskPatrolId(); String taskPatrolId = analyseReq.getTaskPatrolId();
redisService.setCacheObject(RedisConst.REQUEST_UUID + requestId, taskPatrolId, requestTimeout, TimeUnit.DAYS); redisService.setCacheObject(RedisConst.REQUEST_UUID + requestId, taskPatrolId, requestTimeout, TimeUnit.DAYS);
//log.info("CALL_REMOTE_ANALYZE isFilter: {}, requestId: {}, taskPatrolId: {}", isFilter, requestId, taskPatrolId); //log.info("CALL_REMOTE_ANALYZE isFilter: {}, requestId: {}, taskPatrolId: {}", isFilter, requestId, taskPatrolId);
@ -63,14 +66,18 @@ public class AnalyseRemoteService {
String analyseFilter = patrolTaskService.selectConfigByKey(AnalyseConstants.ANALYSE_IS_FILTER); String analyseFilter = patrolTaskService.selectConfigByKey(AnalyseConstants.ANALYSE_IS_FILTER);
if ("1".equals(analyseFilter) && isFilter) { if ("1".equals(analyseFilter) && isFilter) {
final String analyzeFilterRequestIdRedisKey = AnalyseConstants.ANALYSE_FILTER_REQUEST.concat(requestId); final String analyzeFilterRequestIdRedisKey = AnalyseConstants.ANALYSE_FILTER_REQUEST.concat(requestId);
log.info("[FILTER] sendRequest analyseFilterRequestIdRedisKey: {}, analyseReq: {}", analyzeFilterRequestIdRedisKey, analyseReq);
log.info("SEND_REQUEST_FILTER taskPatrolId: {}, requestId: {}, analyseFilterRequestIdRedisKey: {}, analyseReq: {}",
analyseReq.getTaskPatrolId(),
requestId,
analyzeFilterRequestIdRedisKey,
analyseReq);
//redisService.setCacheObject(analyzeFilterRequestIdRedisKey, analyseReq.clone(), testMode?3L:requestTimeout, testMode?TimeUnit.MINUTES:TimeUnit.DAYS); //redisService.setCacheObject(analyzeFilterRequestIdRedisKey, analyseReq.clone(), testMode?3L:requestTimeout, testMode?TimeUnit.MINUTES:TimeUnit.DAYS);
redisService.setCacheObject(analyzeFilterRequestIdRedisKey, analyseReq.clone()); redisService.setCacheObject(analyzeFilterRequestIdRedisKey, analyseReq.clone());
if(testMode) {
delayQueueService.submitRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseReq, (3L), TimeUnit.MINUTES);
} else {
delayQueueService.submitRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseReq, requestTimeout, TimeUnit.DAYS);
}
// if(testMode) {
// delayQueueService.submitRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseReq, (3L), TimeUnit.MINUTES);
// } else {
// delayQueueService.submitRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseReq, requestTimeout, TimeUnit.DAYS);
// }
AnalyseReqItem analyseReqItem = analyseReq.getObjectList().get(0);//只取第一个 AnalyseReqItem analyseReqItem = analyseReq.getObjectList().get(0);//只取第一个
analyseReqItem.setTypeList(typeList); analyseReqItem.setTypeList(typeList);
analyseReq.setObjectList(Collections.singletonList(analyseReqItem)); analyseReq.setObjectList(Collections.singletonList(analyseReqItem));
@ -85,14 +92,17 @@ public class AnalyseRemoteService {
if(checkInfraredType(typeList)) { if(checkInfraredType(typeList)) {
requestUrl = patrolTaskService.selectConfigByKey(AnalyseConstants.ANALYSIS_BIG_URL_INFRARED); requestUrl = patrolTaskService.selectConfigByKey(AnalyseConstants.ANALYSIS_BIG_URL_INFRARED);
log.info("BIG_MODEL_INFRARED requestUrl: {}", requestUrl);
log.info("BIG_MODEL_INFRARED requestId: {}, requestUrl: {}", requestId, requestUrl);
} else { } else {
requestUrl = patrolTaskService.selectConfigByKey(AnalyseConstants.ANALYSIS_BIG_URL); requestUrl = patrolTaskService.selectConfigByKey(AnalyseConstants.ANALYSIS_BIG_URL);
log.info("BIG_MODEL_OTHER requestUrl: {}", requestUrl);
log.info("BIG_MODEL_OTHER requestId: {}, requestUrl: {}", requestId, requestUrl);
} }
final String analyzeBigModelRequestIdRedisKey = AnalyseConstants.ANALYSE_AI_REQUEST.concat(requestId); final String analyzeBigModelRequestIdRedisKey = AnalyseConstants.ANALYSE_AI_REQUEST.concat(requestId);
log.info("BIG_MODEL analyzeBigModelRequestIdRedisKey: {}, analyseReq: {}", analyzeBigModelRequestIdRedisKey, analyseReq);
log.info("SEND_REQUEST_BIG_MODEL requestId: {}, analyzeBigModelRequestIdRedisKey: {}, analyseReq: {}",
requestId,
analyzeBigModelRequestIdRedisKey,
analyseReq);
redisService.setCacheObject(analyzeBigModelRequestIdRedisKey, analyseReq.clone(), requestTimeout, TimeUnit.DAYS); redisService.setCacheObject(analyzeBigModelRequestIdRedisKey, analyseReq.clone(), requestTimeout, TimeUnit.DAYS);
} }
@ -102,15 +112,19 @@ public class AnalyseRemoteService {
analyseReq.setFilter(isFilter); analyseReq.setFilter(isFilter);
analyseReq.setRequestUrl(requestUrl.concat(AnalyseConstants.ANALYSE_URI)); analyseReq.setRequestUrl(requestUrl.concat(AnalyseConstants.ANALYSE_URI));
analyseReq.setRedissonTime(DateUtils.parseDateToStr(DateUtils.yyyyMMddHHmmss, new Date()));
try {
retryDelegate.callRemoteAnalyseService(analyseReq);
} catch (IOException e) {
log.info("FINALLY FAIL CALL REMOTE ANALYSE SERVICE, requestId: {}, taskPatrolId: {}, error: {}",
if(testMode) {
delayQueueService.submitRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseReq, (3L), TimeUnit.MINUTES);
} else {
delayQueueService.submitRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseReq, requestTimeout, TimeUnit.DAYS);
}
if(!retryDelegate.callRemoteAnalyseService(analyseReq)) {
log.info("FINALLY FAIL CALL REMOTE ANALYSE SERVICE, requestId: {}, taskPatrolId: {}",
analyseReq.getRequestId(), analyseReq.getRequestId(),
analyseReq.getTaskPatrolId(),
e.getMessage());
throw e;
analyseReq.getTaskPatrolId());
throw new IOException("Remote analyse service failed");
} }
} }


+ 12
- 5
inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestDelayQueueListener.java View File

@ -45,15 +45,22 @@ public class AnalyseRequestDelayQueueListener implements InitializingBean {
} }
private void handleRequest(AnalyseRequest request) { private void handleRequest(AnalyseRequest request) {
String redisKey = AnalyseConstants.ANALYSE_FILTER_REQUEST.concat(request.getRequestId());
Object cached = redisService.redisTemplate.opsForValue().getAndDelete(redisKey);
String redisKey = request.isFilter() ? AnalyseConstants.ANALYSE_FILTER_REQUEST.concat(request.getRequestId())
: AnalyseConstants.ANALYSE_AI_REQUEST.concat(request.getRequestId());
//Object cached = redisService.redisTemplate.opsForValue().getAndDelete(redisKey);
Object cached = redisService.redisTemplate.opsForValue().get(redisKey);
if (cached != null) { if (cached != null) {
// 正常到期但业务未处理走补偿逻辑 // 正常到期但业务未处理走补偿逻辑
log.info("AnalyseRequest Been Not Consumed, Compensate Triggered: {}", request);
log.info("ANALYSE_REQUEST_DELAY_QUEUE COMPENSATE patrolPointId: {}, requestId: {}, BODY: {}",
request.getTaskPatrolId(),
request.getRequestId(),
request);
analyseRemoteService.sendCompensateRequest(request); analyseRemoteService.sendCompensateRequest(request);
} else { } else {
// 正常业务已完成什么也不做
log.info("AnalyseRequest Been Normally Consumed: {}", request.getRequestId());
// 正常业务已完成什么也不做正常情况下不会进入这个逻辑
log.info("ANALYSE_REQUEST_DELAY_QUEUE CONSUMED: patrolPointId: {}, requestId: {}",
request.getTaskPatrolId(),
request.getRequestId());
} }
} }
} }


+ 13
- 1
inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestRetryableDelegate.java View File

@ -6,8 +6,11 @@ import com.inspect.analysis.utils.SimpleHttpClient;
import com.inspect.partrolresult.domain.AnalyseRequest; import com.inspect.partrolresult.domain.AnalyseRequest;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.core.io.InputStreamResource;
import org.springframework.http.ResponseEntity;
import org.springframework.retry.RetryContext; import org.springframework.retry.RetryContext;
import org.springframework.retry.annotation.Backoff; import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Recover;
import org.springframework.retry.annotation.Retryable; import org.springframework.retry.annotation.Retryable;
import org.springframework.retry.support.RetrySynchronizationManager; import org.springframework.retry.support.RetrySynchronizationManager;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -22,7 +25,7 @@ public class AnalyseRequestRetryableDelegate {
value = IOException.class, value = IOException.class,
maxAttempts = 5, maxAttempts = 5,
backoff = @Backoff(delay = 2000, multiplier = 1.0)) backoff = @Backoff(delay = 2000, multiplier = 1.0))
public void callRemoteAnalyseService(final AnalyseRequest analyseReq) throws IOException {
public boolean callRemoteAnalyseService(final AnalyseRequest analyseReq) throws IOException {
RetryContext retryContext = RetrySynchronizationManager.getContext(); RetryContext retryContext = RetrySynchronizationManager.getContext();
int retryCount = retryContext != null ? retryContext.getRetryCount() : 0; int retryCount = retryContext != null ? retryContext.getRetryCount() : 0;
log.info("CALL_REMOTE_ANALYZE retryCount: {}, requestId: {}, isFilter: {}, URL: {}, PARAMS: {}", log.info("CALL_REMOTE_ANALYZE retryCount: {}, requestId: {}, isFilter: {}, URL: {}, PARAMS: {}",
@ -32,5 +35,14 @@ public class AnalyseRequestRetryableDelegate {
if (!"200".equals(JSONObject.parseObject(result).getString(AnalyseConstants.ANALYSE_CODE))) { if (!"200".equals(JSONObject.parseObject(result).getString(AnalyseConstants.ANALYSE_CODE))) {
log.info("CALL_REMOTE_ANALYZE FAIL: {}", JSONObject.parseObject(result).getString(AnalyseConstants.ANALYSE_CODE)); log.info("CALL_REMOTE_ANALYZE FAIL: {}", JSONObject.parseObject(result).getString(AnalyseConstants.ANALYSE_CODE));
} }
return true;
}
@SuppressWarnings("unused")
@Recover
public boolean recover(IOException e, final AnalyseRequest analyseReq) {
log.info("CALL_REMOTE_ANALYZE RECOVER: requestId: {}", analyseReq.getRequestId());
return false;
} }
} }

+ 4
- 4
inspect-main/inspect-main-task/src/main/java/com/inspect/task/controller/PatrolTaskController.java View File

@ -150,10 +150,10 @@ public class PatrolTaskController extends BaseController {
} }
} }
if(StringUtils.isNotEmpty(patrolTask.getAreaName())) {
if (StringUtils.isNotEmpty(patrolTask.getAreaName())) {
List<PatrolTask> taskListEx = new ArrayList<>(); List<PatrolTask> taskListEx = new ArrayList<>();
for (PatrolTask task : taskList) { for (PatrolTask task : taskList) {
if(StringUtils.isNotEmpty(task.getAreaName()) && task.getAreaName().contains(patrolTask.getAreaName())) {
if (StringUtils.isNotEmpty(task.getAreaName()) && task.getAreaName().contains(patrolTask.getAreaName())) {
taskListEx.add(task); taskListEx.add(task);
} }
} }
@ -2769,8 +2769,8 @@ public class PatrolTaskController extends BaseController {
cell10.setCellValue(MessageUtils.get("分析图片")); cell10.setCellValue(MessageUtils.get("分析图片"));
Cell cell11 = row.createCell(11); Cell cell11 = row.createCell(11);
cell11.setCellValue(MessageUtils.get("分析结果")); cell11.setCellValue(MessageUtils.get("分析结果"));
HSSFPatriarch patriarch = (HSSFPatriarch)sheet.createDrawingPatriarch();
for(int i = 0; i < newList.size(); ++i) {
HSSFPatriarch patriarch = (HSSFPatriarch) sheet.createDrawingPatriarch();
for (int i = 0; i < newList.size(); ++i) {
Row row1 = sheet.createRow(i + 1); Row row1 = sheet.createRow(i + 1);
Cell cell12 = row1.createCell(0); Cell cell12 = row1.createCell(0);
cell12.setCellValue(i + 1); cell12.setCellValue(i + 1);


Loading…
Cancel
Save