diff --git a/im-platform/src/main/java/com/bx/implatform/controller/GroupMessageController.java b/im-platform/src/main/java/com/bx/implatform/controller/GroupMessageController.java index 108bb68..521f603 100644 --- a/im-platform/src/main/java/com/bx/implatform/controller/GroupMessageController.java +++ b/im-platform/src/main/java/com/bx/implatform/controller/GroupMessageController.java @@ -34,12 +34,6 @@ public class GroupMessageController { return ResultUtils.success(groupMessageService.recallMessage(id)); } - @GetMapping("/pullOfflineMessage") - @Operation(summary = "拉取离线消息(已废弃)", description = "拉取离线消息,消息将通过webscoket异步推送") - public Result pullOfflineMessage(@RequestParam Long minId) { - groupMessageService.pullOfflineMessage(minId); - return ResultUtils.success(); - } @GetMapping(value = "/loadOfflineMessage") @Operation(summary = "拉取离线消息", description = "拉取离线消息") diff --git a/im-platform/src/main/java/com/bx/implatform/controller/PrivateMessageController.java b/im-platform/src/main/java/com/bx/implatform/controller/PrivateMessageController.java index 4fea71d..cf3c0fd 100644 --- a/im-platform/src/main/java/com/bx/implatform/controller/PrivateMessageController.java +++ b/im-platform/src/main/java/com/bx/implatform/controller/PrivateMessageController.java @@ -34,13 +34,6 @@ public class PrivateMessageController { return ResultUtils.success( privateMessageService.recallMessage(id)); } - @GetMapping("/pullOfflineMessage") - @Operation(summary = "拉取离线消息(已废弃)", description = "拉取离线消息,消息将通过webscoket异步推送") - public Result pullOfflineMessage(@RequestParam Long minId) { - privateMessageService.pullOfflineMessage(minId); - return ResultUtils.success(); - } - @GetMapping(value = "/loadOfflineMessage") @Operation(summary = "拉取离线消息", description = "拉取离线消息") public Result> loadOfflineMessage(@RequestParam Long minId) { diff --git a/im-platform/src/main/java/com/bx/implatform/service/GroupMessageService.java b/im-platform/src/main/java/com/bx/implatform/service/GroupMessageService.java index 5b526b6..ede6cce 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/GroupMessageService.java +++ b/im-platform/src/main/java/com/bx/implatform/service/GroupMessageService.java @@ -24,12 +24,6 @@ public interface GroupMessageService extends IService { */ GroupMessageVO recallMessage(Long id); - /** - * 拉取离线消息,只能拉取最近1个月的消息,最多拉取1000条 - * - * @param minId 消息起始id - */ - void pullOfflineMessage(Long minId); /** * 拉取离线消息,只能拉取最近1个月的消息 diff --git a/im-platform/src/main/java/com/bx/implatform/service/PrivateMessageService.java b/im-platform/src/main/java/com/bx/implatform/service/PrivateMessageService.java index ed2a269..00192d1 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/PrivateMessageService.java +++ b/im-platform/src/main/java/com/bx/implatform/service/PrivateMessageService.java @@ -36,13 +36,6 @@ public interface PrivateMessageService extends IService { List findHistoryMessage(Long friendId, Long page, Long size); - /** - * 拉取离线消息,只能拉取最近1个月的消息,最多拉取1000条 - * - * @param minId 消息起始id - */ - void pullOfflineMessage(Long minId); - /** * 拉取离线消息,只能拉取最近1个月的消息 * diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java index 81c91d5..e56817e 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java +++ b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java @@ -143,115 +143,6 @@ public class GroupMessageServiceImpl extends ServiceImpl members = groupMemberService.findByUserId(session.getUserId()); - Map groupMemberMap = CollStreamUtil.toIdentityMap(members, GroupMember::getGroupId); - Set groupIds = groupMemberMap.keySet(); - if (CollectionUtil.isEmpty(groupIds)) { - // 关闭加载中标志 - this.sendLoadingMessage(false, session); - return; - } - // 只拉最近一个月 - Date minDate = DateUtils.addMonths(new Date(), -1); - LambdaQueryWrapper wrapper = Wrappers.lambdaQuery(); - wrapper.gt(GroupMessage::getId, minId); - wrapper.gt(GroupMessage::getSendTime, minDate); - wrapper.in(GroupMessage::getGroupId, groupIds); - wrapper.orderByDesc(GroupMessage::getId); - wrapper.last("limit 50000"); - List messages = this.list(wrapper); - // 通过群聊对消息进行分组 - Map> messageGroupMap = - messages.stream().collect(Collectors.groupingBy(GroupMessage::getGroupId)); - // 退群前的消息 - List quitMembers = groupMemberService.findQuitInMonth(session.getUserId()); - for (GroupMember quitMember : quitMembers) { - wrapper = Wrappers.lambdaQuery(); - wrapper.gt(GroupMessage::getId, minId); - wrapper.between(GroupMessage::getSendTime, minDate, quitMember.getQuitTime()); - wrapper.eq(GroupMessage::getGroupId, quitMember.getGroupId()); - wrapper.ne(GroupMessage::getStatus, MessageStatus.RECALL.code()); - wrapper.orderByDesc(GroupMessage::getId); - List groupMessages = this.list(wrapper); - messageGroupMap.put(quitMember.getGroupId(), groupMessages); - groupMemberMap.put(quitMember.getGroupId(), quitMember); - } - EXECUTOR.execute(() -> { - // 开启加载中标志 - this.sendLoadingMessage(true, session); - // 推送消息 - int sendCount = 0; - for (Map.Entry> entry : messageGroupMap.entrySet()) { - Long groupId = entry.getKey(); - List groupMessages = entry.getValue(); - // 第一次拉取时,一个群最多推送3000条消息,防止前端接收能力溢出导致卡顿 - List sendMessages = groupMessages; - if (minId <= 0 && groupMessages.size() > 3000) { - sendMessages = groupMessages.subList(0, 3000); - } - // id从小到大排序 - CollectionUtil.reverse(sendMessages); - // 填充消息状态 - String key = StrUtil.join(":", RedisKey.IM_GROUP_READED_POSITION, groupId); - Object o = redisTemplate.opsForHash().get(key, session.getUserId().toString()); - long readedMaxId = Objects.isNull(o) ? -1 : Long.parseLong(o.toString()); - Map maxIdMap = null; - for (GroupMessage m : sendMessages) { - // 推送过程如果用户下线了,则不再推送 - if (!imClient.isOnline(session.getUserId(), IMTerminalType.fromCode(session.getTerminal()))) { - log.info("用户已下线,停止推送离线群聊消息,用户id:{}", session.getUserId()); - return; - } - // 排除加群之前的消息 - GroupMember member = groupMemberMap.get(m.getGroupId()); - if (DateUtil.compare(member.getCreatedTime(), m.getSendTime()) > 0) { - continue; - } - // 排除不需要接收的消息 - List recvIds = CommaTextUtils.asList(m.getRecvIds()); - if (!recvIds.isEmpty() && !recvIds.contains(session.getUserId().toString())) { - continue; - } - // 组装vo - GroupMessageVO vo = BeanUtils.copyProperties(m, GroupMessageVO.class); - // 被@用户列表 - List atIds = CommaTextUtils.asList(m.getAtUserIds()); - vo.setAtUserIds(atIds.stream().map(Long::parseLong).collect(Collectors.toList())); - // 填充状态 - vo.setStatus(readedMaxId >= m.getId() ? MessageStatus.READED.code() : MessageStatus.PENDING.code()); - // 针对回执消息填充已读人数 - if (m.getReceipt()) { - if (Objects.isNull(maxIdMap)) { - maxIdMap = redisTemplate.opsForHash().entries(key); - } - int count = getReadedUserIds(maxIdMap, m.getId(), m.getSendId()).size(); - vo.setReadedCount(count); - } - // 推送 - IMGroupMessage sendMessage = new IMGroupMessage<>(); - sendMessage.setSender(new IMUserInfo(m.getSendId(), IMTerminalType.WEB.code())); - sendMessage.setRecvIds(Arrays.asList(session.getUserId())); - sendMessage.setRecvTerminals(Arrays.asList(session.getTerminal())); - sendMessage.setSendResult(false); - sendMessage.setSendToSelf(false); - sendMessage.setData(vo); - imClient.sendGroupMessage(sendMessage); - sendCount++; - } - } - // 关闭加载中标志 - this.sendLoadingMessage(false, session); - log.info("拉取离线群聊消息,用户id:{},数量:{}", session.getUserId(), sendCount++); - }); - } - @Override public List loadOffineMessage(Long minId) { UserSession session = SessionContext.getSession(); diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java index 3401f8a..b642090 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java +++ b/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java @@ -136,45 +136,6 @@ public class PrivateMessageServiceImpl extends ServiceImpl wrapper = Wrappers.lambdaQuery(); - // 只能拉取最近1个月的消息 - Date minDate = DateUtils.addMonths(new Date(), -1); - wrapper.gt(PrivateMessage::getId, minId); - wrapper.ge(PrivateMessage::getSendTime, minDate); - wrapper.and(wp -> wp.eq(PrivateMessage::getSendId, session.getUserId()).or() - .eq(PrivateMessage::getRecvId, session.getUserId())); - wrapper.orderByAsc(PrivateMessage::getId); - List messages = this.list(wrapper); - // 异步推送消息 - EXECUTOR.execute(() -> { - // 开启加载中标志 - this.sendLoadingMessage(true, session); - for (PrivateMessage m : messages) { - // 推送过程如果用户下线了,则不再推送 - if (!imClient.isOnline(session.getUserId(), IMTerminalType.fromCode(session.getTerminal()))) { - log.info("用户已下线,停止推送离线私聊消息,用户id:{}", session.getUserId()); - return; - } - PrivateMessageVO vo = BeanUtils.copyProperties(m, PrivateMessageVO.class); - IMPrivateMessage sendMessage = new IMPrivateMessage<>(); - sendMessage.setSender(new IMUserInfo(m.getSendId(), IMTerminalType.WEB.code())); - sendMessage.setRecvId(session.getUserId()); - sendMessage.setRecvTerminals(List.of(session.getTerminal())); - sendMessage.setSendToSelf(false); - sendMessage.setData(vo); - sendMessage.setSendResult(true); - imClient.sendPrivateMessage(sendMessage); - } - // 关闭加载中标志 - this.sendLoadingMessage(false, session); - log.info("拉取私聊消息,用户id:{},数量:{}", session.getUserId(), messages.size()); - }); - } - @Override public List loadOfflineMessage(Long minId) { UserSession session = SessionContext.getSession();