diff --git a/im-client/src/main/java/com/bx/imclient/sender/IMSender.java b/im-client/src/main/java/com/bx/imclient/sender/IMSender.java index 8221d17..a76e2ad 100644 --- a/im-client/src/main/java/com/bx/imclient/sender/IMSender.java +++ b/im-client/src/main/java/com/bx/imclient/sender/IMSender.java @@ -5,7 +5,6 @@ import com.bx.imcommon.contant.RedisKey; import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.enums.IMListenerType; import com.bx.imcommon.enums.IMSendCode; -import com.bx.imcommon.enums.IMTerminalType; import com.bx.imcommon.model.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -28,10 +27,11 @@ public class IMSender { @Autowired private MessageListenerMulticaster listenerMulticaster; - public void sendPrivateMessage(IMPrivateMessage message) { - for (IMTerminalType terminal : IMTerminalType.values()) { + public void sendPrivateMessage(IMPrivateMessage message) { + List terminals = message.getRecvTerminals(); + for (Integer terminal : terminals) { // 获取对方连接的channelId - String key = String.join(":",RedisKey.IM_USER_SERVER_ID, message.getRecvId().toString(), terminal.code().toString()); + String key = String.join(":",RedisKey.IM_USER_SERVER_ID, message.getRecvId().toString(), terminal.toString()); Integer serverId = (Integer) redisTemplate.opsForValue().get(key); // 如果对方在线,将数据存储至redis,等待拉取推送 if (serverId != null) { @@ -40,22 +40,21 @@ public class IMSender { for (int i = 0; i < message.getDatas().size(); i++) { IMRecvInfo recvInfo = new IMRecvInfo(); recvInfo.setCmd(IMCmdType.PRIVATE_MESSAGE.code()); - recvInfo.setRecvTerminal(terminal.code()); - recvInfo.setNeedSendResult(true); - List recvIds = new LinkedList(); + recvInfo.setRecvTerminal(terminal); + recvInfo.setSendResult(message.getSendResult()); + List recvIds = new LinkedList<>(); recvIds.add(message.getRecvId()); recvInfo.setRecvIds(recvIds); recvInfo.setData(message.getDatas().get(i)); recvInfos[i]=recvInfo; } redisTemplate.opsForList().rightPushAll(sendKey, recvInfos); - - } else { + } else if(message.getSendResult()){ // 回复消息状态 for (int i = 0; i < message.getDatas().size(); i++) { SendResult result = new SendResult(); result.setRecvId(message.getRecvId()); - result.setRecvTerminal(terminal.code()); + result.setRecvTerminal(terminal); result.setCode(IMSendCode.NOT_ONLINE.code()); result.setData(message.getDatas().get(i)); listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, result); @@ -63,9 +62,9 @@ public class IMSender { } // 推送给自己的其他终端 - if (message.getSendToSelf() && !message.getSendTerminal().equals(terminal.code())) { + if (message.getSendToSelf() && !message.getSendTerminal().equals(terminal)) { // 获取终端连接的channelId - key = String.join(":",RedisKey.IM_USER_SERVER_ID, message.getSendId().toString(), terminal.code().toString()); + key = String.join(":",RedisKey.IM_USER_SERVER_ID, message.getSendId().toString(), terminal.toString()); serverId = (Integer) redisTemplate.opsForValue().get(key); // 如果终端在线,将数据存储至redis,等待拉取推送 if (serverId != null) { @@ -74,10 +73,10 @@ public class IMSender { for (int i = 0; i < message.getDatas().size(); i++) { IMRecvInfo recvInfo = new IMRecvInfo(); recvInfo.setCmd(IMCmdType.PRIVATE_MESSAGE.code()); - recvInfo.setRecvTerminal(terminal.code()); + recvInfo.setRecvTerminal(terminal); // 自己的消息不需要回推消息结果 - recvInfo.setNeedSendResult(false); - List recvIds = new LinkedList(); + recvInfo.setSendResult(false); + LinkedList recvIds = new LinkedList<>(); recvIds.add(message.getSendId()); recvInfo.setRecvIds(recvIds); recvInfo.setData(message.getDatas().get(i)); diff --git a/im-commom/src/main/java/com/bx/imcommon/enums/IMTerminalType.java b/im-commom/src/main/java/com/bx/imcommon/enums/IMTerminalType.java index c9330a3..7ca76f1 100644 --- a/im-commom/src/main/java/com/bx/imcommon/enums/IMTerminalType.java +++ b/im-commom/src/main/java/com/bx/imcommon/enums/IMTerminalType.java @@ -1,5 +1,8 @@ package com.bx.imcommon.enums; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; public enum IMTerminalType { @@ -24,6 +27,9 @@ public enum IMTerminalType { return null; } + public static List codes(){ + return Arrays.stream(values()).map(IMTerminalType::code).collect(Collectors.toList()); + } public String description() { return desc; diff --git a/im-commom/src/main/java/com/bx/imcommon/model/IMPrivateMessage.java b/im-commom/src/main/java/com/bx/imcommon/model/IMPrivateMessage.java index 9b5e4b9..a40a43c 100644 --- a/im-commom/src/main/java/com/bx/imcommon/model/IMPrivateMessage.java +++ b/im-commom/src/main/java/com/bx/imcommon/model/IMPrivateMessage.java @@ -3,7 +3,6 @@ package com.bx.imcommon.model; import com.bx.imcommon.enums.IMTerminalType; import lombok.Data; -import java.util.LinkedList; import java.util.List; @@ -21,17 +20,28 @@ public class IMPrivateMessage { private Integer sendTerminal; /** - * 是否发送给自己的其他终端 + * 接收者id */ - private Boolean sendToSelf ; + private Long recvId; + /** - * 接收者id + * 接收者终端类型,默认全部 */ - private Long recvId; + private List recvTerminals = IMTerminalType.codes(); + + /** + * 是否发送给自己的其他终端,默认true + */ + private Boolean sendToSelf = true; + + /** + * 是否需要回推发送结果,默认true + */ + private Boolean sendResult = true; /** - * 消息内容(可一次性发送多条) + * 消息内容 */ private List datas; diff --git a/im-commom/src/main/java/com/bx/imcommon/model/IMRecvInfo.java b/im-commom/src/main/java/com/bx/imcommon/model/IMRecvInfo.java index baf8633..73a51a3 100644 --- a/im-commom/src/main/java/com/bx/imcommon/model/IMRecvInfo.java +++ b/im-commom/src/main/java/com/bx/imcommon/model/IMRecvInfo.java @@ -30,7 +30,7 @@ public class IMRecvInfo { /* * 是否需要回调发送结果 */ - private Boolean needSendResult = true; + private Boolean sendResult; /* * 推送消息体 diff --git a/im-platform/src/main/java/com/bx/implatform/contant/RedisKey.java b/im-platform/src/main/java/com/bx/implatform/contant/RedisKey.java index 9d25026..a22b9ea 100644 --- a/im-platform/src/main/java/com/bx/implatform/contant/RedisKey.java +++ b/im-platform/src/main/java/com/bx/implatform/contant/RedisKey.java @@ -4,6 +4,8 @@ public class RedisKey { // 已读群聊消息位置(已读最大id) public final static String IM_GROUP_READED_POSITION = "im:readed:group:position:"; + // webrtc 会话信息 + public final static String IM_WEBRTC_SESSION = "im:webrtc:session"; // 缓存前缀 public final static String IM_CACHE = "im:cache:"; // 缓存是否好友:bool diff --git a/im-platform/src/main/java/com/bx/implatform/controller/WebrtcController.java b/im-platform/src/main/java/com/bx/implatform/controller/WebrtcController.java index 0cc8310..9fca97c 100644 --- a/im-platform/src/main/java/com/bx/implatform/controller/WebrtcController.java +++ b/im-platform/src/main/java/com/bx/implatform/controller/WebrtcController.java @@ -1,26 +1,15 @@ package com.bx.implatform.controller; - -import cn.hutool.core.util.ArrayUtil; -import com.bx.imclient.IMClient; -import com.bx.imcommon.enums.IMTerminalType; -import com.bx.imcommon.model.IMPrivateMessage; -import com.bx.imcommon.model.PrivateMessageInfo; -import com.bx.implatform.config.ICEServerConfig; -import com.bx.implatform.enums.MessageType; -import com.bx.implatform.exception.GlobalException; +import com.bx.implatform.config.ICEServer; import com.bx.implatform.result.Result; import com.bx.implatform.result.ResultUtils; -import com.bx.implatform.session.SessionContext; -import com.bx.implatform.session.UserSession; +import com.bx.implatform.service.IWebrtcService; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; -import org.apache.commons.lang.ArrayUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; -import java.lang.reflect.Array; -import java.util.Arrays; +import java.util.List; @Api(tags = "webrtc视频单人通话") @RestController @@ -28,25 +17,19 @@ import java.util.Arrays; public class WebrtcController { @Autowired - private IMClient imClient; - - @Autowired - private ICEServerConfig iceServerConfig; + private IWebrtcService webrtcService; @ApiOperation(httpMethod = "POST", value = "呼叫视频通话") @PostMapping("/call") public Result call(@RequestParam Long uid, @RequestBody String offer) { - if(!imClient.isOnline(uid)){ - throw new GlobalException("对方目前不在线"); - } - imClient.sendPrivateMessage(buildSendMessage(MessageType.RTC_CALL,uid,offer)); + webrtcService.call(uid,offer); return ResultUtils.success(); } @ApiOperation(httpMethod = "POST", value = "接受视频通话") @PostMapping("/accept") public Result accept(@RequestParam Long uid,@RequestBody String answer) { - imClient.sendPrivateMessage(buildSendMessage(MessageType.RTC_ACCEPT,uid,answer)); + webrtcService.accept(uid,answer); return ResultUtils.success(); } @@ -54,59 +37,43 @@ public class WebrtcController { @ApiOperation(httpMethod = "POST", value = "拒绝视频通话") @PostMapping("/reject") public Result reject(@RequestParam Long uid) { - imClient.sendPrivateMessage(buildSendMessage(MessageType.RTC_REJECT,uid,null)); + webrtcService.reject(uid); return ResultUtils.success(); } @ApiOperation(httpMethod = "POST", value = "取消呼叫") @PostMapping("/cancel") public Result cancel(@RequestParam Long uid) { - imClient.sendPrivateMessage(buildSendMessage(MessageType.RTC_CANCEL,uid,null)); + webrtcService.cancel(uid); return ResultUtils.success(); } @ApiOperation(httpMethod = "POST", value = "呼叫失败") @PostMapping("/failed") public Result failed(@RequestParam Long uid,@RequestParam String reason) { - imClient.sendPrivateMessage(buildSendMessage(MessageType.RTC_FAILED,uid,reason)); + webrtcService.failed(uid,reason); return ResultUtils.success(); } @ApiOperation(httpMethod = "POST", value = "挂断") @PostMapping("/handup") public Result leave(@RequestParam Long uid) { - imClient.sendPrivateMessage(buildSendMessage(MessageType.RTC_HANDUP,uid,null)); + webrtcService.leave(uid); return ResultUtils.success(); } @PostMapping("/candidate") @ApiOperation(httpMethod = "POST", value = "同步candidate") - public Result candidate(@RequestParam Long uid,@RequestBody String candidate ) { - imClient.sendPrivateMessage(buildSendMessage(MessageType.RTC_CANDIDATE,uid,candidate)); + public Result forwardCandidate(@RequestParam Long uid,@RequestBody String candidate ) { + webrtcService.candidate(uid,candidate); return ResultUtils.success(); } + @GetMapping("/iceservers") @ApiOperation(httpMethod = "GET", value = "获取iceservers") - public Result iceservers() { - return ResultUtils.success(iceServerConfig.getIceServers()); - } - - private IMPrivateMessage buildSendMessage(MessageType messageType,Long uid,String content){ - UserSession session = SessionContext.getSession(); - PrivateMessageInfo messageInfo = new PrivateMessageInfo(); - messageInfo.setType(messageType.code()); - messageInfo.setRecvId(uid); - messageInfo.setSendId(session.getUserId()); - messageInfo.setContent(content); - - IMPrivateMessage sendMessage = new IMPrivateMessage(); - sendMessage.setSendId(session.getUserId()); - sendMessage.setSendTerminal(session.getTerminal()); - sendMessage.setSendToSelf(false); - sendMessage.setRecvId(uid); - sendMessage.setDatas(Arrays.asList(messageInfo)); - return sendMessage; + public Result> iceservers() { + return ResultUtils.success(webrtcService.getIceServers()); } } diff --git a/im-platform/src/main/java/com/bx/implatform/service/IWebrtcService.java b/im-platform/src/main/java/com/bx/implatform/service/IWebrtcService.java new file mode 100644 index 0000000..0b2e345 --- /dev/null +++ b/im-platform/src/main/java/com/bx/implatform/service/IWebrtcService.java @@ -0,0 +1,32 @@ +package com.bx.implatform.service; + +import com.bx.implatform.config.ICEServer; +import org.springframework.web.bind.annotation.RequestBody; +import java.util.List; + + +/** + * webrtc 通信服务 + * @author + */ +public interface IWebrtcService { + + void call(Long uid, String offer); + + void accept( Long uid,@RequestBody String answer); + + void reject( Long uid); + + void cancel( Long uid); + + void failed( Long uid, String reason); + + void leave( Long uid) ; + + void candidate( Long uid, String candidate); + + List getIceServers(); + + + +} diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java index 23d3a55..09f9ac7 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java +++ b/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java @@ -19,13 +19,10 @@ import com.bx.implatform.session.SessionContext; import com.bx.implatform.session.UserSession; import com.bx.implatform.util.BeanUtils; import com.bx.implatform.vo.PrivateMessageVO; -import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; - -import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.List; import java.util.stream.Collectors; @@ -60,12 +57,12 @@ public class PrivateMessageServiceImpl extends ServiceImpl sendMessage = new IMPrivateMessage(); + IMPrivateMessage sendMessage = new IMPrivateMessage<>(); sendMessage.setSendId(msgInfo.getSendId()); sendMessage.setRecvId(msgInfo.getRecvId()); sendMessage.setSendTerminal(session.getTerminal()); sendMessage.setSendToSelf(true); - sendMessage.setDatas(Arrays.asList(msgInfo)); + sendMessage.setDatas(Collections.singletonList(msgInfo)); imClient.sendPrivateMessage(sendMessage); log.info("发送私聊消息,发送id:{},接收id:{},内容:{}", session.getUserId(), vo.getRecvId(), vo.getContent()); return msg.getId(); @@ -98,12 +95,12 @@ public class PrivateMessageServiceImpl extends ServiceImpl sendMessage = new IMPrivateMessage(); + IMPrivateMessage sendMessage = new IMPrivateMessage<>(); sendMessage.setSendId(msgInfo.getSendId()); sendMessage.setRecvId(msgInfo.getRecvId()); sendMessage.setSendTerminal(session.getTerminal()); sendMessage.setSendToSelf(true); - sendMessage.setDatas(Arrays.asList(msgInfo)); + sendMessage.setDatas(Collections.singletonList(msgInfo)); imClient.sendPrivateMessage(sendMessage); log.info("撤回私聊消息,发送id:{},接收id:{},内容:{}", msg.getSendId(), msg.getRecvId(), msg.getContent()); } @@ -122,7 +119,7 @@ public class PrivateMessageServiceImpl extends ServiceImpl 0 ? page : 1; size = size > 0 ? size : 10; Long userId = SessionContext.getSession().getUserId(); - Long stIdx = (page - 1) * size; + long stIdx = (page - 1) * size; QueryWrapper wrapper = new QueryWrapper<>(); wrapper.lambda().and(wrap -> wrap.and( wp -> wp.eq(PrivateMessage::getSendId, userId) @@ -159,7 +156,7 @@ public class PrivateMessageServiceImpl extends ServiceImpl messageInfos = messages.stream().map(m -> BeanUtils.copyProperties(m, PrivateMessageInfo.class)).collect(Collectors.toList()); // 推送消息 - IMPrivateMessage sendMessage = new IMPrivateMessage(); + IMPrivateMessage sendMessage = new IMPrivateMessage<>(); sendMessage.setRecvId(userId); sendMessage.setSendToSelf(false); sendMessage.setDatas(messageInfos); diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/WebrtcServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/WebrtcServiceImpl.java new file mode 100644 index 0000000..561efe6 --- /dev/null +++ b/im-platform/src/main/java/com/bx/implatform/service/impl/WebrtcServiceImpl.java @@ -0,0 +1,252 @@ +package com.bx.implatform.service.impl; + +import com.bx.imclient.IMClient; +import com.bx.imcommon.model.IMPrivateMessage; +import com.bx.imcommon.model.PrivateMessageInfo; +import com.bx.implatform.config.ICEServer; +import com.bx.implatform.config.ICEServerConfig; +import com.bx.implatform.contant.RedisKey; +import com.bx.implatform.enums.MessageType; +import com.bx.implatform.exception.GlobalException; +import com.bx.implatform.service.IWebrtcService; +import com.bx.implatform.session.SessionContext; +import com.bx.implatform.session.UserSession; +import com.bx.implatform.session.WebrtcSession; +import io.swagger.models.auth.In; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Service; +import org.springframework.web.bind.annotation.RequestBody; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Service +public class WebrtcServiceImpl implements IWebrtcService { + + @Autowired + private IMClient imClient; + @Autowired + private RedisTemplate redisTemplate; + @Autowired + private ICEServerConfig iceServerConfig; + + @Override + public void call(Long uid, String offer) { + UserSession session = SessionContext.getSession(); + if (!imClient.isOnline(uid)) { + throw new GlobalException("对方目前不在线"); + } + // 创建webrtc会话 + WebrtcSession webrtcSession = new WebrtcSession(); + webrtcSession.setCallerId(session.getUserId()); + webrtcSession.setCallerTerminal(session.getTerminal()); + String key = getSessionKey(session.getUserId(), uid); + redisTemplate.opsForValue().set(key, webrtcSession, 12, TimeUnit.HOURS); + // 向对方所有终端发起呼叫 + PrivateMessageInfo messageInfo = new PrivateMessageInfo(); + messageInfo.setType(MessageType.RTC_CALL.code()); + messageInfo.setRecvId(uid); + messageInfo.setSendId(session.getUserId()); + messageInfo.setContent(offer); + + IMPrivateMessage sendMessage = new IMPrivateMessage<>(); + sendMessage.setSendId(session.getUserId()); + sendMessage.setSendTerminal(session.getTerminal()); + sendMessage.setRecvId(uid); + sendMessage.setSendToSelf(false); + sendMessage.setSendResult(false); + sendMessage.setDatas(Collections.singletonList(messageInfo)); + imClient.sendPrivateMessage(sendMessage); + + } + + @Override + public void accept(Long uid, @RequestBody String answer) { + UserSession session = SessionContext.getSession(); + // 查询webrtc会话 + WebrtcSession webrtcSession = getWebrtcSession(session.getUserId(), uid); + // 更新接受者信息 + webrtcSession.setAcceptorId(session.getUserId()); + webrtcSession.setAcceptorTerminal(session.getTerminal()); + String key = getSessionKey(session.getUserId(), uid); + redisTemplate.opsForValue().set(key, webrtcSession, 12, TimeUnit.HOURS); + // 向发起人推送接受通话信令 + PrivateMessageInfo messageInfo = new PrivateMessageInfo(); + messageInfo.setType(MessageType.RTC_ACCEPT.code()); + messageInfo.setRecvId(uid); + messageInfo.setSendId(session.getUserId()); + messageInfo.setContent(answer); + + IMPrivateMessage sendMessage = new IMPrivateMessage<>(); + sendMessage.setSendId(session.getUserId()); + sendMessage.setSendTerminal(session.getTerminal()); + sendMessage.setRecvId(uid); + // 告知其他终端已经接受会话,中止呼叫 + sendMessage.setSendToSelf(true); + sendMessage.setSendResult(false); + sendMessage.setRecvTerminals((Collections.singletonList(webrtcSession.getCallerTerminal()))); + sendMessage.setDatas(Collections.singletonList(messageInfo)); + imClient.sendPrivateMessage(sendMessage); + } + + @Override + public void reject(Long uid) { + UserSession session = SessionContext.getSession(); + // 查询webrtc会话 + WebrtcSession webrtcSession = getWebrtcSession(session.getUserId(), uid); + // 删除会话信息 + removeWebrtcSession(uid, session.getUserId()); + // 向发起人推送拒绝通话信令 + PrivateMessageInfo messageInfo = new PrivateMessageInfo(); + messageInfo.setType(MessageType.RTC_REJECT.code()); + messageInfo.setRecvId(uid); + messageInfo.setSendId(session.getUserId()); + + IMPrivateMessage sendMessage = new IMPrivateMessage<>(); + sendMessage.setSendId(session.getUserId()); + sendMessage.setSendTerminal(session.getTerminal()); + sendMessage.setRecvId(uid); + // 告知其他终端已经拒绝会话,中止呼叫 + sendMessage.setSendToSelf(true); + sendMessage.setSendResult(false); + sendMessage.setRecvTerminals(Collections.singletonList(webrtcSession.getCallerTerminal())); + sendMessage.setDatas(Collections.singletonList(messageInfo)); + imClient.sendPrivateMessage(sendMessage); + } + + @Override + public void cancel(Long uid) { + UserSession session = SessionContext.getSession(); + // 删除会话信息 + removeWebrtcSession(session.getUserId(), uid); + // 向对方所有终端推送取消通话信令 + PrivateMessageInfo messageInfo = new PrivateMessageInfo(); + messageInfo.setType(MessageType.RTC_ACCEPT.code()); + messageInfo.setRecvId(uid); + messageInfo.setSendId(session.getUserId()); + + IMPrivateMessage sendMessage = new IMPrivateMessage<>(); + sendMessage.setSendId(session.getUserId()); + sendMessage.setSendTerminal(session.getTerminal()); + sendMessage.setRecvId(uid); + sendMessage.setSendToSelf(false); + sendMessage.setSendResult(false); + sendMessage.setDatas(Collections.singletonList(messageInfo)); + // 通知对方取消会话 + imClient.sendPrivateMessage(sendMessage); + } + + @Override + public void failed(Long uid, String reason) { + UserSession session = SessionContext.getSession(); + // 查询webrtc会话 + WebrtcSession webrtcSession = getWebrtcSession(session.getUserId(), uid); + // 删除会话信息 + removeWebrtcSession(uid, session.getUserId()); + // 向发起方推送通话失败信令 + PrivateMessageInfo messageInfo = new PrivateMessageInfo(); + messageInfo.setType(MessageType.RTC_FAILED.code()); + messageInfo.setRecvId(uid); + messageInfo.setSendId(session.getUserId()); + + IMPrivateMessage sendMessage = new IMPrivateMessage<>(); + sendMessage.setSendId(session.getUserId()); + sendMessage.setSendTerminal(session.getTerminal()); + sendMessage.setRecvId(uid); + // 告知其他终端已经会话失败,中止呼叫 + sendMessage.setSendToSelf(true); + sendMessage.setSendResult(false); + sendMessage.setRecvTerminals(Collections.singletonList(webrtcSession.getCallerTerminal())); + sendMessage.setDatas(Collections.singletonList(messageInfo)); + // 通知对方取消会话 + imClient.sendPrivateMessage(sendMessage); + + } + + @Override + public void leave(Long uid) { + UserSession session = SessionContext.getSession(); + // 查询webrtc会话 + WebrtcSession webrtcSession = getWebrtcSession(session.getUserId(), uid); + // 删除会话信息 + removeWebrtcSession(uid, session.getUserId()); + // 向对方推送挂断通话信令 + PrivateMessageInfo messageInfo = new PrivateMessageInfo(); + messageInfo.setType(MessageType.RTC_HANDUP.code()); + messageInfo.setRecvId(uid); + messageInfo.setSendId(session.getUserId()); + + IMPrivateMessage sendMessage = new IMPrivateMessage<>(); + sendMessage.setSendId(session.getUserId()); + sendMessage.setSendTerminal(session.getTerminal()); + sendMessage.setRecvId(uid); + sendMessage.setSendToSelf(false); + sendMessage.setSendResult(false); + Integer terminal = getTerminalType(uid, webrtcSession); + sendMessage.setRecvTerminals(Collections.singletonList(terminal)); + sendMessage.setDatas(Collections.singletonList(messageInfo)); + // 通知对方取消会话 + imClient.sendPrivateMessage(sendMessage); + } + + @Override + public void candidate(Long uid, String candidate) { + UserSession session = SessionContext.getSession(); + // 查询webrtc会话 + WebrtcSession webrtcSession = getWebrtcSession(session.getUserId(), uid); + // 向发起方推送同步candidate信令 + PrivateMessageInfo messageInfo = new PrivateMessageInfo(); + messageInfo.setType(MessageType.RTC_CANDIDATE.code()); + messageInfo.setRecvId(uid); + messageInfo.setSendId(session.getUserId()); + messageInfo.setContent(candidate); + + IMPrivateMessage sendMessage = new IMPrivateMessage<>(); + sendMessage.setSendId(session.getUserId()); + sendMessage.setSendTerminal(session.getTerminal()); + sendMessage.setRecvId(uid); + sendMessage.setSendToSelf(false); + sendMessage.setSendResult(false); + Integer terminal = getTerminalType(uid, webrtcSession); + sendMessage.setRecvTerminals(Collections.singletonList(terminal)); + sendMessage.setDatas(Collections.singletonList(messageInfo)); + imClient.sendPrivateMessage(sendMessage); + } + + @Override + public List getIceServers() { + return iceServerConfig.getIceServers(); + } + + private WebrtcSession getWebrtcSession(Long userId, Long uid) { + String key = getSessionKey(userId, uid); + WebrtcSession webrtcSession = (WebrtcSession)redisTemplate.opsForValue().get(key); + if (webrtcSession == null) { + throw new GlobalException("视频通话已结束"); + } + return webrtcSession; + } + + private void removeWebrtcSession(Long userId, Long uid) { + String key = getSessionKey(userId, uid); + redisTemplate.delete(key); + } + + private String getSessionKey(Long id1, Long id2) { + Long minId = id1 > id2 ? id2 : id1; + Long maxId = id1 > id2 ? id1 : id2; + return String.join(":", RedisKey.IM_WEBRTC_SESSION, minId.toString(), maxId.toString()); + } + + private Integer getTerminalType(Long uid, WebrtcSession webrtcSession) { + if (uid.equals(webrtcSession.getCallerId())) { + return webrtcSession.getCallerTerminal(); + } + return webrtcSession.getAcceptorTerminal(); + } + +} diff --git a/im-platform/src/main/java/com/bx/implatform/session/WebrtcSession.java b/im-platform/src/main/java/com/bx/implatform/session/WebrtcSession.java new file mode 100644 index 0000000..fe4e08f --- /dev/null +++ b/im-platform/src/main/java/com/bx/implatform/session/WebrtcSession.java @@ -0,0 +1,32 @@ +package com.bx.implatform.session; + +import com.bx.imcommon.enums.IMTerminalType; +import io.swagger.models.auth.In; +import lombok.Data; + +/* + * webrtc 会话信息 + * @Author Blue + * @Date 2022/10/21 + */ +@Data +public class WebrtcSession { + /** + * 发起者id + */ + private Long callerId; + /** + * 发起者终端类型 + */ + private Integer callerTerminal; + + /** + * 接受者id + */ + private Long acceptorId; + + /** + * 接受者终端类型 + */ + private Integer acceptorTerminal; +} diff --git a/im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java b/im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java index c1c3a55..8f652e0 100644 --- a/im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java +++ b/im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java @@ -48,7 +48,7 @@ public class PrivateMessageProcessor extends AbstractMessageProcessor { let initWebSocket = () => { try { console.log("初始化WebSocket"); + closeWebSocket(); hasLogin = false; - websock = new WebSocket(wsurl); + websock = new WebSocket(wsurl); websock.onmessage = function(e) { let sendInfo = JSON.parse(e.data) if (sendInfo.cmd == 0) { @@ -39,6 +40,7 @@ let initWebSocket = () => { websock.onclose = function(e) { console.log('WebSocket连接关闭') isConnect = false; //断开后修改标识 + reConnect(); } websock.onopen = function() { console.log("WebSocket连接成功"); @@ -77,7 +79,7 @@ let reConnect = () => { }; //设置关闭连接 let closeWebSocket = () => { - websock.close(); + websock && websock.close(); }; @@ -147,4 +149,4 @@ export { sendMessage, onmessage, onopen -} +} diff --git a/im-ui/src/components/chat/ChatPrivateVideo.vue b/im-ui/src/components/chat/ChatPrivateVideo.vue index 2e28baf..f70d3e6 100644 --- a/im-ui/src/components/chat/ChatPrivateVideo.vue +++ b/im-ui/src/components/chat/ChatPrivateVideo.vue @@ -1,24 +1,23 @@