Browse Source

支持多端同时在线后端改造中 (未完成)

master
xie.bx 3 years ago
parent
commit
fb18a8801e
  1. 8
      im-client/src/main/java/com/bx/imclient/IMClient.java
  2. 157
      im-client/src/main/java/com/bx/imclient/sender/IMSender.java
  3. 2
      im-commom/src/main/java/com/bx/imcommon/contant/RedisKey.java
  4. 1
      im-commom/src/main/java/com/bx/imcommon/enums/IMCmdType.java
  5. 12
      im-commom/src/main/java/com/bx/imcommon/enums/IMSendCode.java
  6. 36
      im-commom/src/main/java/com/bx/imcommon/enums/IMTerminalType.java
  7. 39
      im-commom/src/main/java/com/bx/imcommon/model/IMPrivateMessage.java
  8. 21
      im-commom/src/main/java/com/bx/imcommon/model/IMRecvInfo.java
  9. 18
      im-commom/src/main/java/com/bx/imcommon/model/IMSessionInfo.java
  10. 14
      im-commom/src/main/java/com/bx/imcommon/model/SendResult.java
  11. 4
      im-platform/src/main/java/com/bx/implatform/IMPlatformApp.java
  12. 2
      im-platform/src/main/java/com/bx/implatform/controller/FriendController.java
  13. 2
      im-platform/src/main/java/com/bx/implatform/controller/UserController.java
  14. 82
      im-platform/src/main/java/com/bx/implatform/controller/WebrtcController.java
  15. 13
      im-platform/src/main/java/com/bx/implatform/dto/LoginDTO.java
  16. 9
      im-platform/src/main/java/com/bx/implatform/enums/MessageType.java
  17. 2
      im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java
  18. 33
      im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java
  19. 8
      im-platform/src/main/java/com/bx/implatform/service/impl/FriendServiceImpl.java
  20. 8
      im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java
  21. 20
      im-platform/src/main/java/com/bx/implatform/service/impl/GroupServiceImpl.java
  22. 48
      im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java
  23. 9
      im-platform/src/main/java/com/bx/implatform/service/impl/UserServiceImpl.java
  24. 4
      im-platform/src/main/java/com/bx/implatform/service/thirdparty/FileService.java
  25. 13
      im-platform/src/main/java/com/bx/implatform/session/UserSession.java
  26. 12
      im-server/src/main/java/com/bx/imserver/constant/ChannelAttrKey.java
  27. 19
      im-server/src/main/java/com/bx/imserver/netty/IMChannelHandler.java
  28. 30
      im-server/src/main/java/com/bx/imserver/netty/UserChannelCtxMap.java
  29. 28
      im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java
  30. 15
      im-server/src/main/java/com/bx/imserver/netty/processor/HeartbeatProcessor.java
  31. 29
      im-server/src/main/java/com/bx/imserver/netty/processor/LoginProcessor.java
  32. 48
      im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java
  33. 4
      im-server/src/main/java/com/bx/imserver/task/PullUnreadGroupMessageTask.java
  34. 2
      im-server/src/main/java/com/bx/imserver/task/PullUnreadPrivateMessageTask.java
  35. 10
      im-ui/src/view/Home.vue
  36. 7
      im-ui/src/view/Login.vue

8
im-client/src/main/java/com/bx/imclient/IMClient.java

@ -3,6 +3,7 @@ package com.bx.imclient;
import com.bx.imclient.listener.MessageListenerMulticaster;
import com.bx.imclient.sender.IMSender;
import com.bx.imcommon.model.GroupMessageInfo;
import com.bx.imcommon.model.IMPrivateMessage;
import com.bx.imcommon.model.PrivateMessageInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
@ -27,11 +28,10 @@ public class IMClient {
/**
* 发送私聊消息发送结果通过MessageListener接收
*
* @param recvId 接收用户id
* @param messageInfo 消息体将转成json发送到客户端
* @param message 私有消息
*/
public void sendPrivateMessage(Long recvId, PrivateMessageInfo... messageInfo){
imSender.sendPrivateMessage(recvId,messageInfo);
public void sendPrivateMessage(IMPrivateMessage message){
imSender.sendPrivateMessage(message);
}
/**

157
im-client/src/main/java/com/bx/imclient/sender/IMSender.java

@ -5,10 +5,8 @@ import com.bx.imcommon.contant.RedisKey;
import com.bx.imcommon.enums.IMCmdType;
import com.bx.imcommon.enums.IMListenerType;
import com.bx.imcommon.enums.IMSendCode;
import com.bx.imcommon.model.GroupMessageInfo;
import com.bx.imcommon.model.IMRecvInfo;
import com.bx.imcommon.model.PrivateMessageInfo;
import com.bx.imcommon.model.SendResult;
import com.bx.imcommon.enums.IMTerminalType;
import com.bx.imcommon.model.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisTemplate;
@ -30,86 +28,117 @@ public class IMSender {
@Autowired
private MessageListenerMulticaster listenerMulticaster;
public void sendPrivateMessage(Long recvId, PrivateMessageInfo... messageInfos){
// 获取对方连接的channelId
String key = RedisKey.IM_USER_SERVER_ID + recvId;
Integer serverId = (Integer) redisTemplate.opsForValue().get(key);
// 如果对方在线,将数据存储至redis,等待拉取推送
if (serverId != null) {
String sendKey = RedisKey.IM_UNREAD_PRIVATE_QUEUE + serverId;
IMRecvInfo[] recvInfos = new IMRecvInfo[messageInfos.length];
for (int i=0;i<messageInfos.length;i++){
IMRecvInfo<PrivateMessageInfo> recvInfo = new IMRecvInfo<>();
recvInfo.setCmd(IMCmdType.PRIVATE_MESSAGE.code());
List recvIds = new LinkedList();
recvIds.add(recvId);
recvInfo.setRecvIds(recvIds);
recvInfo.setData(messageInfos[i]);
recvInfos[i] = recvInfo;
public void sendPrivateMessage(IMPrivateMessage message) {
for (IMTerminalType terminal : IMTerminalType.values()) {
// 获取对方连接的channelId
String key = String.join(":",RedisKey.IM_USER_SERVER_ID, message.getRecvId().toString(), terminal.code().toString());
Integer serverId = (Integer) redisTemplate.opsForValue().get(key);
// 如果对方在线,将数据存储至redis,等待拉取推送
if (serverId != null) {
IMRecvInfo[] recvInfos = new IMRecvInfo[message.getDatas().size()];
String sendKey = RedisKey.IM_UNREAD_PRIVATE_QUEUE + serverId;
for (int i = 0; i < message.getDatas().size(); i++) {
IMRecvInfo recvInfo = new IMRecvInfo();
recvInfo.setCmd(IMCmdType.PRIVATE_MESSAGE.code());
recvInfo.setRecvTerminal(terminal.code());
recvInfo.setNeedSendResult(true);
List recvIds = new LinkedList();
recvIds.add(message.getRecvId());
recvInfo.setRecvIds(recvIds);
recvInfo.setData(message.getDatas().get(i));
recvInfos[i]=recvInfo;
}
redisTemplate.opsForList().rightPushAll(sendKey, recvInfos);
} else {
// 回复消息状态
for (int i = 0; i < message.getDatas().size(); i++) {
SendResult result = new SendResult();
result.setRecvId(message.getRecvId());
result.setRecvTerminal(terminal.code());
result.setCode(IMSendCode.NOT_ONLINE.code());
result.setData(message.getDatas().get(i));
listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, result);
}
}
redisTemplate.opsForList().rightPushAll(sendKey, recvInfos);
}else{
// 回复消息状态
for(PrivateMessageInfo messageInfo : messageInfos ) {
SendResult result = new SendResult();
result.setMessageInfo(messageInfo);
result.setRecvId(recvId);
result.setCode(IMSendCode.NOT_ONLINE);
listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, result);
// 推送给自己的其他终端
if (message.getSendToSelf() && !message.getSendTerminal().equals(terminal.code())) {
// 获取终端连接的channelId
key = String.join(":",RedisKey.IM_USER_SERVER_ID, message.getSendId().toString(), terminal.code().toString());
serverId = (Integer) redisTemplate.opsForValue().get(key);
// 如果终端在线,将数据存储至redis,等待拉取推送
if (serverId != null) {
String sendKey = RedisKey.IM_UNREAD_PRIVATE_QUEUE + serverId;
IMRecvInfo[] recvInfos = new IMRecvInfo[message.getDatas().size()];
for (int i = 0; i < message.getDatas().size(); i++) {
IMRecvInfo recvInfo = new IMRecvInfo();
recvInfo.setCmd(IMCmdType.PRIVATE_MESSAGE.code());
recvInfo.setRecvTerminal(terminal.code());
// 自己的消息不需要回推消息结果
recvInfo.setNeedSendResult(false);
List recvIds = new LinkedList();
recvIds.add(message.getSendId());
recvInfo.setRecvIds(recvIds);
recvInfo.setData(message.getDatas().get(i));
recvInfos[i]=recvInfo;
}
redisTemplate.opsForList().rightPushAll(sendKey, recvInfos);
}
}
}
}
public void sendGroupMessage(List<Long> recvIds, GroupMessageInfo... messageInfos){
public void sendGroupMessage(List<Long> recvIds, GroupMessageInfo... messageInfos) {
// 根据群聊每个成员所连的IM-server,进行分组
List<Long> offLineIds = Collections.synchronizedList(new LinkedList<Long>());
Map<Integer, List<Long>> serverMap = new ConcurrentHashMap<>();
recvIds.parallelStream().forEach(id->{
recvIds.parallelStream().forEach(id -> {
String key = RedisKey.IM_USER_SERVER_ID + id;
Integer serverId = (Integer)redisTemplate.opsForValue().get(key);
if(serverId != null){
Integer serverId = (Integer) redisTemplate.opsForValue().get(key);
if (serverId != null) {
// 此处需要加锁,否则list可以会被覆盖
synchronized(serverMap){
if(serverMap.containsKey(serverId)){
synchronized (serverMap) {
if (serverMap.containsKey(serverId)) {
serverMap.get(serverId).add(id);
}else {
} else {
List<Long> list = Collections.synchronizedList(new LinkedList<Long>());
list.add(id);
serverMap.put(serverId,list);
serverMap.put(serverId, list);
}
}
}else{
} else {
offLineIds.add(id);
}
});
// 逐个server发送
for (Map.Entry<Integer,List<Long>> entry : serverMap.entrySet()) {
IMRecvInfo[] recvInfos = new IMRecvInfo[messageInfos.length];
for (int i=0;i<messageInfos.length;i++){
IMRecvInfo<GroupMessageInfo> recvInfo = new IMRecvInfo<>();
recvInfo.setCmd(IMCmdType.GROUP_MESSAGE.code());
recvInfo.setRecvIds(new LinkedList<>(entry.getValue()));
recvInfo.setData(messageInfos[i]);
recvInfos[i] = recvInfo;
}
String key = RedisKey.IM_UNREAD_GROUP_QUEUE +entry.getKey();
redisTemplate.opsForList().rightPushAll(key,recvInfos);
}
// 不在线的用户,回复消息状态
for(GroupMessageInfo messageInfo:messageInfos ){
for(Long id : offLineIds){
// 回复消息状态
SendResult result = new SendResult();
result.setMessageInfo(messageInfo);
result.setRecvId(id);
result.setCode(IMSendCode.NOT_ONLINE);
listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE,result);
}
}
// for (Map.Entry<Integer, List<Long>> entry : serverMap.entrySet()) {
// IMRecvInfo[] recvInfos = new IMRecvInfo[messageInfos.length];
// for (int i = 0; i < messageInfos.length; i++) {
// IMRecvInfo<GroupMessageInfo> recvInfo = new IMRecvInfo<>();
// recvInfo.setCmd(IMCmdType.GROUP_MESSAGE.code());
// recvInfo.setRecvIds(new LinkedList<>(entry.getValue()));
// recvInfo.setData(messageInfos[i]);
// recvInfos[i] = recvInfo;
// }
// String key = RedisKey.IM_UNREAD_GROUP_QUEUE + entry.getKey();
// redisTemplate.opsForList().rightPushAll(key, recvInfos);
// }
// // 不在线的用户,回复消息状态
// for (GroupMessageInfo messageInfo : messageInfos) {
// for (Long id : offLineIds) {
// // 回复消息状态
// SendResult result = new SendResult();
// result.setMessageInfo(messageInfo);
// result.setRecvId(id);
// result.setCode(IMSendCode.NOT_ONLINE);
// listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE, result);
// }
// }
}
public Boolean isOnline(Long userId){
String key = RedisKey.IM_USER_SERVER_ID + userId;
return redisTemplate.hasKey(key);
public Boolean isOnline(Long userId) {
String key = String.join(":",RedisKey.IM_USER_SERVER_ID,userId.toString(),"*");
return !redisTemplate.keys(key).isEmpty();
}
}

2
im-commom/src/main/java/com/bx/imcommon/contant/RedisKey.java

@ -5,7 +5,7 @@ public class RedisKey {
// im-server最大id,从0开始递增
public final static String IM_MAX_SERVER_ID = "im:max_server_id";
// 用户ID所连接的IM-server的ID
public final static String IM_USER_SERVER_ID = "im:user:server_id:";
public final static String IM_USER_SERVER_ID = "im:user:server_id";
// 未读私聊消息队列
public final static String IM_UNREAD_PRIVATE_QUEUE = "im:unread:private:";
// 未读群聊消息队列

1
im-commom/src/main/java/com/bx/imcommon/enums/IMCmdType.java

@ -11,6 +11,7 @@ public enum IMCmdType {
GROUP_MESSAGE(4,"群发消息");
private Integer code;
private String desc;

12
im-commom/src/main/java/com/bx/imcommon/enums/IMSendCode.java

@ -8,7 +8,7 @@ public enum IMSendCode {
NOT_FIND_CHANNEL(2,"未找到对方的channel"),
UNKONW_ERROR(9999,"未知异常");
private int code;
private Integer code;
private String desc;
// 构造方法
@ -17,6 +17,16 @@ public enum IMSendCode {
this.desc = desc;
}
public static IMSendCode fromCode(Integer code){
for (IMSendCode typeEnum:values()) {
if (typeEnum.code.equals(code)) {
return typeEnum;
}
}
return null;
}
public String description() {
return desc;
}

36
im-commom/src/main/java/com/bx/imcommon/enums/IMTerminalType.java

@ -0,0 +1,36 @@
package com.bx.imcommon.enums;
public enum IMTerminalType {
WEB(0,"web"),
APP(1,"app");
private Integer code;
private String desc;
IMTerminalType(Integer index, String desc) {
this.code =index;
this.desc=desc;
}
public static IMTerminalType fromCode(Integer code){
for (IMTerminalType typeEnum:values()) {
if (typeEnum.code.equals(code)) {
return typeEnum;
}
}
return null;
}
public String description() {
return desc;
}
public Integer code(){
return this.code;
}
}

39
im-commom/src/main/java/com/bx/imcommon/model/IMPrivateMessage.java

@ -0,0 +1,39 @@
package com.bx.imcommon.model;
import com.bx.imcommon.enums.IMTerminalType;
import lombok.Data;
import java.util.LinkedList;
import java.util.List;
@Data
public class IMPrivateMessage<T> {
/**
* 发送者id
*/
private Long sendId;
/**
* 发送者终端类型 IMTerminalType
*/
private Integer sendTerminal;
/**
* 是否发送给自己的其他终端
*/
private Boolean sendToSelf ;
/**
* 接收者id
*/
private Long recvId;
/**
* 消息内容(可一次性发送多条)
*/
private List<T> datas;
}

21
im-commom/src/main/java/com/bx/imcommon/model/IMRecvInfo.java

@ -5,22 +5,37 @@ import lombok.Data;
import java.util.List;
@Data
public class IMRecvInfo<T> {
public class IMRecvInfo {
/*
* 命令类型
* 命令类型 IMCmdType
*/
private Integer cmd;
/*
* 发送者id
*/
private Long sendId;
/*
* 接收终端类型 IMTerminalType
*/
private Integer recvTerminal;
/*
* 接收者id列表
*/
private List<Long> recvIds;
/*
* 是否需要回调发送结果
*/
private Boolean needSendResult = true;
/*
* 推送消息体
*/
private T data;
private Object data;
}

18
im-commom/src/main/java/com/bx/imcommon/model/IMSessionInfo.java

@ -0,0 +1,18 @@
package com.bx.imcommon.model;
import com.bx.imcommon.enums.IMTerminalType;
import lombok.Data;
@Data
public class IMSessionInfo {
/*
* 用户id
*/
private Long userId;
/*
* 终端类型
*/
private Integer terminal;
}

14
im-commom/src/main/java/com/bx/imcommon/model/SendResult.java

@ -1,6 +1,7 @@
package com.bx.imcommon.model;
import com.bx.imcommon.enums.IMSendCode;
import com.bx.imcommon.enums.IMTerminalType;
import lombok.Data;
@Data
@ -12,13 +13,18 @@ public class SendResult<T> {
private Long recvId;
/*
* 发送状态
* 接收者终端类型 IMTerminalType
*/
private IMSendCode code;
private Integer recvTerminal;
/*
* 消息体(透传)
* 发送状态 IMCmdType
*/
private T messageInfo;
private Integer code;
/*
* 消息内容
*/
private T data;
}

4
im-platform/src/main/java/com/bx/implatform/ImplatformApp.java → im-platform/src/main/java/com/bx/implatform/IMPlatformApp.java

@ -12,9 +12,9 @@ import org.springframework.context.annotation.EnableAspectJAutoProxy;
@EnableAspectJAutoProxy(exposeProxy = true)
@MapperScan(basePackages = {"com.bx.implatform.mapper"})
@SpringBootApplication(exclude= {SecurityAutoConfiguration.class })// 禁用secrity
public class ImplatformApp {
public class IMPlatformApp {
public static void main(String[] args) {
SpringApplication.run(ImplatformApp.class,args);
SpringApplication.run(IMPlatformApp.class,args);
}
}

2
im-platform/src/main/java/com/bx/implatform/controller/FriendController.java

@ -27,7 +27,7 @@ public class FriendController {
@GetMapping("/list")
@ApiOperation(value = "好友列表",notes="获取好友列表")
public Result< List<FriendVO>> findFriends(){
List<Friend> friends = friendService.findFriendByUserId(SessionContext.getSession().getId());
List<Friend> friends = friendService.findFriendByUserId(SessionContext.getSession().getUserId());
List<FriendVO> vos = friends.stream().map(f->{
FriendVO vo = new FriendVO();
vo.setId(f.getFriendId());

2
im-platform/src/main/java/com/bx/implatform/controller/UserController.java

@ -38,7 +38,7 @@ public class UserController {
@ApiOperation(value = "获取当前用户信息",notes="获取当前用户信息")
public Result findSelfInfo(){
UserSession session = SessionContext.getSession();
User user = userService.getById(session.getId());
User user = userService.getById(session.getUserId());
UserVO userVO = BeanUtils.copyProperties(user,UserVO.class);
return ResultUtils.success(userVO);
}

82
im-platform/src/main/java/com/bx/implatform/controller/WebrtcController.java

@ -1,18 +1,26 @@
package com.bx.implatform.controller;
import cn.hutool.core.util.ArrayUtil;
import com.bx.imclient.IMClient;
import com.bx.imcommon.enums.IMTerminalType;
import com.bx.imcommon.model.IMPrivateMessage;
import com.bx.imcommon.model.PrivateMessageInfo;
import com.bx.implatform.config.ICEServerConfig;
import com.bx.implatform.enums.MessageType;
import com.bx.implatform.result.Result;
import com.bx.implatform.result.ResultUtils;
import com.bx.implatform.session.SessionContext;
import com.bx.implatform.session.UserSession;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.apache.commons.lang.ArrayUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.lang.reflect.Array;
import java.util.Arrays;
@Api(tags = "webrtc视频单人通话")
@RestController
@RequestMapping("/webrtc/private")
@ -27,28 +35,14 @@ public class WebrtcController {
@ApiOperation(httpMethod = "POST", value = "呼叫视频通话")
@PostMapping("/call")
public Result call(@RequestParam Long uid, @RequestBody String offer) {
Long userId = SessionContext.getSession().getId();
PrivateMessageInfo message = new PrivateMessageInfo();
message.setType(MessageType.RTC_CALL.code());
message.setRecvId(uid);
message.setSendId(userId);
message.setContent(offer);
imClient.sendPrivateMessage(uid,message);
imClient.sendPrivateMessage(buildSendMessage(MessageType.RTC_CALL,uid,offer));
return ResultUtils.success();
}
@ApiOperation(httpMethod = "POST", value = "接受视频通话")
@PostMapping("/accept")
public Result accept(@RequestParam Long uid,@RequestBody String answer) {
Long userId = SessionContext.getSession().getId();
PrivateMessageInfo message = new PrivateMessageInfo();
message.setType(MessageType.RTC_ACCEPT.code());
message.setRecvId(uid);
message.setSendId(userId);
message.setContent(answer);
imClient.sendPrivateMessage(uid,message);
imClient.sendPrivateMessage(buildSendMessage(MessageType.RTC_ACCEPT,uid,answer));
return ResultUtils.success();
}
@ -56,51 +50,28 @@ public class WebrtcController {
@ApiOperation(httpMethod = "POST", value = "拒绝视频通话")
@PostMapping("/reject")
public Result reject(@RequestParam Long uid) {
Long userId = SessionContext.getSession().getId();
PrivateMessageInfo message = new PrivateMessageInfo();
message.setType(MessageType.RTC_REJECT.code());
message.setRecvId(uid);
message.setSendId(userId);
imClient.sendPrivateMessage(uid,message);
imClient.sendPrivateMessage(buildSendMessage(MessageType.RTC_REJECT,uid,null));
return ResultUtils.success();
}
@ApiOperation(httpMethod = "POST", value = "取消呼叫")
@PostMapping("/cancel")
public Result cancel(@RequestParam Long uid) {
Long userId = SessionContext.getSession().getId();
PrivateMessageInfo message = new PrivateMessageInfo();
message.setType(MessageType.RTC_CANCEL.code());
message.setRecvId(uid);
message.setSendId(userId);
imClient.sendPrivateMessage(uid,message);
imClient.sendPrivateMessage(buildSendMessage(MessageType.RTC_CANCEL,uid,null));
return ResultUtils.success();
}
@ApiOperation(httpMethod = "POST", value = "呼叫失败")
@PostMapping("/failed")
public Result failed(@RequestParam Long uid,@RequestParam String reason) {
Long userId = SessionContext.getSession().getId();
PrivateMessageInfo message = new PrivateMessageInfo();
message.setType(MessageType.RTC_FAILED.code());
message.setRecvId(uid);
message.setSendId(userId);
message.setContent(reason);
imClient.sendPrivateMessage(uid,message);
imClient.sendPrivateMessage(buildSendMessage(MessageType.RTC_FAILED,uid,reason));
return ResultUtils.success();
}
@ApiOperation(httpMethod = "POST", value = "挂断")
@PostMapping("/handup")
public Result leave(@RequestParam Long uid) {
Long userId = SessionContext.getSession().getId();
PrivateMessageInfo message = new PrivateMessageInfo();
message.setType(MessageType.RTC_HANDUP.code());
message.setRecvId(uid);
message.setSendId(userId);
imClient.sendPrivateMessage(uid,message);
imClient.sendPrivateMessage(buildSendMessage(MessageType.RTC_HANDUP,uid,null));
return ResultUtils.success();
}
@ -108,13 +79,7 @@ public class WebrtcController {
@PostMapping("/candidate")
@ApiOperation(httpMethod = "POST", value = "同步candidate")
public Result candidate(@RequestParam Long uid,@RequestBody String candidate ) {
Long userId = SessionContext.getSession().getId();
PrivateMessageInfo message = new PrivateMessageInfo();
message.setType(MessageType.RTC_CANDIDATE.code());
message.setRecvId(uid);
message.setSendId(userId);
message.setContent(candidate);
imClient.sendPrivateMessage(uid,message);
imClient.sendPrivateMessage(buildSendMessage(MessageType.RTC_CANDIDATE,uid,candidate));
return ResultUtils.success();
}
@ -123,4 +88,21 @@ public class WebrtcController {
public Result iceservers() {
return ResultUtils.success(iceServerConfig.getIceServers());
}
private IMPrivateMessage buildSendMessage(MessageType messageType,Long uid,String content){
UserSession session = SessionContext.getSession();
PrivateMessageInfo messageInfo = new PrivateMessageInfo();
messageInfo.setType(messageType.code());
messageInfo.setRecvId(uid);
messageInfo.setSendId(session.getUserId());
messageInfo.setContent(content);
IMPrivateMessage sendMessage = new IMPrivateMessage();
sendMessage.setSendId(session.getUserId());
sendMessage.setSendTerminal(session.getTerminal());
sendMessage.setSendToSelf(false);
sendMessage.setRecvId(uid);
sendMessage.setDatas(Arrays.asList(messageInfo));
return sendMessage;
}
}

13
im-platform/src/main/java/com/bx/implatform/dto/LoginDTO.java

@ -5,17 +5,26 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import org.hibernate.validator.constraints.Length;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
@Data
@ApiModel("用户登录VO")
public class LoginDTO {
//@NotEmpty(message="用户名不可为空")
@Max(value = 1,message = "登录终端类型取值范围:0,1")
@Min(value = 0,message = "登录终端类型取值范围:0,1")
@NotNull(message="登录终端类型不可为空")
@ApiModelProperty(value = "登录终端 0:web 1:app")
private Integer terminal;
@NotEmpty(message="用户名不可为空")
@ApiModelProperty(value = "用户名")
private String userName;
// @NotEmpty(message="用户密码不可为空")
@NotEmpty(message="用户密码不可为空")
@ApiModelProperty(value = "用户密码")
private String password;

9
im-platform/src/main/java/com/bx/implatform/enums/MessageType.java

@ -26,6 +26,15 @@ public enum MessageType {
this.desc=desc;
}
public static MessageType fromCode(Integer code){
for (MessageType typeEnum:values()) {
if (typeEnum.code.equals(code)) {
return typeEnum;
}
}
return null;
}
public String description() {
return desc;

2
im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java

@ -22,7 +22,7 @@ public class GroupMessageListener implements MessageListener {
@Override
public void process(SendResult result){
GroupMessageInfo messageInfo = (GroupMessageInfo) result.getMessageInfo();
GroupMessageInfo messageInfo = (GroupMessageInfo) result.getData();
// 提示类数据不记录
if(messageInfo.getType().equals(MessageType.TIP)){
return;

33
im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java

@ -4,8 +4,10 @@ import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.bx.imclient.IMClient;
import com.bx.imclient.annotation.IMListener;
import com.bx.imclient.listener.MessageListener;
import com.bx.imcommon.enums.IMCmdType;
import com.bx.imcommon.enums.IMListenerType;
import com.bx.imcommon.enums.IMSendCode;
import com.bx.imcommon.model.IMPrivateMessage;
import com.bx.imcommon.model.PrivateMessageInfo;
import com.bx.imcommon.model.SendResult;
import com.bx.implatform.entity.PrivateMessage;
@ -15,6 +17,7 @@ import com.bx.implatform.service.IPrivateMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Arrays;
import java.util.Date;
@ -30,7 +33,8 @@ public class PrivateMessageListener implements MessageListener {
@Override
public void process(SendResult result){
PrivateMessageInfo messageInfo = (PrivateMessageInfo) result.getMessageInfo();
PrivateMessageInfo messageInfo = (PrivateMessageInfo) result.getData();
IMSendCode resultCode = IMSendCode.fromCode(result.getCode());
// 提示类数据不记录
if(messageInfo.getType().equals(MessageType.TIP.code())){
return;
@ -39,19 +43,26 @@ public class PrivateMessageListener implements MessageListener {
if(messageInfo.getType() >= MessageType.RTC_CALL.code() && messageInfo.getType()< MessageType.RTC_CANDIDATE.code()){
// 通知用户呼叫失败了
if(messageInfo.getType().equals(MessageType.RTC_CALL.code())
&& !result.getCode().equals(IMSendCode.SUCCESS)){
PrivateMessageInfo sendMessage = new PrivateMessageInfo();
sendMessage.setRecvId(messageInfo.getSendId());
sendMessage.setSendId(messageInfo.getRecvId());
sendMessage.setType(MessageType.RTC_FAILED.code());
sendMessage.setContent(result.getCode().description());
sendMessage.setSendTime(new Date());
imClient.sendPrivateMessage(sendMessage.getRecvId(),sendMessage);
&& !resultCode.equals(IMSendCode.SUCCESS)){
PrivateMessageInfo msgInfo = new PrivateMessageInfo();
msgInfo.setRecvId(messageInfo.getSendId());
msgInfo.setSendId(messageInfo.getRecvId());
msgInfo.setType(MessageType.RTC_FAILED.code());
msgInfo.setContent(resultCode.description());
msgInfo.setSendTime(new Date());
IMPrivateMessage sendMessage = new IMPrivateMessage();
sendMessage.setSendId(messageInfo.getSendId());
sendMessage.setRecvId(messageInfo.getRecvId());
sendMessage.setSendTerminal(result.getRecvTerminal());
sendMessage.setSendToSelf(false);
sendMessage.setDatas(Arrays.asList(messageInfo));
imClient.sendPrivateMessage(sendMessage);
}
return;
}
// 更新消息状态
if(result.getCode().equals(IMSendCode.SUCCESS)){
// 更新消息状态,这里只处理成功消息,失败的消息继续保持未读状态
if(resultCode.equals(IMSendCode.SUCCESS)){
UpdateWrapper<PrivateMessage> updateWrapper = new UpdateWrapper<>();
updateWrapper.lambda().eq(PrivateMessage::getId,messageInfo.getId())
.eq(PrivateMessage::getStatus, MessageStatus.UNREAD.code())

8
im-platform/src/main/java/com/bx/implatform/service/impl/FriendServiceImpl.java

@ -57,7 +57,7 @@ public class FriendServiceImpl extends ServiceImpl<FriendMapper, Friend> impleme
@Transactional
@Override
public void addFriend(Long friendId) {
long userId = SessionContext.getSession().getId();
long userId = SessionContext.getSession().getUserId();
if(userId == friendId){
throw new GlobalException(ResultCode.PROGRAM_ERROR,"不允许添加自己为好友");
}
@ -78,7 +78,7 @@ public class FriendServiceImpl extends ServiceImpl<FriendMapper, Friend> impleme
@Transactional
@Override
public void delFriend(Long friendId) {
long userId = SessionContext.getSession().getId();
long userId = SessionContext.getSession().getUserId();
// 互相解除好友关系
FriendServiceImpl proxy = (FriendServiceImpl)AopContext.currentProxy();
proxy.unbindFriend(userId,friendId);
@ -113,7 +113,7 @@ public class FriendServiceImpl extends ServiceImpl<FriendMapper, Friend> impleme
*/
@Override
public void update(FriendVO vo) {
long userId = SessionContext.getSession().getId();
long userId = SessionContext.getSession().getUserId();
QueryWrapper<Friend> queryWrapper = new QueryWrapper<>();
queryWrapper.lambda()
.eq(Friend::getUserId,userId)
@ -186,7 +186,7 @@ public class FriendServiceImpl extends ServiceImpl<FriendMapper, Friend> impleme
UserSession session = SessionContext.getSession();
QueryWrapper<Friend> wrapper = new QueryWrapper<>();
wrapper.lambda()
.eq(Friend::getUserId,session.getId())
.eq(Friend::getUserId,session.getUserId())
.eq(Friend::getFriendId,friendId);
Friend friend = this.getOne(wrapper);
if(friend == null){

8
im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java

@ -56,7 +56,7 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
*/
@Override
public Long sendMessage(GroupMessageVO vo) {
Long userId = SessionContext.getSession().getId();
Long userId = SessionContext.getSession().getUserId();
Group group = groupService.getById(vo.getGroupId());
if(group == null){
throw new GlobalException(ResultCode.PROGRAM_ERROR,"群聊不存在");
@ -93,7 +93,7 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
*/
@Override
public void recallMessage(Long id) {
Long userId = SessionContext.getSession().getId();
Long userId = SessionContext.getSession().getUserId();
GroupMessage msg = this.getById(id);
if(msg == null){
throw new GlobalException(ResultCode.PROGRAM_ERROR,"消息不存在");
@ -133,7 +133,7 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
*/
@Override
public void pullUnreadMessage() {
Long userId = SessionContext.getSession().getId();
Long userId = SessionContext.getSession().getUserId();
List<Long> recvIds = new LinkedList();
recvIds.add(userId);
List<GroupMember> members = groupMemberService.findByUserId(userId);
@ -179,7 +179,7 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
public List<GroupMessageInfo> findHistoryMessage(Long groupId, Long page, Long size) {
page = page > 0 ? page:1;
size = size > 0 ? size:10;
Long userId = SessionContext.getSession().getId();
Long userId = SessionContext.getSession().getUserId();
Long stIdx = (page-1)* size;
// 群聊成员信息
GroupMember member = groupMemberService.findByGroupAndUserId(groupId,userId);

20
im-platform/src/main/java/com/bx/implatform/service/impl/GroupServiceImpl.java

@ -62,7 +62,7 @@ public class GroupServiceImpl extends ServiceImpl<GroupMapper, Group> implements
@Override
public GroupVO createGroup(String groupName) {
UserSession session = SessionContext.getSession();
User user = userService.getById(session.getId());
User user = userService.getById(session.getUserId());
// 保存群组数据
Group group = new Group();
group.setName(groupName);
@ -100,12 +100,12 @@ public class GroupServiceImpl extends ServiceImpl<GroupMapper, Group> implements
// 校验是不是群主,只有群主能改信息
Group group = this.getById(vo.getId());
// 群主有权修改群基本信息
if(group.getOwnerId() == session.getId()){
if(group.getOwnerId() == session.getUserId()){
group = BeanUtils.copyProperties(vo,Group.class);
this.updateById(group);
}
// 更新成员信息
GroupMember member = groupMemberService.findByGroupAndUserId(vo.getId(),session.getId());
GroupMember member = groupMemberService.findByGroupAndUserId(vo.getId(),session.getUserId());
if(member == null){
throw new GlobalException(ResultCode.PROGRAM_ERROR,"您不是群聊的成员");
}
@ -129,7 +129,7 @@ public class GroupServiceImpl extends ServiceImpl<GroupMapper, Group> implements
public void deleteGroup(Long groupId) {
UserSession session = SessionContext.getSession();
Group group = this.getById(groupId);
if(group.getOwnerId() != session.getId()){
if(group.getOwnerId() != session.getUserId()){
throw new GlobalException(ResultCode.PROGRAM_ERROR,"只有群主才有权限解除群聊");
}
// 逻辑删除群数据
@ -149,7 +149,7 @@ public class GroupServiceImpl extends ServiceImpl<GroupMapper, Group> implements
*/
@Override
public void quitGroup(Long groupId) {
Long userId = SessionContext.getSession().getId();
Long userId = SessionContext.getSession().getUserId();
Group group = this.getById(groupId);
if(group.getOwnerId() == userId){
throw new GlobalException(ResultCode.PROGRAM_ERROR,"您是群主,不可退出群聊");
@ -171,10 +171,10 @@ public class GroupServiceImpl extends ServiceImpl<GroupMapper, Group> implements
public void kickGroup(Long groupId, Long userId) {
UserSession session = SessionContext.getSession();
Group group = this.getById(groupId);
if(group.getOwnerId() != session.getId()){
if(group.getOwnerId() != session.getUserId()){
throw new GlobalException(ResultCode.PROGRAM_ERROR,"您不是群主,没有权限踢人");
}
if(userId == session.getId()){
if(userId == session.getUserId()){
throw new GlobalException(ResultCode.PROGRAM_ERROR,"亲,不能自己踢自己哟");
}
// 删除群聊成员
@ -186,7 +186,7 @@ public class GroupServiceImpl extends ServiceImpl<GroupMapper, Group> implements
public GroupVO findById(Long groupId) {
UserSession session = SessionContext.getSession();
Group group = this.getById(groupId);
GroupMember member = groupMemberService.findByGroupAndUserId(groupId,session.getId());
GroupMember member = groupMemberService.findByGroupAndUserId(groupId,session.getUserId());
if(member == null){
throw new GlobalException(ResultCode.PROGRAM_ERROR,"您未加入群聊");
}
@ -226,7 +226,7 @@ public class GroupServiceImpl extends ServiceImpl<GroupMapper, Group> implements
public List<GroupVO> findGroups() {
UserSession session = SessionContext.getSession();
// 查询当前用户的群id列表
List<GroupMember> groupMembers = groupMemberService.findByUserId(session.getId());
List<GroupMember> groupMembers = groupMemberService.findByUserId(session.getUserId());
if(groupMembers.isEmpty()){
return Collections.EMPTY_LIST;
}
@ -267,7 +267,7 @@ public class GroupServiceImpl extends ServiceImpl<GroupMapper, Group> implements
}
// 找出好友信息
List<Friend> friends = friendsService.findFriendByUserId(session.getId());
List<Friend> friends = friendsService.findFriendByUserId(session.getUserId());
List<Friend> friendsList = vo.getFriendIds().stream().map(id ->
friends.stream().filter(f -> f.getFriendId().equals(id)).findFirst().get()).collect(Collectors.toList());
if (friendsList.size() != vo.getFriendIds().size()) {

48
im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java

@ -4,7 +4,8 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.bx.imclient.IMClient;
import com.bx.imcommon.contant.Constant;
import com.bx.imcommon.contant.RedisKey;
import com.bx.imcommon.enums.IMTerminalType;
import com.bx.imcommon.model.IMPrivateMessage;
import com.bx.imcommon.model.PrivateMessageInfo;
import com.bx.implatform.entity.PrivateMessage;
import com.bx.implatform.enums.MessageStatus;
@ -15,13 +16,16 @@ import com.bx.implatform.mapper.PrivateMessageMapper;
import com.bx.implatform.service.IFriendService;
import com.bx.implatform.service.IPrivateMessageService;
import com.bx.implatform.session.SessionContext;
import com.bx.implatform.session.UserSession;
import com.bx.implatform.util.BeanUtils;
import com.bx.implatform.vo.PrivateMessageVO;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
@ -44,21 +48,27 @@ public class PrivateMessageServiceImpl extends ServiceImpl<PrivateMessageMapper,
*/
@Override
public Long sendMessage(PrivateMessageVO vo) {
Long userId = SessionContext.getSession().getId();
Boolean isFriends = friendService.isFriend(userId, vo.getRecvId());
UserSession session = SessionContext.getSession();
Boolean isFriends = friendService.isFriend(session.getUserId(), vo.getRecvId());
if (!isFriends) {
throw new GlobalException(ResultCode.PROGRAM_ERROR, "您已不是对方好友,无法发送消息");
}
// 保存消息
PrivateMessage msg = BeanUtils.copyProperties(vo, PrivateMessage.class);
msg.setSendId(userId);
msg.setSendId(session.getUserId());
msg.setStatus(MessageStatus.UNREAD.code());
msg.setSendTime(new Date());
this.save(msg);
// 推送消息
PrivateMessageInfo msgInfo = BeanUtils.copyProperties(msg, PrivateMessageInfo.class);
imClient.sendPrivateMessage(vo.getRecvId(),msgInfo);
log.info("发送私聊消息,发送id:{},接收id:{},内容:{}", userId, vo.getRecvId(), vo.getContent());
IMPrivateMessage sendMessage = new IMPrivateMessage();
sendMessage.setSendId(msgInfo.getSendId());
sendMessage.setRecvId(msgInfo.getRecvId());
sendMessage.setSendTerminal(session.getTerminal());
sendMessage.setSendToSelf(true);
sendMessage.setDatas(Arrays.asList(msgInfo));
imClient.sendPrivateMessage(sendMessage);
log.info("发送私聊消息,发送id:{},接收id:{},内容:{}", session.getUserId(), vo.getRecvId(), vo.getContent());
return msg.getId();
}
@ -69,12 +79,12 @@ public class PrivateMessageServiceImpl extends ServiceImpl<PrivateMessageMapper,
*/
@Override
public void recallMessage(Long id) {
Long userId = SessionContext.getSession().getId();
UserSession session = SessionContext.getSession();
PrivateMessage msg = this.getById(id);
if (msg == null) {
throw new GlobalException(ResultCode.PROGRAM_ERROR, "消息不存在");
}
if (!msg.getSendId().equals(userId)) {
if (!msg.getSendId().equals(session.getUserId())) {
throw new GlobalException(ResultCode.PROGRAM_ERROR, "这条消息不是由您发送,无法撤回");
}
if (System.currentTimeMillis() - msg.getSendTime().getTime() > Constant.ALLOW_RECALL_SECOND * 1000) {
@ -88,7 +98,14 @@ public class PrivateMessageServiceImpl extends ServiceImpl<PrivateMessageMapper,
msgInfo.setType(MessageType.TIP.code());
msgInfo.setSendTime(new Date());
msgInfo.setContent("对方撤回了一条消息");
imClient.sendPrivateMessage(msgInfo.getRecvId(),msgInfo);
IMPrivateMessage sendMessage = new IMPrivateMessage();
sendMessage.setSendId(msgInfo.getSendId());
sendMessage.setRecvId(msgInfo.getRecvId());
sendMessage.setSendTerminal(session.getTerminal());
sendMessage.setSendToSelf(true);
sendMessage.setDatas(Arrays.asList(msgInfo));
imClient.sendPrivateMessage(sendMessage);
log.info("撤回私聊消息,发送id:{},接收id:{},内容:{}", msg.getSendId(), msg.getRecvId(), msg.getContent());
}
@ -105,7 +122,7 @@ public class PrivateMessageServiceImpl extends ServiceImpl<PrivateMessageMapper,
public List<PrivateMessageInfo> findHistoryMessage(Long friendId, Long page, Long size) {
page = page > 0 ? page : 1;
size = size > 0 ? size : 10;
Long userId = SessionContext.getSession().getId();
Long userId = SessionContext.getSession().getUserId();
Long stIdx = (page - 1) * size;
QueryWrapper<PrivateMessage> wrapper = new QueryWrapper<>();
wrapper.lambda().and(wrap -> wrap.and(
@ -134,7 +151,7 @@ public class PrivateMessageServiceImpl extends ServiceImpl<PrivateMessageMapper,
@Override
public void pullUnreadMessage() {
// 获取当前连接的channelId
Long userId = SessionContext.getSession().getId();
Long userId = SessionContext.getSession().getUserId();
if (!imClient.isOnline(userId)) {
throw new GlobalException(ResultCode.PROGRAM_ERROR, "用户未建立连接");
}
@ -150,9 +167,12 @@ public class PrivateMessageServiceImpl extends ServiceImpl<PrivateMessageMapper,
return msgInfo;
}).collect(Collectors.toList());
// 推送消息
PrivateMessageInfo[] infoArr = messageInfos.toArray(new PrivateMessageInfo[messageInfos.size()]);
imClient.sendPrivateMessage(userId,infoArr);
log.info("拉取未读私聊消息,用户id:{},数量:{}", userId, infoArr.length);
IMPrivateMessage<PrivateMessageInfo> sendMessage = new IMPrivateMessage();
sendMessage.setRecvId(userId);
sendMessage.setSendToSelf(false);
sendMessage.setDatas(messageInfos);
imClient.sendPrivateMessage(sendMessage);
log.info("拉取未读私聊消息,用户id:{},数量:{}", userId, messageInfos.size());
}
}
}

9
im-platform/src/main/java/com/bx/implatform/service/impl/UserServiceImpl.java

@ -4,7 +4,6 @@ import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.bx.imclient.IMClient;
import com.bx.imcommon.contant.RedisKey;
import com.bx.implatform.config.JwtProperties;
import com.bx.implatform.entity.Friend;
import com.bx.implatform.entity.GroupMember;
@ -74,6 +73,8 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IU
}
// 生成token
UserSession session = BeanUtils.copyProperties(user,UserSession.class);
session.setUserId(user.getId());
session.setTerminal(dto.getTerminal());
String strJson = JSON.toJSONString(session);
String accessToken = JwtUtil.sign(user.getId(),strJson,jwtProperties.getAccessTokenExpireIn(),jwtProperties.getAccessTokenSecret());
String refreshToken = JwtUtil.sign(user.getId(),strJson,jwtProperties.getAccessTokenExpireIn(),jwtProperties.getAccessTokenSecret());
@ -150,7 +151,7 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IU
@Override
public void update(UserVO vo) {
UserSession session = SessionContext.getSession();
if(!session.getId().equals(vo.getId()) ){
if(!session.getUserId().equals(vo.getId()) ){
throw new GlobalException(ResultCode.PROGRAM_ERROR,"不允许修改其他用户的信息!");
}
User user = this.getById(vo.getId());
@ -160,7 +161,7 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IU
// 更新好友昵称和头像
if(!user.getNickName().equals(vo.getNickName()) || !user.getHeadImageThumb().equals(vo.getHeadImageThumb())){
QueryWrapper<Friend> queryWrapper = new QueryWrapper<>();
queryWrapper.lambda().eq(Friend::getFriendId,session.getId());
queryWrapper.lambda().eq(Friend::getFriendId,session.getUserId());
List<Friend> friends = friendService.list(queryWrapper);
for(Friend friend: friends){
friend.setFriendNickName(vo.getNickName());
@ -170,7 +171,7 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IU
}
// 更新群聊中的头像
if(!user.getHeadImageThumb().equals(vo.getHeadImageThumb())){
List<GroupMember> members = groupMemberService.findByUserId(session.getId());
List<GroupMember> members = groupMemberService.findByUserId(session.getUserId());
for(GroupMember member:members){
member.setHeadImage(vo.getHeadImageThumb());
}

4
im-platform/src/main/java/com/bx/implatform/service/thirdparty/FileService.java

@ -54,7 +54,7 @@ public class FileService {
public String uploadFile(MultipartFile file){
Long userId = SessionContext.getSession().getId();
Long userId = SessionContext.getSession().getUserId();
// 大小校验
if(file.getSize() > Constant.MAX_FILE_SIZE){
throw new GlobalException(ResultCode.PROGRAM_ERROR,"文件大小不能超过10M");
@ -71,7 +71,7 @@ public class FileService {
public UploadImageVO uploadImage(MultipartFile file){
try {
Long userId = SessionContext.getSession().getId();
Long userId = SessionContext.getSession().getUserId();
// 大小校验
if(file.getSize() > Constant.MAX_IMAGE_SIZE){
throw new GlobalException(ResultCode.PROGRAM_ERROR,"图片大小不能超过5M");

13
im-platform/src/main/java/com/bx/implatform/session/UserSession.java

@ -1,11 +1,20 @@
package com.bx.implatform.session;
import com.bx.imcommon.model.IMSessionInfo;
import lombok.Data;
@Data
public class UserSession {
public class UserSession extends IMSessionInfo {
private Long id;
/*
* 用户名称
*/
private String userName;
/*
* 用户昵称
*/
private String nickName;
}

12
im-server/src/main/java/com/bx/imserver/constant/ChannelAttrKey.java

@ -0,0 +1,12 @@
package com.bx.imserver.constant;
public class ChannelAttrKey {
// 用户ID
public static final String USER_ID = "USER_ID";
// 终端类型
public static final String TERMINAL_TYPE = "TERMINAL_TYPE";
// 心跳次数
public static final String HEARTBEAt_TIMES = "HEARTBEAt_TIMES";
}

19
im-server/src/main/java/com/bx/imserver/netty/IMChannelHandler.java

@ -3,6 +3,7 @@ package com.bx.imserver.netty;
import com.bx.imcommon.contant.RedisKey;
import com.bx.imcommon.enums.IMCmdType;
import com.bx.imcommon.model.IMSendInfo;
import com.bx.imserver.constant.ChannelAttrKey;
import com.bx.imserver.netty.processor.MessageProcessor;
import com.bx.imserver.netty.processor.ProcessorFactory;
import com.bx.imserver.util.SpringContextHolder;
@ -64,18 +65,20 @@ public class IMChannelHandler extends SimpleChannelInboundHandler<IMSendInfo> {
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
AttributeKey<Long> attr = AttributeKey.valueOf("USER_ID");
Long userId = ctx.channel().attr(attr).get();
ChannelHandlerContext context = UserChannelCtxMap.getChannelCtx(userId);
AttributeKey<Long> userIdAttr = AttributeKey.valueOf(ChannelAttrKey.USER_ID);
Long userId = ctx.channel().attr(userIdAttr).get();
AttributeKey<Integer> terminalAttr = AttributeKey.valueOf(ChannelAttrKey.TERMINAL_TYPE);
Integer terminal = ctx.channel().attr(terminalAttr).get();
ChannelHandlerContext context = UserChannelCtxMap.getChannelCtx(userId,terminal);
// 判断一下,避免异地登录导致的误删
if(context != null && ctx.channel().id().equals(context.channel().id())){
// 移除channel
UserChannelCtxMap.removeChannelCtx(userId);
UserChannelCtxMap.removeChannelCtx(userId,terminal);
// 用户下线
RedisTemplate redisTemplate = SpringContextHolder.getBean("redisTemplate");
String key = RedisKey.IM_USER_SERVER_ID + userId;
String key = String.join(":",RedisKey.IM_USER_SERVER_ID,userId.toString(), terminal.toString());
redisTemplate.delete(key);
log.info("断开连接,userId:{}",userId);
log.info("断开连接,userId:{},终端类型:{}",userId,terminal);
}
}
@ -87,7 +90,9 @@ public class IMChannelHandler extends SimpleChannelInboundHandler<IMSendInfo> {
// 在规定时间内没有收到客户端的上行数据, 主动断开连接
AttributeKey<Long> attr = AttributeKey.valueOf("USER_ID");
Long userId = ctx.channel().attr(attr).get();
log.info("心跳超时,即将断开连接,用户id:{} ",userId);
AttributeKey<Integer> terminalAttr = AttributeKey.valueOf(ChannelAttrKey.TERMINAL_TYPE);
Integer ternimal = ctx.channel().attr(terminalAttr).get();
log.info("心跳超时,即将断开连接,用户id:{},终端类型:{} ",userId,ternimal);
ctx.channel().close();
}
} else {

30
im-server/src/main/java/com/bx/imserver/netty/UserChannelCtxMap.java

@ -2,6 +2,7 @@ package com.bx.imserver.netty;
import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -9,21 +10,34 @@ import java.util.concurrent.ConcurrentHashMap;
public class UserChannelCtxMap {
/*
* 维护userId和ctx的关联关系格式:Map<userId,ctx>
* 维护userId和ctx的关联关系格式:Map<userId,map<terminalctx>>
*/
private static Map<Long, ChannelHandlerContext> channelMap = new ConcurrentHashMap();
private static Map<Long, Map<Integer,ChannelHandlerContext>> channelMap = new ConcurrentHashMap();
public static void addChannelCtx(Long userId,ChannelHandlerContext ctx){
channelMap.put(userId,ctx);
public static void addChannelCtx(Long userId,Integer channel,ChannelHandlerContext ctx){
channelMap.computeIfAbsent(userId,key -> new ConcurrentHashMap()).put(channel,ctx);
}
public static void removeChannelCtx(Long userId){
if(userId != null){
channelMap.remove(userId);
public static void removeChannelCtx(Long userId,Integer terminal){
if(userId != null && terminal != null && channelMap.containsKey(userId)){
Map<Integer,ChannelHandlerContext> userChannelMap = channelMap.get(userId);
if(userChannelMap.containsKey(terminal)){
userChannelMap.remove(terminal);
}
}
}
public static ChannelHandlerContext getChannelCtx(Long userId){
public static ChannelHandlerContext getChannelCtx(Long userId,Integer terminal){
if(userId != null && terminal != null && channelMap.containsKey(userId)){
Map<Integer,ChannelHandlerContext> userChannelMap = channelMap.get(userId);
if(userChannelMap.containsKey(terminal)){
return userChannelMap.get(terminal);
}
}
return null;
}
public static Map<Integer,ChannelHandlerContext> getChannelCtx(Long userId){
if(userId == null){
return null;
}

28
im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java

@ -19,32 +19,32 @@ import java.util.List;
@Slf4j
@Component
public class GroupMessageProcessor extends MessageProcessor<IMRecvInfo<GroupMessageInfo>> {
public class GroupMessageProcessor extends MessageProcessor<IMRecvInfo> {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@Async
@Override
public void process(IMRecvInfo<GroupMessageInfo> recvInfo) {
GroupMessageInfo messageInfo = recvInfo.getData();
public void process(IMRecvInfo recvInfo) {
Object data = recvInfo.getData();
List<Long> recvIds = recvInfo.getRecvIds();
log.info("接收到群消息,发送者:{},群id:{},接收id:{},内容:{}",messageInfo.getSendId(),messageInfo.getGroupId(),recvIds,messageInfo.getContent());
log.info("接收到群消息,发送者:{},接收id:{},内容:{}",recvInfo.getSendId(),recvIds,data);
for(Long recvId:recvIds){
try {
ChannelHandlerContext channelCtx = UserChannelCtxMap.getChannelCtx(recvId);
ChannelHandlerContext channelCtx = UserChannelCtxMap.getChannelCtx(recvId,recvInfo.getRecvTerminal());
if(channelCtx != null){
// 推送消息到用户
IMSendInfo sendInfo = new IMSendInfo();
sendInfo.setCmd(IMCmdType.GROUP_MESSAGE.code());
sendInfo.setData(messageInfo);
sendInfo.setData(data);
channelCtx.channel().writeAndFlush(sendInfo);
// 消息发送成功确认
String key = RedisKey.IM_RESULT_GROUP_QUEUE;
SendResult sendResult = new SendResult();
sendResult.setRecvId(recvId);
sendResult.setCode(IMSendCode.SUCCESS);
sendResult.setMessageInfo(messageInfo);
sendResult.setCode(IMSendCode.SUCCESS.code());
sendResult.setData(data);
redisTemplate.opsForList().rightPush(key,sendResult);
}else {
@ -52,20 +52,20 @@ public class GroupMessageProcessor extends MessageProcessor<IMRecvInfo<GroupMes
String key = RedisKey.IM_RESULT_GROUP_QUEUE;
SendResult sendResult = new SendResult();
sendResult.setRecvId(recvId);
sendResult.setCode(IMSendCode.NOT_FIND_CHANNEL);
sendResult.setMessageInfo(messageInfo);
sendResult.setCode(IMSendCode.NOT_FIND_CHANNEL.code());
sendResult.setData(data);
redisTemplate.opsForList().rightPush(key,sendResult);
log.error("未找到WS连接,发送者:{},群id:{},接收id:{},内容:{}",messageInfo.getSendId(),messageInfo.getGroupId(),recvIds,messageInfo.getContent());
log.error("未找到WS连接,发送者:{},接收id:{},内容:{}",recvInfo.getSendId(),recvId,data);
}
}catch (Exception e){
// 消息发送失败确认
String key = RedisKey.IM_RESULT_GROUP_QUEUE;
SendResult sendResult = new SendResult();
sendResult.setRecvId(recvId);
sendResult.setCode(IMSendCode.UNKONW_ERROR);
sendResult.setMessageInfo(messageInfo);
sendResult.setCode(IMSendCode.UNKONW_ERROR.code());
sendResult.setData(data);
redisTemplate.opsForList().rightPush(key,sendResult);
log.error("发送消息异常,发送者:{},群id:{},接收id:{},内容:{}",messageInfo.getSendId(),messageInfo.getGroupId(),recvIds,messageInfo.getContent());
log.error("发送消息异常,发送者:{},接收id:{},内容:{}",recvInfo.getSendId(),recvId,data);
}
}
}

15
im-server/src/main/java/com/bx/imserver/netty/processor/HeartbeatProcessor.java

@ -6,6 +6,7 @@ import com.bx.imcommon.contant.RedisKey;
import com.bx.imcommon.enums.IMCmdType;
import com.bx.imcommon.model.HeartbeatInfo;
import com.bx.imcommon.model.IMSendInfo;
import com.bx.imserver.constant.ChannelAttrKey;
import com.bx.imserver.netty.ws.WebSocketServer;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.AttributeKey;
@ -36,14 +37,16 @@ public class HeartbeatProcessor extends MessageProcessor<HeartbeatInfo> {
ctx.channel().writeAndFlush(sendInfo);
// 设置属性
AttributeKey<Long> attr = AttributeKey.valueOf("HEARTBEAt_TIMES");
Long heartbeatTimes = ctx.channel().attr(attr).get();
ctx.channel().attr(attr).set(++heartbeatTimes);
AttributeKey<Long> heartBeatAttr = AttributeKey.valueOf(ChannelAttrKey.HEARTBEAt_TIMES);
Long heartbeatTimes = ctx.channel().attr(heartBeatAttr).get();
ctx.channel().attr(heartBeatAttr).set(++heartbeatTimes);
if(heartbeatTimes%10 == 0){
// 每心跳10次,用户在线状态续一次命
attr = AttributeKey.valueOf("USER_ID");
Long userId = ctx.channel().attr(attr).get();
String key = RedisKey.IM_USER_SERVER_ID+userId;
AttributeKey<Long> userIdAttr = AttributeKey.valueOf(ChannelAttrKey.USER_ID);
Long userId = ctx.channel().attr(userIdAttr).get();
AttributeKey<Integer> terminalAttr = AttributeKey.valueOf(ChannelAttrKey.TERMINAL_TYPE);
Integer ternimal = ctx.channel().attr(terminalAttr).get();
String key = String.join(":",RedisKey.IM_USER_SERVER_ID,userId.toString(),ternimal.toString());
redisTemplate.expire(key, Constant.ONLINE_TIMEOUT_SECOND, TimeUnit.SECONDS);
}
}

29
im-server/src/main/java/com/bx/imserver/netty/processor/LoginProcessor.java

@ -1,12 +1,16 @@
package com.bx.imserver.netty.processor;
import cn.hutool.core.bean.BeanUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bx.imcommon.contant.Constant;
import com.bx.imcommon.contant.RedisKey;
import com.bx.imcommon.enums.IMCmdType;
import com.bx.imcommon.model.IMSendInfo;
import com.bx.imcommon.model.IMSessionInfo;
import com.bx.imcommon.model.LoginInfo;
import com.bx.imcommon.util.JwtUtil;
import com.bx.imserver.constant.ChannelAttrKey;
import com.bx.imserver.netty.IMServerGroup;
import com.bx.imserver.netty.UserChannelCtxMap;
import com.bx.imserver.netty.ws.WebSocketServer;
@ -19,6 +23,7 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Slf4j
@ -41,9 +46,12 @@ public class LoginProcessor extends MessageProcessor<LoginInfo> {
ctx.channel().close();
log.warn("用户token校验不通过,强制下线,token:{}",loginInfo.getAccessToken());
}
Long userId = JwtUtil.getUserId(loginInfo.getAccessToken());
String strInfo = JwtUtil.getInfo(loginInfo.getAccessToken());
IMSessionInfo sessionInfo = JSON.parseObject(strInfo,IMSessionInfo.class);
Long userId = sessionInfo.getUserId();
Integer terminal = sessionInfo.getTerminal();
log.info("用户登录,userId:{}",userId);
ChannelHandlerContext context = UserChannelCtxMap.getChannelCtx(userId);
ChannelHandlerContext context = UserChannelCtxMap.getChannelCtx(userId,terminal);
if(context != null && !ctx.channel().id().equals(context.channel().id())){
// 不允许多地登录,强制下线
IMSendInfo sendInfo = new IMSendInfo();
@ -53,15 +61,18 @@ public class LoginProcessor extends MessageProcessor<LoginInfo> {
log.info("异地登录,强制下线,userId:{}",userId);
}
// 绑定用户和channel
UserChannelCtxMap.addChannelCtx(userId,ctx);
UserChannelCtxMap.addChannelCtx(userId,terminal,ctx);
// 设置用户id属性
AttributeKey<Long> attr = AttributeKey.valueOf("USER_ID");
ctx.channel().attr(attr).set(userId);
// 心跳次数
attr = AttributeKey.valueOf("HEARTBEAt_TIMES");
ctx.channel().attr(attr).set(0L);
AttributeKey<Long> userIdAttr = AttributeKey.valueOf(ChannelAttrKey.USER_ID);
ctx.channel().attr(userIdAttr).set(userId);
// 设置用户终端类型
AttributeKey<Integer> terminalAttr = AttributeKey.valueOf(ChannelAttrKey.TERMINAL_TYPE);
ctx.channel().attr(terminalAttr).set(terminal);
// 初始化心跳次数
AttributeKey<Long> heartBeatAttr = AttributeKey.valueOf("HEARTBEAt_TIMES");
ctx.channel().attr(heartBeatAttr).set(0L);
// 在redis上记录每个user的channelId,15秒没有心跳,则自动过期
String key = RedisKey.IM_USER_SERVER_ID+userId;
String key = String.join(":",RedisKey.IM_USER_SERVER_ID,userId.toString(), terminal.toString());
redisTemplate.opsForValue().set(key, IMServerGroup.serverId, Constant.ONLINE_TIMEOUT_SECOND, TimeUnit.SECONDS);
// 响应ws
IMSendInfo sendInfo = new IMSendInfo();

48
im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java

@ -14,54 +14,50 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Map;
@Slf4j
@Component
public class PrivateMessageProcessor extends MessageProcessor<IMRecvInfo<PrivateMessageInfo>> {
public class PrivateMessageProcessor extends MessageProcessor<IMRecvInfo> {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@Override
public void process(IMRecvInfo<PrivateMessageInfo> recvInfo) {
PrivateMessageInfo messageInfo = recvInfo.getData();
public void process(IMRecvInfo recvInfo) {
Long recvId = recvInfo.getRecvIds().get(0);
log.info("接收到消息,发送者:{},接收者:{},内容:{}",messageInfo.getSendId(),recvId,messageInfo.getContent());
log.info("接收到消息,发送者:{},接收者:{},内容:{}",recvInfo.getSendId(),recvId,recvInfo.getData());
try{
ChannelHandlerContext channelCtx = UserChannelCtxMap.getChannelCtx(recvId);
ChannelHandlerContext channelCtx = UserChannelCtxMap.getChannelCtx(recvId,recvInfo.getRecvTerminal());
if(channelCtx != null ){
// 推送消息到用户
IMSendInfo sendInfo = new IMSendInfo();
sendInfo.setCmd(IMCmdType.PRIVATE_MESSAGE.code());
sendInfo.setData(messageInfo);
sendInfo.setData(recvInfo.getData());
channelCtx.channel().writeAndFlush(sendInfo);
// 消息发送成功确认
String key = RedisKey.IM_RESULT_PRIVATE_QUEUE;
SendResult sendResult = new SendResult();
sendResult.setRecvId(recvId);
sendResult.setCode(IMSendCode.SUCCESS);
sendResult.setMessageInfo(messageInfo);
redisTemplate.opsForList().rightPush(key,sendResult);
sendResult(recvInfo,IMSendCode.SUCCESS);
}else{
// 消息推送失败确认
String key = RedisKey.IM_RESULT_PRIVATE_QUEUE;
SendResult sendResult = new SendResult();
sendResult.setRecvId(recvId);
sendResult.setCode(IMSendCode.NOT_FIND_CHANNEL);
sendResult.setMessageInfo(messageInfo);
redisTemplate.opsForList().rightPush(key,sendResult);
log.error("未找到WS连接,发送者:{},接收者:{},内容:{}",messageInfo.getSendId(),recvId,messageInfo.getContent());
sendResult(recvInfo,IMSendCode.NOT_FIND_CHANNEL);
log.error("未找到WS连接,发送者:{},接收者:{},内容:{}",recvInfo.getSendId(),recvId,recvInfo.getData());
}
}catch (Exception e){
// 消息推送失败确认
String key = RedisKey.IM_RESULT_PRIVATE_QUEUE;
SendResult sendResult = new SendResult();
sendResult.setRecvId(recvId);
sendResult.setCode(IMSendCode.UNKONW_ERROR);
sendResult.setMessageInfo(messageInfo);
redisTemplate.opsForList().rightPush(key,sendResult);
log.error("发送异常,发送者:{},接收者:{},内容:{}",messageInfo.getSendId(),recvId,messageInfo.getContent(),e);
sendResult(recvInfo,IMSendCode.UNKONW_ERROR);
log.error("发送异常,发送者:{},接收者:{},内容:{}",recvInfo.getSendId(),recvId,recvInfo.getData(),e);
}
}
private void sendResult(IMRecvInfo recvInfo,IMSendCode sendCode){
if(recvInfo.getNeedSendResult()) {
String key = RedisKey.IM_RESULT_PRIVATE_QUEUE;
SendResult result = new SendResult();
result.setRecvId(recvInfo.getRecvIds().get(0));
result.setCode(sendCode.code());
result.setData(recvInfo.getData());
redisTemplate.opsForList().rightPush(key, result);
}
}
}

4
im-server/src/main/java/com/bx/imserver/task/PullUnreadGroupMessageTask.java

@ -25,8 +25,6 @@ public class PullUnreadGroupMessageTask extends AbstractPullMessageTask {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@Override
public void pullMessage() {
// 从redis拉取未读消息
@ -34,7 +32,7 @@ public class PullUnreadGroupMessageTask extends AbstractPullMessageTask {
List messageInfos = redisTemplate.opsForList().range(key,0,-1);
for(Object o: messageInfos){
redisTemplate.opsForList().leftPop(key);
IMRecvInfo<GroupMessageInfo> recvInfo = (IMRecvInfo)o;
IMRecvInfo recvInfo = (IMRecvInfo)o;
MessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.GROUP_MESSAGE);
processor.process(recvInfo);
}

2
im-server/src/main/java/com/bx/imserver/task/PullUnreadPrivateMessageTask.java

@ -34,7 +34,7 @@ public class PullUnreadPrivateMessageTask extends AbstractPullMessageTask {
List messageInfos = redisTemplate.opsForList().range(key,0,-1);
for(Object o: messageInfos){
redisTemplate.opsForList().leftPop(key);
IMRecvInfo<PrivateMessageInfo> recvInfo = (IMRecvInfo)o;
IMRecvInfo recvInfo = (IMRecvInfo)o;
MessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.PRIVATE_MESSAGE);
processor.process(recvInfo);

10
im-ui/src/view/Home.vue

@ -116,7 +116,9 @@
},
handlePrivateMessage(msg) {
//
let friend = this.$store.state.friendStore.friends.find((f) => f.id == msg.sendId);
msg.selfSend = msg.sendId==this.$store.state.userStore.userInfo.id;
let friendId = msg.selfSend?msg.recvId:msg.sendId;
let friend = this.$store.state.friendStore.friends.find((f) => f.id == friendId);
if (friend) {
this.insertPrivateMessage(friend, msg);
return;
@ -135,7 +137,6 @@
if (msg.type >= this.$enums.MESSAGE_TYPE.RTC_CALL &&
msg.type <= this.$enums.MESSAGE_TYPE.RTC_CANDIDATE) {
//
console.log(msg)
if (msg.type == this.$enums.MESSAGE_TYPE.RTC_CALL ||
msg.type == this.$enums.MESSAGE_TYPE.RTC_CANCEL) {
this.$store.commit("showVideoAcceptorBox", friend);
@ -157,7 +158,8 @@
//
this.$store.commit("insertMessage", msg);
//
this.playAudioTip();
!msg.selfSend && this.playAudioTip();
},
handleGroupMessage(msg) {
//
@ -325,4 +327,4 @@
text-align: center;
}
</style>
</style>

7
im-ui/src/view/Login.vue

@ -2,6 +2,10 @@
<div class="login-view" >
<el-form :model="loginForm" status-icon :rules="rules" ref="loginForm" label-width="60px" class="web-ruleForm" @keyup.enter.native="submitForm('loginForm')">
<div class="login-brand">欢迎登陆</div>
<el-form-item label="终端" prop="userName">
<el-input type="terminal" v-model="loginForm.terminal" autocomplete="off"></el-input>
</el-form-item>
<el-form-item label="用户名" prop="userName">
<el-input type="userName" v-model="loginForm.userName" autocomplete="off"></el-input>
@ -26,14 +30,12 @@
name: "login",
data() {
var checkUsername = (rule, value, callback) => {
console.log("checkUsername");
if (!value) {
return callback(new Error('请输入用户名'));
}
callback();
};
var checkPassword = (rule, value, callback) => {
console.log("checkPassword");
if (value === '') {
callback(new Error('请输入密码'));
}
@ -42,6 +44,7 @@
};
return {
loginForm: {
terminal: 0,
userName: '',
password: ''
},

Loading…
Cancel
Save