From 136b5567c849970e5d6391e238ef01bcd7e8b46e Mon Sep 17 00:00:00 2001 From: blue <825657193@qq.com> Date: Fri, 15 Dec 2023 22:16:40 +0800 Subject: [PATCH 1/7] =?UTF-8?q?im-client=E6=94=AF=E6=8C=81=E5=A4=9A?= =?UTF-8?q?=E4=B8=AA=E6=9C=8D=E5=8A=A1=E8=BF=9E=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/bx/imclient/IMAutoConfiguration.java | 1 + .../main/java/com/bx/imclient/sender/IMSender.java | 6 ++++++ .../imclient/task/GroupMessageResultResultTask.java | 10 ++++++++-- .../imclient/task/PrivateMessageResultResultTask.java | 11 ++++++++--- .../main/java/com/bx/imcommon/model/IMRecvInfo.java | 4 ++++ .../service/impl/GroupMessageServiceImpl.java | 1 + im-platform/src/main/resources/application.yml | 2 ++ .../netty/processor/GroupMessageProcessor.java | 3 ++- .../netty/processor/PrivateMessageProcessor.java | 3 ++- im-ui/src/components/chat/ChatBox.vue | 1 - 10 files changed, 34 insertions(+), 8 deletions(-) diff --git a/im-client/src/main/java/com/bx/imclient/IMAutoConfiguration.java b/im-client/src/main/java/com/bx/imclient/IMAutoConfiguration.java index b3a93fc..9f8fa84 100644 --- a/im-client/src/main/java/com/bx/imclient/IMAutoConfiguration.java +++ b/im-client/src/main/java/com/bx/imclient/IMAutoConfiguration.java @@ -9,4 +9,5 @@ import org.springframework.context.annotation.Configuration; @Configuration @ComponentScan("com.bx.imclient") public class IMAutoConfiguration { + } diff --git a/im-client/src/main/java/com/bx/imclient/sender/IMSender.java b/im-client/src/main/java/com/bx/imclient/sender/IMSender.java index 0eac628..fafdfe9 100644 --- a/im-client/src/main/java/com/bx/imclient/sender/IMSender.java +++ b/im-client/src/main/java/com/bx/imclient/sender/IMSender.java @@ -9,6 +9,7 @@ import com.bx.imcommon.enums.IMSendCode; import com.bx.imcommon.enums.IMTerminalType; import com.bx.imcommon.model.*; import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; @@ -22,6 +23,9 @@ public class IMSender { @Resource(name="IMRedisTemplate") private RedisTemplate redisTemplate; + @Value("${spring.application.name}") + private String appName; + private final MessageListenerMulticaster listenerMulticaster; public void sendPrivateMessage(IMPrivateMessage message) { @@ -35,6 +39,7 @@ public class IMSender { IMRecvInfo recvInfo = new IMRecvInfo(); recvInfo.setCmd(IMCmdType.PRIVATE_MESSAGE.code()); recvInfo.setSendResult(message.getSendResult()); + recvInfo.setServiceName(appName); recvInfo.setSender(message.getSender()); recvInfo.setReceivers(Collections.singletonList(new IMUserInfo(message.getRecvId(), terminal))); recvInfo.setData(message.getData()); @@ -106,6 +111,7 @@ public class IMSender { recvInfo.setCmd(IMCmdType.GROUP_MESSAGE.code()); recvInfo.setReceivers(new LinkedList<>(entry.getValue())); recvInfo.setSender(message.getSender()); + recvInfo.setServiceName(appName); recvInfo.setSendResult(message.getSendResult()); recvInfo.setData(message.getData()); // 推送至队列 diff --git a/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java b/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java index 4e6f08c..3150061 100644 --- a/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java +++ b/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java @@ -1,11 +1,14 @@ package com.bx.imclient.task; +import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSONObject; import com.bx.imclient.listener.MessageListenerMulticaster; import com.bx.imcommon.contant.IMRedisKey; import com.bx.imcommon.enums.IMListenerType; import com.bx.imcommon.model.IMSendResult; import lombok.AllArgsConstructor; +import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; @@ -13,17 +16,20 @@ import javax.annotation.Resource; import java.util.concurrent.TimeUnit; @Component -@AllArgsConstructor +@RequiredArgsConstructor public class GroupMessageResultResultTask extends AbstractMessageResultTask { @Resource(name = "IMRedisTemplate") private RedisTemplate redisTemplate; + @Value("${spring.application.name}") + private String appName; + private final MessageListenerMulticaster listenerMulticaster; @Override public void pullMessage() { - String key = IMRedisKey.IM_RESULT_GROUP_QUEUE; + String key = StrUtil.join(":",IMRedisKey.IM_RESULT_GROUP_QUEUE,appName); JSONObject jsonObject = (JSONObject)redisTemplate.opsForList().leftPop(key,10, TimeUnit.SECONDS); if(jsonObject != null) { IMSendResult result = jsonObject.toJavaObject(IMSendResult.class); diff --git a/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java b/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java index b90a08e..31cc462 100644 --- a/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java +++ b/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java @@ -1,12 +1,15 @@ package com.bx.imclient.task; +import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSONObject; import com.bx.imclient.listener.MessageListenerMulticaster; import com.bx.imcommon.contant.IMRedisKey; import com.bx.imcommon.enums.IMListenerType; import com.bx.imcommon.model.IMSendResult; import lombok.AllArgsConstructor; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; @@ -15,18 +18,20 @@ import java.util.concurrent.TimeUnit; @Slf4j @Component -@AllArgsConstructor +@RequiredArgsConstructor public class PrivateMessageResultResultTask extends AbstractMessageResultTask { - @Resource(name = "IMRedisTemplate") private RedisTemplate redisTemplate; + @Value("${spring.application.name}") + private String appName; + private final MessageListenerMulticaster listenerMulticaster; @Override public void pullMessage() { - String key = IMRedisKey.IM_RESULT_PRIVATE_QUEUE; + String key = StrUtil.join(":",IMRedisKey.IM_RESULT_PRIVATE_QUEUE,appName); JSONObject jsonObject = (JSONObject)redisTemplate.opsForList().leftPop(key,10, TimeUnit.SECONDS); if(jsonObject != null) { IMSendResult result = jsonObject.toJavaObject(IMSendResult.class); diff --git a/im-commom/src/main/java/com/bx/imcommon/model/IMRecvInfo.java b/im-commom/src/main/java/com/bx/imcommon/model/IMRecvInfo.java index 9a6e0df..52c74e6 100644 --- a/im-commom/src/main/java/com/bx/imcommon/model/IMRecvInfo.java +++ b/im-commom/src/main/java/com/bx/imcommon/model/IMRecvInfo.java @@ -27,6 +27,10 @@ public class IMRecvInfo { */ private Boolean sendResult; + /** + * 当前服务名(回调发送结果使用) + */ + private String serviceName; /** * 推送消息体 */ diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java index e3ecbaa..b645ab9 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java +++ b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java @@ -86,6 +86,7 @@ public class GroupMessageServiceImpl extends ServiceImpl sendMessage = new IMGroupMessage<>(); sendMessage.setSender(new IMUserInfo(session.getUserId(), session.getTerminal())); sendMessage.setRecvIds(userIds); + sendMessage.setSendResult(false); sendMessage.setData(msgInfo); imClient.sendGroupMessage(sendMessage); log.info("发送群聊消息,发送id:{},群聊id:{},内容:{}", session.getUserId(), dto.getGroupId(), dto.getContent()); diff --git a/im-platform/src/main/resources/application.yml b/im-platform/src/main/resources/application.yml index a682483..889a69c 100644 --- a/im-platform/src/main/resources/application.yml +++ b/im-platform/src/main/resources/application.yml @@ -3,6 +3,8 @@ server: port: 8888 #配置项目的数据源 spring: + application: + name: im-platform mvc: pathmatch: matching-strategy: ant_path_matcher diff --git a/im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java b/im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java index 9772d20..88b3577 100644 --- a/im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java +++ b/im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java @@ -1,5 +1,6 @@ package com.bx.imserver.netty.processor; +import cn.hutool.core.util.StrUtil; import com.bx.imcommon.contant.IMRedisKey; import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.enums.IMSendCode; @@ -64,7 +65,7 @@ public class GroupMessageProcessor extends AbstractMessageProcessor result.setCode(sendCode.code()); result.setData(recvInfo.getData()); // 推送到结果队列 - String key = IMRedisKey.IM_RESULT_GROUP_QUEUE; + String key = StrUtil.join(":",IMRedisKey.IM_RESULT_GROUP_QUEUE,recvInfo.getServiceName()); redisTemplate.opsForList().rightPush(key, result); } } diff --git a/im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java b/im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java index 1f150fd..26803c5 100644 --- a/im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java +++ b/im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java @@ -1,5 +1,6 @@ package com.bx.imserver.netty.processor; +import cn.hutool.core.util.StrUtil; import com.bx.imcommon.contant.IMRedisKey; import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.enums.IMSendCode; @@ -57,7 +58,7 @@ public class PrivateMessageProcessor extends AbstractMessageProcessor Date: Sat, 16 Dec 2023 15:27:09 +0800 Subject: [PATCH 2/7] =?UTF-8?q?=E7=A7=BB=E9=99=A4logback=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- im-platform/src/main/resources/application.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/im-platform/src/main/resources/application.yml b/im-platform/src/main/resources/application.yml index 889a69c..b8ff61b 100644 --- a/im-platform/src/main/resources/application.yml +++ b/im-platform/src/main/resources/application.yml @@ -54,6 +54,3 @@ jwt: refreshToken: expireIn: 604800 #7天 secret: IKDiqVmn0VFU - -logging: - config: classpath:logback-prod.xml From 6fc3a66c5824bbd381e6b9bc8eebae7e9c36b031 Mon Sep 17 00:00:00 2001 From: blue <825657193@qq.com> Date: Fri, 22 Dec 2023 00:52:29 +0800 Subject: [PATCH 3/7] =?UTF-8?q?=E6=B6=88=E6=81=AF=E7=BB=93=E6=9E=9C?= =?UTF-8?q?=E6=8E=A8=E9=80=81=E4=BD=BF=E7=94=A8=E6=89=B9=E9=87=8F=E6=96=B9?= =?UTF-8?q?=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bx/imclient/listener/MessageListener.java | 4 +- .../listener/MessageListenerMulticaster.java | 25 +++++++----- .../java/com/bx/imclient/sender/IMSender.java | 38 +++++++++++-------- .../task/AbstractMessageResultTask.java | 16 ++++---- .../task/GroupMessageResultResultTask.java | 33 ++++++++++++---- .../task/PrivateMessageResultResultTask.java | 34 +++++++++++++---- .../listener/GroupMessageListener.java | 13 +++++-- .../listener/PrivateMessageListener.java | 24 +++++++++--- .../impl/PrivateMessageServiceImpl.java | 1 + .../processor/GroupMessageProcessor.java | 1 - .../task/AbstractPullMessageTask.java | 4 +- .../imserver/task/PullGroupMessageTask.java | 8 ++-- .../imserver/task/PullPrivateMessageTask.java | 15 ++++---- 13 files changed, 146 insertions(+), 70 deletions(-) diff --git a/im-client/src/main/java/com/bx/imclient/listener/MessageListener.java b/im-client/src/main/java/com/bx/imclient/listener/MessageListener.java index 26636dd..0ee2af3 100644 --- a/im-client/src/main/java/com/bx/imclient/listener/MessageListener.java +++ b/im-client/src/main/java/com/bx/imclient/listener/MessageListener.java @@ -3,8 +3,10 @@ package com.bx.imclient.listener; import com.bx.imcommon.model.IMSendResult; +import java.util.List; + public interface MessageListener { - void process(IMSendResult result); + void process(List> result); } diff --git a/im-client/src/main/java/com/bx/imclient/listener/MessageListenerMulticaster.java b/im-client/src/main/java/com/bx/imclient/listener/MessageListenerMulticaster.java index 05eb5b4..8e1fdcd 100644 --- a/im-client/src/main/java/com/bx/imclient/listener/MessageListenerMulticaster.java +++ b/im-client/src/main/java/com/bx/imclient/listener/MessageListenerMulticaster.java @@ -1,6 +1,7 @@ package com.bx.imclient.listener; +import cn.hutool.core.collection.CollUtil; import com.alibaba.fastjson.JSONObject; import com.bx.imclient.annotation.IMListener; import com.bx.imcommon.enums.IMListenerType; @@ -19,19 +20,25 @@ public class MessageListenerMulticaster { @Autowired(required = false) private List messageListeners = Collections.emptyList(); - public void multicast(IMListenerType listenerType, IMSendResult result){ + public void multicast(IMListenerType listenerType, List results){ + if(CollUtil.isEmpty(results)){ + return; + } for(MessageListener listener:messageListeners){ IMListener annotation = listener.getClass().getAnnotation(IMListener.class); if(annotation!=null && (annotation.type().equals(IMListenerType.ALL) || annotation.type().equals(listenerType))){ - // 将data转回对象类型 - if(result.getData() instanceof JSONObject){ - Type superClass = listener.getClass().getGenericInterfaces()[0]; - Type type = ((ParameterizedType) superClass).getActualTypeArguments()[0]; - JSONObject data = (JSONObject)result.getData(); - result.setData(data.toJavaObject(type)); - } + + results.forEach(result->{ + // 将data转回对象类型 + if(result.getData() instanceof JSONObject){ + Type superClass = listener.getClass().getGenericInterfaces()[0]; + Type type = ((ParameterizedType) superClass).getActualTypeArguments()[0]; + JSONObject data = (JSONObject)result.getData(); + result.setData(data.toJavaObject(type)); + } + }); // 回调到调用方处理 - listener.process(result); + listener.process(results); } } } diff --git a/im-client/src/main/java/com/bx/imclient/sender/IMSender.java b/im-client/src/main/java/com/bx/imclient/sender/IMSender.java index fafdfe9..6fc1297 100644 --- a/im-client/src/main/java/com/bx/imclient/sender/IMSender.java +++ b/im-client/src/main/java/com/bx/imclient/sender/IMSender.java @@ -29,6 +29,7 @@ public class IMSender { private final MessageListenerMulticaster listenerMulticaster; public void sendPrivateMessage(IMPrivateMessage message) { + List results = new LinkedList<>(); for (Integer terminal : message.getRecvTerminals()) { // 获取对方连接的channelId String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, message.getRecvId().toString(), terminal.toString()); @@ -44,14 +45,13 @@ public class IMSender { recvInfo.setReceivers(Collections.singletonList(new IMUserInfo(message.getRecvId(), terminal))); recvInfo.setData(message.getData()); redisTemplate.opsForList().rightPush(sendKey, recvInfo); - } else if (message.getSendResult()) { - // 回复消息状态 + } else { IMSendResult result = new IMSendResult(); result.setSender(message.getSender()); result.setReceiver(new IMUserInfo(message.getRecvId(), terminal)); result.setCode(IMSendCode.NOT_ONLINE.code()); result.setData(message.getData()); - listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, result); + results.add(result); } } // 推送给自己的其他终端 @@ -77,10 +77,14 @@ public class IMSender { } } } - + // 对离线用户回复消息状态 + if(message.getSendResult() && !results.isEmpty()){ + listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, results); + } } public void sendGroupMessage(IMGroupMessage message) { + // 根据群聊每个成员所连的IM-server,进行分组 Map sendMap = new HashMap<>(); for (Integer terminal : message.getRecvTerminals()) { @@ -118,17 +122,7 @@ public class IMSender { String key = String.join(":", IMRedisKey.IM_MESSAGE_GROUP_QUEUE, entry.getKey().toString()); redisTemplate.opsForList().rightPush(key, recvInfo); } - // 对离线用户回复消息状态 - if (message.getSendResult()) { - for (IMUserInfo offLineUser : offLineUsers) { - IMSendResult result = new IMSendResult(); - result.setSender(message.getSender()); - result.setReceiver(offLineUser); - result.setCode(IMSendCode.NOT_ONLINE.code()); - result.setData(message.getData()); - listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE, result); - } - } + // 推送给自己的其他终端 if (message.getSendToSelf()) { for (Integer terminal : IMTerminalType.codes()) { @@ -152,6 +146,20 @@ public class IMSender { } } } + // 对离线用户回复消息状态 + if(message.getSendResult() && !offLineUsers.isEmpty()){ + List results = new LinkedList<>(); + for (IMUserInfo offLineUser : offLineUsers) { + IMSendResult result = new IMSendResult(); + result.setSender(message.getSender()); + result.setReceiver(offLineUser); + result.setCode(IMSendCode.NOT_ONLINE.code()); + result.setData(message.getData()); + results.add(result); + } + listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE, results); + } + } public Map> getOnlineTerminal(List userIds){ diff --git a/im-client/src/main/java/com/bx/imclient/task/AbstractMessageResultTask.java b/im-client/src/main/java/com/bx/imclient/task/AbstractMessageResultTask.java index 86b8920..154f2f3 100644 --- a/im-client/src/main/java/com/bx/imclient/task/AbstractMessageResultTask.java +++ b/im-client/src/main/java/com/bx/imclient/task/AbstractMessageResultTask.java @@ -20,13 +20,13 @@ public abstract class AbstractMessageResultTask implements CommandLineRunner { @SneakyThrows @Override public void run() { - try{ + try { pullMessage(); - }catch (Exception e){ - log.error("任务调度异常",e); - Thread.sleep(200); + } catch (Exception e) { + log.error("任务调度异常", e); } - if(!EXECUTOR_SERVICE.isShutdown()){ + if (!EXECUTOR_SERVICE.isShutdown()) { + Thread.sleep(100); EXECUTOR_SERVICE.execute(this); } } @@ -35,10 +35,10 @@ public abstract class AbstractMessageResultTask implements CommandLineRunner { @PreDestroy - public void destroy(){ - log.info("{}线程任务关闭",this.getClass().getSimpleName()); + public void destroy() { + log.info("{}线程任务关闭", this.getClass().getSimpleName()); EXECUTOR_SERVICE.shutdown(); } - public abstract void pullMessage(); + public abstract void pullMessage() throws InterruptedException; } diff --git a/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java b/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java index 3150061..7089f70 100644 --- a/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java +++ b/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java @@ -6,14 +6,16 @@ import com.bx.imclient.listener.MessageListenerMulticaster; import com.bx.imcommon.contant.IMRedisKey; import com.bx.imcommon.enums.IMListenerType; import com.bx.imcommon.model.IMSendResult; -import lombok.AllArgsConstructor; import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; -import java.util.concurrent.TimeUnit; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; + @Component @RequiredArgsConstructor @@ -25,16 +27,33 @@ public class GroupMessageResultResultTask extends AbstractMessageResultTask { @Value("${spring.application.name}") private String appName; + @Value("${im.result.batch:100}") + private int batchSize; + private final MessageListenerMulticaster listenerMulticaster; @Override public void pullMessage() { - String key = StrUtil.join(":",IMRedisKey.IM_RESULT_GROUP_QUEUE,appName); - JSONObject jsonObject = (JSONObject)redisTemplate.opsForList().leftPop(key,10, TimeUnit.SECONDS); - if(jsonObject != null) { - IMSendResult result = jsonObject.toJavaObject(IMSendResult.class); - listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE,result); + List results; + do { + results = loadBatch(); + if(!results.isEmpty()){ + listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE, results); + } + } while (results.size() < batchSize); + } + + List loadBatch() { + String key = StrUtil.join(":", IMRedisKey.IM_RESULT_GROUP_QUEUE, appName); + //这个接口redis6.2以上才支持 + //List list = redisTemplate.opsForList().leftPop(key, 100); + List results = new LinkedList<>(); + JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); + while (!Objects.isNull(jsonObject) && results.size() < batchSize) { + results.add(jsonObject.toJavaObject(IMSendResult.class)); + jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); } + return results; } } diff --git a/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java b/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java index 31cc462..9ff6c80 100644 --- a/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java +++ b/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java @@ -6,7 +6,6 @@ import com.bx.imclient.listener.MessageListenerMulticaster; import com.bx.imcommon.contant.IMRedisKey; import com.bx.imcommon.enums.IMListenerType; import com.bx.imcommon.model.IMSendResult; -import lombok.AllArgsConstructor; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; @@ -14,7 +13,9 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; -import java.util.concurrent.TimeUnit; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; @Slf4j @Component @@ -22,21 +23,38 @@ import java.util.concurrent.TimeUnit; public class PrivateMessageResultResultTask extends AbstractMessageResultTask { @Resource(name = "IMRedisTemplate") - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Value("${spring.application.name}") private String appName; + @Value("${im.result.batch:100}") + private int batchSize; + private final MessageListenerMulticaster listenerMulticaster; @Override public void pullMessage() { - String key = StrUtil.join(":",IMRedisKey.IM_RESULT_PRIVATE_QUEUE,appName); - JSONObject jsonObject = (JSONObject)redisTemplate.opsForList().leftPop(key,10, TimeUnit.SECONDS); - if(jsonObject != null) { - IMSendResult result = jsonObject.toJavaObject(IMSendResult.class); - listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, result); + List results; + do { + results = loadBatch(); + if(!results.isEmpty()){ + listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, results); + } + } while (results.size() < batchSize); + } + + List loadBatch() { + String key = StrUtil.join(":", IMRedisKey.IM_RESULT_PRIVATE_QUEUE, appName); + //这个接口redis6.2以上才支持 + //List list = redisTemplate.opsForList().leftPop(key, 100); + List results = new LinkedList<>(); + JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); + while (!Objects.isNull(jsonObject) && results.size() < batchSize) { + results.add(jsonObject.toJavaObject(IMSendResult.class)); + jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); } + return results; } } diff --git a/im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java b/im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java index 745d606..d927b99 100644 --- a/im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java +++ b/im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java @@ -7,10 +7,13 @@ import com.bx.imcommon.enums.IMSendCode; import com.bx.imcommon.model.IMSendResult; import com.bx.implatform.contant.RedisKey; import com.bx.implatform.vo.GroupMessageVO; +import com.bx.implatform.vo.PrivateMessageVO; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; +import java.util.List; + @Slf4j @IMListener(type = IMListenerType.GROUP_MESSAGE) @AllArgsConstructor @@ -19,9 +22,13 @@ public class GroupMessageListener implements MessageListener { private final RedisTemplate redisTemplate; @Override - public void process(IMSendResult result) { - GroupMessageVO messageInfo = result.getData(); - // 空空如也 + public void process(List> results) { + for(IMSendResult result:results){ + GroupMessageVO messageInfo = result.getData(); + if (result.getCode().equals(IMSendCode.SUCCESS.code())) { + log.info("消息送达,消息id:{},发送者:{},接收者:{},终端:{}", messageInfo.getId(), result.getSender().getId(), result.getReceiver().getId(), result.getReceiver().getTerminal()); + } + } } } diff --git a/im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java b/im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java index 3779365..8ff7d49 100644 --- a/im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java +++ b/im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java @@ -1,5 +1,6 @@ package com.bx.implatform.listener; +import cn.hutool.core.collection.CollUtil; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.bx.imclient.annotation.IMListener; import com.bx.imclient.listener.MessageListener; @@ -14,6 +15,10 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + @Slf4j @IMListener(type = IMListenerType.PRIVATE_MESSAGE) public class PrivateMessageListener implements MessageListener { @@ -23,16 +28,23 @@ public class PrivateMessageListener implements MessageListener private IPrivateMessageService privateMessageService; @Override - public void process(IMSendResult result) { - PrivateMessageVO messageInfo = result.getData(); - // 更新消息状态,这里只处理成功消息,失败的消息继续保持未读状态 - if (result.getCode().equals(IMSendCode.SUCCESS.code())) { + public void process(List> results) { + Set messageIds = new HashSet<>(); + for(IMSendResult result : results){ + PrivateMessageVO messageInfo = result.getData(); + // 更新消息状态,这里只处理成功消息,失败的消息继续保持未读状态 + if (result.getCode().equals(IMSendCode.SUCCESS.code())) { + messageIds.add(messageInfo.getId()); + log.info("消息送达,消息id:{},发送者:{},接收者:{},终端:{}", messageInfo.getId(), result.getSender().getId(), result.getReceiver().getId(), result.getReceiver().getTerminal()); + } + } + // 批量修改状态 + if(CollUtil.isNotEmpty(messageIds)){ UpdateWrapper updateWrapper = new UpdateWrapper<>(); - updateWrapper.lambda().eq(PrivateMessage::getId, messageInfo.getId()) + updateWrapper.lambda().in(PrivateMessage::getId, messageIds) .eq(PrivateMessage::getStatus, MessageStatus.UNSEND.code()) .set(PrivateMessage::getStatus, MessageStatus.SENDED.code()); privateMessageService.update(updateWrapper); - log.info("消息已读,消息id:{},发送者:{},接收者:{},终端:{}", messageInfo.getId(), result.getSender().getId(), result.getReceiver().getId(), result.getReceiver().getTerminal()); } } diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java index 01a2fd8..0ef663e 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java +++ b/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java @@ -65,6 +65,7 @@ public class PrivateMessageServiceImpl extends ServiceImpl private final RedisTemplate redisTemplate; - @Async @Override public void process(IMRecvInfo recvInfo) { IMUserInfo sender = recvInfo.getSender(); diff --git a/im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java b/im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java index 6a4c30d..6752869 100644 --- a/im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java +++ b/im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java @@ -30,9 +30,9 @@ public abstract class AbstractPullMessageTask implements CommandLineRunner { } } catch (Exception e) { log.error("任务调度异常", e); - Thread.sleep(200); } if (!EXECUTOR_SERVICE.isShutdown()) { + Thread.sleep(100); EXECUTOR_SERVICE.execute(this); } } @@ -45,5 +45,5 @@ public abstract class AbstractPullMessageTask implements CommandLineRunner { EXECUTOR_SERVICE.shutdown(); } - public abstract void pullMessage(); + public abstract void pullMessage() throws InterruptedException; } diff --git a/im-server/src/main/java/com/bx/imserver/task/PullGroupMessageTask.java b/im-server/src/main/java/com/bx/imserver/task/PullGroupMessageTask.java index 3690034..6d8e83f 100644 --- a/im-server/src/main/java/com/bx/imserver/task/PullGroupMessageTask.java +++ b/im-server/src/main/java/com/bx/imserver/task/PullGroupMessageTask.java @@ -12,7 +12,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; -import java.util.concurrent.TimeUnit; +import java.util.Objects; @Slf4j @Component @@ -25,11 +25,13 @@ public class PullGroupMessageTask extends AbstractPullMessageTask { public void pullMessage() { // 从redis拉取未读消息 String key = String.join(":", IMRedisKey.IM_MESSAGE_GROUP_QUEUE, IMServerGroup.serverId + ""); - JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key, 10, TimeUnit.SECONDS); - if (jsonObject != null) { + JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); + while (!Objects.isNull(jsonObject)) { IMRecvInfo recvInfo = jsonObject.toJavaObject(IMRecvInfo.class); AbstractMessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.GROUP_MESSAGE); processor.process(recvInfo); + // 下一条消息 + jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); } } diff --git a/im-server/src/main/java/com/bx/imserver/task/PullPrivateMessageTask.java b/im-server/src/main/java/com/bx/imserver/task/PullPrivateMessageTask.java index b22e43b..5edd0d6 100644 --- a/im-server/src/main/java/com/bx/imserver/task/PullPrivateMessageTask.java +++ b/im-server/src/main/java/com/bx/imserver/task/PullPrivateMessageTask.java @@ -12,25 +12,26 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; -import java.util.concurrent.TimeUnit; +import java.util.Objects; @Slf4j @Component @RequiredArgsConstructor -public class PullPrivateMessageTask extends AbstractPullMessageTask { +public class PullPrivateMessageTask extends AbstractPullMessageTask { - private final RedisTemplate redisTemplate; + private final RedisTemplate redisTemplate; @Override public void pullMessage() { // 从redis拉取未读消息 - String key = String.join(":", IMRedisKey.IM_MESSAGE_PRIVATE_QUEUE,IMServerGroup.serverId+""); - JSONObject jsonObject = (JSONObject)redisTemplate.opsForList().leftPop(key,10, TimeUnit.SECONDS); - if(jsonObject!=null){ + String key = String.join(":", IMRedisKey.IM_MESSAGE_PRIVATE_QUEUE, IMServerGroup.serverId + ""); + JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); + while (!Objects.isNull(jsonObject)) { IMRecvInfo recvInfo = jsonObject.toJavaObject(IMRecvInfo.class); AbstractMessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.PRIVATE_MESSAGE); processor.process(recvInfo); + // 下一条消息 + jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); } } - } From 05ebb1e97acd0f8fb4464dcf262cb33f2dcd4b8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=B0=E5=AE=B6=E8=83=9C?= Date: Thu, 28 Dec 2023 14:49:37 +0800 Subject: [PATCH 4/7] =?UTF-8?q?loadMessage=E6=8E=92=E9=99=A4=E5=8A=A0?= =?UTF-8?q?=E7=BE=A4=E4=B9=8B=E5=89=8D=E7=9A=84=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/GroupMessageServiceImpl.java | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java index b645ab9..0a2edfd 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java +++ b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java @@ -1,6 +1,8 @@ package com.bx.implatform.service.impl; +import cn.hutool.core.collection.CollStreamUtil; import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.StrUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; @@ -146,10 +148,11 @@ public class GroupMessageServiceImpl extends ServiceImpl loadMessage(Long minId) { UserSession session = SessionContext.getSession(); List members = groupMemberService.findByUserId(session.getUserId()); - List ids = members.stream().map(GroupMember::getGroupId).collect(Collectors.toList()); - if (CollectionUtil.isEmpty(ids)) { + if (CollectionUtil.isEmpty(members)) { return new ArrayList<>(); } + Map groupMemberMap = CollStreamUtil.toIdentityMap(members, GroupMember::getGroupId); + Set ids = groupMemberMap.keySet(); // 只能拉取最近1个月的 Date minDate = DateUtils.addMonths(new Date(), -1); LambdaQueryWrapper wrapper = Wrappers.lambdaQuery(); @@ -158,15 +161,21 @@ public class GroupMessageServiceImpl extends ServiceImpl messages = this.list(wrapper); // 转成vo - List vos = messages.stream().map(m -> { - GroupMessageVO vo = BeanUtils.copyProperties(m, GroupMessageVO.class); - // 被@用户列表 - if (StringUtils.isNotBlank(m.getAtUserIds())) { - List atIds = Splitter.on(",").trimResults().splitToList(m.getAtUserIds()); - vo.setAtUserIds(atIds.stream().map(Long::parseLong).collect(Collectors.toList())); - } - return vo; - }).collect(Collectors.toList()); + List vos = messages.stream() + .filter(m -> { + //排除加群之前的消息 + GroupMember member = groupMemberMap.get(m.getGroupId()); + return Objects.nonNull(member) && DateUtil.compare(member.getCreatedTime(), m.getSendTime()) <= 0; + }) + .map(m -> { + GroupMessageVO vo = BeanUtils.copyProperties(m, GroupMessageVO.class); + // 被@用户列表 + if (StringUtils.isNotBlank(m.getAtUserIds()) && Objects.nonNull(vo)) { + List atIds = Splitter.on(",").trimResults().splitToList(m.getAtUserIds()); + vo.setAtUserIds(atIds.stream().map(Long::parseLong).collect(Collectors.toList())); + } + return vo; + }).collect(Collectors.toList()); // 消息状态,数据库没有存群聊的消息状态,需要从redis取 List keys = ids.stream().map(id -> String.join(":", RedisKey.IM_GROUP_READED_POSITION, id.toString(), session.getUserId().toString())) .collect(Collectors.toList()); From ae052321c53ccf20c8415736bb4f8370a9fcbfa2 Mon Sep 17 00:00:00 2001 From: blue <825657193@qq.com> Date: Fri, 29 Dec 2023 01:05:16 +0800 Subject: [PATCH 5/7] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E7=A9=BA=E5=BE=AA?= =?UTF-8?q?=E7=8E=AF=E7=9A=84bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/bx/imclient/listener/MessageListenerMulticaster.java | 1 - .../com/bx/imclient/task/GroupMessageResultResultTask.java | 4 ++-- .../com/bx/imclient/task/PrivateMessageResultResultTask.java | 4 ++-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/im-client/src/main/java/com/bx/imclient/listener/MessageListenerMulticaster.java b/im-client/src/main/java/com/bx/imclient/listener/MessageListenerMulticaster.java index 8e1fdcd..bd95284 100644 --- a/im-client/src/main/java/com/bx/imclient/listener/MessageListenerMulticaster.java +++ b/im-client/src/main/java/com/bx/imclient/listener/MessageListenerMulticaster.java @@ -27,7 +27,6 @@ public class MessageListenerMulticaster { for(MessageListener listener:messageListeners){ IMListener annotation = listener.getClass().getAnnotation(IMListener.class); if(annotation!=null && (annotation.type().equals(IMListenerType.ALL) || annotation.type().equals(listenerType))){ - results.forEach(result->{ // 将data转回对象类型 if(result.getData() instanceof JSONObject){ diff --git a/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java b/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java index 7089f70..400220d 100644 --- a/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java +++ b/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java @@ -40,13 +40,13 @@ public class GroupMessageResultResultTask extends AbstractMessageResultTask { if(!results.isEmpty()){ listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE, results); } - } while (results.size() < batchSize); + } while (results.size() >= batchSize); } List loadBatch() { String key = StrUtil.join(":", IMRedisKey.IM_RESULT_GROUP_QUEUE, appName); //这个接口redis6.2以上才支持 - //List list = redisTemplate.opsForList().leftPop(key, 100); + //List list = redisTemplate.opsForList().leftPop(key, batchSize); List results = new LinkedList<>(); JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); while (!Objects.isNull(jsonObject) && results.size() < batchSize) { diff --git a/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java b/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java index 9ff6c80..2451db1 100644 --- a/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java +++ b/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java @@ -41,13 +41,13 @@ public class PrivateMessageResultResultTask extends AbstractMessageResultTask { if(!results.isEmpty()){ listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, results); } - } while (results.size() < batchSize); + } while (results.size() >= batchSize); } List loadBatch() { String key = StrUtil.join(":", IMRedisKey.IM_RESULT_PRIVATE_QUEUE, appName); //这个接口redis6.2以上才支持 - //List list = redisTemplate.opsForList().leftPop(key, 100); + //List list = redisTemplate.opsForList().leftPop(key, batchSize); List results = new LinkedList<>(); JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); while (!Objects.isNull(jsonObject) && results.size() < batchSize) { From 002335285afe740841ceb0465e84a4cf5e88dfeb Mon Sep 17 00:00:00 2001 From: blue <825657193@qq.com> Date: Sat, 6 Jan 2024 18:03:40 +0800 Subject: [PATCH 6/7] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=97=B6=E9=97=B4?= =?UTF-8?q?=E5=B1=95=E7=A4=BA=E7=9A=84bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- im-ui/src/api/date.js | 3 ++- im-uniapp/common/date.js | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/im-ui/src/api/date.js b/im-ui/src/api/date.js index 705f1b7..2329860 100644 --- a/im-ui/src/api/date.js +++ b/im-ui/src/api/date.js @@ -21,7 +21,8 @@ let toTimeText = (timeStamp, simple) => { //不属于今年 timeText = formatDateTime(dateTime); if(simple){ - timeText = timeText.substring(2,5); + console.log(timeText) + timeText = timeText.substr(2,5); } } return timeText; diff --git a/im-uniapp/common/date.js b/im-uniapp/common/date.js index 705f1b7..c3a747d 100644 --- a/im-uniapp/common/date.js +++ b/im-uniapp/common/date.js @@ -21,7 +21,7 @@ let toTimeText = (timeStamp, simple) => { //不属于今年 timeText = formatDateTime(dateTime); if(simple){ - timeText = timeText.substring(2,5); + timeText = timeText.substr(2,5); } } return timeText; From 26fca28d041955ced2333c97effd3999b3902582 Mon Sep 17 00:00:00 2001 From: blue <825657193@qq.com> Date: Sat, 6 Jan 2024 18:13:20 +0800 Subject: [PATCH 7/7] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=97=B6=E9=97=B4?= =?UTF-8?q?=E5=B1=95=E7=A4=BA=E7=9A=84bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- im-ui/src/api/date.js | 3 +-- im-uniapp/common/date.js | 2 +- im-uniapp/components/chat-item/chat-item.vue | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/im-ui/src/api/date.js b/im-ui/src/api/date.js index 2329860..bba3e43 100644 --- a/im-ui/src/api/date.js +++ b/im-ui/src/api/date.js @@ -21,8 +21,7 @@ let toTimeText = (timeStamp, simple) => { //不属于今年 timeText = formatDateTime(dateTime); if(simple){ - console.log(timeText) - timeText = timeText.substr(2,5); + timeText = timeText.substr(2,8); } } return timeText; diff --git a/im-uniapp/common/date.js b/im-uniapp/common/date.js index c3a747d..bba3e43 100644 --- a/im-uniapp/common/date.js +++ b/im-uniapp/common/date.js @@ -21,7 +21,7 @@ let toTimeText = (timeStamp, simple) => { //不属于今年 timeText = formatDateTime(dateTime); if(simple){ - timeText = timeText.substr(2,5); + timeText = timeText.substr(2,8); } } return timeText; diff --git a/im-uniapp/components/chat-item/chat-item.vue b/im-uniapp/components/chat-item/chat-item.vue index 3ae1a97..8b8e2f5 100644 --- a/im-uniapp/components/chat-item/chat-item.vue +++ b/im-uniapp/components/chat-item/chat-item.vue @@ -7,7 +7,7 @@ {{chat.showName}} - {{$date.toTimeText(chat.lastSendTime)}} + {{$date.toTimeText(chat.lastSendTime,true)}} {{atText}}