diff --git a/im-client/src/main/java/com/bx/imclient/listener/MessageListener.java b/im-client/src/main/java/com/bx/imclient/listener/MessageListener.java index 26636dd..0ee2af3 100644 --- a/im-client/src/main/java/com/bx/imclient/listener/MessageListener.java +++ b/im-client/src/main/java/com/bx/imclient/listener/MessageListener.java @@ -3,8 +3,10 @@ package com.bx.imclient.listener; import com.bx.imcommon.model.IMSendResult; +import java.util.List; + public interface MessageListener { - void process(IMSendResult result); + void process(List> result); } diff --git a/im-client/src/main/java/com/bx/imclient/listener/MessageListenerMulticaster.java b/im-client/src/main/java/com/bx/imclient/listener/MessageListenerMulticaster.java index 05eb5b4..8e1fdcd 100644 --- a/im-client/src/main/java/com/bx/imclient/listener/MessageListenerMulticaster.java +++ b/im-client/src/main/java/com/bx/imclient/listener/MessageListenerMulticaster.java @@ -1,6 +1,7 @@ package com.bx.imclient.listener; +import cn.hutool.core.collection.CollUtil; import com.alibaba.fastjson.JSONObject; import com.bx.imclient.annotation.IMListener; import com.bx.imcommon.enums.IMListenerType; @@ -19,19 +20,25 @@ public class MessageListenerMulticaster { @Autowired(required = false) private List messageListeners = Collections.emptyList(); - public void multicast(IMListenerType listenerType, IMSendResult result){ + public void multicast(IMListenerType listenerType, List results){ + if(CollUtil.isEmpty(results)){ + return; + } for(MessageListener listener:messageListeners){ IMListener annotation = listener.getClass().getAnnotation(IMListener.class); if(annotation!=null && (annotation.type().equals(IMListenerType.ALL) || annotation.type().equals(listenerType))){ - // 将data转回对象类型 - if(result.getData() instanceof JSONObject){ - Type superClass = listener.getClass().getGenericInterfaces()[0]; - Type type = ((ParameterizedType) superClass).getActualTypeArguments()[0]; - JSONObject data = (JSONObject)result.getData(); - result.setData(data.toJavaObject(type)); - } + + results.forEach(result->{ + // 将data转回对象类型 + if(result.getData() instanceof JSONObject){ + Type superClass = listener.getClass().getGenericInterfaces()[0]; + Type type = ((ParameterizedType) superClass).getActualTypeArguments()[0]; + JSONObject data = (JSONObject)result.getData(); + result.setData(data.toJavaObject(type)); + } + }); // 回调到调用方处理 - listener.process(result); + listener.process(results); } } } diff --git a/im-client/src/main/java/com/bx/imclient/sender/IMSender.java b/im-client/src/main/java/com/bx/imclient/sender/IMSender.java index fafdfe9..6fc1297 100644 --- a/im-client/src/main/java/com/bx/imclient/sender/IMSender.java +++ b/im-client/src/main/java/com/bx/imclient/sender/IMSender.java @@ -29,6 +29,7 @@ public class IMSender { private final MessageListenerMulticaster listenerMulticaster; public void sendPrivateMessage(IMPrivateMessage message) { + List results = new LinkedList<>(); for (Integer terminal : message.getRecvTerminals()) { // 获取对方连接的channelId String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, message.getRecvId().toString(), terminal.toString()); @@ -44,14 +45,13 @@ public class IMSender { recvInfo.setReceivers(Collections.singletonList(new IMUserInfo(message.getRecvId(), terminal))); recvInfo.setData(message.getData()); redisTemplate.opsForList().rightPush(sendKey, recvInfo); - } else if (message.getSendResult()) { - // 回复消息状态 + } else { IMSendResult result = new IMSendResult(); result.setSender(message.getSender()); result.setReceiver(new IMUserInfo(message.getRecvId(), terminal)); result.setCode(IMSendCode.NOT_ONLINE.code()); result.setData(message.getData()); - listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, result); + results.add(result); } } // 推送给自己的其他终端 @@ -77,10 +77,14 @@ public class IMSender { } } } - + // 对离线用户回复消息状态 + if(message.getSendResult() && !results.isEmpty()){ + listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, results); + } } public void sendGroupMessage(IMGroupMessage message) { + // 根据群聊每个成员所连的IM-server,进行分组 Map sendMap = new HashMap<>(); for (Integer terminal : message.getRecvTerminals()) { @@ -118,17 +122,7 @@ public class IMSender { String key = String.join(":", IMRedisKey.IM_MESSAGE_GROUP_QUEUE, entry.getKey().toString()); redisTemplate.opsForList().rightPush(key, recvInfo); } - // 对离线用户回复消息状态 - if (message.getSendResult()) { - for (IMUserInfo offLineUser : offLineUsers) { - IMSendResult result = new IMSendResult(); - result.setSender(message.getSender()); - result.setReceiver(offLineUser); - result.setCode(IMSendCode.NOT_ONLINE.code()); - result.setData(message.getData()); - listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE, result); - } - } + // 推送给自己的其他终端 if (message.getSendToSelf()) { for (Integer terminal : IMTerminalType.codes()) { @@ -152,6 +146,20 @@ public class IMSender { } } } + // 对离线用户回复消息状态 + if(message.getSendResult() && !offLineUsers.isEmpty()){ + List results = new LinkedList<>(); + for (IMUserInfo offLineUser : offLineUsers) { + IMSendResult result = new IMSendResult(); + result.setSender(message.getSender()); + result.setReceiver(offLineUser); + result.setCode(IMSendCode.NOT_ONLINE.code()); + result.setData(message.getData()); + results.add(result); + } + listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE, results); + } + } public Map> getOnlineTerminal(List userIds){ diff --git a/im-client/src/main/java/com/bx/imclient/task/AbstractMessageResultTask.java b/im-client/src/main/java/com/bx/imclient/task/AbstractMessageResultTask.java index 86b8920..154f2f3 100644 --- a/im-client/src/main/java/com/bx/imclient/task/AbstractMessageResultTask.java +++ b/im-client/src/main/java/com/bx/imclient/task/AbstractMessageResultTask.java @@ -20,13 +20,13 @@ public abstract class AbstractMessageResultTask implements CommandLineRunner { @SneakyThrows @Override public void run() { - try{ + try { pullMessage(); - }catch (Exception e){ - log.error("任务调度异常",e); - Thread.sleep(200); + } catch (Exception e) { + log.error("任务调度异常", e); } - if(!EXECUTOR_SERVICE.isShutdown()){ + if (!EXECUTOR_SERVICE.isShutdown()) { + Thread.sleep(100); EXECUTOR_SERVICE.execute(this); } } @@ -35,10 +35,10 @@ public abstract class AbstractMessageResultTask implements CommandLineRunner { @PreDestroy - public void destroy(){ - log.info("{}线程任务关闭",this.getClass().getSimpleName()); + public void destroy() { + log.info("{}线程任务关闭", this.getClass().getSimpleName()); EXECUTOR_SERVICE.shutdown(); } - public abstract void pullMessage(); + public abstract void pullMessage() throws InterruptedException; } diff --git a/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java b/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java index 3150061..7089f70 100644 --- a/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java +++ b/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java @@ -6,14 +6,16 @@ import com.bx.imclient.listener.MessageListenerMulticaster; import com.bx.imcommon.contant.IMRedisKey; import com.bx.imcommon.enums.IMListenerType; import com.bx.imcommon.model.IMSendResult; -import lombok.AllArgsConstructor; import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; -import java.util.concurrent.TimeUnit; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; + @Component @RequiredArgsConstructor @@ -25,16 +27,33 @@ public class GroupMessageResultResultTask extends AbstractMessageResultTask { @Value("${spring.application.name}") private String appName; + @Value("${im.result.batch:100}") + private int batchSize; + private final MessageListenerMulticaster listenerMulticaster; @Override public void pullMessage() { - String key = StrUtil.join(":",IMRedisKey.IM_RESULT_GROUP_QUEUE,appName); - JSONObject jsonObject = (JSONObject)redisTemplate.opsForList().leftPop(key,10, TimeUnit.SECONDS); - if(jsonObject != null) { - IMSendResult result = jsonObject.toJavaObject(IMSendResult.class); - listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE,result); + List results; + do { + results = loadBatch(); + if(!results.isEmpty()){ + listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE, results); + } + } while (results.size() < batchSize); + } + + List loadBatch() { + String key = StrUtil.join(":", IMRedisKey.IM_RESULT_GROUP_QUEUE, appName); + //这个接口redis6.2以上才支持 + //List list = redisTemplate.opsForList().leftPop(key, 100); + List results = new LinkedList<>(); + JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); + while (!Objects.isNull(jsonObject) && results.size() < batchSize) { + results.add(jsonObject.toJavaObject(IMSendResult.class)); + jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); } + return results; } } diff --git a/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java b/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java index 31cc462..9ff6c80 100644 --- a/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java +++ b/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java @@ -6,7 +6,6 @@ import com.bx.imclient.listener.MessageListenerMulticaster; import com.bx.imcommon.contant.IMRedisKey; import com.bx.imcommon.enums.IMListenerType; import com.bx.imcommon.model.IMSendResult; -import lombok.AllArgsConstructor; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; @@ -14,7 +13,9 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; -import java.util.concurrent.TimeUnit; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; @Slf4j @Component @@ -22,21 +23,38 @@ import java.util.concurrent.TimeUnit; public class PrivateMessageResultResultTask extends AbstractMessageResultTask { @Resource(name = "IMRedisTemplate") - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Value("${spring.application.name}") private String appName; + @Value("${im.result.batch:100}") + private int batchSize; + private final MessageListenerMulticaster listenerMulticaster; @Override public void pullMessage() { - String key = StrUtil.join(":",IMRedisKey.IM_RESULT_PRIVATE_QUEUE,appName); - JSONObject jsonObject = (JSONObject)redisTemplate.opsForList().leftPop(key,10, TimeUnit.SECONDS); - if(jsonObject != null) { - IMSendResult result = jsonObject.toJavaObject(IMSendResult.class); - listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, result); + List results; + do { + results = loadBatch(); + if(!results.isEmpty()){ + listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, results); + } + } while (results.size() < batchSize); + } + + List loadBatch() { + String key = StrUtil.join(":", IMRedisKey.IM_RESULT_PRIVATE_QUEUE, appName); + //这个接口redis6.2以上才支持 + //List list = redisTemplate.opsForList().leftPop(key, 100); + List results = new LinkedList<>(); + JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); + while (!Objects.isNull(jsonObject) && results.size() < batchSize) { + results.add(jsonObject.toJavaObject(IMSendResult.class)); + jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); } + return results; } } diff --git a/im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java b/im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java index 745d606..d927b99 100644 --- a/im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java +++ b/im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java @@ -7,10 +7,13 @@ import com.bx.imcommon.enums.IMSendCode; import com.bx.imcommon.model.IMSendResult; import com.bx.implatform.contant.RedisKey; import com.bx.implatform.vo.GroupMessageVO; +import com.bx.implatform.vo.PrivateMessageVO; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; +import java.util.List; + @Slf4j @IMListener(type = IMListenerType.GROUP_MESSAGE) @AllArgsConstructor @@ -19,9 +22,13 @@ public class GroupMessageListener implements MessageListener { private final RedisTemplate redisTemplate; @Override - public void process(IMSendResult result) { - GroupMessageVO messageInfo = result.getData(); - // 空空如也 + public void process(List> results) { + for(IMSendResult result:results){ + GroupMessageVO messageInfo = result.getData(); + if (result.getCode().equals(IMSendCode.SUCCESS.code())) { + log.info("消息送达,消息id:{},发送者:{},接收者:{},终端:{}", messageInfo.getId(), result.getSender().getId(), result.getReceiver().getId(), result.getReceiver().getTerminal()); + } + } } } diff --git a/im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java b/im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java index 3779365..8ff7d49 100644 --- a/im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java +++ b/im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java @@ -1,5 +1,6 @@ package com.bx.implatform.listener; +import cn.hutool.core.collection.CollUtil; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.bx.imclient.annotation.IMListener; import com.bx.imclient.listener.MessageListener; @@ -14,6 +15,10 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + @Slf4j @IMListener(type = IMListenerType.PRIVATE_MESSAGE) public class PrivateMessageListener implements MessageListener { @@ -23,16 +28,23 @@ public class PrivateMessageListener implements MessageListener private IPrivateMessageService privateMessageService; @Override - public void process(IMSendResult result) { - PrivateMessageVO messageInfo = result.getData(); - // 更新消息状态,这里只处理成功消息,失败的消息继续保持未读状态 - if (result.getCode().equals(IMSendCode.SUCCESS.code())) { + public void process(List> results) { + Set messageIds = new HashSet<>(); + for(IMSendResult result : results){ + PrivateMessageVO messageInfo = result.getData(); + // 更新消息状态,这里只处理成功消息,失败的消息继续保持未读状态 + if (result.getCode().equals(IMSendCode.SUCCESS.code())) { + messageIds.add(messageInfo.getId()); + log.info("消息送达,消息id:{},发送者:{},接收者:{},终端:{}", messageInfo.getId(), result.getSender().getId(), result.getReceiver().getId(), result.getReceiver().getTerminal()); + } + } + // 批量修改状态 + if(CollUtil.isNotEmpty(messageIds)){ UpdateWrapper updateWrapper = new UpdateWrapper<>(); - updateWrapper.lambda().eq(PrivateMessage::getId, messageInfo.getId()) + updateWrapper.lambda().in(PrivateMessage::getId, messageIds) .eq(PrivateMessage::getStatus, MessageStatus.UNSEND.code()) .set(PrivateMessage::getStatus, MessageStatus.SENDED.code()); privateMessageService.update(updateWrapper); - log.info("消息已读,消息id:{},发送者:{},接收者:{},终端:{}", messageInfo.getId(), result.getSender().getId(), result.getReceiver().getId(), result.getReceiver().getTerminal()); } } diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java index 01a2fd8..0ef663e 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java +++ b/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java @@ -65,6 +65,7 @@ public class PrivateMessageServiceImpl extends ServiceImpl private final RedisTemplate redisTemplate; - @Async @Override public void process(IMRecvInfo recvInfo) { IMUserInfo sender = recvInfo.getSender(); diff --git a/im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java b/im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java index 6a4c30d..6752869 100644 --- a/im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java +++ b/im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java @@ -30,9 +30,9 @@ public abstract class AbstractPullMessageTask implements CommandLineRunner { } } catch (Exception e) { log.error("任务调度异常", e); - Thread.sleep(200); } if (!EXECUTOR_SERVICE.isShutdown()) { + Thread.sleep(100); EXECUTOR_SERVICE.execute(this); } } @@ -45,5 +45,5 @@ public abstract class AbstractPullMessageTask implements CommandLineRunner { EXECUTOR_SERVICE.shutdown(); } - public abstract void pullMessage(); + public abstract void pullMessage() throws InterruptedException; } diff --git a/im-server/src/main/java/com/bx/imserver/task/PullGroupMessageTask.java b/im-server/src/main/java/com/bx/imserver/task/PullGroupMessageTask.java index 3690034..6d8e83f 100644 --- a/im-server/src/main/java/com/bx/imserver/task/PullGroupMessageTask.java +++ b/im-server/src/main/java/com/bx/imserver/task/PullGroupMessageTask.java @@ -12,7 +12,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; -import java.util.concurrent.TimeUnit; +import java.util.Objects; @Slf4j @Component @@ -25,11 +25,13 @@ public class PullGroupMessageTask extends AbstractPullMessageTask { public void pullMessage() { // 从redis拉取未读消息 String key = String.join(":", IMRedisKey.IM_MESSAGE_GROUP_QUEUE, IMServerGroup.serverId + ""); - JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key, 10, TimeUnit.SECONDS); - if (jsonObject != null) { + JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); + while (!Objects.isNull(jsonObject)) { IMRecvInfo recvInfo = jsonObject.toJavaObject(IMRecvInfo.class); AbstractMessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.GROUP_MESSAGE); processor.process(recvInfo); + // 下一条消息 + jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); } } diff --git a/im-server/src/main/java/com/bx/imserver/task/PullPrivateMessageTask.java b/im-server/src/main/java/com/bx/imserver/task/PullPrivateMessageTask.java index b22e43b..5edd0d6 100644 --- a/im-server/src/main/java/com/bx/imserver/task/PullPrivateMessageTask.java +++ b/im-server/src/main/java/com/bx/imserver/task/PullPrivateMessageTask.java @@ -12,25 +12,26 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; -import java.util.concurrent.TimeUnit; +import java.util.Objects; @Slf4j @Component @RequiredArgsConstructor -public class PullPrivateMessageTask extends AbstractPullMessageTask { +public class PullPrivateMessageTask extends AbstractPullMessageTask { - private final RedisTemplate redisTemplate; + private final RedisTemplate redisTemplate; @Override public void pullMessage() { // 从redis拉取未读消息 - String key = String.join(":", IMRedisKey.IM_MESSAGE_PRIVATE_QUEUE,IMServerGroup.serverId+""); - JSONObject jsonObject = (JSONObject)redisTemplate.opsForList().leftPop(key,10, TimeUnit.SECONDS); - if(jsonObject!=null){ + String key = String.join(":", IMRedisKey.IM_MESSAGE_PRIVATE_QUEUE, IMServerGroup.serverId + ""); + JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); + while (!Objects.isNull(jsonObject)) { IMRecvInfo recvInfo = jsonObject.toJavaObject(IMRecvInfo.class); AbstractMessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.PRIVATE_MESSAGE); processor.process(recvInfo); + // 下一条消息 + jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); } } - }