Browse Source

/*

* 1. Redisson机制增加健壮性,使用的线程改为由Spring容器托管;
* 2.
  Algorithm算法返回处理过程中,在清理对应的Redisson记录的时候增加对于该记录的判断,防止由于Redisson超时之后已经将该Redisson记录删除之后,再次进行重复删除。
*/
master
htjcAdmin 4 months ago
parent
commit
6349c0ff2f
6 changed files with 60 additions and 12 deletions
  1. +1
    -1
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/constant/AnalyseConstants.java
  2. +6
    -2
      inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalyseResponseServiceImpl.java
  3. +41
    -7
      inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestDelayQueueListener.java
  4. +2
    -2
      inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestServiceImpl.java
  5. +2
    -0
      inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/DelayQueueService.java
  6. +8
    -0
      inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/RedissonDelayQueueServiceImpl.java

+ 1
- 1
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/constant/AnalyseConstants.java View File

@ -43,7 +43,7 @@ public class AnalyseConstants {
public static final String ALGORITHM_REQUEST_QUEUE = "algorithm:request:queue";
public static final String ALGORITHM_RESPONSE_QUEUE = "algorithm:response:queue";
public static final String ANALYSE_REQ_DELAY_QUEUE = "ANALYSE_REQ_DELAY_QUEUE";
public static final String ALGORITHM_REQUEST_DELAY_QUEUE = "algorithm:request:delay:queue";
public static final String LUMINOSITY_REQUEST_QUEUE = "luminosity:request:queue";
}

+ 6
- 2
inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalyseResponseServiceImpl.java View File

@ -127,8 +127,12 @@ public class AnalyseResponseServiceImpl implements IAnalyseResponseService {
analyseRequest.getTaskPatrolId(),
requestId,
analyseRequest);
boolean isRemoved = delayQueueService.removeRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseRequest);
log.info("ALGORITHM_FILTER_RESULT Redisson Removed requestId: {}, Result: {}, analyseRequest: {}", requestId, isRemoved ? "success" : "fail", analyseRequest);
if(delayQueueService.containRequest(AnalyseConstants.ALGORITHM_REQUEST_DELAY_QUEUE, analyseRequest)) {
boolean isRemoved = delayQueueService.removeRequest(AnalyseConstants.ALGORITHM_REQUEST_DELAY_QUEUE, analyseRequest);
log.info("ALGORITHM_FILTER_RESULT Redisson Removed requestId: {}, Result: {}, analyseRequest: {}", requestId, isRemoved ? "success" : "fail", analyseRequest);
} else {
log.info("ALGORITHM_FILTER_RESULT Redisson has Compensated requestId: {}, analyseRequest: {}", requestId, analyseRequest);
}
algTypeList = analyseRequest.getObjectList().get(0).getTypeList();
analyseResult.reloadReq(analyseRequest);//只有一个,analyseReq.getObjectList().get(0)


+ 41
- 7
inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestDelayQueueListener.java View File

@ -4,8 +4,11 @@ import com.inspect.analysis.constant.AnalyseConstants;
import com.inspect.base.redis.service.RedisService;
import com.inspect.partrolresult.domain.AnalyseRequest;
import lombok.extern.slf4j.Slf4j;
import org.redisson.RedissonShutdownException;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
@ -13,8 +16,8 @@ import javax.annotation.Resource;
@Slf4j
@Component
public class AnalyseRequestDelayQueueListener implements InitializingBean {
private final String ListenerName = "AnalyseRequest-DelayQueue-Listener";
public class AnalyseRequestDelayQueueListener implements InitializingBean, DisposableBean {
private final String THREAD_NAME = "AnalyseRequest-DelayQueue-Listener";
@Resource
private RedissonClient redissonClient;
@ -25,22 +28,44 @@ public class AnalyseRequestDelayQueueListener implements InitializingBean {
@Resource
private IAnalyseRequestService analyseRequestService;
private Thread listenerThread;
private volatile boolean running = true;
@Override
public void afterPropertiesSet() {
// 启动监听线程
new Thread(() -> {
RBlockingQueue<AnalyseRequest> blockingQueue = redissonClient.getBlockingQueue(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE);
listenerThread = new Thread(() -> {
RBlockingQueue<AnalyseRequest> blockingQueue = redissonClient.getBlockingQueue(AnalyseConstants.ALGORITHM_REQUEST_DELAY_QUEUE);
log.info("Listening AnalyseRequest Delay Queue Task Launching");
while (true) {
while (running && !Thread.currentThread().isInterrupted()) {
try {
AnalyseRequest request = blockingQueue.take(); // 阻塞等待任务到期
handleRequest(request);
} catch (Exception e) {
} catch (RedissonShutdownException e) {
log.warn("Redisson 已关闭,终止监听线程");
break;
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 响应中断
log.info("监听线程被中断,正在关闭...");
break;
} catch (RedisException e) {
log.error("Redis 连接异常: {}", e.getMessage(), e);
try {
Thread.sleep(3000); // 避免异常频繁重试
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
catch (Exception e) {
log.error("Listening AnalyseRequest Queue Exception: ", e);
}
}
}, ListenerName).start();
}, THREAD_NAME);
listenerThread.setDaemon(true); // 设置为守护线程防止主程序不退出
listenerThread.start();
}
private void handleRequest(AnalyseRequest request) {
@ -62,5 +87,14 @@ public class AnalyseRequestDelayQueueListener implements InitializingBean {
request.getRequestId());
}
}
@Override
public void destroy() {
log.info("正在关闭 AnalyseRequestDelayQueueListener...");
running = false;
if (listenerThread != null) {
listenerThread.interrupt();
}
}
}

+ 2
- 2
inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestServiceImpl.java View File

@ -119,9 +119,9 @@ public class AnalyseRequestServiceImpl implements IAnalyseRequestService {
analyseReq.setRedissonTime(DateUtils.parseDateToStr(DateUtils.yyyyMMddHHmmss, new Date()));
if (testMode) {
delayQueueService.submitRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseReq, (3L), TimeUnit.MINUTES);
delayQueueService.submitRequest(AnalyseConstants.ALGORITHM_REQUEST_DELAY_QUEUE, analyseReq, (3L), TimeUnit.MINUTES);
} else {
delayQueueService.submitRequest(AnalyseConstants.ANALYSE_REQ_DELAY_QUEUE, analyseReq, requestTimeout, TimeUnit.DAYS);
delayQueueService.submitRequest(AnalyseConstants.ALGORITHM_REQUEST_DELAY_QUEUE, analyseReq, requestTimeout, TimeUnit.DAYS);
}
if (!retryDelegate.callRemoteAnalyseService(analyseReq)) {


+ 2
- 0
inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/DelayQueueService.java View File

@ -6,5 +6,7 @@ public interface DelayQueueService<T> {
void submitRequest(String queueName, T payload, long delay, TimeUnit timeUnit);
boolean removeRequest(String queueName, T payload);
boolean containRequest(String queueName, T payload);
}

+ 8
- 0
inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/RedissonDelayQueueServiceImpl.java View File

@ -41,5 +41,13 @@ public class RedissonDelayQueueServiceImpl<T> implements DelayQueueService<T> {
return delayedQueue.remove(payload);
}
@Override
public boolean containRequest(String queueName, T payload) {
RBlockingQueue<T> blockingQueue = redissonClient.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
return delayedQueue.contains(payload);
}
}

Loading…
Cancel
Save