From 84902beb0a07902139540eda19398d550c53a818 Mon Sep 17 00:00:00 2001 From: htjcAdmin Date: Tue, 8 Jul 2025 17:06:12 +0800 Subject: [PATCH] =?UTF-8?q?/*=E5=A2=9E=E5=8A=A0=E5=85=89=E6=98=8E=E5=A4=A7?= =?UTF-8?q?=E6=A8=A1=E5=9E=8B=E5=A4=84=E7=90=86=E6=A1=86=E6=9E=B6*/?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../simulator/constant/AlgConstants.java | 2 + .../LuminosityRequestConsumerManager.java | 54 +++++++++++++ .../service/impl/RedisQueueConsumerAsync.java | 80 +++++++++++++++++++ 3 files changed, 136 insertions(+) create mode 100644 src/main/java/com/inspect/simulator/service/impl/LuminosityRequestConsumerManager.java create mode 100644 src/main/java/com/inspect/simulator/service/impl/RedisQueueConsumerAsync.java diff --git a/src/main/java/com/inspect/simulator/constant/AlgConstants.java b/src/main/java/com/inspect/simulator/constant/AlgConstants.java index 9a1c170..38440be 100644 --- a/src/main/java/com/inspect/simulator/constant/AlgConstants.java +++ b/src/main/java/com/inspect/simulator/constant/AlgConstants.java @@ -9,4 +9,6 @@ public class AlgConstants { public static final String XB = "xb"; public static final String ALG_SUBTYPE_CODE = "alg_subtype_code"; + + public static final String LUMINOSITY_REQUEST_QUEUE = "luminosity:request:queue"; } diff --git a/src/main/java/com/inspect/simulator/service/impl/LuminosityRequestConsumerManager.java b/src/main/java/com/inspect/simulator/service/impl/LuminosityRequestConsumerManager.java new file mode 100644 index 0000000..03cb051 --- /dev/null +++ b/src/main/java/com/inspect/simulator/service/impl/LuminosityRequestConsumerManager.java @@ -0,0 +1,54 @@ +package com.inspect.simulator.service.impl; + +import com.inspect.simulator.constant.AlgConstants; +import com.inspect.simulator.domain.algorithm.in.AnalyseRequest; +import com.inspect.simulator.utils.redis.RedisService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; +import javax.annotation.PreDestroy; + +@Component +public class LuminosityRequestConsumerManager { + private static final Logger log = LoggerFactory.getLogger(LuminosityRequestConsumerManager.class); + + private RedisQueueConsumerAsync consumer; + + @Resource + private RedisService redisService; + + + @PostConstruct + public void initConsumer() { + consumer = new RedisQueueConsumerAsync( + redisService.redisTemplate, + AlgConstants.LUMINOSITY_REQUEST_QUEUE, + AnalyseRequest.class, + 1000, + 500, + request -> { + try { + log.info("LuminosityRequestConsumerManager queueSize: {}, request: {}", getQueueSize(), request); + } catch (Exception e) { + log.info("LuminosityRequestConsumerManager error queueSize: {}, request: {}", getQueueSize(), request); + } + } + ); + + consumer.start(); // 应用启动时即开始消费 + } + + @PreDestroy + public void stopConsumer() { + if (consumer != null) { + consumer.stop(); + } + } + + public long getQueueSize() { + return redisService.redisTemplate.opsForList().size(AlgConstants.LUMINOSITY_REQUEST_QUEUE); + } +} diff --git a/src/main/java/com/inspect/simulator/service/impl/RedisQueueConsumerAsync.java b/src/main/java/com/inspect/simulator/service/impl/RedisQueueConsumerAsync.java new file mode 100644 index 0000000..0e7a23b --- /dev/null +++ b/src/main/java/com/inspect/simulator/service/impl/RedisQueueConsumerAsync.java @@ -0,0 +1,80 @@ +package com.inspect.simulator.service.impl; + +import com.alibaba.nacos.shaded.com.google.gson.Gson; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.redis.core.RedisTemplate; + +import java.time.Duration; +import java.util.function.Consumer; + +public class RedisQueueConsumerAsync implements Runnable { + private static final Logger logger = LoggerFactory.getLogger(RedisQueueConsumerAsync.class); + + private final RedisTemplate redisTemplate; + private final Gson gson = new Gson(); + private final String queueKey; + private final Class clazz; + + private final long intervalMs; + private final long idleSleepMs; + + private final Consumer handler; + + private volatile boolean running = false; + + public RedisQueueConsumerAsync(RedisTemplate redisTemplate, + String queueKey, + Class clazz, + long intervalMs, + long idleSleepMs, + Consumer handler) { + this.redisTemplate = redisTemplate; + this.queueKey = queueKey; + this.clazz = clazz; + this.intervalMs = intervalMs; + this.idleSleepMs = idleSleepMs; + this.handler = handler; + } + + public void start() { + if (!running) { + running = true; + new Thread(this, "RedisQueueConsumerAsync-" + queueKey).start(); + } + } + + public void stop() { + running = false; + } + + @Override + public void run() { + while (running) { + try { + String json = redisTemplate.opsForList().leftPop(queueKey, Duration.ofSeconds(1)); + + if (json == null) { + Thread.sleep(idleSleepMs); + continue; + } + + long start = System.currentTimeMillis(); + + T obj = gson.fromJson(json, clazz); + handler.accept(obj); + + long cost = System.currentTimeMillis() - start; + if (cost < intervalMs) { + Thread.sleep(intervalMs - cost); + } + + } catch (Exception e) { + logger.error("RedisQueueConsumerAsync error", e); + } + } + + logger.info("RedisQueueConsumerAsync for [{}] stopped.", queueKey); + } +} +