From bf5bb70751217df8fe16d696412fb7b6aa52c603 Mon Sep 17 00:00:00 2001 From: xsx <825657193@qq.com> Date: Sun, 24 Sep 2023 12:41:16 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A4=9A=E7=BB=88=E7=AB=AF=E5=90=8C=E6=97=B6?= =?UTF-8?q?=E5=9C=A8=E7=BA=BF=E5=90=8E=E7=AB=AF=E6=94=B9=E9=80=A0=EF=BC=88?= =?UTF-8?q?=E5=BC=80=E5=8F=91=E4=B8=AD=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/com/bx/imclient/IMClient.java | 15 +- .../java/com/bx/imclient/sender/IMSender.java | 162 ++++++++++-------- .../com/bx/imcommon/contant/RedisKey.java | 5 +- .../com/bx/imcommon/model/IMGroupMessage.java | 43 +++++ .../bx/imcommon/model/IMPrivateMessage.java | 9 +- .../com/bx/imcommon/model/IMRecvInfo.java | 13 +- .../com/bx/imcommon/model/IMUserInfo.java | 30 ++++ .../com/bx/imcommon/model/SendResult.java | 18 +- .../com/bx/implatform/contant/RedisKey.java | 2 +- .../listener/GroupMessageListener.java | 9 +- .../listener/PrivateMessageListener.java | 28 +-- .../service/impl/GroupMessageServiceImpl.java | 67 +++++--- .../impl/PrivateMessageServiceImpl.java | 21 +-- .../service/impl/WebrtcServiceImpl.java | 22 +-- .../processor/GroupMessageProcessor.java | 53 +++--- .../processor/PrivateMessageProcessor.java | 18 +- .../task/AbstractPullMessageTask.java | 1 - .../task/PullUnreadGroupMessageTask.java | 13 +- .../task/PullUnreadPrivateMessageTask.java | 13 +- .../src/components/chat/ChatPrivateVideo.vue | 10 +- .../src/components/chat/ChatVideoAcceptor.vue | 64 +++---- im-ui/src/view/Home.vue | 5 +- 22 files changed, 331 insertions(+), 290 deletions(-) create mode 100644 im-commom/src/main/java/com/bx/imcommon/model/IMGroupMessage.java create mode 100644 im-commom/src/main/java/com/bx/imcommon/model/IMUserInfo.java diff --git a/im-client/src/main/java/com/bx/imclient/IMClient.java b/im-client/src/main/java/com/bx/imclient/IMClient.java index a004550..d47b082 100644 --- a/im-client/src/main/java/com/bx/imclient/IMClient.java +++ b/im-client/src/main/java/com/bx/imclient/IMClient.java @@ -1,14 +1,12 @@ package com.bx.imclient; -import com.bx.imclient.listener.MessageListenerMulticaster; import com.bx.imclient.sender.IMSender; -import com.bx.imcommon.model.GroupMessageInfo; +import com.bx.imcommon.model.IMGroupMessage; import com.bx.imcommon.model.IMPrivateMessage; -import com.bx.imcommon.model.PrivateMessageInfo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; -import java.util.List; + @Configuration public class IMClient { @@ -30,18 +28,17 @@ public class IMClient { * * @param message 私有消息 */ - public void sendPrivateMessage(IMPrivateMessage message){ + public void sendPrivateMessage(IMPrivateMessage message){ imSender.sendPrivateMessage(message); } /** * 发送群聊消息(发送结果通过MessageListener接收) * - * @param recvIds 群聊用户id列表 - * @param messageInfo 消息体,将转成json发送到客户端 + * @param message 群聊消息 */ - public void sendGroupMessage(List recvIds, GroupMessageInfo... messageInfo){ - imSender.sendGroupMessage(recvIds,messageInfo); + public void sendGroupMessage(IMGroupMessage message){ + imSender.sendGroupMessage(message); } diff --git a/im-client/src/main/java/com/bx/imclient/sender/IMSender.java b/im-client/src/main/java/com/bx/imclient/sender/IMSender.java index a76e2ad..785df96 100644 --- a/im-client/src/main/java/com/bx/imclient/sender/IMSender.java +++ b/im-client/src/main/java/com/bx/imclient/sender/IMSender.java @@ -22,29 +22,26 @@ public class IMSender { @Autowired @Qualifier("IMRedisTemplate") - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private MessageListenerMulticaster listenerMulticaster; public void sendPrivateMessage(IMPrivateMessage message) { - List terminals = message.getRecvTerminals(); - for (Integer terminal : terminals) { + for (Integer terminal : message.getRecvTerminals()) { // 获取对方连接的channelId String key = String.join(":",RedisKey.IM_USER_SERVER_ID, message.getRecvId().toString(), terminal.toString()); Integer serverId = (Integer) redisTemplate.opsForValue().get(key); // 如果对方在线,将数据存储至redis,等待拉取推送 if (serverId != null) { IMRecvInfo[] recvInfos = new IMRecvInfo[message.getDatas().size()]; - String sendKey = RedisKey.IM_UNREAD_PRIVATE_QUEUE + serverId; + String sendKey = String.join(":",RedisKey.IM_UNREAD_PRIVATE_QUEUE,serverId.toString()); for (int i = 0; i < message.getDatas().size(); i++) { IMRecvInfo recvInfo = new IMRecvInfo(); recvInfo.setCmd(IMCmdType.PRIVATE_MESSAGE.code()); - recvInfo.setRecvTerminal(terminal); recvInfo.setSendResult(message.getSendResult()); - List recvIds = new LinkedList<>(); - recvIds.add(message.getRecvId()); - recvInfo.setRecvIds(recvIds); + recvInfo.setSender(message.getSender()); + recvInfo.setReceivers(Collections.singletonList(new IMUserInfo(message.getRecvId(), terminal))); recvInfo.setData(message.getDatas().get(i)); recvInfos[i]=recvInfo; } @@ -53,8 +50,8 @@ public class IMSender { // 回复消息状态 for (int i = 0; i < message.getDatas().size(); i++) { SendResult result = new SendResult(); - result.setRecvId(message.getRecvId()); - result.setRecvTerminal(terminal); + result.setSender(message.getSender()); + result.setReceiver(new IMUserInfo(message.getRecvId(), terminal)); result.setCode(IMSendCode.NOT_ONLINE.code()); result.setData(message.getDatas().get(i)); listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, result); @@ -62,78 +59,95 @@ public class IMSender { } // 推送给自己的其他终端 - if (message.getSendToSelf() && !message.getSendTerminal().equals(terminal)) { - // 获取终端连接的channelId - key = String.join(":",RedisKey.IM_USER_SERVER_ID, message.getSendId().toString(), terminal.toString()); - serverId = (Integer) redisTemplate.opsForValue().get(key); - // 如果终端在线,将数据存储至redis,等待拉取推送 - if (serverId != null) { - String sendKey = RedisKey.IM_UNREAD_PRIVATE_QUEUE + serverId; - IMRecvInfo[] recvInfos = new IMRecvInfo[message.getDatas().size()]; - for (int i = 0; i < message.getDatas().size(); i++) { - IMRecvInfo recvInfo = new IMRecvInfo(); - recvInfo.setCmd(IMCmdType.PRIVATE_MESSAGE.code()); - recvInfo.setRecvTerminal(terminal); - // 自己的消息不需要回推消息结果 - recvInfo.setSendResult(false); - LinkedList recvIds = new LinkedList<>(); - recvIds.add(message.getSendId()); - recvInfo.setRecvIds(recvIds); - recvInfo.setData(message.getDatas().get(i)); - recvInfos[i]=recvInfo; + if (message.getSendToSelf() && !message.getSender().getTerminal().equals(terminal)) { + // 获取终端连接的channelId + key = String.join(":",RedisKey.IM_USER_SERVER_ID, message.getSender().getId().toString(), terminal.toString()); + serverId = (Integer) redisTemplate.opsForValue().get(key); + // 如果终端在线,将数据存储至redis,等待拉取推送 + if (serverId != null) { + String sendKey = String.join(":",RedisKey.IM_UNREAD_PRIVATE_QUEUE, serverId.toString()); + IMRecvInfo[] recvInfos = new IMRecvInfo[message.getDatas().size()]; + for (int i = 0; i < message.getDatas().size(); i++) { + IMRecvInfo recvInfo = new IMRecvInfo(); + // 自己的消息不需要回推消息结果 + recvInfo.setSendResult(false); + recvInfo.setCmd(IMCmdType.PRIVATE_MESSAGE.code()); + recvInfo.setSender(message.getSender()); + recvInfo.setReceivers(Collections.singletonList(new IMUserInfo(message.getSender().getId(),terminal))); + recvInfo.setData(message.getDatas().get(i)); + recvInfos[i]=recvInfo; + } + redisTemplate.opsForList().rightPushAll(sendKey, recvInfos); } - redisTemplate.opsForList().rightPushAll(sendKey, recvInfos); } - } } } - public void sendGroupMessage(List recvIds, GroupMessageInfo... messageInfos) { + public void sendGroupMessage(IMGroupMessage message) { // 根据群聊每个成员所连的IM-server,进行分组 - List offLineIds = Collections.synchronizedList(new LinkedList()); - Map> serverMap = new ConcurrentHashMap<>(); - recvIds.parallelStream().forEach(id -> { - String key = RedisKey.IM_USER_SERVER_ID + id; - Integer serverId = (Integer) redisTemplate.opsForValue().get(key); - if (serverId != null) { - // 此处需要加锁,否则list可以会被覆盖 - synchronized (serverMap) { - if (serverMap.containsKey(serverId)) { - serverMap.get(serverId).add(id); - } else { - List list = Collections.synchronizedList(new LinkedList()); - list.add(id); - serverMap.put(serverId, list); - } + List offLineUsers = Collections.synchronizedList(new LinkedList<>()); + // 格式:map<服务器id,list<接收方>> + Map> serverMap = new ConcurrentHashMap<>(); + for (Integer terminal : message.getRecvTerminals()) { + message.getRecvIds().parallelStream().forEach(id -> { + String key = String.join(":",RedisKey.IM_USER_SERVER_ID, id.toString(),terminal.toString()); + Integer serverId = (Integer)redisTemplate.opsForValue().get(key); + if (serverId != null) { + List list = serverMap.computeIfAbsent(serverId,o->Collections.synchronizedList(new LinkedList<>())); + list.add(new IMUserInfo(id,terminal)); + } else { + // 加入离线列表 + offLineUsers.add(new IMUserInfo(id,terminal)); } - } else { - offLineIds.add(id); - } - }); + }); + } // 逐个server发送 -// for (Map.Entry> entry : serverMap.entrySet()) { -// IMRecvInfo[] recvInfos = new IMRecvInfo[messageInfos.length]; -// for (int i = 0; i < messageInfos.length; i++) { -// IMRecvInfo recvInfo = new IMRecvInfo<>(); -// recvInfo.setCmd(IMCmdType.GROUP_MESSAGE.code()); -// recvInfo.setRecvIds(new LinkedList<>(entry.getValue())); -// recvInfo.setData(messageInfos[i]); -// recvInfos[i] = recvInfo; -// } -// String key = RedisKey.IM_UNREAD_GROUP_QUEUE + entry.getKey(); -// redisTemplate.opsForList().rightPushAll(key, recvInfos); -// } -// // 不在线的用户,回复消息状态 -// for (GroupMessageInfo messageInfo : messageInfos) { -// for (Long id : offLineIds) { -// // 回复消息状态 -// SendResult result = new SendResult(); -// result.setMessageInfo(messageInfo); -// result.setRecvId(id); -// result.setCode(IMSendCode.NOT_ONLINE); -// listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE, result); -// } -// } + for (Map.Entry> entry : serverMap.entrySet()) { + IMRecvInfo recvInfo = new IMRecvInfo(); + recvInfo.setCmd(IMCmdType.GROUP_MESSAGE.code()); + recvInfo.setReceivers(new LinkedList<>(entry.getValue())); + recvInfo.setSender(message.getSender()); + recvInfo.setSendResult(message.getSendResult()); + recvInfo.setData(message.getData()); + // 推送至队列 + String key = String.join(":",RedisKey.IM_UNREAD_GROUP_QUEUE,entry.getKey().toString()); + redisTemplate.opsForList().rightPush(key, recvInfo); + } + // 对离线用户回复消息状态 + if(message.getSendResult()){ + for (IMUserInfo offLineUser : offLineUsers) { + SendResult result = new SendResult(); + result.setSender(message.getSender()); + result.setReceiver(offLineUser); + result.setCode(IMSendCode.NOT_ONLINE.code()); + result.setData(message.getData()); + listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE, result); + } + } + + // 推送给自己的其他终端 + if (message.getSendToSelf()) { + for (Integer terminal : message.getRecvTerminals()) { + if(terminal.equals(message.getSender().getTerminal())){ + continue; + } + // 获取终端连接的channelId + String key = String.join(":",RedisKey.IM_USER_SERVER_ID, message.getSender().getId().toString(), terminal.toString()); + Integer serverId = (Integer) redisTemplate.opsForValue().get(key); + // 如果终端在线,将数据存储至redis,等待拉取推送 + if (serverId != null) { + IMRecvInfo recvInfo = new IMRecvInfo(); + recvInfo.setCmd(IMCmdType.GROUP_MESSAGE.code()); + recvInfo.setSender(message.getSender()); + recvInfo.setReceivers(Collections.singletonList(new IMUserInfo(message.getSender().getId(),terminal))); + // 自己的消息不需要回推消息结果 + recvInfo.setSendResult(false); + recvInfo.setData(message.getData()); + String sendKey = String.join(":",RedisKey.IM_UNREAD_GROUP_QUEUE,serverId.toString()); + redisTemplate.opsForList().rightPush(sendKey, recvInfo); + } + } + } } public Boolean isOnline(Long userId) { diff --git a/im-commom/src/main/java/com/bx/imcommon/contant/RedisKey.java b/im-commom/src/main/java/com/bx/imcommon/contant/RedisKey.java index 9831953..1e02da4 100644 --- a/im-commom/src/main/java/com/bx/imcommon/contant/RedisKey.java +++ b/im-commom/src/main/java/com/bx/imcommon/contant/RedisKey.java @@ -7,13 +7,12 @@ public class RedisKey { // 用户ID所连接的IM-server的ID public final static String IM_USER_SERVER_ID = "im:user:server_id"; // 未读私聊消息队列 - public final static String IM_UNREAD_PRIVATE_QUEUE = "im:unread:private:"; + public final static String IM_UNREAD_PRIVATE_QUEUE = "im:unread:private"; // 未读群聊消息队列 - public final static String IM_UNREAD_GROUP_QUEUE = "im:unread:group:"; + public final static String IM_UNREAD_GROUP_QUEUE = "im:unread:group"; // 私聊消息发送结果队列 public final static String IM_RESULT_PRIVATE_QUEUE = "im:result:private"; // 群聊消息发送结果队列 public final static String IM_RESULT_GROUP_QUEUE = "im:result:group"; - } diff --git a/im-commom/src/main/java/com/bx/imcommon/model/IMGroupMessage.java b/im-commom/src/main/java/com/bx/imcommon/model/IMGroupMessage.java new file mode 100644 index 0000000..9bbc905 --- /dev/null +++ b/im-commom/src/main/java/com/bx/imcommon/model/IMGroupMessage.java @@ -0,0 +1,43 @@ +package com.bx.imcommon.model; + +import com.bx.imcommon.enums.IMTerminalType; +import lombok.Data; + +import java.util.List; + +@Data +public class IMGroupMessage { + + /** + * 发送方 + */ + private IMUserInfo sender; + + /** + * 接收者id列表(群成员列表) + */ + private List recvIds; + + + /** + * 接收者终端类型,默认全部 + */ + private List recvTerminals = IMTerminalType.codes(); + + /** + * 是否发送给自己的其他终端,默认true + */ + private Boolean sendToSelf = true; + + /** + * 是否需要回推发送结果,默认true + */ + private Boolean sendResult = true; + + /** + * 消息内容 + */ + private T data; + + +} diff --git a/im-commom/src/main/java/com/bx/imcommon/model/IMPrivateMessage.java b/im-commom/src/main/java/com/bx/imcommon/model/IMPrivateMessage.java index a40a43c..f2b373e 100644 --- a/im-commom/src/main/java/com/bx/imcommon/model/IMPrivateMessage.java +++ b/im-commom/src/main/java/com/bx/imcommon/model/IMPrivateMessage.java @@ -10,14 +10,9 @@ import java.util.List; public class IMPrivateMessage { /** - * 发送者id + * 发送方 */ - private Long sendId; - - /** - * 发送者终端类型 IMTerminalType - */ - private Integer sendTerminal; + private IMUserInfo sender; /** * 接收者id diff --git a/im-commom/src/main/java/com/bx/imcommon/model/IMRecvInfo.java b/im-commom/src/main/java/com/bx/imcommon/model/IMRecvInfo.java index 73a51a3..b0939aa 100644 --- a/im-commom/src/main/java/com/bx/imcommon/model/IMRecvInfo.java +++ b/im-commom/src/main/java/com/bx/imcommon/model/IMRecvInfo.java @@ -13,19 +13,14 @@ public class IMRecvInfo { private Integer cmd; /* - * 发送者id + * 发送方 */ - private Long sendId; + private IMUserInfo sender; /* - * 接收终端类型 IMTerminalType + * 接收方用户列表 */ - private Integer recvTerminal; - - /* - * 接收者id列表 - */ - private List recvIds; + List receivers; /* * 是否需要回调发送结果 diff --git a/im-commom/src/main/java/com/bx/imcommon/model/IMUserInfo.java b/im-commom/src/main/java/com/bx/imcommon/model/IMUserInfo.java new file mode 100644 index 0000000..72007ea --- /dev/null +++ b/im-commom/src/main/java/com/bx/imcommon/model/IMUserInfo.java @@ -0,0 +1,30 @@ +package com.bx.imcommon.model; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * @author: 谢绍许 + * @date: 2023-09-24 09:23:11 + * @version: 1.0 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class IMUserInfo { + + /** + * 用户id + */ + private Long id; + + /** + * 用户终端类型 IMTerminalType + */ + private Integer terminal; + + +} diff --git a/im-commom/src/main/java/com/bx/imcommon/model/SendResult.java b/im-commom/src/main/java/com/bx/imcommon/model/SendResult.java index b4d2b9e..e303512 100644 --- a/im-commom/src/main/java/com/bx/imcommon/model/SendResult.java +++ b/im-commom/src/main/java/com/bx/imcommon/model/SendResult.java @@ -1,21 +1,19 @@ package com.bx.imcommon.model; -import com.bx.imcommon.enums.IMSendCode; -import com.bx.imcommon.enums.IMTerminalType; import lombok.Data; @Data -public class SendResult { +public class SendResult { - /* - * 接收者id + /** + * 发送方 */ - private Long recvId; + private IMUserInfo sender; - /* - * 接收者终端类型 IMTerminalType + /** + * 接收方 */ - private Integer recvTerminal; + private IMUserInfo receiver; /* * 发送状态 IMCmdType @@ -25,6 +23,6 @@ public class SendResult { /* * 消息内容 */ - private T data; + private Object data; } diff --git a/im-platform/src/main/java/com/bx/implatform/contant/RedisKey.java b/im-platform/src/main/java/com/bx/implatform/contant/RedisKey.java index a22b9ea..f66091d 100644 --- a/im-platform/src/main/java/com/bx/implatform/contant/RedisKey.java +++ b/im-platform/src/main/java/com/bx/implatform/contant/RedisKey.java @@ -3,7 +3,7 @@ package com.bx.implatform.contant; public class RedisKey { // 已读群聊消息位置(已读最大id) - public final static String IM_GROUP_READED_POSITION = "im:readed:group:position:"; + public final static String IM_GROUP_READED_POSITION = "im:readed:group:position"; // webrtc 会话信息 public final static String IM_WEBRTC_SESSION = "im:webrtc:session"; // 缓存前缀 diff --git a/im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java b/im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java index e2c3fc5..43202ae 100644 --- a/im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java +++ b/im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java @@ -23,14 +23,9 @@ public class GroupMessageListener implements MessageListener { @Override public void process(SendResult result){ GroupMessageInfo messageInfo = (GroupMessageInfo) result.getData(); - // 提示类数据不记录 - if(messageInfo.getType().equals(MessageType.TIP)){ - return; - } - // 保存该用户已拉取的最大消息id - if(result.getCode().equals(IMSendCode.SUCCESS)) { - String key = RedisKey.IM_GROUP_READED_POSITION + messageInfo.getGroupId() + ":" + result.getRecvId(); + if(result.getCode().equals(IMSendCode.SUCCESS.code())) { + String key = String.join(":",RedisKey.IM_GROUP_READED_POSITION,messageInfo.getGroupId().toString(),result.getReceiver().getId().toString()); redisTemplate.opsForValue().set(key, messageInfo.getId()); } } diff --git a/im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java b/im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java index db277a0..e8f9eed 100644 --- a/im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java +++ b/im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java @@ -35,32 +35,6 @@ public class PrivateMessageListener implements MessageListener { public void process(SendResult result){ PrivateMessageInfo messageInfo = (PrivateMessageInfo) result.getData(); IMSendCode resultCode = IMSendCode.fromCode(result.getCode()); - // 提示类数据不记录 - if(messageInfo.getType().equals(MessageType.TIP.code())){ - return; - } - // 视频通话信令不记录 - if(messageInfo.getType() >= MessageType.RTC_CALL.code() && messageInfo.getType()< MessageType.RTC_CANDIDATE.code()){ -// // 通知用户呼叫失败了 -// if(messageInfo.getType().equals(MessageType.RTC_CALL.code()) -// && !resultCode.equals(IMSendCode.SUCCESS)){ -// PrivateMessageInfo msgInfo = new PrivateMessageInfo(); -// msgInfo.setRecvId(messageInfo.getSendId()); -// msgInfo.setSendId(messageInfo.getRecvId()); -// msgInfo.setType(MessageType.RTC_FAILED.code()); -// msgInfo.setContent(resultCode.description()); -// msgInfo.setSendTime(new Date()); -// -// IMPrivateMessage sendMessage = new IMPrivateMessage(); -// sendMessage.setSendId(messageInfo.getSendId()); -// sendMessage.setRecvId(messageInfo.getRecvId()); -// sendMessage.setSendTerminal(result.getRecvTerminal()); -// sendMessage.setSendToSelf(false); -// sendMessage.setDatas(Arrays.asList(messageInfo)); -// imClient.sendPrivateMessage(sendMessage); -// } - return; - } // 更新消息状态,这里只处理成功消息,失败的消息继续保持未读状态 if(resultCode.equals(IMSendCode.SUCCESS)){ UpdateWrapper updateWrapper = new UpdateWrapper<>(); @@ -68,7 +42,7 @@ public class PrivateMessageListener implements MessageListener { .eq(PrivateMessage::getStatus, MessageStatus.UNREAD.code()) .set(PrivateMessage::getStatus, MessageStatus.ALREADY_READ.code()); privateMessageService.update(updateWrapper); - log.info("消息已读,消息id:{},发送者:{},接收者:{}",messageInfo.getId(),messageInfo.getSendId(),messageInfo.getRecvId()); + log.info("消息已读,消息id:{},发送者:{},接收者:{},终端:{}",messageInfo.getId(),result.getSender().getId(),result.getReceiver().getId(),result.getReceiver().getTerminal()); } } diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java index 8666b6a..71886b2 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java +++ b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java @@ -5,6 +5,8 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.bx.imclient.IMClient; import com.bx.imcommon.contant.Constant; import com.bx.imcommon.model.GroupMessageInfo; +import com.bx.imcommon.model.IMGroupMessage; +import com.bx.imcommon.model.IMUserInfo; import com.bx.implatform.contant.RedisKey; import com.bx.implatform.entity.Group; import com.bx.implatform.entity.GroupMember; @@ -18,6 +20,7 @@ import com.bx.implatform.service.IGroupMemberService; import com.bx.implatform.service.IGroupMessageService; import com.bx.implatform.service.IGroupService; import com.bx.implatform.session.SessionContext; +import com.bx.implatform.session.UserSession; import com.bx.implatform.util.BeanUtils; import com.bx.implatform.vo.GroupMessageVO; import lombok.extern.slf4j.Slf4j; @@ -56,7 +59,7 @@ public class GroupMessageServiceImpl extends ServiceImpl userIds = groupMemberService.findUserIdsByGroupId(group.getId()); - if(!userIds.contains(userId)){ + if(!userIds.contains(session.getUserId())){ throw new GlobalException(ResultCode.PROGRAM_ERROR,"您已不在群聊里面,无法发送消息"); } // 保存消息 GroupMessage msg = BeanUtils.copyProperties(vo, GroupMessage.class); - msg.setSendId(userId); + msg.setSendId(session.getUserId()); msg.setSendTime(new Date()); this.save(msg); // 不用发给自己 - userIds = userIds.stream().filter(id->userId.equals(id)).collect(Collectors.toList()); + userIds = userIds.stream().filter(id->!session.getUserId().equals(id)).collect(Collectors.toList()); // 群发 GroupMessageInfo msgInfo = BeanUtils.copyProperties(msg, GroupMessageInfo.class); - imClient.sendGroupMessage(userIds,msgInfo); - log.info("发送群聊消息,发送id:{},群聊id:{},内容:{}",userId,vo.getGroupId(),vo.getContent()); + IMGroupMessage sendMessage = new IMGroupMessage(); + sendMessage.setSender(new IMUserInfo(session.getUserId(),session.getTerminal())); + sendMessage.setRecvIds(userIds); + sendMessage.setData(msgInfo); + imClient.sendGroupMessage(sendMessage); + log.info("发送群聊消息,发送id:{},群聊id:{},内容:{}",session.getUserId(),vo.getGroupId(),vo.getContent()); return msg.getId(); } @@ -93,19 +100,19 @@ public class GroupMessageServiceImpl extends ServiceImpl Constant.ALLOW_RECALL_SECOND * 1000){ throw new GlobalException(ResultCode.PROGRAM_ERROR,"消息已发送超过5分钟,无法撤回"); } // 判断是否在群里 - GroupMember member = groupMemberService.findByGroupAndUserId(msg.getGroupId(),userId); + GroupMember member = groupMemberService.findByGroupAndUserId(msg.getGroupId(),session.getUserId()); if(member == null){ throw new GlobalException(ResultCode.PROGRAM_ERROR,"您已不在群聊里面,无法撤回消息"); } @@ -115,14 +122,20 @@ public class GroupMessageServiceImpl extends ServiceImpl userIds = groupMemberService.findUserIdsByGroupId(msg.getGroupId()); // 不用发给自己 - userIds = userIds.stream().filter(uid->userId.equals(uid)).collect(Collectors.toList()); + userIds = userIds.stream().filter(uid->!session.getUserId().equals(uid)).collect(Collectors.toList()); GroupMessageInfo msgInfo = BeanUtils.copyProperties(msg, GroupMessageInfo.class); msgInfo.setType(MessageType.TIP.code()); String content = String.format("'%s'撤回了一条消息",member.getAliasName()); msgInfo.setContent(content); msgInfo.setSendTime(new Date()); - imClient.sendGroupMessage(userIds,msgInfo); - log.info("撤回群聊消息,发送id:{},群聊id:{},内容:{}",userId,msg.getGroupId(),msg.getContent()); + + IMGroupMessage sendMessage = new IMGroupMessage(); + sendMessage.setSender(new IMUserInfo(session.getUserId(),session.getTerminal())); + sendMessage.setRecvIds(userIds); + sendMessage.setData(msgInfo); + sendMessage.setSendResult(false); + imClient.sendGroupMessage(sendMessage); + log.info("撤回群聊消息,发送id:{},群聊id:{},内容:{}",session.getUserId(),msg.getGroupId(),msg.getContent()); } @@ -133,18 +146,16 @@ public class GroupMessageServiceImpl extends ServiceImpl recvIds = new LinkedList(); - recvIds.add(userId); - List members = groupMemberService.findByUserId(userId); + UserSession session = SessionContext.getSession(); + List members = groupMemberService.findByUserId(session.getUserId()); for(GroupMember member:members){ // 获取群聊已读的最大消息id,只推送未读消息 - String key = RedisKey.IM_GROUP_READED_POSITION + member.getGroupId()+":"+userId; + String key = String.join(":",RedisKey.IM_GROUP_READED_POSITION,member.getGroupId().toString(),session.getUserId().toString()); Integer maxReadedId = (Integer)redisTemplate.opsForValue().get(key); QueryWrapper wrapper = new QueryWrapper(); wrapper.lambda().eq(GroupMessage::getGroupId,member.getGroupId()) .gt(GroupMessage::getSendTime,member.getCreatedTime()) - .ne(GroupMessage::getSendId, userId) + .ne(GroupMessage::getSendId, session.getUserId()) .ne(GroupMessage::getStatus, MessageStatus.RECALL.code()); if(maxReadedId!=null){ wrapper.lambda().gt(GroupMessage::getId,maxReadedId); @@ -154,15 +165,19 @@ public class GroupMessageServiceImpl extends ServiceImpl messageInfos = messages.stream().map(m->{ - GroupMessageInfo msgInfo = BeanUtils.copyProperties(m, GroupMessageInfo.class); - return msgInfo; - }).collect(Collectors.toList()); + // 推送 + for (GroupMessage message:messages ){ + GroupMessageInfo msgInfo = BeanUtils.copyProperties(message, GroupMessageInfo.class); + IMGroupMessage sendMessage = new IMGroupMessage(); + sendMessage.setSender(new IMUserInfo(session.getUserId(),session.getTerminal())); + // 只推给自己当前终端 + sendMessage.setRecvIds(Collections.singletonList(session.getUserId())); + sendMessage.setRecvTerminals(Collections.singletonList(session.getTerminal())); + sendMessage.setData(msgInfo); + imClient.sendGroupMessage(sendMessage); + } // 发送消息 - GroupMessageInfo[] infoArr = messageInfos.toArray(new GroupMessageInfo[messageInfos.size()]); - imClient.sendGroupMessage(Collections.singletonList(userId), infoArr); - log.info("拉取未读群聊消息,用户id:{},群聊id:{},数量:{}",userId,member.getGroupId(),messageInfos.size()); + log.info("拉取未读群聊消息,用户id:{},群聊id:{},数量:{}",session.getUserId(),member.getGroupId(),messages.size()); } } diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java index 09f9ac7..1109368 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java +++ b/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java @@ -4,8 +4,8 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.bx.imclient.IMClient; import com.bx.imcommon.contant.Constant; -import com.bx.imcommon.enums.IMTerminalType; import com.bx.imcommon.model.IMPrivateMessage; +import com.bx.imcommon.model.IMUserInfo; import com.bx.imcommon.model.PrivateMessageInfo; import com.bx.implatform.entity.PrivateMessage; import com.bx.implatform.enums.MessageStatus; @@ -22,6 +22,7 @@ import com.bx.implatform.vo.PrivateMessageVO; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; + import java.util.Collections; import java.util.Date; import java.util.List; @@ -58,9 +59,8 @@ public class PrivateMessageServiceImpl extends ServiceImpl sendMessage = new IMPrivateMessage<>(); - sendMessage.setSendId(msgInfo.getSendId()); + sendMessage.setSender(new IMUserInfo(session.getUserId(),session.getTerminal())); sendMessage.setRecvId(msgInfo.getRecvId()); - sendMessage.setSendTerminal(session.getTerminal()); sendMessage.setSendToSelf(true); sendMessage.setDatas(Collections.singletonList(msgInfo)); imClient.sendPrivateMessage(sendMessage); @@ -96,9 +96,8 @@ public class PrivateMessageServiceImpl extends ServiceImpl sendMessage = new IMPrivateMessage<>(); - sendMessage.setSendId(msgInfo.getSendId()); + sendMessage.setSender(new IMUserInfo(session.getUserId(),session.getTerminal())); sendMessage.setRecvId(msgInfo.getRecvId()); - sendMessage.setSendTerminal(session.getTerminal()); sendMessage.setSendToSelf(true); sendMessage.setDatas(Collections.singletonList(msgInfo)); imClient.sendPrivateMessage(sendMessage); @@ -142,14 +141,14 @@ public class PrivateMessageServiceImpl extends ServiceImpl queryWrapper = new QueryWrapper<>(); - queryWrapper.lambda().eq(PrivateMessage::getRecvId, userId) + queryWrapper.lambda().eq(PrivateMessage::getRecvId, session.getUserId()) .eq(PrivateMessage::getStatus, MessageStatus.UNREAD); List messages = this.list(queryWrapper); // 上传至redis,等待推送 @@ -157,11 +156,13 @@ public class PrivateMessageServiceImpl extends ServiceImpl messageInfos = messages.stream().map(m -> BeanUtils.copyProperties(m, PrivateMessageInfo.class)).collect(Collectors.toList()); // 推送消息 IMPrivateMessage sendMessage = new IMPrivateMessage<>(); - sendMessage.setRecvId(userId); + sendMessage.setSender(new IMUserInfo(session.getUserId(),session.getTerminal())); + sendMessage.setRecvId(session.getUserId()); + sendMessage.setRecvTerminals(Collections.singletonList(session.getTerminal())); sendMessage.setSendToSelf(false); sendMessage.setDatas(messageInfos); imClient.sendPrivateMessage(sendMessage); - log.info("拉取未读私聊消息,用户id:{},数量:{}", userId, messageInfos.size()); + log.info("拉取未读私聊消息,用户id:{},数量:{}", session.getUserId(), messageInfos.size()); } } } diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/WebrtcServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/WebrtcServiceImpl.java index 561efe6..910c097 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/impl/WebrtcServiceImpl.java +++ b/im-platform/src/main/java/com/bx/implatform/service/impl/WebrtcServiceImpl.java @@ -2,6 +2,7 @@ package com.bx.implatform.service.impl; import com.bx.imclient.IMClient; import com.bx.imcommon.model.IMPrivateMessage; +import com.bx.imcommon.model.IMUserInfo; import com.bx.imcommon.model.PrivateMessageInfo; import com.bx.implatform.config.ICEServer; import com.bx.implatform.config.ICEServerConfig; @@ -54,8 +55,7 @@ public class WebrtcServiceImpl implements IWebrtcService { messageInfo.setContent(offer); IMPrivateMessage sendMessage = new IMPrivateMessage<>(); - sendMessage.setSendId(session.getUserId()); - sendMessage.setSendTerminal(session.getTerminal()); + sendMessage.setSender(new IMUserInfo(session.getUserId(),session.getTerminal())); sendMessage.setRecvId(uid); sendMessage.setSendToSelf(false); sendMessage.setSendResult(false); @@ -82,8 +82,7 @@ public class WebrtcServiceImpl implements IWebrtcService { messageInfo.setContent(answer); IMPrivateMessage sendMessage = new IMPrivateMessage<>(); - sendMessage.setSendId(session.getUserId()); - sendMessage.setSendTerminal(session.getTerminal()); + sendMessage.setSender(new IMUserInfo(session.getUserId(),session.getTerminal())); sendMessage.setRecvId(uid); // 告知其他终端已经接受会话,中止呼叫 sendMessage.setSendToSelf(true); @@ -107,8 +106,7 @@ public class WebrtcServiceImpl implements IWebrtcService { messageInfo.setSendId(session.getUserId()); IMPrivateMessage sendMessage = new IMPrivateMessage<>(); - sendMessage.setSendId(session.getUserId()); - sendMessage.setSendTerminal(session.getTerminal()); + sendMessage.setSender(new IMUserInfo(session.getUserId(),session.getTerminal())); sendMessage.setRecvId(uid); // 告知其他终端已经拒绝会话,中止呼叫 sendMessage.setSendToSelf(true); @@ -130,8 +128,7 @@ public class WebrtcServiceImpl implements IWebrtcService { messageInfo.setSendId(session.getUserId()); IMPrivateMessage sendMessage = new IMPrivateMessage<>(); - sendMessage.setSendId(session.getUserId()); - sendMessage.setSendTerminal(session.getTerminal()); + sendMessage.setSender(new IMUserInfo(session.getUserId(),session.getTerminal())); sendMessage.setRecvId(uid); sendMessage.setSendToSelf(false); sendMessage.setSendResult(false); @@ -154,8 +151,7 @@ public class WebrtcServiceImpl implements IWebrtcService { messageInfo.setSendId(session.getUserId()); IMPrivateMessage sendMessage = new IMPrivateMessage<>(); - sendMessage.setSendId(session.getUserId()); - sendMessage.setSendTerminal(session.getTerminal()); + sendMessage.setSender(new IMUserInfo(session.getUserId(),session.getTerminal())); sendMessage.setRecvId(uid); // 告知其他终端已经会话失败,中止呼叫 sendMessage.setSendToSelf(true); @@ -181,8 +177,7 @@ public class WebrtcServiceImpl implements IWebrtcService { messageInfo.setSendId(session.getUserId()); IMPrivateMessage sendMessage = new IMPrivateMessage<>(); - sendMessage.setSendId(session.getUserId()); - sendMessage.setSendTerminal(session.getTerminal()); + sendMessage.setSender(new IMUserInfo(session.getUserId(),session.getTerminal())); sendMessage.setRecvId(uid); sendMessage.setSendToSelf(false); sendMessage.setSendResult(false); @@ -206,8 +201,7 @@ public class WebrtcServiceImpl implements IWebrtcService { messageInfo.setContent(candidate); IMPrivateMessage sendMessage = new IMPrivateMessage<>(); - sendMessage.setSendId(session.getUserId()); - sendMessage.setSendTerminal(session.getTerminal()); + sendMessage.setSender(new IMUserInfo(session.getUserId(),session.getTerminal())); sendMessage.setRecvId(uid); sendMessage.setSendToSelf(false); sendMessage.setSendResult(false); diff --git a/im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java b/im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java index b48628b..fc63f87 100644 --- a/im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java +++ b/im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java @@ -5,6 +5,7 @@ import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.enums.IMSendCode; import com.bx.imcommon.model.IMRecvInfo; import com.bx.imcommon.model.IMSendInfo; +import com.bx.imcommon.model.IMUserInfo; import com.bx.imcommon.model.SendResult; import com.bx.imserver.netty.UserChannelCtxMap; import io.netty.channel.ChannelHandlerContext; @@ -26,47 +27,45 @@ public class GroupMessageProcessor extends AbstractMessageProcessor @Async @Override public void process(IMRecvInfo recvInfo) { - Object data = recvInfo.getData(); - List recvIds = recvInfo.getRecvIds(); - log.info("接收到群消息,发送者:{},接收id:{},内容:{}",recvInfo.getSendId(),recvIds,data); - for(Long recvId:recvIds){ + IMUserInfo sender = recvInfo.getSender(); + List receivers = recvInfo.getReceivers(); + log.info("接收到群消息,发送者:{},接收用户数量:{},内容:{}",sender.getId(),receivers.size(),recvInfo.getData()); + for(IMUserInfo receiver:receivers){ try { - ChannelHandlerContext channelCtx = UserChannelCtxMap.getChannelCtx(recvId,recvInfo.getRecvTerminal()); + ChannelHandlerContext channelCtx = UserChannelCtxMap.getChannelCtx(receiver.getId(),receiver.getTerminal()); if(channelCtx != null){ // 推送消息到用户 IMSendInfo sendInfo = new IMSendInfo(); sendInfo.setCmd(IMCmdType.GROUP_MESSAGE.code()); - sendInfo.setData(data); + sendInfo.setData(recvInfo.getData()); channelCtx.channel().writeAndFlush(sendInfo); // 消息发送成功确认 - String key = RedisKey.IM_RESULT_GROUP_QUEUE; - SendResult sendResult = new SendResult(); - sendResult.setRecvId(recvId); - sendResult.setCode(IMSendCode.SUCCESS.code()); - sendResult.setData(data); - redisTemplate.opsForList().rightPush(key,sendResult); + sendResult(recvInfo,receiver,IMSendCode.SUCCESS); }else { - // 消息发送失败确认 - String key = RedisKey.IM_RESULT_GROUP_QUEUE; - SendResult sendResult = new SendResult(); - sendResult.setRecvId(recvId); - sendResult.setCode(IMSendCode.NOT_FIND_CHANNEL.code()); - sendResult.setData(data); - redisTemplate.opsForList().rightPush(key,sendResult); - log.error("未找到WS连接,发送者:{},接收id:{},内容:{}",recvInfo.getSendId(),recvId,data); + // 消息发送成功确认 + sendResult(recvInfo,receiver,IMSendCode.NOT_FIND_CHANNEL); + log.error("未找到channel,发送者:{},接收id:{},内容:{}",sender.getId(),receiver.getId(),recvInfo.getData()); } }catch (Exception e){ // 消息发送失败确认 - String key = RedisKey.IM_RESULT_GROUP_QUEUE; - SendResult sendResult = new SendResult(); - sendResult.setRecvId(recvId); - sendResult.setCode(IMSendCode.UNKONW_ERROR.code()); - sendResult.setData(data); - redisTemplate.opsForList().rightPush(key,sendResult); - log.error("发送消息异常,发送者:{},接收id:{},内容:{}",recvInfo.getSendId(),recvId,data); + sendResult(recvInfo,receiver,IMSendCode.UNKONW_ERROR); + log.error("发送消息异常,发送者:{},接收id:{},内容:{}",sender.getId(),receiver.getId(),recvInfo.getData()); } } } + + private void sendResult(IMRecvInfo recvInfo,IMUserInfo receiver,IMSendCode sendCode){ + if(recvInfo.getSendResult()) { + SendResult result = new SendResult(); + result.setSender(recvInfo.getSender()); + result.setReceiver(receiver); + result.setCode(sendCode.code()); + result.setData(recvInfo.getData()); + // 推送到结果队列 + String key = RedisKey.IM_RESULT_GROUP_QUEUE; + redisTemplate.opsForList().rightPush(key, result); + } + } } diff --git a/im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java b/im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java index 8f652e0..78044a1 100644 --- a/im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java +++ b/im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java @@ -5,6 +5,7 @@ import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.enums.IMSendCode; import com.bx.imcommon.model.IMRecvInfo; import com.bx.imcommon.model.IMSendInfo; +import com.bx.imcommon.model.IMUserInfo; import com.bx.imcommon.model.SendResult; import com.bx.imserver.netty.UserChannelCtxMap; import io.netty.channel.ChannelHandlerContext; @@ -22,10 +23,11 @@ public class PrivateMessageProcessor extends AbstractMessageProcessor { + }) + },(error) => { this.$message.error(error); }); diff --git a/im-ui/src/components/chat/ChatVideoAcceptor.vue b/im-ui/src/components/chat/ChatVideoAcceptor.vue index 8b035e4..0796c70 100644 --- a/im-ui/src/components/chat/ChatVideoAcceptor.vue +++ b/im-ui/src/components/chat/ChatVideoAcceptor.vue @@ -7,87 +7,86 @@ {{friend.nickName}} 请求和您进行视频通话...
-
-
+
+
+ \ No newline at end of file diff --git a/im-ui/src/view/Home.vue b/im-ui/src/view/Home.vue index 99bf056..cb10629 100644 --- a/im-ui/src/view/Home.vue +++ b/im-ui/src/view/Home.vue @@ -87,6 +87,8 @@ this.pullUnreadMessage(); }); this.$wsApi.onmessage((cmd, msgInfo) => { + // 标记这条消息是不是自己发的 + msgInfo.selfSend = msgInfo.sendId==this.$store.state.userStore.userInfo.id; if (cmd == 2) { // 异地登录,强制下线 this.$message.error("您已在其他地方登陆,将被强制下线"); @@ -116,7 +118,6 @@ }, handlePrivateMessage(msg) { // 好友列表存在好友信息,直接插入私聊消息 - msg.selfSend = msg.sendId==this.$store.state.userStore.userInfo.id; let friendId = msg.selfSend?msg.recvId:msg.sendId; let friend = this.$store.state.friendStore.friends.find((f) => f.id == friendId); if (friend) { @@ -191,7 +192,7 @@ // 插入消息 this.$store.commit("insertMessage", msg); // 播放提示音 - this.playAudioTip(); + !msg.selfSend && this.playAudioTip(); }, handleExit() { this.$wsApi.closeWebSocket();