|
|
|
@ -3,6 +3,7 @@ package com.bx.implatform.service.impl; |
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; |
|
|
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; |
|
|
|
import com.bx.common.contant.RedisKey; |
|
|
|
import com.bx.common.enums.MessageTypeEnum; |
|
|
|
import com.bx.common.enums.ResultCode; |
|
|
|
import com.bx.common.model.im.GroupMessageInfo; |
|
|
|
import com.bx.common.util.BeanUtils; |
|
|
|
@ -46,7 +47,7 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro |
|
|
|
* @return |
|
|
|
*/ |
|
|
|
@Override |
|
|
|
public void sendMessage(GroupMessageVO vo) { |
|
|
|
public Long sendMessage(GroupMessageVO vo) { |
|
|
|
Long userId = SessionContext.getSession().getId(); |
|
|
|
Group group = groupService.getById(vo.getGroupId()); |
|
|
|
if(group == null){ |
|
|
|
@ -65,31 +66,49 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro |
|
|
|
msg.setSendId(userId); |
|
|
|
msg.setSendTime(new Date()); |
|
|
|
this.save(msg); |
|
|
|
// 根据群聊每个成员所连的IM-server,进行分组
|
|
|
|
Map<Integer,List<Long>> serverMap = new ConcurrentHashMap<>(); |
|
|
|
userIds.parallelStream().forEach(id->{ |
|
|
|
String key = RedisKey.IM_USER_SERVER_ID + id; |
|
|
|
Integer serverId = (Integer)redisTemplate.opsForValue().get(key); |
|
|
|
if(serverId != null){ |
|
|
|
if(serverMap.containsKey(serverId)){ |
|
|
|
serverMap.get(serverId).add(id); |
|
|
|
}else { |
|
|
|
List<Long> list = Collections.synchronizedList(new LinkedList<Long>()); |
|
|
|
list.add(id); |
|
|
|
serverMap.put(serverId,list); |
|
|
|
// 群发
|
|
|
|
GroupMessageInfo msgInfo = BeanUtils.copyProperties(msg, GroupMessageInfo.class); |
|
|
|
this.sendMessage(userIds,msgInfo); |
|
|
|
log.info("发送群聊消息,发送id:{},群聊id:{},内容:{}",userId,vo.getGroupId(),vo.getContent()); |
|
|
|
return msg.getId(); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
* 撤回消息 |
|
|
|
* |
|
|
|
* @param id 消息id |
|
|
|
*/ |
|
|
|
@Override |
|
|
|
public void recallMessage(Long id) { |
|
|
|
Long userId = SessionContext.getSession().getId(); |
|
|
|
GroupMessage msg = this.getById(id); |
|
|
|
if(msg == null){ |
|
|
|
throw new GlobalException(ResultCode.PROGRAM_ERROR,"消息不存在"); |
|
|
|
} |
|
|
|
}); |
|
|
|
// 逐个server发送
|
|
|
|
for (Map.Entry<Integer,List<Long>> entry : serverMap.entrySet()) { |
|
|
|
GroupMessageInfo msgInfo = BeanUtils.copyProperties(msg, GroupMessageInfo.class); |
|
|
|
msgInfo.setRecvIds(new LinkedList<>(entry.getValue())); |
|
|
|
String key = RedisKey.IM_UNREAD_GROUP_MESSAGE +entry.getKey(); |
|
|
|
redisTemplate.opsForList().rightPush(key,msgInfo); |
|
|
|
if(msg.getSendId() != userId){ |
|
|
|
throw new GlobalException(ResultCode.PROGRAM_ERROR,"这条消息不是您发送的呢"); |
|
|
|
} |
|
|
|
log.info("发送群聊消息,发送id:{},群聊id:{}",userId,vo.getGroupId()); |
|
|
|
// 判断是否在群里
|
|
|
|
GroupMember member = groupMemberService.findByGroupAndUserId(msg.getGroupId(),userId); |
|
|
|
if(member == null){ |
|
|
|
throw new GlobalException(ResultCode.PROGRAM_ERROR,"您已不在群聊里面,无法撤回消息"); |
|
|
|
} |
|
|
|
// 直接物理删除
|
|
|
|
this.removeById(id); |
|
|
|
// 群发
|
|
|
|
List<Long> userIds = groupMemberService.findUserIdsByGroupId(msg.getGroupId()); |
|
|
|
GroupMessageInfo msgInfo = BeanUtils.copyProperties(msg, GroupMessageInfo.class); |
|
|
|
msgInfo.setType(MessageTypeEnum.TIP.getCode()); |
|
|
|
String content = String.format("'%s'撤回了一条消息",member.getAliasName()); |
|
|
|
msgInfo.setContent(content); |
|
|
|
this.sendMessage(userIds,msgInfo); |
|
|
|
log.info("删除群聊消息,发送id:{},群聊id:{},内容:{}",userId,msg.getGroupId(),msg.getContent()); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
* 异步拉取群聊消息,通过websocket异步推送 |
|
|
|
* |
|
|
|
@ -129,6 +148,35 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro |
|
|
|
}).collect(Collectors.toList()); |
|
|
|
key = RedisKey.IM_UNREAD_GROUP_MESSAGE + serverId; |
|
|
|
redisTemplate.opsForList().rightPushAll(key,messageInfos.toArray()); |
|
|
|
log.info("拉取未读群聊消息,用户id:{},群聊id:{},数量:{}",userId,member.getGroupId(),messageInfos.size()); |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
private void sendMessage(List<Long> userIds, GroupMessageInfo msgInfo){ |
|
|
|
// 根据群聊每个成员所连的IM-server,进行分组
|
|
|
|
Map<Integer,List<Long>> serverMap = new ConcurrentHashMap<>(); |
|
|
|
userIds.parallelStream().forEach(id->{ |
|
|
|
String key = RedisKey.IM_USER_SERVER_ID + id; |
|
|
|
Integer serverId = (Integer)redisTemplate.opsForValue().get(key); |
|
|
|
if(serverId != null){ |
|
|
|
if(serverMap.containsKey(serverId)){ |
|
|
|
serverMap.get(serverId).add(id); |
|
|
|
}else { |
|
|
|
// 此处需要加锁,否则list可以会被覆盖
|
|
|
|
synchronized(serverMap){ |
|
|
|
List<Long> list = Collections.synchronizedList(new LinkedList<Long>()); |
|
|
|
list.add(id); |
|
|
|
serverMap.put(serverId,list); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
}); |
|
|
|
// 逐个server发送
|
|
|
|
for (Map.Entry<Integer,List<Long>> entry : serverMap.entrySet()) { |
|
|
|
msgInfo.setRecvIds(new LinkedList<>(entry.getValue())); |
|
|
|
String key = RedisKey.IM_UNREAD_GROUP_MESSAGE +entry.getKey(); |
|
|
|
redisTemplate.opsForList().rightPush(key,msgInfo); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|