|
|
|
@ -28,8 +28,7 @@ import java.util.concurrent.TimeUnit; |
|
|
|
@Component |
|
|
|
public class RedisMQPullTask implements CommandLineRunner { |
|
|
|
|
|
|
|
private static final ScheduledThreadPoolExecutor EXECUTOR_SERVICE = |
|
|
|
ThreadPoolExecutorFactory.getThreadPoolExecutor(); |
|
|
|
private static final ScheduledThreadPoolExecutor EXECUTOR = ThreadPoolExecutorFactory.getThreadPoolExecutor(); |
|
|
|
|
|
|
|
@Autowired(required = false) |
|
|
|
private List<RedisMQConsumer> consumers = Collections.emptyList(); |
|
|
|
@ -48,11 +47,14 @@ public class RedisMQPullTask implements CommandLineRunner { |
|
|
|
// 获取泛型类型
|
|
|
|
Type superClass = consumer.getClass().getGenericSuperclass(); |
|
|
|
Type type = ((ParameterizedType)superClass).getActualTypeArguments()[0]; |
|
|
|
EXECUTOR_SERVICE.execute(new Runnable() { |
|
|
|
EXECUTOR.execute(new Runnable() { |
|
|
|
@Override |
|
|
|
public void run() { |
|
|
|
List<Object> datas = new LinkedList<>(); |
|
|
|
try { |
|
|
|
if(redisMQTemplate.isClose()){ |
|
|
|
return; |
|
|
|
} |
|
|
|
if (consumer.isReady()) { |
|
|
|
String key = consumer.generateKey(); |
|
|
|
// 拉取一个批次的数据
|
|
|
|
@ -71,17 +73,16 @@ public class RedisMQPullTask implements CommandLineRunner { |
|
|
|
} |
|
|
|
} catch (Exception e) { |
|
|
|
log.error("数据消费异常,队列:{}", queue, e); |
|
|
|
EXECUTOR_SERVICE.schedule(this, period, TimeUnit.MICROSECONDS); |
|
|
|
return; |
|
|
|
} |
|
|
|
// 继续消费数据
|
|
|
|
if (!EXECUTOR_SERVICE.isShutdown()) { |
|
|
|
if (!EXECUTOR.isShutdown()) { |
|
|
|
if (datas.size() < batchSize) { |
|
|
|
// 数据已经消费完,等待下一个周期继续拉取
|
|
|
|
EXECUTOR_SERVICE.schedule(this, period, TimeUnit.MICROSECONDS); |
|
|
|
EXECUTOR.schedule(this, period, TimeUnit.MICROSECONDS); |
|
|
|
} else { |
|
|
|
// 数据没有消费完,直接开启下一个消费周期
|
|
|
|
EXECUTOR_SERVICE.execute(this); |
|
|
|
EXECUTOR.execute(this); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@ -107,6 +108,7 @@ public class RedisMQPullTask implements CommandLineRunner { |
|
|
|
|
|
|
|
@PreDestroy |
|
|
|
public void destory() { |
|
|
|
log.info("消费线程停止..."); |
|
|
|
ThreadPoolExecutorFactory.shutDown(); |
|
|
|
} |
|
|
|
} |
|
|
|
|