Browse Source

/*智巡向初筛发送的请求得不到初筛回应,会导致任务一直处于执行状态,因此增加补偿机制,如果在设定的时间段内没有收到初筛的回复,则对业务进行自闭环以保证

巡视任务的完整。*/
master
htjcAdmin 5 months ago
parent
commit
a93ff90611
11 changed files with 197 additions and 146 deletions
  1. +6
    -0
      inspect-main/inspect-main-task/pom.xml
  2. +2
    -0
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/constant/AnalyseConstants.java
  3. +9
    -7
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestRetryableConsumerManager.java
  4. +0
    -37
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisRedisKeyExpireListener.java
  5. +6
    -86
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisServiceImpl.java
  6. +7
    -15
      inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/AnalyseRequest.java
  7. +35
    -1
      inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRemoteService.java
  8. +60
    -0
      inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestDelayQueueListener.java
  9. +10
    -0
      inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/DelayQueueService.java
  10. +17
    -0
      inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/RedissonConfig.java
  11. +45
    -0
      inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/RedissonDelayQueueServiceImpl.java

+ 6
- 0
inspect-main/inspect-main-task/pom.xml View File

@ -63,5 +63,11 @@
<artifactId>poi-ooxml</artifactId>
<version>5.2.3</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.23.5</version>
</dependency>
</dependencies>
</project>

+ 2
- 0
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/constant/AnalyseConstants.java View File

@ -41,5 +41,7 @@ public class AnalyseConstants {
public static final String ALGORITHM_REQUEST_QUEUE = "algorithm:request:queue";
public static final String ALGORITHM_RESPONSE_QUEUE = "algorithm:response:queue";
public static final String ANALYSE_REQ_DELAY_QUEUE = "ANALYSE_REQ_DELAY_QUEUE";
public static final String LUMINOSITY_REQUEST_QUEUE = "luminosity:request:queue";
}

+ 9
- 7
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestRetryableConsumerManager.java View File

@ -2,7 +2,6 @@ package com.inspect.analysis.service.impl;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.inspect.analysis.constant.AnalyseConstants;
import com.inspect.base.core.utils.HttpClientUtils;
import com.inspect.base.redis.service.RedisService;
import com.inspect.partrolresult.domain.AnalyseRequest;
import com.inspect.partrolresult.service.AnalyseRemoteService;
@ -34,6 +33,9 @@ public class AlgorithmRequestRetryableConsumerManager {
@Value("${task.redis.request.idle-sleep-ms:500}")
private long idleSleepMs;
@Value("${task.test-mode:false}")
private boolean testMode;
private RedisQueueRetryableConsumerAsync<AnalyseRequest> consumer;
@Resource
@ -65,14 +67,14 @@ public class AlgorithmRequestRetryableConsumerManager {
} catch (IOException e) {
// 可以记录失败次数避免无限重试
int retryCount = Optional.of(request.getRetryCount()).orElse(0);
log.info("AlgorithmRequestRetryableConsumerManager Send failed, will retry: {}, retryCount: {}, msg: {}", request, request.getRetryCount(), e.getMessage());
if (retryCount < 1) {
log.info("AlgorithmRequestRetryableConsumerManager Failed, retryCount: {}, reqeust: {}, msg: {}", request.getRetryCount(), request, e.getMessage());
final int TRIES_MAX = testMode ? 50 : 3;
if (retryCount < TRIES_MAX) {
request.setRetryCount(retryCount + 1);
redisService.redisTemplate.opsForList().rightPush(AnalyseConstants.ALGORITHM_REQUEST_QUEUE, new Gson().toJson(request));
redisService.redisTemplate.opsForList().leftPush(AnalyseConstants.ALGORITHM_REQUEST_QUEUE, new Gson().toJson(request));
} else {
log.info("AlgorithmRequestRetryableConsumerManager Max retry reached, dropping request: {}", request);
final String defaultUrl = "http://localhost:" + this.port + AnalyseConstants.ANALYSE_RET_URI;
HttpClientUtils.sendPostAgain(defaultUrl, request.toErrorResultStr());
log.info("AlgorithmRequestRetryableConsumerManager Compensate, retryCount: {}, request: {}", retryCount, request);
analyseRemoteService.sendCompensateRequest(request);
}
}
}


+ 0
- 37
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisRedisKeyExpireListener.java View File

@ -1,37 +0,0 @@
package com.inspect.analysis.service.impl;
import com.inspect.analysis.constant.AnalyseConstants;
import com.inspect.base.core.constant.Color;
import com.inspect.base.redis.service.RedisService;
import com.inspect.partrolresult.domain.AnalyseRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class AnalysisRedisKeyExpireListener extends KeyExpirationEventMessageListener {
private final Logger log = LoggerFactory.getLogger(AnalysisRedisKeyExpireListener.class);
@Resource
private RedisService redisService;
public AnalysisRedisKeyExpireListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
public void onMessage(Message message, byte[] pattern) {
String expiredKey = message.toString();
log.info(Color.YELLOW + "Analysis RedisKey Expire: {}, pattern: {}" + Color.END, expiredKey, pattern);
if(expiredKey.startsWith(AnalyseConstants.ANALYSE_FILTER_REQUEST)) {
AnalyseRequest analyseRequest = (AnalyseRequest) redisService.redisTemplate.opsForValue().getAndDelete(expiredKey);
log.info(Color.YELLOW + "Analysis RedisKey Expire Request: {}" + Color.END, analyseRequest);
}
}
}

+ 6
- 86
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisServiceImpl.java View File

@ -23,6 +23,7 @@ import com.inspect.message.MessageUtils;
import com.inspect.partrolresult.domain.AnalyseRequest;
import com.inspect.partrolresult.domain.PatrolResult;
import com.inspect.partrolresult.service.AnalyseRemoteService;
import com.inspect.partrolresult.service.DelayQueueService;
import com.inspect.partrolresult.service.IPatrolResultService;
import com.inspect.resultmain.domain.PatrolTaskResultMain;
import com.inspect.resultmain.service.IPatrolTaskResultMainService;
@ -75,92 +76,8 @@ public class AnalysisServiceImpl implements IAnalysisService {
@Resource
private ResultAnalysisUtils resultAnalysisUtils;
// @Override
// public void picAnalyseRetNotify(AnalyseResult analyseResult) {
// log.info(Color.CYAN + "###### 分析算法模块返回的表计识别结果 start, analyseResult:{} ######" + Color.END, analyseResult);
// String requestId = analyseResult.getRequestId();
// String keyId = AnalyseConstants.ANALYSE_REQUEST_ID.concat(requestId);
// if (!redisService.hasKey(keyId)) {
// log.error("picAnalyseRetNotify isTest: {}, NO keyId={}, REQUEST_ID={} in REDIS!", analyseResult.isTest(), keyId, requestId);
// if (!analyseResult.isTest()) {
// return;
// }
// //redisService.setCacheObject(keyId, "123456789");
// }
//
// String patrolTaskIdObj = redisService.getCacheObject(keyId);
// log.info("picAnalyseRetNotify keyId={}, requestId={}, patrolTaskIdObj={}", keyId, requestId, patrolTaskIdObj);
// analyseResult.setTaskPatrolId(patrolTaskIdObj);
// String keyFilterRequest = AnalyseConstants.ANALYSE_FILTER_REQUEST + requestId;
// log.info("picAnalyseRetNotify keyFilterRequest={}", keyFilterRequest);
// boolean bBigModelExecFlag = false;
// String[] algTypeList = {};
// AnalyseRequest analyseRequest = null;
// if (redisService.hasKey(keyFilterRequest)) { // 初筛结果
// analyseResult.setFilter("1"); // 设置初筛标志
// analyseRequest = (AnalyseRequest) redisService.redisTemplate.opsForValue().getAndDelete(keyFilterRequest);
// log.info("FILTER_RESULT picAnalyseRetNotify analyseRequest IN REDIS: {}", analyseRequest);
// AnalyseResPoint analyseResPoint = analyseResult.getResultList().get(0).getResults().get(0);
// if (analyseRequest != null && analyseRequest.getObjectList() != null && !analyseRequest.getObjectList().isEmpty()) {
// algTypeList = analyseRequest.getObjectList().get(0).getTypeList();
// }
//
// boolean bDefect = analyseResPoint.isDefect(); // code=2000代表初筛结果返回正常value=1代表有缺陷
// log.info("picAnalyseRetNotify FILTER bDefect={}, algorithmType={}", bDefect, analyseResPoint.getType());
// analyseResult.reloadReq(analyseRequest);//只有一个,analyseReq.getObjectList().get(0)
// /*
// *大致两种情况1. 算法是表计(meter); 2. 算法不是表计(meter)
// * 1. 算法不是表计(meter)
// * 1.1 初筛结果有缺陷继续发给大模型处理
// * 1.2 初筛结果无缺陷不用调用大模型流程就此结束
// * 2. 算法是表计(meter)
// * 不论初筛结果有没有缺陷都要继续调用大模型因为大模型调用了表计(meter)识别算法
// */
// final String algType = analyseRequest.getObjectList().get(0).getTypeList()[0];
// log.info("picAnalyseRetNotify algType IN REDIS: {}", algType);
// if (bDefect || (AlgConstants.METER.equals(algType)
// || AlgConstants.XB.equals(algType)
// || AlgConstants.INFRA_1800.equals(algType)
// || AlgConstants.INFRA_YU3.equals(algType)
// || AlgConstants.INFRA_CAMERA.equals(algType)
// || AlgConstants.INFRA_CAMERA_REVERSE.equals(algType))
// ) {
// /*
// * 先检查结果有缺无缺陷如果有缺陷不用判断算法直接调用大模型*
// * 如果无缺陷再去判断算法如果算法是meter就继续调用大模型*
// */
// analyseResult.setResult("0");
// bBigModelExecFlag = true;
//// log.info("picAnalyseRetNotify CALL BIG_MODEL REQUEST_ID={}", requestId);
//// ispAlgorithmRequestService.sendRequest(analyseRequest);
// } else {
// //初筛结果无缺陷并且非表计算法不用调用大模型流程就此结束
// log.info("picAnalyseRetNotify NO BIG_MODEL WOULD CALLED REQUEST_ID={}", requestId);
// }
// } else {
// // 大模型结果
// log.info("BIG_MODEL_RESULT CALLBACK picAnalyseRetNotify REQUEST_ID={}", requestId);
// analyseResult.setFilter("0"); // 设置大模型标志为0
// }
//
// //qinyl
// analysisLogService.log(new AnalyseLog(analyseResult.toString(), "1", analyseResult.getTaskPatrolId(), analyseResult.getFilter(), requestId));
// try {
// analysis(analyseResult);
// } catch (Exception e) {
// log.error("error", e);
// }
// //doAlgorithmAnalysis(analyseResult);
// //calculateProcess(analyseResult);
// log.info(Color.CYAN + "###### 分析算法模块返回的表计识别结果 end ######" + Color.END);
//
// if (bBigModelExecFlag) {
// log.info("picAnalyseRetNotify CALL BIG_MODEL REQUEST_ID={}", requestId);
//// ispAlgorithmRequestService.sendRequest(analyseRequest);
// analyseRemoteService.sendRequest(analyseRequest, algTypeList, false);
// }
// }
@Resource
private DelayQueueService<AnalyseRequest> delayQueueService;
@Override
public void picAnalyseRetNotify(AnalyseResult analyseResult) {
@ -198,6 +115,9 @@ public class AnalysisServiceImpl implements IAnalysisService {
analyseResult.setFilter("1"); // 设置初筛标志
analyseRequest = (AnalyseRequest) redisService.redisTemplate.opsForValue().getAndDelete(filterRequestRedisKey);
log.info("ALGO_RES_UUID: {}, FILTER_RESULT picAnalyseRetNotify analyseRequest IN REDIS: {}", logLabel, analyseRequest);
boolean isRemoved = delayQueueService.removeRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseRequest);
log.info("Redisson Removed Result: {}, analyseRequest: {}", isRemoved? "success" : "fail", analyseRequest);
AnalyseResPoint analyseResPoint = analyseResult.getResultList().get(0).getResults().get(0);
if (analyseRequest != null && analyseRequest.getObjectList() != null && !analyseRequest.getObjectList().isEmpty()) {
algTypeList = analyseRequest.getObjectList().get(0).getTypeList();


+ 7
- 15
inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/AnalyseRequest.java View File

@ -5,16 +5,18 @@ import com.inspect.analysis.domain.AnalyseResItem;
import com.inspect.analysis.domain.AnalyseResPoint;
import com.inspect.analysis.domain.AnalyseResult;
import com.inspect.analysis.service.RetryableRequest;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
@Setter
@Getter
@EqualsAndHashCode(of = "requestId")
public class AnalyseRequest implements RetryableRequest, Serializable {
private List<AnalyseReqItem> objectList;
private String requestHostIp;
@ -33,6 +35,10 @@ public class AnalyseRequest implements RetryableRequest, Serializable {
private int retryCount;
private String requestUrl;
public AnalyseRequest() {
}
@Override
public int getRetryCount() {
return retryCount;
@ -76,18 +82,4 @@ public class AnalyseRequest implements RetryableRequest, Serializable {
public String toString() {
return JSONObject.toJSONString(this);
}
@Override
public boolean equals(Object object) {
if (this == object) return true;
if (object == null || getClass() != object.getClass()) return false;
AnalyseRequest that = (AnalyseRequest) object;
return Objects.equals(objectList, that.objectList) && Objects.equals(requestHostIp, that.requestHostIp) && Objects.equals(requestHostPort, that.requestHostPort) && Objects.equals(requestId, that.requestId) && Objects.equals(algorithmPath, that.algorithmPath) && Objects.equals(taskPatrolId, that.taskPatrolId);
}
@Override
public int hashCode() {
return Objects.hash(objectList, requestHostIp, requestHostPort, requestId, algorithmPath, taskPatrolId);
}
}

+ 35
- 1
inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRemoteService.java View File

@ -1,5 +1,6 @@
package com.inspect.partrolresult.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.pagehelper.util.StringUtil;
import com.inspect.analysis.constant.AnalyseConstants;
@ -9,6 +10,7 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import com.inspect.base.core.constant.RedisConst;
import com.inspect.base.core.utils.HttpClientUtils;
import com.inspect.base.core.utils.StringUtils;
import com.inspect.base.redis.service.RedisService;
import com.inspect.partrolresult.domain.AnalyseReqItem;
@ -39,6 +41,9 @@ public class AnalyseRemoteService {
@Resource
private AnalyseRequestRetryableDelegate retryDelegate;
@Resource
private DelayQueueService<AnalyseRequest> delayQueueService;
//qinyl
public void sendRequest(AnalyseRequest analyseReq, String[] typeList, boolean isFilter) throws IOException {
final long requestTimeout = 1L;
@ -60,7 +65,13 @@ public class AnalyseRemoteService {
if ("1".equals(analyseFilter) && isFilter) {
final String analyzeFilterRequestIdRedisKey = AnalyseConstants.ANALYSE_FILTER_REQUEST.concat(requestId);
log.info("[FILTER] sendRequest analyseFilterRequestIdRedisKey: {}, analyseReq: {}", analyzeFilterRequestIdRedisKey, analyseReq);
this.redisService.setCacheObject(analyzeFilterRequestIdRedisKey, analyseReq.clone(), testMode?3L:requestTimeout, testMode?TimeUnit.MINUTES:TimeUnit.DAYS);
//redisService.setCacheObject(analyzeFilterRequestIdRedisKey, analyseReq.clone(), testMode?3L:requestTimeout, testMode?TimeUnit.MINUTES:TimeUnit.DAYS);
redisService.setCacheObject(analyzeFilterRequestIdRedisKey, analyseReq.clone());
if(testMode) {
delayQueueService.submitRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseReq, (3L), TimeUnit.MINUTES);
} else {
delayQueueService.submitRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseReq, requestTimeout, TimeUnit.DAYS);
}
AnalyseReqItem analyseReqItem = analyseReq.getObjectList().get(0);//只取第一个
analyseReqItem.setTypeList(typeList);
analyseReq.setObjectList(Collections.singletonList(analyseReqItem));
@ -97,5 +108,28 @@ public class AnalyseRemoteService {
}
}
public void sendCompensateRequest(AnalyseRequest request) {
final String defaultUrl = "http://localhost:" + this.port + AnalyseConstants.ANALYSE_RET_URI;
HttpClientUtils.sendPostAgain(defaultUrl, request.toErrorResultStr());
}
public static void main(String[] args) {
try {
ObjectMapper mapper = new ObjectMapper();
AnalyseRequest r1 = new AnalyseRequest();
r1.setRequestId("3177fe1c5d6f44f7bc527d5de8451ab9");
String json = mapper.writeValueAsString(r1);
AnalyseRequest r2 = mapper.readValue(json, AnalyseRequest.class);
boolean res = r1.equals(r2);
System.out.println(res); // 应为 true
boolean res2 = r1.hashCode() == r2.hashCode();
System.out.println(res2); // 应为 true
} catch (Exception e) {
e.printStackTrace();
}
}
}

+ 60
- 0
inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestDelayQueueListener.java View File

@ -0,0 +1,60 @@
package com.inspect.partrolresult.service;
import com.inspect.analysis.constant.AnalyseConstants;
import com.inspect.base.redis.service.RedisService;
import com.inspect.partrolresult.domain.AnalyseRequest;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Slf4j
@Component
public class AnalyseRequestDelayQueueListener implements InitializingBean {
private final String ListenerName = "AnalyseRequest-DelayQueue-Listener";
@Resource
private RedissonClient redissonClient;
@Resource
private RedisService redisService;
@Resource
private AnalyseRemoteService analyseRemoteService;
@Override
public void afterPropertiesSet() {
// 启动监听线程
new Thread(() -> {
RBlockingQueue<AnalyseRequest> blockingQueue = redissonClient.getBlockingQueue(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE);
log.info("Listening AnalyseRequest Delay Queue Task Launching");
while (true) {
try {
AnalyseRequest request = blockingQueue.take(); // 阻塞等待任务到期
handleRequest(request);
} catch (Exception e) {
log.error("Listening AnalyseRequest Queue Exception: ", e);
}
}
}, ListenerName).start();
}
private void handleRequest(AnalyseRequest request) {
String redisKey = AnalyseConstants.ANALYSE_FILTER_REQUEST.concat(request.getRequestId());
Object cached = redisService.redisTemplate.opsForValue().getAndDelete(redisKey);
if (cached != null) {
// 正常到期但业务未处理走补偿逻辑
log.info("AnalyseRequest Been Not Consumed, Compensate Triggered: {}", request);
analyseRemoteService.sendCompensateRequest(request);
} else {
// 正常业务已完成什么也不做
log.info("AnalyseRequest Been Normally Consumed: {}", request.getRequestId());
}
}
}

+ 10
- 0
inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/DelayQueueService.java View File

@ -0,0 +1,10 @@
package com.inspect.partrolresult.service;
import java.util.concurrent.TimeUnit;
public interface DelayQueueService<T> {
void submitRequest(String queueName, T payload, long delay, TimeUnit timeUnit);
boolean removeRequest(String queueName, T payload);
}

+ 17
- 0
inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/RedissonConfig.java View File

@ -0,0 +1,17 @@
package com.inspect.partrolresult.service;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress("redis://redis:6379");
return Redisson.create(config);
}
}

+ 45
- 0
inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/RedissonDelayQueueServiceImpl.java View File

@ -0,0 +1,45 @@
package com.inspect.partrolresult.service;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
public class RedissonDelayQueueServiceImpl<T> implements DelayQueueService<T> {
@Resource
private RedissonClient redissonClient;
/**
* 工作机制如下
* 1. 先往延迟池delayedQueue存入了一个 ZSET + HASHRedis结构并设置了时间戳
* 2. Redisson 后台调度器线程会在数据到期时将它从延迟 ZSET 中移动到目标的 blockingQueue
* @param queueName
* @param payload
* @param delay
* @param timeUnit
*/
@Override
public void submitRequest(String queueName, T payload, long delay, TimeUnit timeUnit) {
RBlockingQueue<T> blockingQueue = redissonClient.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
delayedQueue.offer(payload, delay, timeUnit);
log.info("Submit Delay Queue Task To: {}, Delay: {} {}", queueName, delay, timeUnit);
}
@Override
public boolean removeRequest(String queueName, T payload) {
RBlockingQueue<T> blockingQueue = redissonClient.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
return delayedQueue.remove(payload);
}
}

Loading…
Cancel
Save