Browse Source

离线消息拉取优化

master
xsx 9 months ago
parent
commit
d4af9d78f0
  1. 24
      im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java

24
im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java

@ -164,7 +164,11 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
wrapper.gt(GroupMessage::getId, minId); wrapper.gt(GroupMessage::getId, minId);
wrapper.gt(GroupMessage::getSendTime, minDate); wrapper.gt(GroupMessage::getSendTime, minDate);
wrapper.in(GroupMessage::getGroupId, groupIds); wrapper.in(GroupMessage::getGroupId, groupIds);
wrapper.orderByAsc(GroupMessage::getId); wrapper.orderByDesc(GroupMessage::getId);
if (minId <= 0) {
// 首次拉取限制消息数量大小,防止内存溢出
wrapper.last("limit 100000");
}
List<GroupMessage> messages = this.list(wrapper); List<GroupMessage> messages = this.list(wrapper);
// 通过群聊对消息进行分组 // 通过群聊对消息进行分组
Map<Long, List<GroupMessage>> messageGroupMap = Map<Long, List<GroupMessage>> messageGroupMap =
@ -177,7 +181,7 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
wrapper.between(GroupMessage::getSendTime, minDate, quitMember.getQuitTime()); wrapper.between(GroupMessage::getSendTime, minDate, quitMember.getQuitTime());
wrapper.eq(GroupMessage::getGroupId, quitMember.getGroupId()); wrapper.eq(GroupMessage::getGroupId, quitMember.getGroupId());
wrapper.ne(GroupMessage::getStatus, MessageStatus.RECALL.code()); wrapper.ne(GroupMessage::getStatus, MessageStatus.RECALL.code());
wrapper.orderByAsc(GroupMessage::getId); wrapper.orderByDesc(GroupMessage::getId);
List<GroupMessage> groupMessages = this.list(wrapper); List<GroupMessage> groupMessages = this.list(wrapper);
messageGroupMap.put(quitMember.getGroupId(), groupMessages); messageGroupMap.put(quitMember.getGroupId(), groupMessages);
groupMemberMap.put(quitMember.getGroupId(), quitMember); groupMemberMap.put(quitMember.getGroupId(), quitMember);
@ -186,13 +190,17 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
// 开启加载中标志 // 开启加载中标志
this.sendLoadingMessage(true, session); this.sendLoadingMessage(true, session);
// 推送消息 // 推送消息
AtomicInteger sendCount = new AtomicInteger(); int sendCount = 0;
messageGroupMap.forEach((groupId, groupMessages) -> { for (Map.Entry<Long, List<GroupMessage>> entry : messageGroupMap.entrySet()) {
Long groupId = entry.getKey();
List<GroupMessage> groupMessages = entry.getValue();
// 第一次拉取时,一个群最多推送3000条消息,防止前端接收能力溢出导致卡顿 // 第一次拉取时,一个群最多推送3000条消息,防止前端接收能力溢出导致卡顿
List<GroupMessage> sendMessages = groupMessages; List<GroupMessage> sendMessages = groupMessages;
if (minId <= 0 && groupMessages.size() > 3000) { if (minId <= 0 && groupMessages.size() > 3000) {
sendMessages = groupMessages.subList(groupMessages.size() - 3000, groupMessages.size()); sendMessages = groupMessages.subList(0, 3000);
} }
// id从小到大排序
CollectionUtil.reverse(sendMessages);
// 填充消息状态 // 填充消息状态
String key = StrUtil.join(":", RedisKey.IM_GROUP_READED_POSITION, groupId); String key = StrUtil.join(":", RedisKey.IM_GROUP_READED_POSITION, groupId);
Object o = redisTemplate.opsForHash().get(key, session.getUserId().toString()); Object o = redisTemplate.opsForHash().get(key, session.getUserId().toString());
@ -238,12 +246,12 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
sendMessage.setSendToSelf(false); sendMessage.setSendToSelf(false);
sendMessage.setData(vo); sendMessage.setData(vo);
imClient.sendGroupMessage(sendMessage); imClient.sendGroupMessage(sendMessage);
sendCount.getAndIncrement(); sendCount++;
}
} }
});
// 关闭加载中标志 // 关闭加载中标志
this.sendLoadingMessage(false, session); this.sendLoadingMessage(false, session);
log.info("拉取离线群聊消息,用户id:{},数量:{}", session.getUserId(), sendCount.get()); log.info("拉取离线群聊消息,用户id:{},数量:{}", session.getUserId(), sendCount++);
}); });
} }

Loading…
Cancel
Save