diff --git a/im-client/src/main/java/com/bx/imclient/IMClient.java b/im-client/src/main/java/com/bx/imclient/IMClient.java index 05a238d..a004550 100644 --- a/im-client/src/main/java/com/bx/imclient/IMClient.java +++ b/im-client/src/main/java/com/bx/imclient/IMClient.java @@ -3,6 +3,7 @@ package com.bx.imclient; import com.bx.imclient.listener.MessageListenerMulticaster; import com.bx.imclient.sender.IMSender; import com.bx.imcommon.model.GroupMessageInfo; +import com.bx.imcommon.model.IMPrivateMessage; import com.bx.imcommon.model.PrivateMessageInfo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; @@ -27,11 +28,10 @@ public class IMClient { /** * 发送私聊消息(发送结果通过MessageListener接收) * - * @param recvId 接收用户id - * @param messageInfo 消息体,将转成json发送到客户端 + * @param message 私有消息 */ - public void sendPrivateMessage(Long recvId, PrivateMessageInfo... messageInfo){ - imSender.sendPrivateMessage(recvId,messageInfo); + public void sendPrivateMessage(IMPrivateMessage message){ + imSender.sendPrivateMessage(message); } /** diff --git a/im-client/src/main/java/com/bx/imclient/sender/IMSender.java b/im-client/src/main/java/com/bx/imclient/sender/IMSender.java index b4ce5c3..8221d17 100644 --- a/im-client/src/main/java/com/bx/imclient/sender/IMSender.java +++ b/im-client/src/main/java/com/bx/imclient/sender/IMSender.java @@ -5,10 +5,8 @@ import com.bx.imcommon.contant.RedisKey; import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.enums.IMListenerType; import com.bx.imcommon.enums.IMSendCode; -import com.bx.imcommon.model.GroupMessageInfo; -import com.bx.imcommon.model.IMRecvInfo; -import com.bx.imcommon.model.PrivateMessageInfo; -import com.bx.imcommon.model.SendResult; +import com.bx.imcommon.enums.IMTerminalType; +import com.bx.imcommon.model.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.core.RedisTemplate; @@ -30,86 +28,117 @@ public class IMSender { @Autowired private MessageListenerMulticaster listenerMulticaster; - public void sendPrivateMessage(Long recvId, PrivateMessageInfo... messageInfos){ - // 获取对方连接的channelId - String key = RedisKey.IM_USER_SERVER_ID + recvId; - Integer serverId = (Integer) redisTemplate.opsForValue().get(key); - // 如果对方在线,将数据存储至redis,等待拉取推送 - if (serverId != null) { - String sendKey = RedisKey.IM_UNREAD_PRIVATE_QUEUE + serverId; - IMRecvInfo[] recvInfos = new IMRecvInfo[messageInfos.length]; - for (int i=0;i recvInfo = new IMRecvInfo<>(); - recvInfo.setCmd(IMCmdType.PRIVATE_MESSAGE.code()); - List recvIds = new LinkedList(); - recvIds.add(recvId); - recvInfo.setRecvIds(recvIds); - recvInfo.setData(messageInfos[i]); - recvInfos[i] = recvInfo; + public void sendPrivateMessage(IMPrivateMessage message) { + for (IMTerminalType terminal : IMTerminalType.values()) { + // 获取对方连接的channelId + String key = String.join(":",RedisKey.IM_USER_SERVER_ID, message.getRecvId().toString(), terminal.code().toString()); + Integer serverId = (Integer) redisTemplate.opsForValue().get(key); + // 如果对方在线,将数据存储至redis,等待拉取推送 + if (serverId != null) { + IMRecvInfo[] recvInfos = new IMRecvInfo[message.getDatas().size()]; + String sendKey = RedisKey.IM_UNREAD_PRIVATE_QUEUE + serverId; + for (int i = 0; i < message.getDatas().size(); i++) { + IMRecvInfo recvInfo = new IMRecvInfo(); + recvInfo.setCmd(IMCmdType.PRIVATE_MESSAGE.code()); + recvInfo.setRecvTerminal(terminal.code()); + recvInfo.setNeedSendResult(true); + List recvIds = new LinkedList(); + recvIds.add(message.getRecvId()); + recvInfo.setRecvIds(recvIds); + recvInfo.setData(message.getDatas().get(i)); + recvInfos[i]=recvInfo; + } + redisTemplate.opsForList().rightPushAll(sendKey, recvInfos); + + } else { + // 回复消息状态 + for (int i = 0; i < message.getDatas().size(); i++) { + SendResult result = new SendResult(); + result.setRecvId(message.getRecvId()); + result.setRecvTerminal(terminal.code()); + result.setCode(IMSendCode.NOT_ONLINE.code()); + result.setData(message.getDatas().get(i)); + listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, result); + } + } - redisTemplate.opsForList().rightPushAll(sendKey, recvInfos); - }else{ - // 回复消息状态 - for(PrivateMessageInfo messageInfo : messageInfos ) { - SendResult result = new SendResult(); - result.setMessageInfo(messageInfo); - result.setRecvId(recvId); - result.setCode(IMSendCode.NOT_ONLINE); - listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, result); + // 推送给自己的其他终端 + if (message.getSendToSelf() && !message.getSendTerminal().equals(terminal.code())) { + // 获取终端连接的channelId + key = String.join(":",RedisKey.IM_USER_SERVER_ID, message.getSendId().toString(), terminal.code().toString()); + serverId = (Integer) redisTemplate.opsForValue().get(key); + // 如果终端在线,将数据存储至redis,等待拉取推送 + if (serverId != null) { + String sendKey = RedisKey.IM_UNREAD_PRIVATE_QUEUE + serverId; + IMRecvInfo[] recvInfos = new IMRecvInfo[message.getDatas().size()]; + for (int i = 0; i < message.getDatas().size(); i++) { + IMRecvInfo recvInfo = new IMRecvInfo(); + recvInfo.setCmd(IMCmdType.PRIVATE_MESSAGE.code()); + recvInfo.setRecvTerminal(terminal.code()); + // 自己的消息不需要回推消息结果 + recvInfo.setNeedSendResult(false); + List recvIds = new LinkedList(); + recvIds.add(message.getSendId()); + recvInfo.setRecvIds(recvIds); + recvInfo.setData(message.getDatas().get(i)); + recvInfos[i]=recvInfo; + } + redisTemplate.opsForList().rightPushAll(sendKey, recvInfos); + } } } } - public void sendGroupMessage(List recvIds, GroupMessageInfo... messageInfos){ + public void sendGroupMessage(List recvIds, GroupMessageInfo... messageInfos) { // 根据群聊每个成员所连的IM-server,进行分组 List offLineIds = Collections.synchronizedList(new LinkedList()); Map> serverMap = new ConcurrentHashMap<>(); - recvIds.parallelStream().forEach(id->{ + recvIds.parallelStream().forEach(id -> { String key = RedisKey.IM_USER_SERVER_ID + id; - Integer serverId = (Integer)redisTemplate.opsForValue().get(key); - if(serverId != null){ + Integer serverId = (Integer) redisTemplate.opsForValue().get(key); + if (serverId != null) { // 此处需要加锁,否则list可以会被覆盖 - synchronized(serverMap){ - if(serverMap.containsKey(serverId)){ + synchronized (serverMap) { + if (serverMap.containsKey(serverId)) { serverMap.get(serverId).add(id); - }else { + } else { List list = Collections.synchronizedList(new LinkedList()); list.add(id); - serverMap.put(serverId,list); + serverMap.put(serverId, list); } } - }else{ + } else { offLineIds.add(id); } }); // 逐个server发送 - for (Map.Entry> entry : serverMap.entrySet()) { - IMRecvInfo[] recvInfos = new IMRecvInfo[messageInfos.length]; - for (int i=0;i recvInfo = new IMRecvInfo<>(); - recvInfo.setCmd(IMCmdType.GROUP_MESSAGE.code()); - recvInfo.setRecvIds(new LinkedList<>(entry.getValue())); - recvInfo.setData(messageInfos[i]); - recvInfos[i] = recvInfo; - } - String key = RedisKey.IM_UNREAD_GROUP_QUEUE +entry.getKey(); - redisTemplate.opsForList().rightPushAll(key,recvInfos); - } - // 不在线的用户,回复消息状态 - for(GroupMessageInfo messageInfo:messageInfos ){ - for(Long id : offLineIds){ - // 回复消息状态 - SendResult result = new SendResult(); - result.setMessageInfo(messageInfo); - result.setRecvId(id); - result.setCode(IMSendCode.NOT_ONLINE); - listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE,result); - } - } +// for (Map.Entry> entry : serverMap.entrySet()) { +// IMRecvInfo[] recvInfos = new IMRecvInfo[messageInfos.length]; +// for (int i = 0; i < messageInfos.length; i++) { +// IMRecvInfo recvInfo = new IMRecvInfo<>(); +// recvInfo.setCmd(IMCmdType.GROUP_MESSAGE.code()); +// recvInfo.setRecvIds(new LinkedList<>(entry.getValue())); +// recvInfo.setData(messageInfos[i]); +// recvInfos[i] = recvInfo; +// } +// String key = RedisKey.IM_UNREAD_GROUP_QUEUE + entry.getKey(); +// redisTemplate.opsForList().rightPushAll(key, recvInfos); +// } +// // 不在线的用户,回复消息状态 +// for (GroupMessageInfo messageInfo : messageInfos) { +// for (Long id : offLineIds) { +// // 回复消息状态 +// SendResult result = new SendResult(); +// result.setMessageInfo(messageInfo); +// result.setRecvId(id); +// result.setCode(IMSendCode.NOT_ONLINE); +// listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE, result); +// } +// } } - public Boolean isOnline(Long userId){ - String key = RedisKey.IM_USER_SERVER_ID + userId; - return redisTemplate.hasKey(key); + public Boolean isOnline(Long userId) { + String key = String.join(":",RedisKey.IM_USER_SERVER_ID,userId.toString(),"*"); + return !redisTemplate.keys(key).isEmpty(); } } diff --git a/im-commom/src/main/java/com/bx/imcommon/contant/RedisKey.java b/im-commom/src/main/java/com/bx/imcommon/contant/RedisKey.java index e8c2d7e..9831953 100644 --- a/im-commom/src/main/java/com/bx/imcommon/contant/RedisKey.java +++ b/im-commom/src/main/java/com/bx/imcommon/contant/RedisKey.java @@ -5,7 +5,7 @@ public class RedisKey { // im-server最大id,从0开始递增 public final static String IM_MAX_SERVER_ID = "im:max_server_id"; // 用户ID所连接的IM-server的ID - public final static String IM_USER_SERVER_ID = "im:user:server_id:"; + public final static String IM_USER_SERVER_ID = "im:user:server_id"; // 未读私聊消息队列 public final static String IM_UNREAD_PRIVATE_QUEUE = "im:unread:private:"; // 未读群聊消息队列 diff --git a/im-commom/src/main/java/com/bx/imcommon/enums/IMCmdType.java b/im-commom/src/main/java/com/bx/imcommon/enums/IMCmdType.java index aedfde0..6ae35de 100644 --- a/im-commom/src/main/java/com/bx/imcommon/enums/IMCmdType.java +++ b/im-commom/src/main/java/com/bx/imcommon/enums/IMCmdType.java @@ -11,6 +11,7 @@ public enum IMCmdType { GROUP_MESSAGE(4,"群发消息"); + private Integer code; private String desc; diff --git a/im-commom/src/main/java/com/bx/imcommon/enums/IMSendCode.java b/im-commom/src/main/java/com/bx/imcommon/enums/IMSendCode.java index 62ad254..37de1d7 100644 --- a/im-commom/src/main/java/com/bx/imcommon/enums/IMSendCode.java +++ b/im-commom/src/main/java/com/bx/imcommon/enums/IMSendCode.java @@ -8,7 +8,7 @@ public enum IMSendCode { NOT_FIND_CHANNEL(2,"未找到对方的channel"), UNKONW_ERROR(9999,"未知异常"); - private int code; + private Integer code; private String desc; // 构造方法 @@ -17,6 +17,16 @@ public enum IMSendCode { this.desc = desc; } + public static IMSendCode fromCode(Integer code){ + for (IMSendCode typeEnum:values()) { + if (typeEnum.code.equals(code)) { + return typeEnum; + } + } + return null; + } + + public String description() { return desc; } diff --git a/im-commom/src/main/java/com/bx/imcommon/enums/IMTerminalType.java b/im-commom/src/main/java/com/bx/imcommon/enums/IMTerminalType.java new file mode 100644 index 0000000..c9330a3 --- /dev/null +++ b/im-commom/src/main/java/com/bx/imcommon/enums/IMTerminalType.java @@ -0,0 +1,36 @@ +package com.bx.imcommon.enums; + + +public enum IMTerminalType { + + WEB(0,"web"), + APP(1,"app"); + + private Integer code; + + private String desc; + + IMTerminalType(Integer index, String desc) { + this.code =index; + this.desc=desc; + } + + public static IMTerminalType fromCode(Integer code){ + for (IMTerminalType typeEnum:values()) { + if (typeEnum.code.equals(code)) { + return typeEnum; + } + } + return null; + } + + + public String description() { + return desc; + } + + public Integer code(){ + return this.code; + } + +} diff --git a/im-commom/src/main/java/com/bx/imcommon/model/IMPrivateMessage.java b/im-commom/src/main/java/com/bx/imcommon/model/IMPrivateMessage.java new file mode 100644 index 0000000..9b5e4b9 --- /dev/null +++ b/im-commom/src/main/java/com/bx/imcommon/model/IMPrivateMessage.java @@ -0,0 +1,39 @@ +package com.bx.imcommon.model; + +import com.bx.imcommon.enums.IMTerminalType; +import lombok.Data; + +import java.util.LinkedList; +import java.util.List; + + +@Data +public class IMPrivateMessage { + + /** + * 发送者id + */ + private Long sendId; + + /** + * 发送者终端类型 IMTerminalType + */ + private Integer sendTerminal; + + /** + * 是否发送给自己的其他终端 + */ + private Boolean sendToSelf ; + + /** + * 接收者id + */ + private Long recvId; + + /** + * 消息内容(可一次性发送多条) + */ + private List datas; + + +} diff --git a/im-commom/src/main/java/com/bx/imcommon/model/IMRecvInfo.java b/im-commom/src/main/java/com/bx/imcommon/model/IMRecvInfo.java index 289e2fc..baf8633 100644 --- a/im-commom/src/main/java/com/bx/imcommon/model/IMRecvInfo.java +++ b/im-commom/src/main/java/com/bx/imcommon/model/IMRecvInfo.java @@ -5,22 +5,37 @@ import lombok.Data; import java.util.List; @Data -public class IMRecvInfo { +public class IMRecvInfo { /* - * 命令类型 + * 命令类型 IMCmdType */ private Integer cmd; + /* + * 发送者id + */ + private Long sendId; + + /* + * 接收终端类型 IMTerminalType + */ + private Integer recvTerminal; + /* * 接收者id列表 */ private List recvIds; + /* + * 是否需要回调发送结果 + */ + private Boolean needSendResult = true; + /* * 推送消息体 */ - private T data; + private Object data; } diff --git a/im-commom/src/main/java/com/bx/imcommon/model/IMSessionInfo.java b/im-commom/src/main/java/com/bx/imcommon/model/IMSessionInfo.java new file mode 100644 index 0000000..d97256e --- /dev/null +++ b/im-commom/src/main/java/com/bx/imcommon/model/IMSessionInfo.java @@ -0,0 +1,18 @@ +package com.bx.imcommon.model; + +import com.bx.imcommon.enums.IMTerminalType; +import lombok.Data; + +@Data +public class IMSessionInfo { + /* + * 用户id + */ + private Long userId; + + /* + * 终端类型 + */ + private Integer terminal; + +} diff --git a/im-commom/src/main/java/com/bx/imcommon/model/SendResult.java b/im-commom/src/main/java/com/bx/imcommon/model/SendResult.java index 2a65b82..b4d2b9e 100644 --- a/im-commom/src/main/java/com/bx/imcommon/model/SendResult.java +++ b/im-commom/src/main/java/com/bx/imcommon/model/SendResult.java @@ -1,6 +1,7 @@ package com.bx.imcommon.model; import com.bx.imcommon.enums.IMSendCode; +import com.bx.imcommon.enums.IMTerminalType; import lombok.Data; @Data @@ -12,13 +13,18 @@ public class SendResult { private Long recvId; /* - * 发送状态 + * 接收者终端类型 IMTerminalType */ - private IMSendCode code; + private Integer recvTerminal; /* - * 消息体(透传) + * 发送状态 IMCmdType */ - private T messageInfo; + private Integer code; + + /* + * 消息内容 + */ + private T data; } diff --git a/im-platform/src/main/java/com/bx/implatform/ImplatformApp.java b/im-platform/src/main/java/com/bx/implatform/IMPlatformApp.java similarity index 88% rename from im-platform/src/main/java/com/bx/implatform/ImplatformApp.java rename to im-platform/src/main/java/com/bx/implatform/IMPlatformApp.java index baf7338..7e2f59b 100644 --- a/im-platform/src/main/java/com/bx/implatform/ImplatformApp.java +++ b/im-platform/src/main/java/com/bx/implatform/IMPlatformApp.java @@ -12,9 +12,9 @@ import org.springframework.context.annotation.EnableAspectJAutoProxy; @EnableAspectJAutoProxy(exposeProxy = true) @MapperScan(basePackages = {"com.bx.implatform.mapper"}) @SpringBootApplication(exclude= {SecurityAutoConfiguration.class })// 禁用secrity -public class ImplatformApp { +public class IMPlatformApp { public static void main(String[] args) { - SpringApplication.run(ImplatformApp.class,args); + SpringApplication.run(IMPlatformApp.class,args); } } diff --git a/im-platform/src/main/java/com/bx/implatform/controller/FriendController.java b/im-platform/src/main/java/com/bx/implatform/controller/FriendController.java index 9d97450..fc4d568 100644 --- a/im-platform/src/main/java/com/bx/implatform/controller/FriendController.java +++ b/im-platform/src/main/java/com/bx/implatform/controller/FriendController.java @@ -27,7 +27,7 @@ public class FriendController { @GetMapping("/list") @ApiOperation(value = "好友列表",notes="获取好友列表") public Result< List> findFriends(){ - List friends = friendService.findFriendByUserId(SessionContext.getSession().getId()); + List friends = friendService.findFriendByUserId(SessionContext.getSession().getUserId()); List vos = friends.stream().map(f->{ FriendVO vo = new FriendVO(); vo.setId(f.getFriendId()); diff --git a/im-platform/src/main/java/com/bx/implatform/controller/UserController.java b/im-platform/src/main/java/com/bx/implatform/controller/UserController.java index 3cc5cdf..4d6e06d 100644 --- a/im-platform/src/main/java/com/bx/implatform/controller/UserController.java +++ b/im-platform/src/main/java/com/bx/implatform/controller/UserController.java @@ -38,7 +38,7 @@ public class UserController { @ApiOperation(value = "获取当前用户信息",notes="获取当前用户信息") public Result findSelfInfo(){ UserSession session = SessionContext.getSession(); - User user = userService.getById(session.getId()); + User user = userService.getById(session.getUserId()); UserVO userVO = BeanUtils.copyProperties(user,UserVO.class); return ResultUtils.success(userVO); } diff --git a/im-platform/src/main/java/com/bx/implatform/controller/WebrtcController.java b/im-platform/src/main/java/com/bx/implatform/controller/WebrtcController.java index b1fccd4..29c90b3 100644 --- a/im-platform/src/main/java/com/bx/implatform/controller/WebrtcController.java +++ b/im-platform/src/main/java/com/bx/implatform/controller/WebrtcController.java @@ -1,18 +1,26 @@ package com.bx.implatform.controller; +import cn.hutool.core.util.ArrayUtil; import com.bx.imclient.IMClient; +import com.bx.imcommon.enums.IMTerminalType; +import com.bx.imcommon.model.IMPrivateMessage; import com.bx.imcommon.model.PrivateMessageInfo; import com.bx.implatform.config.ICEServerConfig; import com.bx.implatform.enums.MessageType; import com.bx.implatform.result.Result; import com.bx.implatform.result.ResultUtils; import com.bx.implatform.session.SessionContext; +import com.bx.implatform.session.UserSession; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; +import org.apache.commons.lang.ArrayUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; +import java.lang.reflect.Array; +import java.util.Arrays; + @Api(tags = "webrtc视频单人通话") @RestController @RequestMapping("/webrtc/private") @@ -27,28 +35,14 @@ public class WebrtcController { @ApiOperation(httpMethod = "POST", value = "呼叫视频通话") @PostMapping("/call") public Result call(@RequestParam Long uid, @RequestBody String offer) { - Long userId = SessionContext.getSession().getId(); - - PrivateMessageInfo message = new PrivateMessageInfo(); - message.setType(MessageType.RTC_CALL.code()); - message.setRecvId(uid); - message.setSendId(userId); - message.setContent(offer); - imClient.sendPrivateMessage(uid,message); + imClient.sendPrivateMessage(buildSendMessage(MessageType.RTC_CALL,uid,offer)); return ResultUtils.success(); } @ApiOperation(httpMethod = "POST", value = "接受视频通话") @PostMapping("/accept") public Result accept(@RequestParam Long uid,@RequestBody String answer) { - Long userId = SessionContext.getSession().getId(); - - PrivateMessageInfo message = new PrivateMessageInfo(); - message.setType(MessageType.RTC_ACCEPT.code()); - message.setRecvId(uid); - message.setSendId(userId); - message.setContent(answer); - imClient.sendPrivateMessage(uid,message); + imClient.sendPrivateMessage(buildSendMessage(MessageType.RTC_ACCEPT,uid,answer)); return ResultUtils.success(); } @@ -56,51 +50,28 @@ public class WebrtcController { @ApiOperation(httpMethod = "POST", value = "拒绝视频通话") @PostMapping("/reject") public Result reject(@RequestParam Long uid) { - Long userId = SessionContext.getSession().getId(); - PrivateMessageInfo message = new PrivateMessageInfo(); - message.setType(MessageType.RTC_REJECT.code()); - message.setRecvId(uid); - message.setSendId(userId); - imClient.sendPrivateMessage(uid,message); + imClient.sendPrivateMessage(buildSendMessage(MessageType.RTC_REJECT,uid,null)); return ResultUtils.success(); } @ApiOperation(httpMethod = "POST", value = "取消呼叫") @PostMapping("/cancel") public Result cancel(@RequestParam Long uid) { - Long userId = SessionContext.getSession().getId(); - PrivateMessageInfo message = new PrivateMessageInfo(); - message.setType(MessageType.RTC_CANCEL.code()); - message.setRecvId(uid); - message.setSendId(userId); - imClient.sendPrivateMessage(uid,message); + imClient.sendPrivateMessage(buildSendMessage(MessageType.RTC_CANCEL,uid,null)); return ResultUtils.success(); } @ApiOperation(httpMethod = "POST", value = "呼叫失败") @PostMapping("/failed") public Result failed(@RequestParam Long uid,@RequestParam String reason) { - Long userId = SessionContext.getSession().getId(); - - PrivateMessageInfo message = new PrivateMessageInfo(); - message.setType(MessageType.RTC_FAILED.code()); - message.setRecvId(uid); - message.setSendId(userId); - message.setContent(reason); - imClient.sendPrivateMessage(uid,message); + imClient.sendPrivateMessage(buildSendMessage(MessageType.RTC_FAILED,uid,reason)); return ResultUtils.success(); } @ApiOperation(httpMethod = "POST", value = "挂断") @PostMapping("/handup") public Result leave(@RequestParam Long uid) { - Long userId = SessionContext.getSession().getId(); - - PrivateMessageInfo message = new PrivateMessageInfo(); - message.setType(MessageType.RTC_HANDUP.code()); - message.setRecvId(uid); - message.setSendId(userId); - imClient.sendPrivateMessage(uid,message); + imClient.sendPrivateMessage(buildSendMessage(MessageType.RTC_HANDUP,uid,null)); return ResultUtils.success(); } @@ -108,13 +79,7 @@ public class WebrtcController { @PostMapping("/candidate") @ApiOperation(httpMethod = "POST", value = "同步candidate") public Result candidate(@RequestParam Long uid,@RequestBody String candidate ) { - Long userId = SessionContext.getSession().getId(); - PrivateMessageInfo message = new PrivateMessageInfo(); - message.setType(MessageType.RTC_CANDIDATE.code()); - message.setRecvId(uid); - message.setSendId(userId); - message.setContent(candidate); - imClient.sendPrivateMessage(uid,message); + imClient.sendPrivateMessage(buildSendMessage(MessageType.RTC_CANDIDATE,uid,candidate)); return ResultUtils.success(); } @@ -123,4 +88,21 @@ public class WebrtcController { public Result iceservers() { return ResultUtils.success(iceServerConfig.getIceServers()); } + + private IMPrivateMessage buildSendMessage(MessageType messageType,Long uid,String content){ + UserSession session = SessionContext.getSession(); + PrivateMessageInfo messageInfo = new PrivateMessageInfo(); + messageInfo.setType(messageType.code()); + messageInfo.setRecvId(uid); + messageInfo.setSendId(session.getUserId()); + messageInfo.setContent(content); + + IMPrivateMessage sendMessage = new IMPrivateMessage(); + sendMessage.setSendId(session.getUserId()); + sendMessage.setSendTerminal(session.getTerminal()); + sendMessage.setSendToSelf(false); + sendMessage.setRecvId(uid); + sendMessage.setDatas(Arrays.asList(messageInfo)); + return sendMessage; + } } diff --git a/im-platform/src/main/java/com/bx/implatform/dto/LoginDTO.java b/im-platform/src/main/java/com/bx/implatform/dto/LoginDTO.java index 05a638d..aebc58e 100644 --- a/im-platform/src/main/java/com/bx/implatform/dto/LoginDTO.java +++ b/im-platform/src/main/java/com/bx/implatform/dto/LoginDTO.java @@ -5,17 +5,26 @@ import io.swagger.annotations.ApiModelProperty; import lombok.Data; import org.hibernate.validator.constraints.Length; +import javax.validation.constraints.Max; +import javax.validation.constraints.Min; import javax.validation.constraints.NotEmpty; +import javax.validation.constraints.NotNull; @Data @ApiModel("用户登录VO") public class LoginDTO { - //@NotEmpty(message="用户名不可为空") + @Max(value = 1,message = "登录终端类型取值范围:0,1") + @Min(value = 0,message = "登录终端类型取值范围:0,1") + @NotNull(message="登录终端类型不可为空") + @ApiModelProperty(value = "登录终端 0:web 1:app") + private Integer terminal; + + @NotEmpty(message="用户名不可为空") @ApiModelProperty(value = "用户名") private String userName; - // @NotEmpty(message="用户密码不可为空") + @NotEmpty(message="用户密码不可为空") @ApiModelProperty(value = "用户密码") private String password; diff --git a/im-platform/src/main/java/com/bx/implatform/enums/MessageType.java b/im-platform/src/main/java/com/bx/implatform/enums/MessageType.java index cc7a07c..0b44028 100644 --- a/im-platform/src/main/java/com/bx/implatform/enums/MessageType.java +++ b/im-platform/src/main/java/com/bx/implatform/enums/MessageType.java @@ -26,6 +26,15 @@ public enum MessageType { this.desc=desc; } + public static MessageType fromCode(Integer code){ + for (MessageType typeEnum:values()) { + if (typeEnum.code.equals(code)) { + return typeEnum; + } + } + return null; + } + public String description() { return desc; diff --git a/im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java b/im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java index f9f5be2..e2c3fc5 100644 --- a/im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java +++ b/im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java @@ -22,7 +22,7 @@ public class GroupMessageListener implements MessageListener { @Override public void process(SendResult result){ - GroupMessageInfo messageInfo = (GroupMessageInfo) result.getMessageInfo(); + GroupMessageInfo messageInfo = (GroupMessageInfo) result.getData(); // 提示类数据不记录 if(messageInfo.getType().equals(MessageType.TIP)){ return; diff --git a/im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java b/im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java index 06fa83d..eaa3f44 100644 --- a/im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java +++ b/im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java @@ -4,8 +4,10 @@ import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.bx.imclient.IMClient; import com.bx.imclient.annotation.IMListener; import com.bx.imclient.listener.MessageListener; +import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.enums.IMListenerType; import com.bx.imcommon.enums.IMSendCode; +import com.bx.imcommon.model.IMPrivateMessage; import com.bx.imcommon.model.PrivateMessageInfo; import com.bx.imcommon.model.SendResult; import com.bx.implatform.entity.PrivateMessage; @@ -15,6 +17,7 @@ import com.bx.implatform.service.IPrivateMessageService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import java.util.Arrays; import java.util.Date; @@ -30,7 +33,8 @@ public class PrivateMessageListener implements MessageListener { @Override public void process(SendResult result){ - PrivateMessageInfo messageInfo = (PrivateMessageInfo) result.getMessageInfo(); + PrivateMessageInfo messageInfo = (PrivateMessageInfo) result.getData(); + IMSendCode resultCode = IMSendCode.fromCode(result.getCode()); // 提示类数据不记录 if(messageInfo.getType().equals(MessageType.TIP.code())){ return; @@ -39,19 +43,26 @@ public class PrivateMessageListener implements MessageListener { if(messageInfo.getType() >= MessageType.RTC_CALL.code() && messageInfo.getType()< MessageType.RTC_CANDIDATE.code()){ // 通知用户呼叫失败了 if(messageInfo.getType().equals(MessageType.RTC_CALL.code()) - && !result.getCode().equals(IMSendCode.SUCCESS)){ - PrivateMessageInfo sendMessage = new PrivateMessageInfo(); - sendMessage.setRecvId(messageInfo.getSendId()); - sendMessage.setSendId(messageInfo.getRecvId()); - sendMessage.setType(MessageType.RTC_FAILED.code()); - sendMessage.setContent(result.getCode().description()); - sendMessage.setSendTime(new Date()); - imClient.sendPrivateMessage(sendMessage.getRecvId(),sendMessage); + && !resultCode.equals(IMSendCode.SUCCESS)){ + PrivateMessageInfo msgInfo = new PrivateMessageInfo(); + msgInfo.setRecvId(messageInfo.getSendId()); + msgInfo.setSendId(messageInfo.getRecvId()); + msgInfo.setType(MessageType.RTC_FAILED.code()); + msgInfo.setContent(resultCode.description()); + msgInfo.setSendTime(new Date()); + + IMPrivateMessage sendMessage = new IMPrivateMessage(); + sendMessage.setSendId(messageInfo.getSendId()); + sendMessage.setRecvId(messageInfo.getRecvId()); + sendMessage.setSendTerminal(result.getRecvTerminal()); + sendMessage.setSendToSelf(false); + sendMessage.setDatas(Arrays.asList(messageInfo)); + imClient.sendPrivateMessage(sendMessage); } return; } - // 更新消息状态 - if(result.getCode().equals(IMSendCode.SUCCESS)){ + // 更新消息状态,这里只处理成功消息,失败的消息继续保持未读状态 + if(resultCode.equals(IMSendCode.SUCCESS)){ UpdateWrapper updateWrapper = new UpdateWrapper<>(); updateWrapper.lambda().eq(PrivateMessage::getId,messageInfo.getId()) .eq(PrivateMessage::getStatus, MessageStatus.UNREAD.code()) diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/FriendServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/FriendServiceImpl.java index f074d76..baf7fa5 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/impl/FriendServiceImpl.java +++ b/im-platform/src/main/java/com/bx/implatform/service/impl/FriendServiceImpl.java @@ -57,7 +57,7 @@ public class FriendServiceImpl extends ServiceImpl impleme @Transactional @Override public void addFriend(Long friendId) { - long userId = SessionContext.getSession().getId(); + long userId = SessionContext.getSession().getUserId(); if(userId == friendId){ throw new GlobalException(ResultCode.PROGRAM_ERROR,"不允许添加自己为好友"); } @@ -78,7 +78,7 @@ public class FriendServiceImpl extends ServiceImpl impleme @Transactional @Override public void delFriend(Long friendId) { - long userId = SessionContext.getSession().getId(); + long userId = SessionContext.getSession().getUserId(); // 互相解除好友关系 FriendServiceImpl proxy = (FriendServiceImpl)AopContext.currentProxy(); proxy.unbindFriend(userId,friendId); @@ -113,7 +113,7 @@ public class FriendServiceImpl extends ServiceImpl impleme */ @Override public void update(FriendVO vo) { - long userId = SessionContext.getSession().getId(); + long userId = SessionContext.getSession().getUserId(); QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.lambda() .eq(Friend::getUserId,userId) @@ -186,7 +186,7 @@ public class FriendServiceImpl extends ServiceImpl impleme UserSession session = SessionContext.getSession(); QueryWrapper wrapper = new QueryWrapper<>(); wrapper.lambda() - .eq(Friend::getUserId,session.getId()) + .eq(Friend::getUserId,session.getUserId()) .eq(Friend::getFriendId,friendId); Friend friend = this.getOne(wrapper); if(friend == null){ diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java index c358e61..90a5d1e 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java +++ b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java @@ -56,7 +56,7 @@ public class GroupMessageServiceImpl extends ServiceImpl recvIds = new LinkedList(); recvIds.add(userId); List members = groupMemberService.findByUserId(userId); @@ -179,7 +179,7 @@ public class GroupMessageServiceImpl extends ServiceImpl findHistoryMessage(Long groupId, Long page, Long size) { page = page > 0 ? page:1; size = size > 0 ? size:10; - Long userId = SessionContext.getSession().getId(); + Long userId = SessionContext.getSession().getUserId(); Long stIdx = (page-1)* size; // 群聊成员信息 GroupMember member = groupMemberService.findByGroupAndUserId(groupId,userId); diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupServiceImpl.java index 00c6b34..1391e4d 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupServiceImpl.java +++ b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupServiceImpl.java @@ -62,7 +62,7 @@ public class GroupServiceImpl extends ServiceImpl implements @Override public GroupVO createGroup(String groupName) { UserSession session = SessionContext.getSession(); - User user = userService.getById(session.getId()); + User user = userService.getById(session.getUserId()); // 保存群组数据 Group group = new Group(); group.setName(groupName); @@ -100,12 +100,12 @@ public class GroupServiceImpl extends ServiceImpl implements // 校验是不是群主,只有群主能改信息 Group group = this.getById(vo.getId()); // 群主有权修改群基本信息 - if(group.getOwnerId() == session.getId()){ + if(group.getOwnerId() == session.getUserId()){ group = BeanUtils.copyProperties(vo,Group.class); this.updateById(group); } // 更新成员信息 - GroupMember member = groupMemberService.findByGroupAndUserId(vo.getId(),session.getId()); + GroupMember member = groupMemberService.findByGroupAndUserId(vo.getId(),session.getUserId()); if(member == null){ throw new GlobalException(ResultCode.PROGRAM_ERROR,"您不是群聊的成员"); } @@ -129,7 +129,7 @@ public class GroupServiceImpl extends ServiceImpl implements public void deleteGroup(Long groupId) { UserSession session = SessionContext.getSession(); Group group = this.getById(groupId); - if(group.getOwnerId() != session.getId()){ + if(group.getOwnerId() != session.getUserId()){ throw new GlobalException(ResultCode.PROGRAM_ERROR,"只有群主才有权限解除群聊"); } // 逻辑删除群数据 @@ -149,7 +149,7 @@ public class GroupServiceImpl extends ServiceImpl implements */ @Override public void quitGroup(Long groupId) { - Long userId = SessionContext.getSession().getId(); + Long userId = SessionContext.getSession().getUserId(); Group group = this.getById(groupId); if(group.getOwnerId() == userId){ throw new GlobalException(ResultCode.PROGRAM_ERROR,"您是群主,不可退出群聊"); @@ -171,10 +171,10 @@ public class GroupServiceImpl extends ServiceImpl implements public void kickGroup(Long groupId, Long userId) { UserSession session = SessionContext.getSession(); Group group = this.getById(groupId); - if(group.getOwnerId() != session.getId()){ + if(group.getOwnerId() != session.getUserId()){ throw new GlobalException(ResultCode.PROGRAM_ERROR,"您不是群主,没有权限踢人"); } - if(userId == session.getId()){ + if(userId == session.getUserId()){ throw new GlobalException(ResultCode.PROGRAM_ERROR,"亲,不能自己踢自己哟"); } // 删除群聊成员 @@ -186,7 +186,7 @@ public class GroupServiceImpl extends ServiceImpl implements public GroupVO findById(Long groupId) { UserSession session = SessionContext.getSession(); Group group = this.getById(groupId); - GroupMember member = groupMemberService.findByGroupAndUserId(groupId,session.getId()); + GroupMember member = groupMemberService.findByGroupAndUserId(groupId,session.getUserId()); if(member == null){ throw new GlobalException(ResultCode.PROGRAM_ERROR,"您未加入群聊"); } @@ -226,7 +226,7 @@ public class GroupServiceImpl extends ServiceImpl implements public List findGroups() { UserSession session = SessionContext.getSession(); // 查询当前用户的群id列表 - List groupMembers = groupMemberService.findByUserId(session.getId()); + List groupMembers = groupMemberService.findByUserId(session.getUserId()); if(groupMembers.isEmpty()){ return Collections.EMPTY_LIST; } @@ -267,7 +267,7 @@ public class GroupServiceImpl extends ServiceImpl implements } // 找出好友信息 - List friends = friendsService.findFriendByUserId(session.getId()); + List friends = friendsService.findFriendByUserId(session.getUserId()); List friendsList = vo.getFriendIds().stream().map(id -> friends.stream().filter(f -> f.getFriendId().equals(id)).findFirst().get()).collect(Collectors.toList()); if (friendsList.size() != vo.getFriendIds().size()) { diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java index 83444e6..fa95ada 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java +++ b/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java @@ -4,7 +4,8 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.bx.imclient.IMClient; import com.bx.imcommon.contant.Constant; -import com.bx.imcommon.contant.RedisKey; +import com.bx.imcommon.enums.IMTerminalType; +import com.bx.imcommon.model.IMPrivateMessage; import com.bx.imcommon.model.PrivateMessageInfo; import com.bx.implatform.entity.PrivateMessage; import com.bx.implatform.enums.MessageStatus; @@ -15,13 +16,16 @@ import com.bx.implatform.mapper.PrivateMessageMapper; import com.bx.implatform.service.IFriendService; import com.bx.implatform.service.IPrivateMessageService; import com.bx.implatform.session.SessionContext; +import com.bx.implatform.session.UserSession; import com.bx.implatform.util.BeanUtils; import com.bx.implatform.vo.PrivateMessageVO; +import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; +import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.stream.Collectors; @@ -44,21 +48,27 @@ public class PrivateMessageServiceImpl extends ServiceImpl Constant.ALLOW_RECALL_SECOND * 1000) { @@ -88,7 +98,14 @@ public class PrivateMessageServiceImpl extends ServiceImpl findHistoryMessage(Long friendId, Long page, Long size) { page = page > 0 ? page : 1; size = size > 0 ? size : 10; - Long userId = SessionContext.getSession().getId(); + Long userId = SessionContext.getSession().getUserId(); Long stIdx = (page - 1) * size; QueryWrapper wrapper = new QueryWrapper<>(); wrapper.lambda().and(wrap -> wrap.and( @@ -134,7 +151,7 @@ public class PrivateMessageServiceImpl extends ServiceImpl sendMessage = new IMPrivateMessage(); + sendMessage.setRecvId(userId); + sendMessage.setSendToSelf(false); + sendMessage.setDatas(messageInfos); + imClient.sendPrivateMessage(sendMessage); + log.info("拉取未读私聊消息,用户id:{},数量:{}", userId, messageInfos.size()); } } } diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/UserServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/UserServiceImpl.java index 90d1051..33cfd59 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/impl/UserServiceImpl.java +++ b/im-platform/src/main/java/com/bx/implatform/service/impl/UserServiceImpl.java @@ -4,7 +4,6 @@ import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.bx.imclient.IMClient; -import com.bx.imcommon.contant.RedisKey; import com.bx.implatform.config.JwtProperties; import com.bx.implatform.entity.Friend; import com.bx.implatform.entity.GroupMember; @@ -74,6 +73,8 @@ public class UserServiceImpl extends ServiceImpl implements IU } // 生成token UserSession session = BeanUtils.copyProperties(user,UserSession.class); + session.setUserId(user.getId()); + session.setTerminal(dto.getTerminal()); String strJson = JSON.toJSONString(session); String accessToken = JwtUtil.sign(user.getId(),strJson,jwtProperties.getAccessTokenExpireIn(),jwtProperties.getAccessTokenSecret()); String refreshToken = JwtUtil.sign(user.getId(),strJson,jwtProperties.getAccessTokenExpireIn(),jwtProperties.getAccessTokenSecret()); @@ -150,7 +151,7 @@ public class UserServiceImpl extends ServiceImpl implements IU @Override public void update(UserVO vo) { UserSession session = SessionContext.getSession(); - if(!session.getId().equals(vo.getId()) ){ + if(!session.getUserId().equals(vo.getId()) ){ throw new GlobalException(ResultCode.PROGRAM_ERROR,"不允许修改其他用户的信息!"); } User user = this.getById(vo.getId()); @@ -160,7 +161,7 @@ public class UserServiceImpl extends ServiceImpl implements IU // 更新好友昵称和头像 if(!user.getNickName().equals(vo.getNickName()) || !user.getHeadImageThumb().equals(vo.getHeadImageThumb())){ QueryWrapper queryWrapper = new QueryWrapper<>(); - queryWrapper.lambda().eq(Friend::getFriendId,session.getId()); + queryWrapper.lambda().eq(Friend::getFriendId,session.getUserId()); List friends = friendService.list(queryWrapper); for(Friend friend: friends){ friend.setFriendNickName(vo.getNickName()); @@ -170,7 +171,7 @@ public class UserServiceImpl extends ServiceImpl implements IU } // 更新群聊中的头像 if(!user.getHeadImageThumb().equals(vo.getHeadImageThumb())){ - List members = groupMemberService.findByUserId(session.getId()); + List members = groupMemberService.findByUserId(session.getUserId()); for(GroupMember member:members){ member.setHeadImage(vo.getHeadImageThumb()); } diff --git a/im-platform/src/main/java/com/bx/implatform/service/thirdparty/FileService.java b/im-platform/src/main/java/com/bx/implatform/service/thirdparty/FileService.java index ada35c6..7fdb640 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/thirdparty/FileService.java +++ b/im-platform/src/main/java/com/bx/implatform/service/thirdparty/FileService.java @@ -54,7 +54,7 @@ public class FileService { public String uploadFile(MultipartFile file){ - Long userId = SessionContext.getSession().getId(); + Long userId = SessionContext.getSession().getUserId(); // 大小校验 if(file.getSize() > Constant.MAX_FILE_SIZE){ throw new GlobalException(ResultCode.PROGRAM_ERROR,"文件大小不能超过10M"); @@ -71,7 +71,7 @@ public class FileService { public UploadImageVO uploadImage(MultipartFile file){ try { - Long userId = SessionContext.getSession().getId(); + Long userId = SessionContext.getSession().getUserId(); // 大小校验 if(file.getSize() > Constant.MAX_IMAGE_SIZE){ throw new GlobalException(ResultCode.PROGRAM_ERROR,"图片大小不能超过5M"); diff --git a/im-platform/src/main/java/com/bx/implatform/session/UserSession.java b/im-platform/src/main/java/com/bx/implatform/session/UserSession.java index 0e7c81f..bf408da 100644 --- a/im-platform/src/main/java/com/bx/implatform/session/UserSession.java +++ b/im-platform/src/main/java/com/bx/implatform/session/UserSession.java @@ -1,11 +1,20 @@ package com.bx.implatform.session; +import com.bx.imcommon.model.IMSessionInfo; import lombok.Data; + + @Data -public class UserSession { +public class UserSession extends IMSessionInfo { - private Long id; + /* + * 用户名称 + */ private String userName; + + /* + * 用户昵称 + */ private String nickName; } diff --git a/im-server/src/main/java/com/bx/imserver/constant/ChannelAttrKey.java b/im-server/src/main/java/com/bx/imserver/constant/ChannelAttrKey.java new file mode 100644 index 0000000..0b9adcd --- /dev/null +++ b/im-server/src/main/java/com/bx/imserver/constant/ChannelAttrKey.java @@ -0,0 +1,12 @@ +package com.bx.imserver.constant; + +public class ChannelAttrKey { + + // 用户ID + public static final String USER_ID = "USER_ID"; + // 终端类型 + public static final String TERMINAL_TYPE = "TERMINAL_TYPE"; + // 心跳次数 + public static final String HEARTBEAt_TIMES = "HEARTBEAt_TIMES"; + +} diff --git a/im-server/src/main/java/com/bx/imserver/netty/IMChannelHandler.java b/im-server/src/main/java/com/bx/imserver/netty/IMChannelHandler.java index 9a13f62..8c94e1a 100644 --- a/im-server/src/main/java/com/bx/imserver/netty/IMChannelHandler.java +++ b/im-server/src/main/java/com/bx/imserver/netty/IMChannelHandler.java @@ -3,6 +3,7 @@ package com.bx.imserver.netty; import com.bx.imcommon.contant.RedisKey; import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.model.IMSendInfo; +import com.bx.imserver.constant.ChannelAttrKey; import com.bx.imserver.netty.processor.MessageProcessor; import com.bx.imserver.netty.processor.ProcessorFactory; import com.bx.imserver.util.SpringContextHolder; @@ -64,18 +65,20 @@ public class IMChannelHandler extends SimpleChannelInboundHandler { @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - AttributeKey attr = AttributeKey.valueOf("USER_ID"); - Long userId = ctx.channel().attr(attr).get(); - ChannelHandlerContext context = UserChannelCtxMap.getChannelCtx(userId); + AttributeKey userIdAttr = AttributeKey.valueOf(ChannelAttrKey.USER_ID); + Long userId = ctx.channel().attr(userIdAttr).get(); + AttributeKey terminalAttr = AttributeKey.valueOf(ChannelAttrKey.TERMINAL_TYPE); + Integer terminal = ctx.channel().attr(terminalAttr).get(); + ChannelHandlerContext context = UserChannelCtxMap.getChannelCtx(userId,terminal); // 判断一下,避免异地登录导致的误删 if(context != null && ctx.channel().id().equals(context.channel().id())){ // 移除channel - UserChannelCtxMap.removeChannelCtx(userId); + UserChannelCtxMap.removeChannelCtx(userId,terminal); // 用户下线 RedisTemplate redisTemplate = SpringContextHolder.getBean("redisTemplate"); - String key = RedisKey.IM_USER_SERVER_ID + userId; + String key = String.join(":",RedisKey.IM_USER_SERVER_ID,userId.toString(), terminal.toString()); redisTemplate.delete(key); - log.info("断开连接,userId:{}",userId); + log.info("断开连接,userId:{},终端类型:{}",userId,terminal); } } @@ -87,7 +90,9 @@ public class IMChannelHandler extends SimpleChannelInboundHandler { // 在规定时间内没有收到客户端的上行数据, 主动断开连接 AttributeKey attr = AttributeKey.valueOf("USER_ID"); Long userId = ctx.channel().attr(attr).get(); - log.info("心跳超时,即将断开连接,用户id:{} ",userId); + AttributeKey terminalAttr = AttributeKey.valueOf(ChannelAttrKey.TERMINAL_TYPE); + Integer ternimal = ctx.channel().attr(terminalAttr).get(); + log.info("心跳超时,即将断开连接,用户id:{},终端类型:{} ",userId,ternimal); ctx.channel().close(); } } else { diff --git a/im-server/src/main/java/com/bx/imserver/netty/UserChannelCtxMap.java b/im-server/src/main/java/com/bx/imserver/netty/UserChannelCtxMap.java index 8f5de0a..7bd3fcc 100644 --- a/im-server/src/main/java/com/bx/imserver/netty/UserChannelCtxMap.java +++ b/im-server/src/main/java/com/bx/imserver/netty/UserChannelCtxMap.java @@ -2,6 +2,7 @@ package com.bx.imserver.netty; import io.netty.channel.ChannelHandlerContext; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -9,21 +10,34 @@ import java.util.concurrent.ConcurrentHashMap; public class UserChannelCtxMap { /* - * 维护userId和ctx的关联关系,格式:Map + * 维护userId和ctx的关联关系,格式:Map> */ - private static Map channelMap = new ConcurrentHashMap(); + private static Map> channelMap = new ConcurrentHashMap(); - public static void addChannelCtx(Long userId,ChannelHandlerContext ctx){ - channelMap.put(userId,ctx); + public static void addChannelCtx(Long userId,Integer channel,ChannelHandlerContext ctx){ + channelMap.computeIfAbsent(userId,key -> new ConcurrentHashMap()).put(channel,ctx); } - public static void removeChannelCtx(Long userId){ - if(userId != null){ - channelMap.remove(userId); + public static void removeChannelCtx(Long userId,Integer terminal){ + if(userId != null && terminal != null && channelMap.containsKey(userId)){ + Map userChannelMap = channelMap.get(userId); + if(userChannelMap.containsKey(terminal)){ + userChannelMap.remove(terminal); + } } } - public static ChannelHandlerContext getChannelCtx(Long userId){ + public static ChannelHandlerContext getChannelCtx(Long userId,Integer terminal){ + if(userId != null && terminal != null && channelMap.containsKey(userId)){ + Map userChannelMap = channelMap.get(userId); + if(userChannelMap.containsKey(terminal)){ + return userChannelMap.get(terminal); + } + } + return null; + } + + public static Map getChannelCtx(Long userId){ if(userId == null){ return null; } diff --git a/im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java b/im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java index 3b589b9..3d7eecb 100644 --- a/im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java +++ b/im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java @@ -19,32 +19,32 @@ import java.util.List; @Slf4j @Component -public class GroupMessageProcessor extends MessageProcessor> { +public class GroupMessageProcessor extends MessageProcessor { @Autowired private RedisTemplate redisTemplate; @Async @Override - public void process(IMRecvInfo recvInfo) { - GroupMessageInfo messageInfo = recvInfo.getData(); + public void process(IMRecvInfo recvInfo) { + Object data = recvInfo.getData(); List recvIds = recvInfo.getRecvIds(); - log.info("接收到群消息,发送者:{},群id:{},接收id:{},内容:{}",messageInfo.getSendId(),messageInfo.getGroupId(),recvIds,messageInfo.getContent()); + log.info("接收到群消息,发送者:{},接收id:{},内容:{}",recvInfo.getSendId(),recvIds,data); for(Long recvId:recvIds){ try { - ChannelHandlerContext channelCtx = UserChannelCtxMap.getChannelCtx(recvId); + ChannelHandlerContext channelCtx = UserChannelCtxMap.getChannelCtx(recvId,recvInfo.getRecvTerminal()); if(channelCtx != null){ // 推送消息到用户 IMSendInfo sendInfo = new IMSendInfo(); sendInfo.setCmd(IMCmdType.GROUP_MESSAGE.code()); - sendInfo.setData(messageInfo); + sendInfo.setData(data); channelCtx.channel().writeAndFlush(sendInfo); // 消息发送成功确认 String key = RedisKey.IM_RESULT_GROUP_QUEUE; SendResult sendResult = new SendResult(); sendResult.setRecvId(recvId); - sendResult.setCode(IMSendCode.SUCCESS); - sendResult.setMessageInfo(messageInfo); + sendResult.setCode(IMSendCode.SUCCESS.code()); + sendResult.setData(data); redisTemplate.opsForList().rightPush(key,sendResult); }else { @@ -52,20 +52,20 @@ public class GroupMessageProcessor extends MessageProcessor { ctx.channel().writeAndFlush(sendInfo); // 设置属性 - AttributeKey attr = AttributeKey.valueOf("HEARTBEAt_TIMES"); - Long heartbeatTimes = ctx.channel().attr(attr).get(); - ctx.channel().attr(attr).set(++heartbeatTimes); + AttributeKey heartBeatAttr = AttributeKey.valueOf(ChannelAttrKey.HEARTBEAt_TIMES); + Long heartbeatTimes = ctx.channel().attr(heartBeatAttr).get(); + ctx.channel().attr(heartBeatAttr).set(++heartbeatTimes); if(heartbeatTimes%10 == 0){ // 每心跳10次,用户在线状态续一次命 - attr = AttributeKey.valueOf("USER_ID"); - Long userId = ctx.channel().attr(attr).get(); - String key = RedisKey.IM_USER_SERVER_ID+userId; + AttributeKey userIdAttr = AttributeKey.valueOf(ChannelAttrKey.USER_ID); + Long userId = ctx.channel().attr(userIdAttr).get(); + AttributeKey terminalAttr = AttributeKey.valueOf(ChannelAttrKey.TERMINAL_TYPE); + Integer ternimal = ctx.channel().attr(terminalAttr).get(); + String key = String.join(":",RedisKey.IM_USER_SERVER_ID,userId.toString(),ternimal.toString()); redisTemplate.expire(key, Constant.ONLINE_TIMEOUT_SECOND, TimeUnit.SECONDS); } } diff --git a/im-server/src/main/java/com/bx/imserver/netty/processor/LoginProcessor.java b/im-server/src/main/java/com/bx/imserver/netty/processor/LoginProcessor.java index 73b3a6e..017e176 100644 --- a/im-server/src/main/java/com/bx/imserver/netty/processor/LoginProcessor.java +++ b/im-server/src/main/java/com/bx/imserver/netty/processor/LoginProcessor.java @@ -1,12 +1,16 @@ package com.bx.imserver.netty.processor; import cn.hutool.core.bean.BeanUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import com.bx.imcommon.contant.Constant; import com.bx.imcommon.contant.RedisKey; import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.model.IMSendInfo; +import com.bx.imcommon.model.IMSessionInfo; import com.bx.imcommon.model.LoginInfo; import com.bx.imcommon.util.JwtUtil; +import com.bx.imserver.constant.ChannelAttrKey; import com.bx.imserver.netty.IMServerGroup; import com.bx.imserver.netty.UserChannelCtxMap; import com.bx.imserver.netty.ws.WebSocketServer; @@ -19,6 +23,7 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; @Slf4j @@ -41,9 +46,12 @@ public class LoginProcessor extends MessageProcessor { ctx.channel().close(); log.warn("用户token校验不通过,强制下线,token:{}",loginInfo.getAccessToken()); } - Long userId = JwtUtil.getUserId(loginInfo.getAccessToken()); + String strInfo = JwtUtil.getInfo(loginInfo.getAccessToken()); + IMSessionInfo sessionInfo = JSON.parseObject(strInfo,IMSessionInfo.class); + Long userId = sessionInfo.getUserId(); + Integer terminal = sessionInfo.getTerminal(); log.info("用户登录,userId:{}",userId); - ChannelHandlerContext context = UserChannelCtxMap.getChannelCtx(userId); + ChannelHandlerContext context = UserChannelCtxMap.getChannelCtx(userId,terminal); if(context != null && !ctx.channel().id().equals(context.channel().id())){ // 不允许多地登录,强制下线 IMSendInfo sendInfo = new IMSendInfo(); @@ -53,15 +61,18 @@ public class LoginProcessor extends MessageProcessor { log.info("异地登录,强制下线,userId:{}",userId); } // 绑定用户和channel - UserChannelCtxMap.addChannelCtx(userId,ctx); + UserChannelCtxMap.addChannelCtx(userId,terminal,ctx); // 设置用户id属性 - AttributeKey attr = AttributeKey.valueOf("USER_ID"); - ctx.channel().attr(attr).set(userId); - // 心跳次数 - attr = AttributeKey.valueOf("HEARTBEAt_TIMES"); - ctx.channel().attr(attr).set(0L); + AttributeKey userIdAttr = AttributeKey.valueOf(ChannelAttrKey.USER_ID); + ctx.channel().attr(userIdAttr).set(userId); + // 设置用户终端类型 + AttributeKey terminalAttr = AttributeKey.valueOf(ChannelAttrKey.TERMINAL_TYPE); + ctx.channel().attr(terminalAttr).set(terminal); + // 初始化心跳次数 + AttributeKey heartBeatAttr = AttributeKey.valueOf("HEARTBEAt_TIMES"); + ctx.channel().attr(heartBeatAttr).set(0L); // 在redis上记录每个user的channelId,15秒没有心跳,则自动过期 - String key = RedisKey.IM_USER_SERVER_ID+userId; + String key = String.join(":",RedisKey.IM_USER_SERVER_ID,userId.toString(), terminal.toString()); redisTemplate.opsForValue().set(key, IMServerGroup.serverId, Constant.ONLINE_TIMEOUT_SECOND, TimeUnit.SECONDS); // 响应ws IMSendInfo sendInfo = new IMSendInfo(); diff --git a/im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java b/im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java index 659412c..4f55abd 100644 --- a/im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java +++ b/im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java @@ -14,54 +14,50 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; +import java.util.Map; + @Slf4j @Component -public class PrivateMessageProcessor extends MessageProcessor> { +public class PrivateMessageProcessor extends MessageProcessor { @Autowired private RedisTemplate redisTemplate; @Override - public void process(IMRecvInfo recvInfo) { - PrivateMessageInfo messageInfo = recvInfo.getData(); + public void process(IMRecvInfo recvInfo) { Long recvId = recvInfo.getRecvIds().get(0); - log.info("接收到消息,发送者:{},接收者:{},内容:{}",messageInfo.getSendId(),recvId,messageInfo.getContent()); + log.info("接收到消息,发送者:{},接收者:{},内容:{}",recvInfo.getSendId(),recvId,recvInfo.getData()); try{ - ChannelHandlerContext channelCtx = UserChannelCtxMap.getChannelCtx(recvId); + ChannelHandlerContext channelCtx = UserChannelCtxMap.getChannelCtx(recvId,recvInfo.getRecvTerminal()); if(channelCtx != null ){ // 推送消息到用户 IMSendInfo sendInfo = new IMSendInfo(); sendInfo.setCmd(IMCmdType.PRIVATE_MESSAGE.code()); - sendInfo.setData(messageInfo); + sendInfo.setData(recvInfo.getData()); channelCtx.channel().writeAndFlush(sendInfo); // 消息发送成功确认 - String key = RedisKey.IM_RESULT_PRIVATE_QUEUE; - SendResult sendResult = new SendResult(); - sendResult.setRecvId(recvId); - sendResult.setCode(IMSendCode.SUCCESS); - sendResult.setMessageInfo(messageInfo); - redisTemplate.opsForList().rightPush(key,sendResult); + sendResult(recvInfo,IMSendCode.SUCCESS); }else{ // 消息推送失败确认 - String key = RedisKey.IM_RESULT_PRIVATE_QUEUE; - SendResult sendResult = new SendResult(); - sendResult.setRecvId(recvId); - sendResult.setCode(IMSendCode.NOT_FIND_CHANNEL); - sendResult.setMessageInfo(messageInfo); - redisTemplate.opsForList().rightPush(key,sendResult); - log.error("未找到WS连接,发送者:{},接收者:{},内容:{}",messageInfo.getSendId(),recvId,messageInfo.getContent()); + sendResult(recvInfo,IMSendCode.NOT_FIND_CHANNEL); + log.error("未找到WS连接,发送者:{},接收者:{},内容:{}",recvInfo.getSendId(),recvId,recvInfo.getData()); } }catch (Exception e){ // 消息推送失败确认 - String key = RedisKey.IM_RESULT_PRIVATE_QUEUE; - SendResult sendResult = new SendResult(); - sendResult.setRecvId(recvId); - sendResult.setCode(IMSendCode.UNKONW_ERROR); - sendResult.setMessageInfo(messageInfo); - redisTemplate.opsForList().rightPush(key,sendResult); - log.error("发送异常,发送者:{},接收者:{},内容:{}",messageInfo.getSendId(),recvId,messageInfo.getContent(),e); + sendResult(recvInfo,IMSendCode.UNKONW_ERROR); + log.error("发送异常,发送者:{},接收者:{},内容:{}",recvInfo.getSendId(),recvId,recvInfo.getData(),e); } } + private void sendResult(IMRecvInfo recvInfo,IMSendCode sendCode){ + if(recvInfo.getNeedSendResult()) { + String key = RedisKey.IM_RESULT_PRIVATE_QUEUE; + SendResult result = new SendResult(); + result.setRecvId(recvInfo.getRecvIds().get(0)); + result.setCode(sendCode.code()); + result.setData(recvInfo.getData()); + redisTemplate.opsForList().rightPush(key, result); + } + } } diff --git a/im-server/src/main/java/com/bx/imserver/task/PullUnreadGroupMessageTask.java b/im-server/src/main/java/com/bx/imserver/task/PullUnreadGroupMessageTask.java index 3681c47..88427a4 100644 --- a/im-server/src/main/java/com/bx/imserver/task/PullUnreadGroupMessageTask.java +++ b/im-server/src/main/java/com/bx/imserver/task/PullUnreadGroupMessageTask.java @@ -25,8 +25,6 @@ public class PullUnreadGroupMessageTask extends AbstractPullMessageTask { @Autowired private RedisTemplate redisTemplate; - - @Override public void pullMessage() { // 从redis拉取未读消息 @@ -34,7 +32,7 @@ public class PullUnreadGroupMessageTask extends AbstractPullMessageTask { List messageInfos = redisTemplate.opsForList().range(key,0,-1); for(Object o: messageInfos){ redisTemplate.opsForList().leftPop(key); - IMRecvInfo recvInfo = (IMRecvInfo)o; + IMRecvInfo recvInfo = (IMRecvInfo)o; MessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.GROUP_MESSAGE); processor.process(recvInfo); } diff --git a/im-server/src/main/java/com/bx/imserver/task/PullUnreadPrivateMessageTask.java b/im-server/src/main/java/com/bx/imserver/task/PullUnreadPrivateMessageTask.java index 50bb8b2..dd61d52 100644 --- a/im-server/src/main/java/com/bx/imserver/task/PullUnreadPrivateMessageTask.java +++ b/im-server/src/main/java/com/bx/imserver/task/PullUnreadPrivateMessageTask.java @@ -34,7 +34,7 @@ public class PullUnreadPrivateMessageTask extends AbstractPullMessageTask { List messageInfos = redisTemplate.opsForList().range(key,0,-1); for(Object o: messageInfos){ redisTemplate.opsForList().leftPop(key); - IMRecvInfo recvInfo = (IMRecvInfo)o; + IMRecvInfo recvInfo = (IMRecvInfo)o; MessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.PRIVATE_MESSAGE); processor.process(recvInfo); diff --git a/im-ui/src/view/Home.vue b/im-ui/src/view/Home.vue index 3a1b7c8..5876fa7 100644 --- a/im-ui/src/view/Home.vue +++ b/im-ui/src/view/Home.vue @@ -116,7 +116,9 @@ }, handlePrivateMessage(msg) { // 好友列表存在好友信息,直接插入私聊消息 - let friend = this.$store.state.friendStore.friends.find((f) => f.id == msg.sendId); + msg.selfSend = msg.sendId==this.$store.state.userStore.userInfo.id; + let friendId = msg.selfSend?msg.recvId:msg.sendId; + let friend = this.$store.state.friendStore.friends.find((f) => f.id == friendId); if (friend) { this.insertPrivateMessage(friend, msg); return; @@ -135,7 +137,6 @@ if (msg.type >= this.$enums.MESSAGE_TYPE.RTC_CALL && msg.type <= this.$enums.MESSAGE_TYPE.RTC_CANDIDATE) { // 呼叫 - console.log(msg) if (msg.type == this.$enums.MESSAGE_TYPE.RTC_CALL || msg.type == this.$enums.MESSAGE_TYPE.RTC_CANCEL) { this.$store.commit("showVideoAcceptorBox", friend); @@ -157,7 +158,8 @@ // 插入消息 this.$store.commit("insertMessage", msg); // 播放提示音 - this.playAudioTip(); + !msg.selfSend && this.playAudioTip(); + }, handleGroupMessage(msg) { // 群聊缓存存在,直接插入群聊消息 @@ -325,4 +327,4 @@ text-align: center; } - + diff --git a/im-ui/src/view/Login.vue b/im-ui/src/view/Login.vue index 620bb49..26df584 100644 --- a/im-ui/src/view/Login.vue +++ b/im-ui/src/view/Login.vue @@ -2,6 +2,10 @@