Browse Source

!187 群离线消息拉取优化

Merge pull request !187 from blue/v_3.0.0
master
blue 2 months ago
committed by Gitee
parent
commit
165f066741
No known key found for this signature in database GPG Key ID: 173E9B9CA92EEF8F
  1. 5
      im-platform/src/main/java/com/bx/implatform/contant/Constant.java
  2. 6
      im-platform/src/main/java/com/bx/implatform/controller/GroupMessageController.java
  3. 7
      im-platform/src/main/java/com/bx/implatform/controller/PrivateMessageController.java
  4. 10
      im-platform/src/main/java/com/bx/implatform/service/GroupMemberService.java
  5. 6
      im-platform/src/main/java/com/bx/implatform/service/GroupMessageService.java
  6. 7
      im-platform/src/main/java/com/bx/implatform/service/PrivateMessageService.java
  7. 15
      im-platform/src/main/java/com/bx/implatform/service/impl/GroupMemberServiceImpl.java
  8. 126
      im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java
  9. 4
      im-platform/src/main/java/com/bx/implatform/service/impl/GroupServiceImpl.java
  10. 46
      im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java
  11. 1
      im-platform/src/main/java/com/bx/implatform/thirdparty/MinioService.java

5
im-platform/src/main/java/com/bx/implatform/contant/Constant.java

@ -30,4 +30,9 @@ public final class Constant {
*/
public static final Long MAX_NORMAL_GROUP_MEMBER = 500L;
/**
* 离线消息最大拉取时间()
*/
public static final Long MAX_OFFLINE_MESSAGE_DAYS = 30L;
}

6
im-platform/src/main/java/com/bx/implatform/controller/GroupMessageController.java

@ -34,12 +34,6 @@ public class GroupMessageController {
return ResultUtils.success(groupMessageService.recallMessage(id));
}
@GetMapping("/pullOfflineMessage")
@Operation(summary = "拉取离线消息(已废弃)", description = "拉取离线消息,消息将通过webscoket异步推送")
public Result pullOfflineMessage(@RequestParam Long minId) {
groupMessageService.pullOfflineMessage(minId);
return ResultUtils.success();
}
@GetMapping(value = "/loadOfflineMessage")
@Operation(summary = "拉取离线消息", description = "拉取离线消息")

7
im-platform/src/main/java/com/bx/implatform/controller/PrivateMessageController.java

@ -34,13 +34,6 @@ public class PrivateMessageController {
return ResultUtils.success( privateMessageService.recallMessage(id));
}
@GetMapping("/pullOfflineMessage")
@Operation(summary = "拉取离线消息(已废弃)", description = "拉取离线消息,消息将通过webscoket异步推送")
public Result pullOfflineMessage(@RequestParam Long minId) {
privateMessageService.pullOfflineMessage(minId);
return ResultUtils.success();
}
@GetMapping(value = "/loadOfflineMessage")
@Operation(summary = "拉取离线消息", description = "拉取离线消息")
public Result<List<PrivateMessageVO>> loadOfflineMessage(@RequestParam Long minId) {

10
im-platform/src/main/java/com/bx/implatform/service/GroupMemberService.java

@ -1,9 +1,9 @@
package com.bx.implatform.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.bx.implatform.dto.GroupDndDTO;
import com.bx.implatform.entity.GroupMember;
import java.util.Date;
import java.util.List;
public interface GroupMemberService extends IService<GroupMember> {
@ -26,13 +26,15 @@ public interface GroupMemberService extends IService<GroupMember> {
*/
List<GroupMember> findByUserId(Long userId);
/**
* 根据用户id查询一个月内退的群
* 根据用户id查询某段时间内退的群
*
* @param userId 用户id
* @param userId 用户id
* @param minQuitTime 退群时间
* @return 成员列表
*/
List<GroupMember> findQuitInMonth(Long userId);
public List<GroupMember> findQuitMembers(Long userId, Date minQuitTime);
/**
* 根据群聊id查询群聊成员包括已退出

6
im-platform/src/main/java/com/bx/implatform/service/GroupMessageService.java

@ -24,12 +24,6 @@ public interface GroupMessageService extends IService<GroupMessage> {
*/
GroupMessageVO recallMessage(Long id);
/**
* 拉取离线消息只能拉取最近1个月的消息最多拉取1000条
*
* @param minId 消息起始id
*/
void pullOfflineMessage(Long minId);
/**
* 拉取离线消息只能拉取最近1个月的消息

7
im-platform/src/main/java/com/bx/implatform/service/PrivateMessageService.java

@ -36,13 +36,6 @@ public interface PrivateMessageService extends IService<PrivateMessage> {
List<PrivateMessageVO> findHistoryMessage(Long friendId, Long page, Long size);
/**
* 拉取离线消息只能拉取最近1个月的消息最多拉取1000条
*
* @param minId 消息起始id
*/
void pullOfflineMessage(Long minId);
/**
* 拉取离线消息只能拉取最近1个月的消息
*

15
im-platform/src/main/java/com/bx/implatform/service/impl/GroupMemberServiceImpl.java

@ -9,7 +9,6 @@ import com.bx.implatform.contant.RedisKey;
import com.bx.implatform.entity.GroupMember;
import com.bx.implatform.mapper.GroupMemberMapper;
import com.bx.implatform.service.GroupMemberService;
import com.bx.implatform.util.DateTimeUtils;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
@ -42,7 +41,6 @@ public class GroupMemberServiceImpl extends ServiceImpl<GroupMemberMapper, Group
return this.getOne(wrapper);
}
@Override
public List<GroupMember> findByUserId(Long userId) {
LambdaQueryWrapper<GroupMember> memberWrapper = Wrappers.lambdaQuery();
@ -51,12 +49,12 @@ public class GroupMemberServiceImpl extends ServiceImpl<GroupMemberMapper, Group
}
@Override
public List<GroupMember> findQuitInMonth(Long userId) {
Date monthTime = DateTimeUtils.addMonths(new Date(), -1);
LambdaQueryWrapper<GroupMember> memberWrapper = Wrappers.lambdaQuery();
memberWrapper.eq(GroupMember::getUserId, userId).eq(GroupMember::getQuit, true)
.ge(GroupMember::getQuitTime, monthTime);
return this.list(memberWrapper);
public List<GroupMember> findQuitMembers(Long userId, Date minQuitTime) {
LambdaQueryWrapper<GroupMember> wrapper = Wrappers.lambdaQuery();
wrapper.eq(GroupMember::getUserId, userId);
wrapper.eq(GroupMember::getQuit, true);
wrapper.ge(GroupMember::getQuitTime, minQuitTime);
return this.list(wrapper);
}
@Override
@ -107,7 +105,6 @@ public class GroupMemberServiceImpl extends ServiceImpl<GroupMemberMapper, Group
this.update(wrapper);
}
@Override
public Boolean isInGroup(Long groupId, List<Long> userIds) {
if (CollectionUtils.isEmpty(userIds)) {

126
im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java

@ -10,7 +10,6 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.bx.imclient.IMClient;
import com.bx.imcommon.contant.IMConstant;
import com.bx.imcommon.enums.IMTerminalType;
import com.bx.imcommon.model.IMGroupMessage;
import com.bx.imcommon.model.IMUserInfo;
import com.bx.imcommon.util.CommaTextUtils;
@ -143,115 +142,6 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
return msgInfo;
}
@Override
public void pullOfflineMessage(Long minId) {
UserSession session = SessionContext.getSession();
if (!imClient.isOnline(session.getUserId())) {
throw new GlobalException("网络连接失败,无法拉取离线消息");
}
// 查询用户加入的群组
List<GroupMember> members = groupMemberService.findByUserId(session.getUserId());
Map<Long, GroupMember> groupMemberMap = CollStreamUtil.toIdentityMap(members, GroupMember::getGroupId);
Set<Long> groupIds = groupMemberMap.keySet();
if (CollectionUtil.isEmpty(groupIds)) {
// 关闭加载中标志
this.sendLoadingMessage(false, session);
return;
}
// 只拉最近一个月
Date minDate = DateUtils.addMonths(new Date(), -1);
LambdaQueryWrapper<GroupMessage> wrapper = Wrappers.lambdaQuery();
wrapper.gt(GroupMessage::getId, minId);
wrapper.gt(GroupMessage::getSendTime, minDate);
wrapper.in(GroupMessage::getGroupId, groupIds);
wrapper.orderByDesc(GroupMessage::getId);
wrapper.last("limit 50000");
List<GroupMessage> messages = this.list(wrapper);
// 通过群聊对消息进行分组
Map<Long, List<GroupMessage>> messageGroupMap =
messages.stream().collect(Collectors.groupingBy(GroupMessage::getGroupId));
// 退群前的消息
List<GroupMember> quitMembers = groupMemberService.findQuitInMonth(session.getUserId());
for (GroupMember quitMember : quitMembers) {
wrapper = Wrappers.lambdaQuery();
wrapper.gt(GroupMessage::getId, minId);
wrapper.between(GroupMessage::getSendTime, minDate, quitMember.getQuitTime());
wrapper.eq(GroupMessage::getGroupId, quitMember.getGroupId());
wrapper.ne(GroupMessage::getStatus, MessageStatus.RECALL.code());
wrapper.orderByDesc(GroupMessage::getId);
List<GroupMessage> groupMessages = this.list(wrapper);
messageGroupMap.put(quitMember.getGroupId(), groupMessages);
groupMemberMap.put(quitMember.getGroupId(), quitMember);
}
EXECUTOR.execute(() -> {
// 开启加载中标志
this.sendLoadingMessage(true, session);
// 推送消息
int sendCount = 0;
for (Map.Entry<Long, List<GroupMessage>> entry : messageGroupMap.entrySet()) {
Long groupId = entry.getKey();
List<GroupMessage> groupMessages = entry.getValue();
// 第一次拉取时,一个群最多推送3000条消息,防止前端接收能力溢出导致卡顿
List<GroupMessage> sendMessages = groupMessages;
if (minId <= 0 && groupMessages.size() > 3000) {
sendMessages = groupMessages.subList(0, 3000);
}
// id从小到大排序
CollectionUtil.reverse(sendMessages);
// 填充消息状态
String key = StrUtil.join(":", RedisKey.IM_GROUP_READED_POSITION, groupId);
Object o = redisTemplate.opsForHash().get(key, session.getUserId().toString());
long readedMaxId = Objects.isNull(o) ? -1 : Long.parseLong(o.toString());
Map<Object, Object> maxIdMap = null;
for (GroupMessage m : sendMessages) {
// 推送过程如果用户下线了,则不再推送
if (!imClient.isOnline(session.getUserId(), IMTerminalType.fromCode(session.getTerminal()))) {
log.info("用户已下线,停止推送离线群聊消息,用户id:{}", session.getUserId());
return;
}
// 排除加群之前的消息
GroupMember member = groupMemberMap.get(m.getGroupId());
if (DateUtil.compare(member.getCreatedTime(), m.getSendTime()) > 0) {
continue;
}
// 排除不需要接收的消息
List<String> recvIds = CommaTextUtils.asList(m.getRecvIds());
if (!recvIds.isEmpty() && !recvIds.contains(session.getUserId().toString())) {
continue;
}
// 组装vo
GroupMessageVO vo = BeanUtils.copyProperties(m, GroupMessageVO.class);
// 被@用户列表
List<String> atIds = CommaTextUtils.asList(m.getAtUserIds());
vo.setAtUserIds(atIds.stream().map(Long::parseLong).collect(Collectors.toList()));
// 填充状态
vo.setStatus(readedMaxId >= m.getId() ? MessageStatus.READED.code() : MessageStatus.PENDING.code());
// 针对回执消息填充已读人数
if (m.getReceipt()) {
if (Objects.isNull(maxIdMap)) {
maxIdMap = redisTemplate.opsForHash().entries(key);
}
int count = getReadedUserIds(maxIdMap, m.getId(), m.getSendId()).size();
vo.setReadedCount(count);
}
// 推送
IMGroupMessage<GroupMessageVO> sendMessage = new IMGroupMessage<>();
sendMessage.setSender(new IMUserInfo(m.getSendId(), IMTerminalType.WEB.code()));
sendMessage.setRecvIds(Arrays.asList(session.getUserId()));
sendMessage.setRecvTerminals(Arrays.asList(session.getTerminal()));
sendMessage.setSendResult(false);
sendMessage.setSendToSelf(false);
sendMessage.setData(vo);
imClient.sendGroupMessage(sendMessage);
sendCount++;
}
}
// 关闭加载中标志
this.sendLoadingMessage(false, session);
log.info("拉取离线群聊消息,用户id:{},数量:{}", session.getUserId(), sendCount++);
});
}
@Override
public List<GroupMessageVO> loadOffineMessage(Long minId) {
UserSession session = SessionContext.getSession();
@ -262,8 +152,8 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
if (groupIds.isEmpty()) {
return Collections.EMPTY_LIST;
}
// 只能拉取最近1个月的消息
Date minDate = DateUtils.addMonths(new Date(), -1);
// 只能拉取最近30天的消息
Date minDate = DateUtils.addDays(new Date(), Math.toIntExact(-Constant.MAX_OFFLINE_MESSAGE_DAYS));
LambdaQueryWrapper<GroupMessage> wrapper = Wrappers.lambdaQuery();
wrapper.gt(GroupMessage::getId, minId);
wrapper.gt(GroupMessage::getSendTime, minDate);
@ -271,8 +161,16 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
wrapper.orderByDesc(GroupMessage::getId);
wrapper.last("limit 50000");
List<GroupMessage> messages = this.list(wrapper);
// 退群前的消息
List<GroupMember> quitMembers = groupMemberService.findQuitInMonth(session.getUserId());
// 查询退群前的消息
Date minQuitTime = minDate;
if (minId > 0) {
// 如果某个群的退群时间大于起始消息的发送时间,那消息是不用推送的,过滤掉
GroupMessage message = this.getById(minId);
if (!Objects.isNull(message) && message.getSendTime().compareTo(minDate) > 0) {
minQuitTime = message.getSendTime();
}
}
List<GroupMember> quitMembers = groupMemberService.findQuitMembers(session.getUserId(), minQuitTime);
for (GroupMember quitMember : quitMembers) {
wrapper = Wrappers.lambdaQuery();
wrapper.gt(GroupMessage::getId, minId);

4
im-platform/src/main/java/com/bx/implatform/service/impl/GroupServiceImpl.java

@ -35,6 +35,7 @@ import com.bx.implatform.vo.GroupVO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
@ -235,7 +236,8 @@ public class GroupServiceImpl extends ServiceImpl<GroupMapper, Group> implements
// 查询当前用户的群id列表
List<GroupMember> groupMembers = groupMemberService.findByUserId(session.getUserId());
// 一个月内退的群可能存在退群前的离线消息,一并返回作为前端缓存
groupMembers.addAll(groupMemberService.findQuitInMonth(session.getUserId()));
Date minDate = DateUtils.addDays(new Date(), Math.toIntExact(-Constant.MAX_OFFLINE_MESSAGE_DAYS));
groupMembers.addAll(groupMemberService.findQuitMembers(session.getUserId(),minDate));
if (groupMembers.isEmpty()) {
return new LinkedList<>();
}

46
im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java

@ -7,10 +7,10 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.bx.imclient.IMClient;
import com.bx.imcommon.contant.IMConstant;
import com.bx.imcommon.enums.IMTerminalType;
import com.bx.imcommon.model.IMPrivateMessage;
import com.bx.imcommon.model.IMUserInfo;
import com.bx.imcommon.util.ThreadPoolExecutorFactory;
import com.bx.implatform.contant.Constant;
import com.bx.implatform.dto.PrivateMessageDTO;
import com.bx.implatform.entity.PrivateMessage;
import com.bx.implatform.enums.MessageStatus;
@ -32,7 +32,6 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.stream.Collectors;
@ -136,52 +135,13 @@ public class PrivateMessageServiceImpl extends ServiceImpl<PrivateMessageMapper,
return messageInfos;
}
@Override
public void pullOfflineMessage(Long minId) {
UserSession session = SessionContext.getSession();
// 获取当前用户的消息
LambdaQueryWrapper<PrivateMessage> wrapper = Wrappers.lambdaQuery();
// 只能拉取最近1个月的消息
Date minDate = DateUtils.addMonths(new Date(), -1);
wrapper.gt(PrivateMessage::getId, minId);
wrapper.ge(PrivateMessage::getSendTime, minDate);
wrapper.and(wp -> wp.eq(PrivateMessage::getSendId, session.getUserId()).or()
.eq(PrivateMessage::getRecvId, session.getUserId()));
wrapper.orderByAsc(PrivateMessage::getId);
List<PrivateMessage> messages = this.list(wrapper);
// 异步推送消息
EXECUTOR.execute(() -> {
// 开启加载中标志
this.sendLoadingMessage(true, session);
for (PrivateMessage m : messages) {
// 推送过程如果用户下线了,则不再推送
if (!imClient.isOnline(session.getUserId(), IMTerminalType.fromCode(session.getTerminal()))) {
log.info("用户已下线,停止推送离线私聊消息,用户id:{}", session.getUserId());
return;
}
PrivateMessageVO vo = BeanUtils.copyProperties(m, PrivateMessageVO.class);
IMPrivateMessage<PrivateMessageVO> sendMessage = new IMPrivateMessage<>();
sendMessage.setSender(new IMUserInfo(m.getSendId(), IMTerminalType.WEB.code()));
sendMessage.setRecvId(session.getUserId());
sendMessage.setRecvTerminals(List.of(session.getTerminal()));
sendMessage.setSendToSelf(false);
sendMessage.setData(vo);
sendMessage.setSendResult(true);
imClient.sendPrivateMessage(sendMessage);
}
// 关闭加载中标志
this.sendLoadingMessage(false, session);
log.info("拉取私聊消息,用户id:{},数量:{}", session.getUserId(), messages.size());
});
}
@Override
public List<PrivateMessageVO> loadOfflineMessage(Long minId) {
UserSession session = SessionContext.getSession();
// 获取当前用户的消息
LambdaQueryWrapper<PrivateMessage> wrapper = Wrappers.lambdaQuery();
// 只能拉取最近1个月的消息
Date minDate = DateUtils.addMonths(new Date(), -1);
// 只能拉取最近30天的消息
Date minDate = DateUtils.addDays(new Date(), Math.toIntExact(-Constant.MAX_OFFLINE_MESSAGE_DAYS));
wrapper.gt(PrivateMessage::getId, minId);
wrapper.ge(PrivateMessage::getSendTime, minDate);
wrapper.and(wp -> wp.eq(PrivateMessage::getSendId, session.getUserId()).or()

1
im-platform/src/main/java/com/bx/implatform/thirdparty/MinioService.java

@ -1,7 +1,6 @@
package com.bx.implatform.thirdparty;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.RandomUtil;
import com.bx.implatform.util.DateTimeUtils;
import com.bx.implatform.util.FileUtil;
import io.minio.*;

Loading…
Cancel
Save