Browse Source

1.拉取离线消息改为异步方式,防止请求超时

2.前端消息列表优化,清除多余div
master
xsx 10 months ago
parent
commit
9be62faf38
  1. 60
      im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java
  2. 16
      im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java
  3. 17
      im-web/src/components/chat/ChatBox.vue

60
im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java

@ -14,6 +14,7 @@ import com.bx.imcommon.enums.IMTerminalType;
import com.bx.imcommon.model.IMGroupMessage; import com.bx.imcommon.model.IMGroupMessage;
import com.bx.imcommon.model.IMUserInfo; import com.bx.imcommon.model.IMUserInfo;
import com.bx.imcommon.util.CommaTextUtils; import com.bx.imcommon.util.CommaTextUtils;
import com.bx.imcommon.util.ThreadPoolExecutorFactory;
import com.bx.implatform.contant.Constant; import com.bx.implatform.contant.Constant;
import com.bx.implatform.contant.RedisKey; import com.bx.implatform.contant.RedisKey;
import com.bx.implatform.dto.GroupMessageDTO; import com.bx.implatform.dto.GroupMessageDTO;
@ -40,19 +41,21 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.util.*; import java.util.*;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Slf4j @Slf4j
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, GroupMessage> implements public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, GroupMessage>
GroupMessageService { implements GroupMessageService {
private final GroupService groupService; private final GroupService groupService;
private final GroupMemberService groupMemberService; private final GroupMemberService groupMemberService;
private final RedisTemplate<String, Object> redisTemplate; private final RedisTemplate<String, Object> redisTemplate;
private final IMClient imClient; private final IMClient imClient;
private final SensitiveFilterUtil sensitiveFilterUtil; private final SensitiveFilterUtil sensitiveFilterUtil;
private static final ScheduledThreadPoolExecutor EXECUTOR = ThreadPoolExecutorFactory.getThreadPoolExecutor();
@Override @Override
public GroupMessageVO sendMessage(GroupMessageDTO dto) { public GroupMessageVO sendMessage(GroupMessageDTO dto) {
@ -67,7 +70,8 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
List<Long> userIds = groupMemberService.findUserIdsByGroupId(group.getId()); List<Long> userIds = groupMemberService.findUserIdsByGroupId(group.getId());
if (dto.getReceipt() && userIds.size() > Constant.MAX_LARGE_GROUP_MEMBER) { if (dto.getReceipt() && userIds.size() > Constant.MAX_LARGE_GROUP_MEMBER) {
// 大群的回执消息过于消耗资源,不允许发送 // 大群的回执消息过于消耗资源,不允许发送
throw new GlobalException(String.format("当前群聊大于%s人,不支持发送回执消息", Constant.MAX_LARGE_GROUP_MEMBER)); throw new GlobalException(
String.format("当前群聊大于%s人,不支持发送回执消息", Constant.MAX_LARGE_GROUP_MEMBER));
} }
// 不用发给自己 // 不用发给自己
userIds = userIds.stream().filter(id -> !session.getUserId().equals(id)).collect(Collectors.toList()); userIds = userIds.stream().filter(id -> !session.getUserId().equals(id)).collect(Collectors.toList());
@ -82,7 +86,6 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
msg.setContent(sensitiveFilterUtil.filter(dto.getContent())); msg.setContent(sensitiveFilterUtil.filter(dto.getContent()));
} }
this.save(msg); this.save(msg);
// 群发 // 群发
GroupMessageVO msgInfo = BeanUtils.copyProperties(msg, GroupMessageVO.class); GroupMessageVO msgInfo = BeanUtils.copyProperties(msg, GroupMessageVO.class);
msgInfo.setAtUserIds(dto.getAtUserIds()); msgInfo.setAtUserIds(dto.getAtUserIds());
@ -140,7 +143,6 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
return msgInfo; return msgInfo;
} }
@Override @Override
public void pullOfflineMessage(Long minId) { public void pullOfflineMessage(Long minId) {
UserSession session = SessionContext.getSession(); UserSession session = SessionContext.getSession();
@ -153,44 +155,48 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
Set<Long> groupIds = groupMemberMap.keySet(); Set<Long> groupIds = groupMemberMap.keySet();
if (CollectionUtil.isEmpty(groupIds)) { if (CollectionUtil.isEmpty(groupIds)) {
// 关闭加载中标志 // 关闭加载中标志
this.sendLoadingMessage(false); this.sendLoadingMessage(false, session);
return; return;
} }
// 开启加载中标志
this.sendLoadingMessage(true); // 只能拉取最近3个月的,移动端只拉最近一个月
// 只能拉取最近3个月的,最多拉取3000条
int months = session.getTerminal().equals(IMTerminalType.APP.code()) ? 1 : 3; int months = session.getTerminal().equals(IMTerminalType.APP.code()) ? 1 : 3;
Date minDate = DateUtils.addMonths(new Date(), -months); Date minDate = DateUtils.addMonths(new Date(), -months);
LambdaQueryWrapper<GroupMessage> wrapper = Wrappers.lambdaQuery(); LambdaQueryWrapper<GroupMessage> wrapper = Wrappers.lambdaQuery();
wrapper.gt(GroupMessage::getId, minId) wrapper.gt(GroupMessage::getId, minId).gt(GroupMessage::getSendTime, minDate)
.gt(GroupMessage::getSendTime, minDate) .in(GroupMessage::getGroupId, groupIds).orderByAsc(GroupMessage::getId);
.in(GroupMessage::getGroupId, groupIds)
.orderByAsc(GroupMessage::getId);
List<GroupMessage> messages = this.list(wrapper); List<GroupMessage> messages = this.list(wrapper);
// 通过群聊对消息进行分组 // 通过群聊对消息进行分组
Map<Long, List<GroupMessage>> messageGroupMap = messages.stream().collect(Collectors.groupingBy(GroupMessage::getGroupId)); Map<Long, List<GroupMessage>> messageGroupMap =
messages.stream().collect(Collectors.groupingBy(GroupMessage::getGroupId));
// 退群前的消息 // 退群前的消息
List<GroupMember> quitMembers = groupMemberService.findQuitInMonth(session.getUserId()); List<GroupMember> quitMembers = groupMemberService.findQuitInMonth(session.getUserId());
for (GroupMember quitMember : quitMembers) { for (GroupMember quitMember : quitMembers) {
wrapper = Wrappers.lambdaQuery(); wrapper = Wrappers.lambdaQuery();
wrapper.gt(GroupMessage::getId, minId) wrapper.gt(GroupMessage::getId, minId).between(GroupMessage::getSendTime, minDate, quitMember.getQuitTime())
.between(GroupMessage::getSendTime, minDate,quitMember.getQuitTime())
.eq(GroupMessage::getGroupId, quitMember.getGroupId()) .eq(GroupMessage::getGroupId, quitMember.getGroupId())
.ne(GroupMessage::getStatus, MessageStatus.RECALL.code()) .ne(GroupMessage::getStatus, MessageStatus.RECALL.code()).orderByAsc(GroupMessage::getId);
.orderByAsc(GroupMessage::getId);
List<GroupMessage> groupMessages = this.list(wrapper); List<GroupMessage> groupMessages = this.list(wrapper);
messageGroupMap.put(quitMember.getGroupId(), groupMessages); messageGroupMap.put(quitMember.getGroupId(), groupMessages);
groupMemberMap.put(quitMember.getGroupId(), quitMember); groupMemberMap.put(quitMember.getGroupId(), quitMember);
} }
EXECUTOR.execute(() -> {
// 开启加载中标志
this.sendLoadingMessage(true, session);
// 推送消息 // 推送消息
AtomicInteger sendCount = new AtomicInteger(); AtomicInteger sendCount = new AtomicInteger();
messageGroupMap.forEach((groupId, groupMessages) -> { messageGroupMap.forEach((groupId, groupMessages) -> {
// 第一次拉取时,一个群最多推送1w条消息,防止前端接收能力溢出导致卡顿
List<GroupMessage> sendMessages = groupMessages;
if (minId <= 0 && groupMessages.size() > 10000) {
sendMessages = groupMessages.subList(groupMessages.size() - 10000, groupMessages.size());
}
// 填充消息状态 // 填充消息状态
String key = StrUtil.join(":", RedisKey.IM_GROUP_READED_POSITION, groupId); String key = StrUtil.join(":", RedisKey.IM_GROUP_READED_POSITION, groupId);
Object o = redisTemplate.opsForHash().get(key, session.getUserId().toString()); Object o = redisTemplate.opsForHash().get(key, session.getUserId().toString());
long readedMaxId = Objects.isNull(o) ? -1 : Long.parseLong(o.toString()); long readedMaxId = Objects.isNull(o) ? -1 : Long.parseLong(o.toString());
Map<Object, Object> maxIdMap = null; Map<Object, Object> maxIdMap = null;
for(GroupMessage m:groupMessages){ for (GroupMessage m : sendMessages) {
// 排除加群之前的消息 // 排除加群之前的消息
GroupMember member = groupMemberMap.get(m.getGroupId()); GroupMember member = groupMemberMap.get(m.getGroupId());
if (DateUtil.compare(member.getCreatedTime(), m.getSendTime()) > 0) { if (DateUtil.compare(member.getCreatedTime(), m.getSendTime()) > 0) {
@ -229,8 +235,9 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
} }
}); });
// 关闭加载中标志 // 关闭加载中标志
this.sendLoadingMessage(false); this.sendLoadingMessage(false, session);
log.info("拉取离线群聊消息,用户id:{},数量:{}", session.getUserId(), sendCount.get()); log.info("拉取离线群聊消息,用户id:{},数量:{}", session.getUserId(), sendCount.get());
});
} }
@Override @Override
@ -238,9 +245,7 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
UserSession session = SessionContext.getSession(); UserSession session = SessionContext.getSession();
// 取出最后的消息id // 取出最后的消息id
LambdaQueryWrapper<GroupMessage> wrapper = Wrappers.lambdaQuery(); LambdaQueryWrapper<GroupMessage> wrapper = Wrappers.lambdaQuery();
wrapper.eq(GroupMessage::getGroupId, groupId) wrapper.eq(GroupMessage::getGroupId, groupId).orderByDesc(GroupMessage::getId).last("limit 1")
.orderByDesc(GroupMessage::getId)
.last("limit 1")
.select(GroupMessage::getId); .select(GroupMessage::getId);
GroupMessage message = this.getOne(wrapper); GroupMessage message = this.getOne(wrapper);
if (Objects.isNull(message)) { if (Objects.isNull(message)) {
@ -276,7 +281,8 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
List<Long> userIds = groupMemberService.findUserIdsByGroupId(groupId); List<Long> userIds = groupMemberService.findUserIdsByGroupId(groupId);
Map<Object, Object> maxIdMap = redisTemplate.opsForHash().entries(key); Map<Object, Object> maxIdMap = redisTemplate.opsForHash().entries(key);
for (GroupMessage receiptMessage : receiptMessages) { for (GroupMessage receiptMessage : receiptMessages) {
Integer readedCount = getReadedUserIds(maxIdMap, receiptMessage.getId(),receiptMessage.getSendId()).size(); Integer readedCount =
getReadedUserIds(maxIdMap, receiptMessage.getId(), receiptMessage.getSendId()).size();
// 如果所有人都已读,记录回执消息完成标记 // 如果所有人都已读,记录回执消息完成标记
if (readedCount >= userIds.size() - 1) { if (readedCount >= userIds.size() - 1) {
receiptMessage.setReceiptOk(true); receiptMessage.setReceiptOk(true);
@ -333,7 +339,8 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
// 查询聊天记录,只查询加入群聊时间之后的消息 // 查询聊天记录,只查询加入群聊时间之后的消息
QueryWrapper<GroupMessage> wrapper = new QueryWrapper<>(); QueryWrapper<GroupMessage> wrapper = new QueryWrapper<>();
wrapper.lambda().eq(GroupMessage::getGroupId, groupId).gt(GroupMessage::getSendTime, member.getCreatedTime()) wrapper.lambda().eq(GroupMessage::getGroupId, groupId).gt(GroupMessage::getSendTime, member.getCreatedTime())
.ne(GroupMessage::getStatus, MessageStatus.RECALL.code()).orderByDesc(GroupMessage::getId).last("limit " + stIdx + "," + size); .ne(GroupMessage::getStatus, MessageStatus.RECALL.code()).orderByDesc(GroupMessage::getId)
.last("limit " + stIdx + "," + size);
List<GroupMessage> messages = this.list(wrapper); List<GroupMessage> messages = this.list(wrapper);
List<GroupMessageVO> messageInfos = List<GroupMessageVO> messageInfos =
messages.stream().map(m -> BeanUtils.copyProperties(m, GroupMessageVO.class)).collect(Collectors.toList()); messages.stream().map(m -> BeanUtils.copyProperties(m, GroupMessageVO.class)).collect(Collectors.toList());
@ -354,8 +361,7 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
return userIds; return userIds;
} }
private void sendLoadingMessage(Boolean isLoadding){ private void sendLoadingMessage(Boolean isLoadding, UserSession session) {
UserSession session = SessionContext.getSession();
GroupMessageVO msgInfo = new GroupMessageVO(); GroupMessageVO msgInfo = new GroupMessageVO();
msgInfo.setType(MessageType.LOADING.code()); msgInfo.setType(MessageType.LOADING.code());
msgInfo.setContent(isLoadding.toString()); msgInfo.setContent(isLoadding.toString());

16
im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java

@ -10,6 +10,7 @@ import com.bx.imcommon.contant.IMConstant;
import com.bx.imcommon.enums.IMTerminalType; import com.bx.imcommon.enums.IMTerminalType;
import com.bx.imcommon.model.IMPrivateMessage; import com.bx.imcommon.model.IMPrivateMessage;
import com.bx.imcommon.model.IMUserInfo; import com.bx.imcommon.model.IMUserInfo;
import com.bx.imcommon.util.ThreadPoolExecutorFactory;
import com.bx.implatform.dto.PrivateMessageDTO; import com.bx.implatform.dto.PrivateMessageDTO;
import com.bx.implatform.entity.PrivateMessage; import com.bx.implatform.entity.PrivateMessage;
import com.bx.implatform.enums.MessageStatus; import com.bx.implatform.enums.MessageStatus;
@ -32,6 +33,7 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Slf4j @Slf4j
@ -43,6 +45,7 @@ public class PrivateMessageServiceImpl extends ServiceImpl<PrivateMessageMapper,
private final FriendService friendService; private final FriendService friendService;
private final IMClient imClient; private final IMClient imClient;
private final SensitiveFilterUtil sensitiveFilterUtil; private final SensitiveFilterUtil sensitiveFilterUtil;
private static final ScheduledThreadPoolExecutor EXECUTOR = ThreadPoolExecutorFactory.getThreadPoolExecutor();
@Override @Override
public PrivateMessageVO sendMessage(PrivateMessageDTO dto) { public PrivateMessageVO sendMessage(PrivateMessageDTO dto) {
@ -135,8 +138,6 @@ public class PrivateMessageServiceImpl extends ServiceImpl<PrivateMessageMapper,
@Override @Override
public void pullOfflineMessage(Long minId) { public void pullOfflineMessage(Long minId) {
UserSession session = SessionContext.getSession(); UserSession session = SessionContext.getSession();
// 开启加载中标志
this.sendLoadingMessage(true);
// 获取当前用户的消息 // 获取当前用户的消息
LambdaQueryWrapper<PrivateMessage> wrapper = Wrappers.lambdaQuery(); LambdaQueryWrapper<PrivateMessage> wrapper = Wrappers.lambdaQuery();
// 只能拉取最近3个月的消息,移动端只拉取一个月消息 // 只能拉取最近3个月的消息,移动端只拉取一个月消息
@ -148,7 +149,10 @@ public class PrivateMessageServiceImpl extends ServiceImpl<PrivateMessageMapper,
.eq(PrivateMessage::getRecvId, session.getUserId())); .eq(PrivateMessage::getRecvId, session.getUserId()));
wrapper.orderByAsc(PrivateMessage::getId); wrapper.orderByAsc(PrivateMessage::getId);
List<PrivateMessage> messages = this.list(wrapper); List<PrivateMessage> messages = this.list(wrapper);
// 推送消息 // 异步推送消息
EXECUTOR.execute(() -> {
// 开启加载中标志
this.sendLoadingMessage(true, session);
for (PrivateMessage m : messages) { for (PrivateMessage m : messages) {
PrivateMessageVO vo = BeanUtils.copyProperties(m, PrivateMessageVO.class); PrivateMessageVO vo = BeanUtils.copyProperties(m, PrivateMessageVO.class);
IMPrivateMessage<PrivateMessageVO> sendMessage = new IMPrivateMessage<>(); IMPrivateMessage<PrivateMessageVO> sendMessage = new IMPrivateMessage<>();
@ -161,8 +165,9 @@ public class PrivateMessageServiceImpl extends ServiceImpl<PrivateMessageMapper,
imClient.sendPrivateMessage(sendMessage); imClient.sendPrivateMessage(sendMessage);
} }
// 关闭加载中标志 // 关闭加载中标志
this.sendLoadingMessage(false); this.sendLoadingMessage(false, session);
log.info("拉取私聊消息,用户id:{},数量:{}", session.getUserId(), messages.size()); log.info("拉取私聊消息,用户id:{},数量:{}", session.getUserId(), messages.size());
});
} }
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
@ -215,8 +220,7 @@ public class PrivateMessageServiceImpl extends ServiceImpl<PrivateMessageMapper,
return message.getId(); return message.getId();
} }
private void sendLoadingMessage(Boolean isLoadding) { private void sendLoadingMessage(Boolean isLoadding, UserSession session) {
UserSession session = SessionContext.getSession();
PrivateMessageVO msgInfo = new PrivateMessageVO(); PrivateMessageVO msgInfo = new PrivateMessageVO();
msgInfo.setType(MessageType.LOADING.code()); msgInfo.setType(MessageType.LOADING.code());
msgInfo.setContent(isLoadding.toString()); msgInfo.setContent(isLoadding.toString());

17
im-web/src/components/chat/ChatBox.vue

@ -10,16 +10,13 @@
<el-container class="content-box"> <el-container class="content-box">
<el-main class="im-chat-main" id="chatScrollBox" @scroll="onScroll"> <el-main class="im-chat-main" id="chatScrollBox" @scroll="onScroll">
<div class="im-chat-box"> <div class="im-chat-box">
<ul> <div v-for="(msgInfo, idx) in showMessages" :key="showMinIdx + idx">
<li v-for="(msgInfo, idx) in chat.messages" :key="idx"> <chat-message-item @call="onCall(msgInfo.type)"
<chat-message-item v-if="idx >= showMinIdx" @call="onCall(msgInfo.type)"
:mine="msgInfo.sendId == mine.id" :headImage="headImage(msgInfo)" :mine="msgInfo.sendId == mine.id" :headImage="headImage(msgInfo)"
:showName="showName(msgInfo)" :msgInfo="msgInfo" :showName="showName(msgInfo)" :msgInfo="msgInfo" :groupMembers="groupMembers"
:groupMembers="groupMembers" @delete="deleteMessage" @delete="deleteMessage" @recall="recallMessage">
@recall="recallMessage">
</chat-message-item> </chat-message-item>
</li> </div>
</ul>
</div> </div>
</el-main> </el-main>
<div v-if="!isInBottom" class="scroll-to-bottom" @click="scrollToBottom"> <div v-if="!isInBottom" class="scroll-to-bottom" @click="scrollToBottom">
@ -684,6 +681,10 @@ export default {
unreadCount() { unreadCount() {
return this.chat.unreadCount; return this.chat.unreadCount;
}, },
showMessages() {
console.log("this.chat.messages.slice(this.showMinIdx):",this.chat.messages.slice(this.showMinIdx))
return this.chat.messages.slice(this.showMinIdx)
},
messageSize() { messageSize() {
if (!this.chat || !this.chat.messages) { if (!this.chat || !this.chat.messages) {
return 0; return 0;

Loading…
Cancel
Save