diff --git a/im-client/src/main/java/com/bx/imclient/task/AbstractMessageResultTask.java b/im-client/src/main/java/com/bx/imclient/task/AbstractMessageResultTask.java index 001ffd2..346cfab 100644 --- a/im-client/src/main/java/com/bx/imclient/task/AbstractMessageResultTask.java +++ b/im-client/src/main/java/com/bx/imclient/task/AbstractMessageResultTask.java @@ -1,37 +1,21 @@ package com.bx.imclient.task; -import com.bx.imcommon.util.ThreadPoolExecutorFactory; -import lombok.SneakyThrows; +import cn.hutool.core.util.StrUtil; +import com.bx.imcommon.mq.RedisMQConsumer; import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.CommandLineRunner; - -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import org.springframework.beans.factory.annotation.Value; @Slf4j -public abstract class AbstractMessageResultTask implements CommandLineRunner { +public abstract class AbstractMessageResultTask extends RedisMQConsumer { - private static final ScheduledThreadPoolExecutor EXECUTOR_SERVICE = ThreadPoolExecutorFactory.getThreadPoolExecutor(); + @Value("${spring.application.name}") + private String appName; @Override - public void run(String... args) { - // 初始化定时器 - EXECUTOR_SERVICE.execute(new Runnable() { - @SneakyThrows - @Override - public void run() { - try { - pullMessage(); - } catch (Exception e) { - log.error("任务调度异常", e); - } - if (!EXECUTOR_SERVICE.isShutdown()) { - EXECUTOR_SERVICE.schedule(this,100, TimeUnit.MICROSECONDS); - } - } - }); + public String generateKey() { + return StrUtil.join(":", super.generateKey(), appName); } - public abstract void pullMessage() throws InterruptedException; + } diff --git a/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java b/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java index d9fa639..c8be7e2 100644 --- a/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java +++ b/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java @@ -1,58 +1,27 @@ package com.bx.imclient.task; -import cn.hutool.core.util.StrUtil; -import com.alibaba.fastjson.JSONObject; import com.bx.imclient.listener.MessageListenerMulticaster; import com.bx.imcommon.contant.IMRedisKey; import com.bx.imcommon.enums.IMListenerType; import com.bx.imcommon.model.IMSendResult; -import com.bx.imcommon.mq.RedisMQTemplate; +import com.bx.imcommon.mq.RedisMQListener; import lombok.RequiredArgsConstructor; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; -import java.util.LinkedList; + import java.util.List; -import java.util.Objects; @Component @RequiredArgsConstructor -public class GroupMessageResultResultTask extends AbstractMessageResultTask { - - @Autowired - private RedisMQTemplate redisMQTemplate; - - @Value("${spring.application.name}") - private String appName; - - @Value("${im.result.batch:100}") - private int batchSize; +@RedisMQListener(queue = IMRedisKey.IM_RESULT_GROUP_QUEUE, batchSize = 100) +public class GroupMessageResultResultTask extends AbstractMessageResultTask { private final MessageListenerMulticaster listenerMulticaster; @Override - public void pullMessage() { - List results; - do { - results = loadBatch(); - if(!results.isEmpty()){ - listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE, results); - } - } while (results.size() >= batchSize); + public void onMessage(List results) { + listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE, results); } - List loadBatch() { - String key = StrUtil.join(":", IMRedisKey.IM_RESULT_GROUP_QUEUE, appName); - //这个接口redis6.2以上才支持 - //List list = redisMQTemplate.opsForList().leftPop(key, batchSize); - List results = new LinkedList<>(); - JSONObject jsonObject = (JSONObject) redisMQTemplate.opsForList().leftPop(key); - while (!Objects.isNull(jsonObject) && results.size() < batchSize) { - results.add(jsonObject.toJavaObject(IMSendResult.class)); - jsonObject = (JSONObject) redisMQTemplate.opsForList().leftPop(key); - } - return results; - } } diff --git a/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java b/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java index ac7355c..38e5ac1 100644 --- a/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java +++ b/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java @@ -1,59 +1,26 @@ package com.bx.imclient.task; -import cn.hutool.core.util.StrUtil; -import com.alibaba.fastjson.JSONObject; import com.bx.imclient.listener.MessageListenerMulticaster; import com.bx.imcommon.contant.IMRedisKey; import com.bx.imcommon.enums.IMListenerType; import com.bx.imcommon.model.IMSendResult; -import com.bx.imcommon.mq.RedisMQTemplate; +import com.bx.imcommon.mq.RedisMQListener; import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; -import java.util.LinkedList; + import java.util.List; -import java.util.Objects; -@Slf4j + @Component @RequiredArgsConstructor -public class PrivateMessageResultResultTask extends AbstractMessageResultTask { - - @Autowired - private RedisMQTemplate redisMQTemplate; - - @Value("${spring.application.name}") - private String appName; - - @Value("${im.result.batch:100}") - private int batchSize; +@RedisMQListener(queue = IMRedisKey.IM_RESULT_PRIVATE_QUEUE, batchSize = 100) +public class PrivateMessageResultResultTask extends AbstractMessageResultTask { private final MessageListenerMulticaster listenerMulticaster; @Override - public void pullMessage() { - List results; - do { - results = loadBatch(); - if(!results.isEmpty()){ - listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, results); - } - } while (results.size() >= batchSize); - } - - List loadBatch() { - String key = StrUtil.join(":", IMRedisKey.IM_RESULT_PRIVATE_QUEUE, appName); - //这个接口redis6.2以上才支持 - //List list = redisMQTemplate.opsForList().leftPop(key, batchSize); - List results = new LinkedList<>(); - JSONObject jsonObject = (JSONObject) redisMQTemplate.opsForList().leftPop(key); - while (!Objects.isNull(jsonObject) && results.size() < batchSize) { - results.add(jsonObject.toJavaObject(IMSendResult.class)); - jsonObject = (JSONObject) redisMQTemplate.opsForList().leftPop(key); - } - return results; + public void onMessage(List results) { + listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, results); } } diff --git a/im-client/src/main/java/com/bx/imclient/task/SystemMessageResultResultTask.java b/im-client/src/main/java/com/bx/imclient/task/SystemMessageResultResultTask.java index 7bd3491..a42ca94 100644 --- a/im-client/src/main/java/com/bx/imclient/task/SystemMessageResultResultTask.java +++ b/im-client/src/main/java/com/bx/imclient/task/SystemMessageResultResultTask.java @@ -1,32 +1,19 @@ package com.bx.imclient.task; -import cn.hutool.core.util.StrUtil; -import com.alibaba.fastjson.JSONObject; import com.bx.imclient.listener.MessageListenerMulticaster; import com.bx.imcommon.contant.IMRedisKey; import com.bx.imcommon.enums.IMListenerType; import com.bx.imcommon.model.IMSendResult; -import com.bx.imcommon.mq.RedisMQConsumer; import com.bx.imcommon.mq.RedisMQListener; -import com.bx.imcommon.mq.RedisMQTemplate; import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; -import java.util.LinkedList; import java.util.List; -import java.util.Objects; -@Slf4j @Component @RequiredArgsConstructor -@RedisMQListener(queue = IMRedisKey.IM_RESULT_SYSTEM_QUEUE, batchSize = 100, period = 100) -public class SystemMessageResultResultTask extends RedisMQConsumer { - - @Value("${spring.application.name}") - private String appName; +@RedisMQListener(queue = IMRedisKey.IM_RESULT_SYSTEM_QUEUE, batchSize = 100) +public class SystemMessageResultResultTask extends AbstractMessageResultTask { private final MessageListenerMulticaster listenerMulticaster; @@ -35,9 +22,4 @@ public class SystemMessageResultResultTask extends RedisMQConsumer listenerMulticaster.multicast(IMListenerType.SYSTEM_MESSAGE, results); } - @Override - public String generateKey() { - return StrUtil.join(":", IMRedisKey.IM_RESULT_SYSTEM_QUEUE, appName); - } - } diff --git a/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQConsumer.java b/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQConsumer.java index e3ec88d..cd997fd 100644 --- a/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQConsumer.java +++ b/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQConsumer.java @@ -7,13 +7,29 @@ import java.util.List; */ public abstract class RedisMQConsumer { + /** + * 消费消息回调(单条) + */ public void onMessage(T data){} + /** + * 消费消息回调(批量) + */ public void onMessage(List datas){} + /** + * 生成redis队列完整key + */ public String generateKey(){ // 默认队列名就是redis的key RedisMQListener annotation = this.getClass().getAnnotation(RedisMQListener.class); return annotation.queue(); } + + /** + * 队列是否就绪,返回true才会开始消费 + */ + public Boolean isReady(){ + return true; + } } diff --git a/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQPullTask.java b/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQPullTask.java index e53ae27..7b0d4c6 100644 --- a/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQPullTask.java +++ b/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQPullTask.java @@ -49,18 +49,20 @@ public class RedisMQPullTask implements CommandLineRunner { public void run() { List datas = new LinkedList<>(); try { - // 拉取一个批次的数据 - List objects = pullBatch(key, batchSize); - for (Object obj : objects) { - if (obj instanceof JSONObject) { - JSONObject jsonObject = (JSONObject)obj; - Object data = jsonObject.toJavaObject(type); - consumer.onMessage(data); - datas.add(data); + if(consumer.isReady()){ + // 拉取一个批次的数据 + List objects = pullBatch(key, batchSize); + for (Object obj : objects) { + if (obj instanceof JSONObject) { + JSONObject jsonObject = (JSONObject)obj; + Object data = jsonObject.toJavaObject(type); + consumer.onMessage(data); + datas.add(data); + } + } + if(!datas.isEmpty()){ + consumer.onMessage(datas); } - } - if(!datas.isEmpty()){ - consumer.onMessage(datas); } } catch (Exception e) { log.error("数据消费异常,队列:{}", key, e); diff --git a/im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java b/im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java index 905c5fc..c6b2290 100644 --- a/im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java +++ b/im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java @@ -1,45 +1,23 @@ package com.bx.imserver.task; -import com.bx.imcommon.util.ThreadPoolExecutorFactory; +import com.bx.imcommon.mq.RedisMQConsumer; import com.bx.imserver.netty.IMServerGroup; -import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.CommandLineRunner; - -import javax.annotation.PreDestroy; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; @Slf4j -public abstract class AbstractPullMessageTask implements CommandLineRunner { - - private static final ScheduledThreadPoolExecutor EXECUTOR_SERVICE = ThreadPoolExecutorFactory.getThreadPoolExecutor(); +public abstract class AbstractPullMessageTask extends RedisMQConsumer { @Autowired private IMServerGroup serverGroup; @Override - public void run(String... args) { - EXECUTOR_SERVICE.execute(new Runnable() { - @SneakyThrows - @Override - public void run() { - try { - if (serverGroup.isReady()) { - pullMessage(); - } - } catch (Exception e) { - log.error("任务调度异常", e); - } - if (!EXECUTOR_SERVICE.isShutdown()) { - EXECUTOR_SERVICE.schedule(this,100, TimeUnit.MICROSECONDS); - } - } - }); + public String generateKey() { + return String.join(":", super.generateKey(), IMServerGroup.serverId + ""); } - - public abstract void pullMessage() throws InterruptedException; + @Override + public Boolean isReady() { + return serverGroup.isReady(); + } } diff --git a/im-server/src/main/java/com/bx/imserver/task/PullGroupMessageTask.java b/im-server/src/main/java/com/bx/imserver/task/PullGroupMessageTask.java index 48d37b5..2d0db6b 100644 --- a/im-server/src/main/java/com/bx/imserver/task/PullGroupMessageTask.java +++ b/im-server/src/main/java/com/bx/imserver/task/PullGroupMessageTask.java @@ -1,38 +1,25 @@ package com.bx.imserver.task; -import com.alibaba.fastjson.JSONObject; import com.bx.imcommon.contant.IMRedisKey; import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.model.IMRecvInfo; -import com.bx.imserver.netty.IMServerGroup; +import com.bx.imcommon.mq.RedisMQListener; import com.bx.imserver.netty.processor.AbstractMessageProcessor; import com.bx.imserver.netty.processor.ProcessorFactory; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; -import java.util.Objects; - @Slf4j @Component @RequiredArgsConstructor -public class PullGroupMessageTask extends AbstractPullMessageTask { - - private final RedisTemplate redisTemplate; +@RedisMQListener(queue = IMRedisKey.IM_MESSAGE_GROUP_QUEUE, batchSize = 10) +public class PullGroupMessageTask extends AbstractPullMessageTask { @Override - public void pullMessage() { - // 从redis拉取消息 - String key = String.join(":", IMRedisKey.IM_MESSAGE_GROUP_QUEUE, IMServerGroup.serverId + ""); - JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); - while (!Objects.isNull(jsonObject)) { - IMRecvInfo recvInfo = jsonObject.toJavaObject(IMRecvInfo.class); - AbstractMessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.GROUP_MESSAGE); - processor.process(recvInfo); - // 下一条消息 - jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); - } + public void onMessage(IMRecvInfo recvInfo) { + AbstractMessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.GROUP_MESSAGE); + processor.process(recvInfo); } } diff --git a/im-server/src/main/java/com/bx/imserver/task/PullPrivateMessageTask.java b/im-server/src/main/java/com/bx/imserver/task/PullPrivateMessageTask.java index 007414e..b9b3545 100644 --- a/im-server/src/main/java/com/bx/imserver/task/PullPrivateMessageTask.java +++ b/im-server/src/main/java/com/bx/imserver/task/PullPrivateMessageTask.java @@ -1,37 +1,25 @@ package com.bx.imserver.task; -import com.alibaba.fastjson.JSONObject; import com.bx.imcommon.contant.IMRedisKey; import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.model.IMRecvInfo; -import com.bx.imserver.netty.IMServerGroup; +import com.bx.imcommon.mq.RedisMQListener; import com.bx.imserver.netty.processor.AbstractMessageProcessor; import com.bx.imserver.netty.processor.ProcessorFactory; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; -import java.util.Objects; - @Slf4j @Component @RequiredArgsConstructor -public class PullPrivateMessageTask extends AbstractPullMessageTask { - - private final RedisTemplate redisTemplate; +@RedisMQListener(queue = IMRedisKey.IM_MESSAGE_PRIVATE_QUEUE, batchSize = 10) +public class PullPrivateMessageTask extends AbstractPullMessageTask { @Override - public void pullMessage() { - // 从redis拉取消息 - String key = String.join(":", IMRedisKey.IM_MESSAGE_PRIVATE_QUEUE, IMServerGroup.serverId + ""); - JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); - while (!Objects.isNull(jsonObject)) { - IMRecvInfo recvInfo = jsonObject.toJavaObject(IMRecvInfo.class); - AbstractMessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.PRIVATE_MESSAGE); - processor.process(recvInfo); - // 下一条消息 - jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); - } + public void onMessage(IMRecvInfo recvInfo) { + AbstractMessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.PRIVATE_MESSAGE); + processor.process(recvInfo); } + } diff --git a/im-server/src/main/java/com/bx/imserver/task/PullSystemMessageTask.java b/im-server/src/main/java/com/bx/imserver/task/PullSystemMessageTask.java index c4a3f6d..5e12f1b 100644 --- a/im-server/src/main/java/com/bx/imserver/task/PullSystemMessageTask.java +++ b/im-server/src/main/java/com/bx/imserver/task/PullSystemMessageTask.java @@ -3,9 +3,7 @@ package com.bx.imserver.task; import com.bx.imcommon.contant.IMRedisKey; import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.model.IMRecvInfo; -import com.bx.imcommon.mq.RedisMQConsumer; import com.bx.imcommon.mq.RedisMQListener; -import com.bx.imserver.netty.IMServerGroup; import com.bx.imserver.netty.processor.AbstractMessageProcessor; import com.bx.imserver.netty.processor.ProcessorFactory; import lombok.extern.slf4j.Slf4j; @@ -19,7 +17,7 @@ import org.springframework.stereotype.Component; @Slf4j @Component @RedisMQListener(queue = IMRedisKey.IM_MESSAGE_SYSTEM_QUEUE,batchSize = 10) -public class PullSystemMessageTask extends RedisMQConsumer { +public class PullSystemMessageTask extends AbstractPullMessageTask { @Override public void onMessage(IMRecvInfo recvInfo) { @@ -27,8 +25,4 @@ public class PullSystemMessageTask extends RedisMQConsumer { processor.process(recvInfo); } - public String generateKey(){ - // 队列名:im:message:system:{服务id} - return String.join(":", IMRedisKey.IM_MESSAGE_SYSTEM_QUEUE, IMServerGroup.serverId + ""); - } }