From 9be62faf3829b506110801cdd67bf630bc2a9399 Mon Sep 17 00:00:00 2001 From: xsx <825657193@qq.com> Date: Thu, 5 Jun 2025 23:18:07 +0800 Subject: [PATCH] =?UTF-8?q?1.=E6=8B=89=E5=8F=96=E7=A6=BB=E7=BA=BF=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E6=94=B9=E4=B8=BA=E5=BC=82=E6=AD=A5=E6=96=B9=E5=BC=8F?= =?UTF-8?q?=EF=BC=8C=E9=98=B2=E6=AD=A2=E8=AF=B7=E6=B1=82=E8=B6=85=E6=97=B6?= =?UTF-8?q?=202.=E5=89=8D=E7=AB=AF=E6=B6=88=E6=81=AF=E5=88=97=E8=A1=A8?= =?UTF-8?q?=E4=BC=98=E5=8C=96=EF=BC=8C=E6=B8=85=E9=99=A4=E5=A4=9A=E4=BD=99?= =?UTF-8?q?div?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/GroupMessageServiceImpl.java | 168 +++++++++--------- .../impl/PrivateMessageServiceImpl.java | 42 +++-- im-web/src/components/chat/ChatBox.vue | 21 +-- 3 files changed, 121 insertions(+), 110 deletions(-) diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java index 5aa5857..f62e80f 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java +++ b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java @@ -14,6 +14,7 @@ import com.bx.imcommon.enums.IMTerminalType; import com.bx.imcommon.model.IMGroupMessage; import com.bx.imcommon.model.IMUserInfo; import com.bx.imcommon.util.CommaTextUtils; +import com.bx.imcommon.util.ThreadPoolExecutorFactory; import com.bx.implatform.contant.Constant; import com.bx.implatform.contant.RedisKey; import com.bx.implatform.dto.GroupMessageDTO; @@ -40,19 +41,21 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.*; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @Slf4j @Service @RequiredArgsConstructor -public class GroupMessageServiceImpl extends ServiceImpl implements - GroupMessageService { +public class GroupMessageServiceImpl extends ServiceImpl + implements GroupMessageService { private final GroupService groupService; private final GroupMemberService groupMemberService; private final RedisTemplate redisTemplate; private final IMClient imClient; private final SensitiveFilterUtil sensitiveFilterUtil; + private static final ScheduledThreadPoolExecutor EXECUTOR = ThreadPoolExecutorFactory.getThreadPoolExecutor(); @Override public GroupMessageVO sendMessage(GroupMessageDTO dto) { @@ -67,7 +70,8 @@ public class GroupMessageServiceImpl extends ServiceImpl userIds = groupMemberService.findUserIdsByGroupId(group.getId()); if (dto.getReceipt() && userIds.size() > Constant.MAX_LARGE_GROUP_MEMBER) { // 大群的回执消息过于消耗资源,不允许发送 - throw new GlobalException(String.format("当前群聊大于%s人,不支持发送回执消息", Constant.MAX_LARGE_GROUP_MEMBER)); + throw new GlobalException( + String.format("当前群聊大于%s人,不支持发送回执消息", Constant.MAX_LARGE_GROUP_MEMBER)); } // 不用发给自己 userIds = userIds.stream().filter(id -> !session.getUserId().equals(id)).collect(Collectors.toList()); @@ -78,11 +82,10 @@ public class GroupMessageServiceImpl extends ServiceImpl members = groupMemberService.findByUserId(session.getUserId()); Map groupMemberMap = CollStreamUtil.toIdentityMap(members, GroupMember::getGroupId); Set groupIds = groupMemberMap.keySet(); - if(CollectionUtil.isEmpty(groupIds)){ + if (CollectionUtil.isEmpty(groupIds)) { // 关闭加载中标志 - this.sendLoadingMessage(false); + this.sendLoadingMessage(false, session); return; } - // 开启加载中标志 - this.sendLoadingMessage(true); - // 只能拉取最近3个月的,最多拉取3000条 + + // 只能拉取最近3个月的,移动端只拉最近一个月 int months = session.getTerminal().equals(IMTerminalType.APP.code()) ? 1 : 3; Date minDate = DateUtils.addMonths(new Date(), -months); LambdaQueryWrapper wrapper = Wrappers.lambdaQuery(); - wrapper.gt(GroupMessage::getId, minId) - .gt(GroupMessage::getSendTime, minDate) - .in(GroupMessage::getGroupId, groupIds) - .orderByAsc(GroupMessage::getId); + wrapper.gt(GroupMessage::getId, minId).gt(GroupMessage::getSendTime, minDate) + .in(GroupMessage::getGroupId, groupIds).orderByAsc(GroupMessage::getId); List messages = this.list(wrapper); // 通过群聊对消息进行分组 - Map> messageGroupMap = messages.stream().collect(Collectors.groupingBy(GroupMessage::getGroupId)); + Map> messageGroupMap = + messages.stream().collect(Collectors.groupingBy(GroupMessage::getGroupId)); // 退群前的消息 List quitMembers = groupMemberService.findQuitInMonth(session.getUserId()); - for(GroupMember quitMember: quitMembers){ + for (GroupMember quitMember : quitMembers) { wrapper = Wrappers.lambdaQuery(); - wrapper.gt(GroupMessage::getId, minId) - .between(GroupMessage::getSendTime, minDate,quitMember.getQuitTime()) + wrapper.gt(GroupMessage::getId, minId).between(GroupMessage::getSendTime, minDate, quitMember.getQuitTime()) .eq(GroupMessage::getGroupId, quitMember.getGroupId()) - .ne(GroupMessage::getStatus, MessageStatus.RECALL.code()) - .orderByAsc(GroupMessage::getId); + .ne(GroupMessage::getStatus, MessageStatus.RECALL.code()).orderByAsc(GroupMessage::getId); List groupMessages = this.list(wrapper); - messageGroupMap.put(quitMember.getGroupId(),groupMessages); - groupMemberMap.put(quitMember.getGroupId(),quitMember); + messageGroupMap.put(quitMember.getGroupId(), groupMessages); + groupMemberMap.put(quitMember.getGroupId(), quitMember); } - // 推送消息 - AtomicInteger sendCount = new AtomicInteger(); - messageGroupMap.forEach((groupId, groupMessages) -> { - // 填充消息状态 - String key = StrUtil.join(":", RedisKey.IM_GROUP_READED_POSITION, groupId); - Object o = redisTemplate.opsForHash().get(key, session.getUserId().toString()); - long readedMaxId = Objects.isNull(o) ? -1 : Long.parseLong(o.toString()); - Map maxIdMap = null; - for(GroupMessage m:groupMessages){ - // 排除加群之前的消息 - GroupMember member = groupMemberMap.get(m.getGroupId()); - if(DateUtil.compare(member.getCreatedTime(), m.getSendTime()) > 0){ - continue; + EXECUTOR.execute(() -> { + // 开启加载中标志 + this.sendLoadingMessage(true, session); + // 推送消息 + AtomicInteger sendCount = new AtomicInteger(); + messageGroupMap.forEach((groupId, groupMessages) -> { + // 第一次拉取时,一个群最多推送1w条消息,防止前端接收能力溢出导致卡顿 + List sendMessages = groupMessages; + if (minId <= 0 && groupMessages.size() > 10000) { + sendMessages = groupMessages.subList(groupMessages.size() - 10000, groupMessages.size()); } - // 排除不需要接收的消息 - List recvIds = CommaTextUtils.asList(m.getRecvIds()); - if(!recvIds.isEmpty() && !recvIds.contains(session.getUserId().toString())){ - continue; - } - // 组装vo - GroupMessageVO vo = BeanUtils.copyProperties(m, GroupMessageVO.class); - // 被@用户列表 - List atIds = CommaTextUtils.asList(m.getAtUserIds()); - vo.setAtUserIds(atIds.stream().map(Long::parseLong).collect(Collectors.toList())); - // 填充状态 - vo.setStatus(readedMaxId >= m.getId() ? MessageStatus.READED.code() : MessageStatus.UNSEND.code()); - // 针对回执消息填充已读人数 - if(m.getReceipt()){ - if(Objects.isNull(maxIdMap)) { - maxIdMap = redisTemplate.opsForHash().entries(key); + // 填充消息状态 + String key = StrUtil.join(":", RedisKey.IM_GROUP_READED_POSITION, groupId); + Object o = redisTemplate.opsForHash().get(key, session.getUserId().toString()); + long readedMaxId = Objects.isNull(o) ? -1 : Long.parseLong(o.toString()); + Map maxIdMap = null; + for (GroupMessage m : sendMessages) { + // 排除加群之前的消息 + GroupMember member = groupMemberMap.get(m.getGroupId()); + if (DateUtil.compare(member.getCreatedTime(), m.getSendTime()) > 0) { + continue; + } + // 排除不需要接收的消息 + List recvIds = CommaTextUtils.asList(m.getRecvIds()); + if (!recvIds.isEmpty() && !recvIds.contains(session.getUserId().toString())) { + continue; + } + // 组装vo + GroupMessageVO vo = BeanUtils.copyProperties(m, GroupMessageVO.class); + // 被@用户列表 + List atIds = CommaTextUtils.asList(m.getAtUserIds()); + vo.setAtUserIds(atIds.stream().map(Long::parseLong).collect(Collectors.toList())); + // 填充状态 + vo.setStatus(readedMaxId >= m.getId() ? MessageStatus.READED.code() : MessageStatus.UNSEND.code()); + // 针对回执消息填充已读人数 + if (m.getReceipt()) { + if (Objects.isNull(maxIdMap)) { + maxIdMap = redisTemplate.opsForHash().entries(key); + } + int count = getReadedUserIds(maxIdMap, m.getId(), m.getSendId()).size(); + vo.setReadedCount(count); } - int count = getReadedUserIds(maxIdMap, m.getId(),m.getSendId()).size(); - vo.setReadedCount(count); + // 推送 + IMGroupMessage sendMessage = new IMGroupMessage<>(); + sendMessage.setSender(new IMUserInfo(m.getSendId(), IMTerminalType.WEB.code())); + sendMessage.setRecvIds(Arrays.asList(session.getUserId())); + sendMessage.setRecvTerminals(Arrays.asList(session.getTerminal())); + sendMessage.setSendResult(false); + sendMessage.setSendToSelf(false); + sendMessage.setData(vo); + imClient.sendGroupMessage(sendMessage); + sendCount.getAndIncrement(); } - // 推送 - IMGroupMessage sendMessage = new IMGroupMessage<>(); - sendMessage.setSender(new IMUserInfo(m.getSendId(), IMTerminalType.WEB.code())); - sendMessage.setRecvIds(Arrays.asList(session.getUserId())); - sendMessage.setRecvTerminals(Arrays.asList(session.getTerminal())); - sendMessage.setSendResult(false); - sendMessage.setSendToSelf(false); - sendMessage.setData(vo); - imClient.sendGroupMessage(sendMessage); - sendCount.getAndIncrement(); - } + }); + // 关闭加载中标志 + this.sendLoadingMessage(false, session); + log.info("拉取离线群聊消息,用户id:{},数量:{}", session.getUserId(), sendCount.get()); }); - // 关闭加载中标志 - this.sendLoadingMessage(false); - log.info("拉取离线群聊消息,用户id:{},数量:{}",session.getUserId(),sendCount.get()); } @Override @@ -238,10 +245,8 @@ public class GroupMessageServiceImpl extends ServiceImpl wrapper = Wrappers.lambdaQuery(); - wrapper.eq(GroupMessage::getGroupId, groupId) - .orderByDesc(GroupMessage::getId) - .last("limit 1") - .select(GroupMessage::getId); + wrapper.eq(GroupMessage::getGroupId, groupId).orderByDesc(GroupMessage::getId).last("limit 1") + .select(GroupMessage::getId); GroupMessage message = this.getOne(wrapper); if (Objects.isNull(message)) { return; @@ -276,9 +281,10 @@ public class GroupMessageServiceImpl extends ServiceImpl userIds = groupMemberService.findUserIdsByGroupId(groupId); Map maxIdMap = redisTemplate.opsForHash().entries(key); for (GroupMessage receiptMessage : receiptMessages) { - Integer readedCount = getReadedUserIds(maxIdMap, receiptMessage.getId(),receiptMessage.getSendId()).size(); + Integer readedCount = + getReadedUserIds(maxIdMap, receiptMessage.getId(), receiptMessage.getSendId()).size(); // 如果所有人都已读,记录回执消息完成标记 - if(readedCount >= userIds.size() - 1){ + if (readedCount >= userIds.size() - 1) { receiptMessage.setReceiptOk(true); this.updateById(receiptMessage); } @@ -311,12 +317,12 @@ public class GroupMessageServiceImpl extends ServiceImpl maxIdMap = redisTemplate.opsForHash().entries(key); // 返回已读用户的id集合 - return getReadedUserIds(maxIdMap, message.getId(),message.getSendId()); + return getReadedUserIds(maxIdMap, message.getId(), message.getSendId()); } @Override @@ -333,10 +339,11 @@ public class GroupMessageServiceImpl extends ServiceImpl wrapper = new QueryWrapper<>(); wrapper.lambda().eq(GroupMessage::getGroupId, groupId).gt(GroupMessage::getSendTime, member.getCreatedTime()) - .ne(GroupMessage::getStatus, MessageStatus.RECALL.code()).orderByDesc(GroupMessage::getId).last("limit " + stIdx + "," + size); + .ne(GroupMessage::getStatus, MessageStatus.RECALL.code()).orderByDesc(GroupMessage::getId) + .last("limit " + stIdx + "," + size); List messages = this.list(wrapper); List messageInfos = - messages.stream().map(m -> BeanUtils.copyProperties(m, GroupMessageVO.class)).collect(Collectors.toList()); + messages.stream().map(m -> BeanUtils.copyProperties(m, GroupMessageVO.class)).collect(Collectors.toList()); log.info("拉取群聊记录,用户id:{},群聊id:{},数量:{}", userId, groupId, messageInfos.size()); return messageInfos; } @@ -354,8 +361,7 @@ public class GroupMessageServiceImpl extends ServiceImpl wrapper = Wrappers.lambdaQuery(); // 只能拉取最近3个月的消息,移动端只拉取一个月消息 @@ -148,21 +149,25 @@ public class PrivateMessageServiceImpl extends ServiceImpl messages = this.list(wrapper); - // 推送消息 - for (PrivateMessage m : messages) { - PrivateMessageVO vo = BeanUtils.copyProperties(m, PrivateMessageVO.class); - IMPrivateMessage sendMessage = new IMPrivateMessage<>(); - sendMessage.setSender(new IMUserInfo(m.getSendId(), IMTerminalType.WEB.code())); - sendMessage.setRecvId(session.getUserId()); - sendMessage.setRecvTerminals(List.of(session.getTerminal())); - sendMessage.setSendToSelf(false); - sendMessage.setData(vo); - sendMessage.setSendResult(true); - imClient.sendPrivateMessage(sendMessage); - } - // 关闭加载中标志 - this.sendLoadingMessage(false); - log.info("拉取私聊消息,用户id:{},数量:{}", session.getUserId(), messages.size()); + // 异步推送消息 + EXECUTOR.execute(() -> { + // 开启加载中标志 + this.sendLoadingMessage(true, session); + for (PrivateMessage m : messages) { + PrivateMessageVO vo = BeanUtils.copyProperties(m, PrivateMessageVO.class); + IMPrivateMessage sendMessage = new IMPrivateMessage<>(); + sendMessage.setSender(new IMUserInfo(m.getSendId(), IMTerminalType.WEB.code())); + sendMessage.setRecvId(session.getUserId()); + sendMessage.setRecvTerminals(List.of(session.getTerminal())); + sendMessage.setSendToSelf(false); + sendMessage.setData(vo); + sendMessage.setSendResult(true); + imClient.sendPrivateMessage(sendMessage); + } + // 关闭加载中标志 + this.sendLoadingMessage(false, session); + log.info("拉取私聊消息,用户id:{},数量:{}", session.getUserId(), messages.size()); + }); } @Transactional(rollbackFor = Exception.class) @@ -215,8 +220,7 @@ public class PrivateMessageServiceImpl extends ServiceImpl
-
    -
  • - - -
  • -
+
+ + +
@@ -684,6 +681,10 @@ export default { unreadCount() { return this.chat.unreadCount; }, + showMessages() { + console.log("this.chat.messages.slice(this.showMinIdx):",this.chat.messages.slice(this.showMinIdx)) + return this.chat.messages.slice(this.showMinIdx) + }, messageSize() { if (!this.chat || !this.chat.messages) { return 0;