|
|
|
@ -0,0 +1,80 @@ |
|
|
|
package com.inspect.simulator.service.impl; |
|
|
|
|
|
|
|
import com.alibaba.nacos.shaded.com.google.gson.Gson; |
|
|
|
import org.slf4j.Logger; |
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
import org.springframework.data.redis.core.RedisTemplate; |
|
|
|
|
|
|
|
import java.time.Duration; |
|
|
|
import java.util.function.Consumer; |
|
|
|
|
|
|
|
public class RedisQueueConsumerAsync<T> implements Runnable { |
|
|
|
private static final Logger logger = LoggerFactory.getLogger(RedisQueueConsumerAsync.class); |
|
|
|
|
|
|
|
private final RedisTemplate<String, String> redisTemplate; |
|
|
|
private final Gson gson = new Gson(); |
|
|
|
private final String queueKey; |
|
|
|
private final Class<T> clazz; |
|
|
|
|
|
|
|
private final long intervalMs; |
|
|
|
private final long idleSleepMs; |
|
|
|
|
|
|
|
private final Consumer<T> handler; |
|
|
|
|
|
|
|
private volatile boolean running = false; |
|
|
|
|
|
|
|
public RedisQueueConsumerAsync(RedisTemplate<String, String> redisTemplate, |
|
|
|
String queueKey, |
|
|
|
Class<T> clazz, |
|
|
|
long intervalMs, |
|
|
|
long idleSleepMs, |
|
|
|
Consumer<T> handler) { |
|
|
|
this.redisTemplate = redisTemplate; |
|
|
|
this.queueKey = queueKey; |
|
|
|
this.clazz = clazz; |
|
|
|
this.intervalMs = intervalMs; |
|
|
|
this.idleSleepMs = idleSleepMs; |
|
|
|
this.handler = handler; |
|
|
|
} |
|
|
|
|
|
|
|
public void start() { |
|
|
|
if (!running) { |
|
|
|
running = true; |
|
|
|
new Thread(this, "RedisQueueConsumerAsync-" + queueKey).start(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public void stop() { |
|
|
|
running = false; |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void run() { |
|
|
|
while (running) { |
|
|
|
try { |
|
|
|
String json = redisTemplate.opsForList().leftPop(queueKey, Duration.ofSeconds(1)); |
|
|
|
|
|
|
|
if (json == null) { |
|
|
|
Thread.sleep(idleSleepMs); |
|
|
|
continue; |
|
|
|
} |
|
|
|
|
|
|
|
long start = System.currentTimeMillis(); |
|
|
|
|
|
|
|
T obj = gson.fromJson(json, clazz); |
|
|
|
handler.accept(obj); |
|
|
|
|
|
|
|
long cost = System.currentTimeMillis() - start; |
|
|
|
if (cost < intervalMs) { |
|
|
|
Thread.sleep(intervalMs - cost); |
|
|
|
} |
|
|
|
|
|
|
|
} catch (Exception e) { |
|
|
|
logger.error("RedisQueueConsumerAsync error", e); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
logger.info("RedisQueueConsumerAsync for [{}] stopped.", queueKey); |
|
|
|
} |
|
|
|
} |
|
|
|
|