|
|
|
@ -6,6 +6,7 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers; |
|
|
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; |
|
|
|
import com.bx.imclient.IMClient; |
|
|
|
import com.bx.imcommon.contant.IMConstant; |
|
|
|
import com.bx.implatform.util.DateTimeUtils; |
|
|
|
import com.bx.implatform.vo.GroupMessageVO; |
|
|
|
import com.bx.imcommon.model.IMGroupMessage; |
|
|
|
import com.bx.imcommon.model.IMUserInfo; |
|
|
|
@ -33,6 +34,7 @@ import org.springframework.stereotype.Service; |
|
|
|
import java.util.Collections; |
|
|
|
import java.util.Date; |
|
|
|
import java.util.List; |
|
|
|
import java.util.Objects; |
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
@Slf4j |
|
|
|
@ -43,7 +45,7 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro |
|
|
|
@Autowired |
|
|
|
private IGroupMemberService groupMemberService; |
|
|
|
@Autowired |
|
|
|
private RedisTemplate<String,Object> redisTemplate; |
|
|
|
private RedisTemplate<String, Object> redisTemplate; |
|
|
|
@Autowired |
|
|
|
private IMClient imClient; |
|
|
|
|
|
|
|
@ -57,16 +59,16 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro |
|
|
|
public Long sendMessage(GroupMessageDTO dto) { |
|
|
|
UserSession session = SessionContext.getSession(); |
|
|
|
Group group = groupService.getById(dto.getGroupId()); |
|
|
|
if(group == null){ |
|
|
|
throw new GlobalException(ResultCode.PROGRAM_ERROR,"群聊不存在"); |
|
|
|
if (group == null) { |
|
|
|
throw new GlobalException(ResultCode.PROGRAM_ERROR, "群聊不存在"); |
|
|
|
} |
|
|
|
if(group.getDeleted()){ |
|
|
|
throw new GlobalException(ResultCode.PROGRAM_ERROR,"群聊已解散"); |
|
|
|
if (group.getDeleted()) { |
|
|
|
throw new GlobalException(ResultCode.PROGRAM_ERROR, "群聊已解散"); |
|
|
|
} |
|
|
|
// 判断是否在群里
|
|
|
|
List<Long> userIds = groupMemberService.findUserIdsByGroupId(group.getId()); |
|
|
|
if(!userIds.contains(session.getUserId())){ |
|
|
|
throw new GlobalException(ResultCode.PROGRAM_ERROR,"您已不在群聊里面,无法发送消息"); |
|
|
|
if (!userIds.contains(session.getUserId())) { |
|
|
|
throw new GlobalException(ResultCode.PROGRAM_ERROR, "您已不在群聊里面,无法发送消息"); |
|
|
|
} |
|
|
|
// 保存消息
|
|
|
|
GroupMessage msg = BeanUtils.copyProperties(dto, GroupMessage.class); |
|
|
|
@ -74,21 +76,19 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro |
|
|
|
msg.setSendTime(new Date()); |
|
|
|
this.save(msg); |
|
|
|
// 不用发给自己
|
|
|
|
userIds = userIds.stream().filter(id->!session.getUserId().equals(id)).collect(Collectors.toList()); |
|
|
|
userIds = userIds.stream().filter(id -> !session.getUserId().equals(id)).collect(Collectors.toList()); |
|
|
|
// 群发
|
|
|
|
GroupMessageVO msgInfo = BeanUtils.copyProperties(msg, GroupMessageVO.class); |
|
|
|
IMGroupMessage<GroupMessageVO> sendMessage = new IMGroupMessage<>(); |
|
|
|
sendMessage.setSender(new IMUserInfo(session.getUserId(),session.getTerminal())); |
|
|
|
sendMessage.setSender(new IMUserInfo(session.getUserId(), session.getTerminal())); |
|
|
|
sendMessage.setRecvIds(userIds); |
|
|
|
sendMessage.setData(msgInfo); |
|
|
|
imClient.sendGroupMessage(sendMessage); |
|
|
|
log.info("发送群聊消息,发送id:{},群聊id:{},内容:{}",session.getUserId(),dto.getGroupId(),dto.getContent()); |
|
|
|
log.info("发送群聊消息,发送id:{},群聊id:{},内容:{}", session.getUserId(), dto.getGroupId(), dto.getContent()); |
|
|
|
return msg.getId(); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
* 撤回消息 |
|
|
|
* |
|
|
|
@ -98,19 +98,19 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro |
|
|
|
public void recallMessage(Long id) { |
|
|
|
UserSession session = SessionContext.getSession(); |
|
|
|
GroupMessage msg = this.getById(id); |
|
|
|
if(msg == null){ |
|
|
|
throw new GlobalException(ResultCode.PROGRAM_ERROR,"消息不存在"); |
|
|
|
if (msg == null) { |
|
|
|
throw new GlobalException(ResultCode.PROGRAM_ERROR, "消息不存在"); |
|
|
|
} |
|
|
|
if(!msg.getSendId().equals(session.getUserId())){ |
|
|
|
throw new GlobalException(ResultCode.PROGRAM_ERROR,"这条消息不是由您发送,无法撤回"); |
|
|
|
if (!msg.getSendId().equals(session.getUserId())) { |
|
|
|
throw new GlobalException(ResultCode.PROGRAM_ERROR, "这条消息不是由您发送,无法撤回"); |
|
|
|
} |
|
|
|
if(System.currentTimeMillis() - msg.getSendTime().getTime() > IMConstant.ALLOW_RECALL_SECOND * 1000){ |
|
|
|
throw new GlobalException(ResultCode.PROGRAM_ERROR,"消息已发送超过5分钟,无法撤回"); |
|
|
|
if (System.currentTimeMillis() - msg.getSendTime().getTime() > IMConstant.ALLOW_RECALL_SECOND * 1000) { |
|
|
|
throw new GlobalException(ResultCode.PROGRAM_ERROR, "消息已发送超过5分钟,无法撤回"); |
|
|
|
} |
|
|
|
// 判断是否在群里
|
|
|
|
GroupMember member = groupMemberService.findByGroupAndUserId(msg.getGroupId(),session.getUserId()); |
|
|
|
if(member == null){ |
|
|
|
throw new GlobalException(ResultCode.PROGRAM_ERROR,"您已不在群聊里面,无法撤回消息"); |
|
|
|
GroupMember member = groupMemberService.findByGroupAndUserId(msg.getGroupId(), session.getUserId()); |
|
|
|
if (member == null) { |
|
|
|
throw new GlobalException(ResultCode.PROGRAM_ERROR, "您已不在群聊里面,无法撤回消息"); |
|
|
|
} |
|
|
|
// 修改数据库
|
|
|
|
msg.setStatus(MessageStatus.RECALL.code()); |
|
|
|
@ -118,15 +118,15 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro |
|
|
|
// 群发
|
|
|
|
List<Long> userIds = groupMemberService.findUserIdsByGroupId(msg.getGroupId()); |
|
|
|
// 不用发给自己
|
|
|
|
userIds = userIds.stream().filter(uid->!session.getUserId().equals(uid)).collect(Collectors.toList()); |
|
|
|
userIds = userIds.stream().filter(uid -> !session.getUserId().equals(uid)).collect(Collectors.toList()); |
|
|
|
GroupMessageVO msgInfo = BeanUtils.copyProperties(msg, GroupMessageVO.class); |
|
|
|
msgInfo.setType(MessageType.RECALL.code()); |
|
|
|
String content = String.format("'%s'撤回了一条消息",member.getAliasName()); |
|
|
|
String content = String.format("'%s'撤回了一条消息", member.getAliasName()); |
|
|
|
msgInfo.setContent(content); |
|
|
|
msgInfo.setSendTime(new Date()); |
|
|
|
|
|
|
|
IMGroupMessage<GroupMessageVO> sendMessage = new IMGroupMessage<>(); |
|
|
|
sendMessage.setSender(new IMUserInfo(session.getUserId(),session.getTerminal())); |
|
|
|
sendMessage.setSender(new IMUserInfo(session.getUserId(), session.getTerminal())); |
|
|
|
sendMessage.setRecvIds(userIds); |
|
|
|
sendMessage.setData(msgInfo); |
|
|
|
sendMessage.setSendResult(false); |
|
|
|
@ -139,40 +139,39 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro |
|
|
|
sendMessage.setRecvIds(Collections.emptyList()); |
|
|
|
sendMessage.setRecvTerminals(Collections.emptyList()); |
|
|
|
imClient.sendGroupMessage(sendMessage); |
|
|
|
log.info("撤回群聊消息,发送id:{},群聊id:{},内容:{}",session.getUserId(),msg.getGroupId(),msg.getContent()); |
|
|
|
log.info("撤回群聊消息,发送id:{},群聊id:{},内容:{}", session.getUserId(), msg.getGroupId(), msg.getContent()); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
* 异步拉取群聊消息,通过websocket异步推送 |
|
|
|
* |
|
|
|
*/ |
|
|
|
@Override |
|
|
|
public void pullUnreadMessage() { |
|
|
|
UserSession session = SessionContext.getSession(); |
|
|
|
List<GroupMember> members = groupMemberService.findByUserId(session.getUserId()); |
|
|
|
for(GroupMember member:members){ |
|
|
|
for (GroupMember member : members) { |
|
|
|
// 获取群聊已读的最大消息id,只推送未读消息
|
|
|
|
String key = String.join(":",RedisKey.IM_GROUP_READED_POSITION,member.getGroupId().toString(),session.getUserId().toString()); |
|
|
|
Integer maxReadedId = (Integer)redisTemplate.opsForValue().get(key); |
|
|
|
String key = String.join(":", RedisKey.IM_GROUP_READED_POSITION, member.getGroupId().toString(), session.getUserId().toString()); |
|
|
|
Integer maxReadedId = (Integer) redisTemplate.opsForValue().get(key); |
|
|
|
LambdaQueryWrapper<GroupMessage> wrapper = Wrappers.lambdaQuery(); |
|
|
|
wrapper.eq(GroupMessage::getGroupId,member.getGroupId()) |
|
|
|
.gt(GroupMessage::getSendTime,member.getCreatedTime()) |
|
|
|
wrapper.eq(GroupMessage::getGroupId, member.getGroupId()) |
|
|
|
.gt(GroupMessage::getSendTime, member.getCreatedTime()) |
|
|
|
.ne(GroupMessage::getSendId, session.getUserId()) |
|
|
|
.ne(GroupMessage::getStatus, MessageStatus.RECALL.code()); |
|
|
|
if(maxReadedId!=null){ |
|
|
|
wrapper.gt(GroupMessage::getId,maxReadedId); |
|
|
|
if (maxReadedId != null) { |
|
|
|
wrapper.gt(GroupMessage::getId, maxReadedId); |
|
|
|
} |
|
|
|
wrapper.last("limit 100"); |
|
|
|
List<GroupMessage> messages = this.list(wrapper); |
|
|
|
if(messages.isEmpty()){ |
|
|
|
if (messages.isEmpty()) { |
|
|
|
continue; |
|
|
|
} |
|
|
|
// 推送
|
|
|
|
for (GroupMessage message:messages ){ |
|
|
|
for (GroupMessage message : messages) { |
|
|
|
GroupMessageVO msgInfo = BeanUtils.copyProperties(message, GroupMessageVO.class); |
|
|
|
IMGroupMessage<GroupMessageVO> sendMessage = new IMGroupMessage<>(); |
|
|
|
sendMessage.setSender(new IMUserInfo(session.getUserId(),session.getTerminal())); |
|
|
|
sendMessage.setSender(new IMUserInfo(session.getUserId(), session.getTerminal())); |
|
|
|
// 只推给自己当前终端
|
|
|
|
sendMessage.setRecvIds(Collections.singletonList(session.getUserId())); |
|
|
|
sendMessage.setRecvTerminals(Collections.singletonList(session.getTerminal())); |
|
|
|
@ -180,41 +179,109 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro |
|
|
|
imClient.sendGroupMessage(sendMessage); |
|
|
|
} |
|
|
|
// 发送消息
|
|
|
|
log.info("拉取未读群聊消息,用户id:{},群聊id:{},数量:{}",session.getUserId(),member.getGroupId(),messages.size()); |
|
|
|
log.info("拉取未读群聊消息,用户id:{},群聊id:{},数量:{}", session.getUserId(), member.getGroupId(), messages.size()); |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 拉取消息,只能拉取最近3个月的消息,一次拉取100条 |
|
|
|
* |
|
|
|
* @param minId 消息起始id |
|
|
|
* @return 聊天消息列表 |
|
|
|
*/ |
|
|
|
@Override |
|
|
|
public List<GroupMessageVO> loadMessage(Long minId) { |
|
|
|
UserSession session = SessionContext.getSession(); |
|
|
|
List<GroupMember> members = groupMemberService.findByUserId(session.getUserId()); |
|
|
|
List<Long> ids = members.stream().map(GroupMember::getGroupId).collect(Collectors.toList()); |
|
|
|
// 只能拉取最近3个月的
|
|
|
|
Date minDate = DateTimeUtils.addMonths(new Date(), -1); |
|
|
|
LambdaQueryWrapper<GroupMessage> wrapper = Wrappers.lambdaQuery(); |
|
|
|
wrapper.gt(GroupMessage::getId, minId) |
|
|
|
.gt(GroupMessage::getSendTime, minDate) |
|
|
|
.in(GroupMessage::getGroupId, ids) |
|
|
|
.ne(GroupMessage::getStatus, MessageStatus.RECALL.code()) |
|
|
|
.last("limit 100"); |
|
|
|
|
|
|
|
List<GroupMessage> messages = this.list(wrapper); |
|
|
|
// 转成vo
|
|
|
|
List<GroupMessageVO> vos = messages.stream().map(m -> BeanUtils.copyProperties(m, GroupMessageVO.class)).collect(Collectors.toList()); |
|
|
|
// 消息状态,数据库没有存群聊的消息状态,需要从redis取
|
|
|
|
List<String> keys = ids.stream() |
|
|
|
.map(id -> String.join(":", RedisKey.IM_GROUP_READED_POSITION, id.toString(), session.getUserId().toString())) |
|
|
|
.collect(Collectors.toList()); |
|
|
|
List<Object> sendPos = redisTemplate.opsForValue().multiGet(keys); |
|
|
|
int idx = 0; |
|
|
|
for (Long id : ids) { |
|
|
|
Object o = sendPos.get(idx); |
|
|
|
Integer sendMaxId = Objects.isNull(o) ? -1 : (Integer) o; |
|
|
|
vos.stream().filter(vo -> vo.getGroupId().equals(id)).forEach(vo -> { |
|
|
|
if (vo.getId() <= sendMaxId) { |
|
|
|
// 已推送过
|
|
|
|
vo.setStatus(MessageStatus.SENDED.code()); |
|
|
|
} else { |
|
|
|
// 未推送
|
|
|
|
vo.setStatus(MessageStatus.UNSEND.code()); |
|
|
|
} |
|
|
|
}); |
|
|
|
idx++; |
|
|
|
} |
|
|
|
return vos; |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 消息已读,同步其他终端,清空未读数量 |
|
|
|
* |
|
|
|
* @param groupId 群聊 |
|
|
|
*/ |
|
|
|
@Override |
|
|
|
public void readedMessage(Long groupId) { |
|
|
|
UserSession session = SessionContext.getSession(); |
|
|
|
// 推送消息给自己的其他终端
|
|
|
|
GroupMessageVO msgInfo = new GroupMessageVO(); |
|
|
|
msgInfo.setType(MessageType.READED.code()); |
|
|
|
msgInfo.setSendTime(new Date()); |
|
|
|
msgInfo.setSendId(session.getUserId()); |
|
|
|
msgInfo.setGroupId(groupId); |
|
|
|
IMGroupMessage<GroupMessageVO> sendMessage = new IMGroupMessage<>(); |
|
|
|
sendMessage.setSender(new IMUserInfo(session.getUserId(), session.getTerminal())); |
|
|
|
sendMessage.setSendToSelf(true); |
|
|
|
sendMessage.setData(msgInfo); |
|
|
|
sendMessage.setSendResult(false); |
|
|
|
imClient.sendGroupMessage(sendMessage); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 拉取历史聊天记录 |
|
|
|
* |
|
|
|
* @param groupId 群聊id |
|
|
|
* @param page 页码 |
|
|
|
* @param size 页码大小 |
|
|
|
* @param page 页码 |
|
|
|
* @param size 页码大小 |
|
|
|
* @return 聊天记录列表 |
|
|
|
*/ |
|
|
|
@Override |
|
|
|
public List<GroupMessageVO> findHistoryMessage(Long groupId, Long page, Long size) { |
|
|
|
page = page > 0 ? page:1; |
|
|
|
size = size > 0 ? size:10; |
|
|
|
page = page > 0 ? page : 1; |
|
|
|
size = size > 0 ? size : 10; |
|
|
|
Long userId = SessionContext.getSession().getUserId(); |
|
|
|
long stIdx = (page-1)* size; |
|
|
|
long stIdx = (page - 1) * size; |
|
|
|
// 群聊成员信息
|
|
|
|
GroupMember member = groupMemberService.findByGroupAndUserId(groupId,userId); |
|
|
|
if(member == null || member.getQuit()){ |
|
|
|
throw new GlobalException(ResultCode.PROGRAM_ERROR,"您已不在群聊中"); |
|
|
|
GroupMember member = groupMemberService.findByGroupAndUserId(groupId, userId); |
|
|
|
if (member == null || member.getQuit()) { |
|
|
|
throw new GlobalException(ResultCode.PROGRAM_ERROR, "您已不在群聊中"); |
|
|
|
} |
|
|
|
// 查询聊天记录,只查询加入群聊时间之后的消息
|
|
|
|
QueryWrapper<GroupMessage> wrapper = new QueryWrapper<>(); |
|
|
|
wrapper.lambda().eq(GroupMessage::getGroupId,groupId) |
|
|
|
.gt(GroupMessage::getSendTime,member.getCreatedTime()) |
|
|
|
wrapper.lambda().eq(GroupMessage::getGroupId, groupId) |
|
|
|
.gt(GroupMessage::getSendTime, member.getCreatedTime()) |
|
|
|
.ne(GroupMessage::getStatus, MessageStatus.RECALL.code()) |
|
|
|
.orderByDesc(GroupMessage::getId) |
|
|
|
.last("limit "+stIdx + ","+size); |
|
|
|
.last("limit " + stIdx + "," + size); |
|
|
|
|
|
|
|
List<GroupMessage> messages = this.list(wrapper); |
|
|
|
List<GroupMessageVO> messageInfos = messages.stream().map(m->BeanUtils.copyProperties(m, GroupMessageVO.class)).collect(Collectors.toList()); |
|
|
|
log.info("拉取群聊记录,用户id:{},群聊id:{},数量:{}",userId,groupId,messageInfos.size()); |
|
|
|
List<GroupMessageVO> messageInfos = messages.stream().map(m -> BeanUtils.copyProperties(m, GroupMessageVO.class)).collect(Collectors.toList()); |
|
|
|
log.info("拉取群聊记录,用户id:{},群聊id:{},数量:{}", userId, groupId, messageInfos.size()); |
|
|
|
return messageInfos; |
|
|
|
} |
|
|
|
|
|
|
|
|