|
|
@ -12,10 +12,7 @@ import org.springframework.beans.factory.annotation.Qualifier; |
|
|
import org.springframework.data.redis.core.RedisTemplate; |
|
|
import org.springframework.data.redis.core.RedisTemplate; |
|
|
import org.springframework.stereotype.Service; |
|
|
import org.springframework.stereotype.Service; |
|
|
|
|
|
|
|
|
import java.util.Collections; |
|
|
import java.util.*; |
|
|
import java.util.LinkedList; |
|
|
|
|
|
import java.util.List; |
|
|
|
|
|
import java.util.Map; |
|
|
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
|
|
|
|
|
@Service |
|
|
@Service |
|
|
@ -81,22 +78,29 @@ public class IMSender { |
|
|
|
|
|
|
|
|
public<T> void sendGroupMessage(IMGroupMessage<T> message) { |
|
|
public<T> void sendGroupMessage(IMGroupMessage<T> message) { |
|
|
// 根据群聊每个成员所连的IM-server,进行分组
|
|
|
// 根据群聊每个成员所连的IM-server,进行分组
|
|
|
List<IMUserInfo> offLineUsers = Collections.synchronizedList(new LinkedList<>()); |
|
|
Map<String, IMUserInfo> sendMap = new HashMap<>(); |
|
|
// 格式:map<服务器id,list<接收方>>
|
|
|
|
|
|
Map<Integer, List<IMUserInfo>> serverMap = new ConcurrentHashMap<>(); |
|
|
|
|
|
for (Integer terminal : message.getRecvTerminals()) { |
|
|
for (Integer terminal : message.getRecvTerminals()) { |
|
|
message.getRecvIds().parallelStream().forEach(id -> { |
|
|
message.getRecvIds().stream().forEach(id -> { |
|
|
String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, id.toString(), terminal.toString()); |
|
|
String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, id.toString(), terminal.toString()); |
|
|
Integer serverId = (Integer)redisTemplate.opsForValue().get(key); |
|
|
sendMap.put(key,new IMUserInfo(id, terminal)); |
|
|
if (serverId != null) { |
|
|
|
|
|
List<IMUserInfo> list = serverMap.computeIfAbsent(serverId, o -> Collections.synchronizedList(new LinkedList<>())); |
|
|
|
|
|
list.add(new IMUserInfo(id, terminal)); |
|
|
|
|
|
} else { |
|
|
|
|
|
// 加入离线列表
|
|
|
|
|
|
offLineUsers.add(new IMUserInfo(id, terminal)); |
|
|
|
|
|
} |
|
|
|
|
|
}); |
|
|
}); |
|
|
} |
|
|
} |
|
|
|
|
|
// 批量拉取
|
|
|
|
|
|
List<Object> serverIds = redisTemplate.opsForValue().multiGet(sendMap.keySet()); |
|
|
|
|
|
// 格式:map<服务器id,list<接收方>>
|
|
|
|
|
|
Map<Integer, List<IMUserInfo>> serverMap = new HashMap<>(); |
|
|
|
|
|
List<IMUserInfo> offLineUsers = Collections.synchronizedList(new LinkedList<>()); |
|
|
|
|
|
int idx = 0; |
|
|
|
|
|
for (Map.Entry<String,IMUserInfo> entry : sendMap.entrySet()) { |
|
|
|
|
|
Integer serverId = (Integer)serverIds.get(idx++); |
|
|
|
|
|
if (serverId != null) { |
|
|
|
|
|
List<IMUserInfo> list = serverMap.computeIfAbsent(serverId, o -> Collections.synchronizedList(new LinkedList<>())); |
|
|
|
|
|
list.add(entry.getValue()); |
|
|
|
|
|
} else { |
|
|
|
|
|
// 加入离线列表
|
|
|
|
|
|
offLineUsers.add(entry.getValue()); |
|
|
|
|
|
} |
|
|
|
|
|
}; |
|
|
// 逐个server发送
|
|
|
// 逐个server发送
|
|
|
for (Map.Entry<Integer, List<IMUserInfo>> entry : serverMap.entrySet()) { |
|
|
for (Map.Entry<Integer, List<IMUserInfo>> entry : serverMap.entrySet()) { |
|
|
IMRecvInfo recvInfo = new IMRecvInfo(); |
|
|
IMRecvInfo recvInfo = new IMRecvInfo(); |
|
|
|