From 62e9c89121cc59803eb9f0cab1de3397b8710190 Mon Sep 17 00:00:00 2001 From: "xie.bx" Date: Fri, 4 Nov 2022 00:23:22 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BE=A4=E8=81=8A=E5=8A=9F=E8=83=BD=E5=BC=80?= =?UTF-8?q?=E5=8F=91=E4=B8=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/lx/common/contant/RedisKey.java | 2 +- .../java/com/lx/common/enums/WSCmdEnum.java | 8 +- .../com/lx/common/model/im/LoginInfo.java | 9 ++ .../controller/FriendController.java | 11 +- .../controller/GroupController.java | 10 ++ .../controller/GroupMessageController.java | 10 +- .../java/com/lx/implatform/entity/Group.java | 5 + .../com/lx/implatform/entity/GroupMember.java | 7 ++ .../lx/implatform/entity/GroupMessage.java | 2 +- .../lx/implatform/service/IFriendService.java | 1 + .../service/IGroupMemberService.java | 2 +- .../lx/implatform/service/IGroupService.java | 4 +- .../service/impl/FriendServiceImpl.java | 24 +++- .../service/impl/GroupMemberServiceImpl.java | 42 ++++--- .../service/impl/GroupMessageServiceImpl.java | 50 ++++++++- .../service/impl/GroupServiceImpl.java | 48 +++++--- .../com/lx/implatform/vo/GroupMemberVO.java | 3 + .../java/com/lx/implatform/vo/GroupVO.java | 5 + im-platform/src/main/resources/db/db.sql | 4 +- .../task/PullUnreadGroupMessageTask.java | 2 +- .../task/PullUnreadPrivateMessageTask.java | 8 +- .../imserver/websocket/WebSocketHandler.java | 23 ++-- .../websocket/WebsocketChannelCtxHloder.java | 6 +- .../processor/GroupMessageProcessor.java | 9 +- .../processor/HeartbeatProcessor.java | 27 ++--- .../websocket/processor/LoginProcessor.java | 64 +++++++++++ .../websocket/processor/MessageProcessor.java | 12 +- .../processor/PrivateMessageProcessor.java | 28 +++-- .../websocket/processor/ProcessorFactory.java | 12 +- im-ui/src/api/wssocket.js | 34 +++--- im-ui/src/components/chat/ChatItem.vue | 7 +- im-ui/src/components/friend/FriendItem.vue | 2 +- im-ui/src/components/group/GroupItem.vue | 2 +- im-ui/src/store/chatStore.js | 103 ++++++++++++------ im-ui/src/view/Chat.vue | 26 +++-- im-ui/src/view/Friend.vue | 22 ++-- im-ui/src/view/Group.vue | 16 +-- im-ui/src/view/Home.vue | 83 ++++++++++++-- im-ui/src/view/Login.vue | 9 +- 39 files changed, 544 insertions(+), 198 deletions(-) create mode 100644 commom/src/main/java/com/lx/common/model/im/LoginInfo.java create mode 100644 im-server/src/main/java/com/lx/implatform/imserver/websocket/processor/LoginProcessor.java diff --git a/commom/src/main/java/com/lx/common/contant/RedisKey.java b/commom/src/main/java/com/lx/common/contant/RedisKey.java index 97756c6..1fe64fc 100644 --- a/commom/src/main/java/com/lx/common/contant/RedisKey.java +++ b/commom/src/main/java/com/lx/common/contant/RedisKey.java @@ -13,7 +13,7 @@ public class RedisKey { // 已读私聊消息id队列 public final static String IM_READED_PRIVATE_MESSAGE_ID = "im:readed:private:id"; // 已读群聊消息位置(已读最大id) - public final static String IM_GROUP_READED_POSITION = "im:readed:group:position"; + public final static String IM_GROUP_READED_POSITION = "im:readed:group:position:"; // 缓存前缀 public final static String IM_CACHE = "im:cache:"; // 缓存是否好友:bool diff --git a/commom/src/main/java/com/lx/common/enums/WSCmdEnum.java b/commom/src/main/java/com/lx/common/enums/WSCmdEnum.java index ccac251..a5fc25f 100644 --- a/commom/src/main/java/com/lx/common/enums/WSCmdEnum.java +++ b/commom/src/main/java/com/lx/common/enums/WSCmdEnum.java @@ -2,9 +2,11 @@ package com.lx.common.enums; public enum WSCmdEnum { - HEARTBEAT(0,"心跳"), - PRIVATE_MESSAGE(1,"私聊消息"), - GROUP_MESSAGE(2,"群发消息"); + LOGIN(0,"登陆"), + HEART_BEAT(1,"心跳"), + FORCE_LOGUT(2,"强制下线"), + PRIVATE_MESSAGE(3,"私聊消息"), + GROUP_MESSAGE(4,"群发消息"); private Integer code; diff --git a/commom/src/main/java/com/lx/common/model/im/LoginInfo.java b/commom/src/main/java/com/lx/common/model/im/LoginInfo.java new file mode 100644 index 0000000..b493b32 --- /dev/null +++ b/commom/src/main/java/com/lx/common/model/im/LoginInfo.java @@ -0,0 +1,9 @@ +package com.lx.common.model.im; + +import lombok.Data; + +@Data +public class LoginInfo { + + private long userId; +} diff --git a/im-platform/src/main/java/com/lx/implatform/controller/FriendController.java b/im-platform/src/main/java/com/lx/implatform/controller/FriendController.java index 2bbd2f2..792702a 100644 --- a/im-platform/src/main/java/com/lx/implatform/controller/FriendController.java +++ b/im-platform/src/main/java/com/lx/implatform/controller/FriendController.java @@ -49,9 +49,16 @@ public class FriendController { return ResultUtils.success(); } - @DeleteMapping("/delete") + @GetMapping("/find/{friendId}") + @ApiOperation(value = "查找好友信息",notes="查找好友信息") + public Result findFriend(@NotEmpty(message = "好友id不可为空") @PathVariable("friendId") Long friendId){ + return ResultUtils.success(friendService.findFriend(friendId)); + } + + + @DeleteMapping("/delete/{friendId}") @ApiOperation(value = "删除好友",notes="解除好友关系") - public Result delFriend(@NotEmpty(message = "好友id不可为空") @RequestParam("friendId") Long friendId){ + public Result delFriend(@NotEmpty(message = "好友id不可为空") @PathVariable("friendId") Long friendId){ friendService.delFriend(friendId); return ResultUtils.success(); } diff --git a/im-platform/src/main/java/com/lx/implatform/controller/GroupController.java b/im-platform/src/main/java/com/lx/implatform/controller/GroupController.java index 0a0a211..f1e6015 100644 --- a/im-platform/src/main/java/com/lx/implatform/controller/GroupController.java +++ b/im-platform/src/main/java/com/lx/implatform/controller/GroupController.java @@ -3,10 +3,13 @@ package com.lx.implatform.controller; import com.lx.common.result.Result; import com.lx.common.result.ResultUtils; +import com.lx.common.util.BeanUtils; +import com.lx.implatform.entity.Group; import com.lx.implatform.service.IGroupService; import com.lx.implatform.vo.GroupInviteVO; import com.lx.implatform.vo.GroupMemberVO; import com.lx.implatform.vo.GroupVO; +import com.lx.implatform.vo.UserVO; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @@ -15,6 +18,7 @@ import javax.validation.Valid; import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; import java.util.List; +import java.util.concurrent.locks.ReentrantLock; @RestController @@ -43,6 +47,12 @@ public class GroupController { return ResultUtils.success(); } + @ApiOperation(value = "查询群聊",notes="查询单个群聊信息") + @GetMapping("/find/{groupId}") + public Result findGroup(@NotNull(message = "群聊id不能为空") @PathVariable Long groupId){ + return ResultUtils.success(groupService.findById(groupId)); + } + @ApiOperation(value = "查询群聊列表",notes="查询群聊列表") @GetMapping("/list") public Result> findGroups(){ diff --git a/im-platform/src/main/java/com/lx/implatform/controller/GroupMessageController.java b/im-platform/src/main/java/com/lx/implatform/controller/GroupMessageController.java index e872ea3..ce113f7 100644 --- a/im-platform/src/main/java/com/lx/implatform/controller/GroupMessageController.java +++ b/im-platform/src/main/java/com/lx/implatform/controller/GroupMessageController.java @@ -19,7 +19,7 @@ import javax.validation.Valid; @RestController -@RequestMapping("/group/message") +@RequestMapping("/message/group") public class GroupMessageController { @Autowired @@ -33,5 +33,13 @@ public class GroupMessageController { return ResultUtils.success(); } + + @PostMapping("/pullUnreadMessage") + @ApiOperation(value = "拉取未读消息",notes="拉取未读消息") + public Result pullUnreadMessage(){ + groupMessageService.pullUnreadMessage(); + return ResultUtils.success(); + } + } diff --git a/im-platform/src/main/java/com/lx/implatform/entity/Group.java b/im-platform/src/main/java/com/lx/implatform/entity/Group.java index e0ca164..7749dd9 100644 --- a/im-platform/src/main/java/com/lx/implatform/entity/Group.java +++ b/im-platform/src/main/java/com/lx/implatform/entity/Group.java @@ -63,6 +63,11 @@ public class Group extends Model { @TableField("notice") private String notice; + /** + * 是否已删除 + */ + @TableField("deleted") + private Boolean deleted; /** * 创建时间 diff --git a/im-platform/src/main/java/com/lx/implatform/entity/GroupMember.java b/im-platform/src/main/java/com/lx/implatform/entity/GroupMember.java index 8ea3360..6c8d7fb 100644 --- a/im-platform/src/main/java/com/lx/implatform/entity/GroupMember.java +++ b/im-platform/src/main/java/com/lx/implatform/entity/GroupMember.java @@ -68,6 +68,13 @@ public class GroupMember extends Model { @TableField("remark") private String remark; + /** + * 是否已离开群聊 + */ + @TableField("quit") + private Boolean quit; + + /** * 创建时间 */ diff --git a/im-platform/src/main/java/com/lx/implatform/entity/GroupMessage.java b/im-platform/src/main/java/com/lx/implatform/entity/GroupMessage.java index 93d1542..138d0f9 100644 --- a/im-platform/src/main/java/com/lx/implatform/entity/GroupMessage.java +++ b/im-platform/src/main/java/com/lx/implatform/entity/GroupMessage.java @@ -58,7 +58,7 @@ public class GroupMessage extends Model { * 消息类型 0:文字 1:图片 2:文件 */ @TableField("type") - private Boolean type; + private Integer type; /** * 发送时间 diff --git a/im-platform/src/main/java/com/lx/implatform/service/IFriendService.java b/im-platform/src/main/java/com/lx/implatform/service/IFriendService.java index 4ec4966..3d16fb4 100644 --- a/im-platform/src/main/java/com/lx/implatform/service/IFriendService.java +++ b/im-platform/src/main/java/com/lx/implatform/service/IFriendService.java @@ -26,4 +26,5 @@ public interface IFriendService extends IService { void update(FriendVO vo); + FriendVO findFriend(Long friendId); } diff --git a/im-platform/src/main/java/com/lx/implatform/service/IGroupMemberService.java b/im-platform/src/main/java/com/lx/implatform/service/IGroupMemberService.java index dec2d41..71bc429 100644 --- a/im-platform/src/main/java/com/lx/implatform/service/IGroupMemberService.java +++ b/im-platform/src/main/java/com/lx/implatform/service/IGroupMemberService.java @@ -28,7 +28,7 @@ public interface IGroupMemberService extends IService { boolean save(GroupMember member); - boolean saveBatch(Long groupId,List members); + boolean saveOrUpdateBatch(Long groupId,List members); void removeByGroupId(Long groupId); diff --git a/im-platform/src/main/java/com/lx/implatform/service/IGroupService.java b/im-platform/src/main/java/com/lx/implatform/service/IGroupService.java index 0bfda38..9fb1a77 100644 --- a/im-platform/src/main/java/com/lx/implatform/service/IGroupService.java +++ b/im-platform/src/main/java/com/lx/implatform/service/IGroupService.java @@ -31,7 +31,9 @@ public interface IGroupService extends IService { void invite(GroupInviteVO vo); - Group findById(Long id); + Group GetById(Long groupId); + + GroupVO findById(Long groupId); List findGroupMembers(Long groupId); } diff --git a/im-platform/src/main/java/com/lx/implatform/service/impl/FriendServiceImpl.java b/im-platform/src/main/java/com/lx/implatform/service/impl/FriendServiceImpl.java index a7b9ac5..448da16 100644 --- a/im-platform/src/main/java/com/lx/implatform/service/impl/FriendServiceImpl.java +++ b/im-platform/src/main/java/com/lx/implatform/service/impl/FriendServiceImpl.java @@ -11,6 +11,7 @@ import com.lx.implatform.mapper.FriendMapper; import com.lx.implatform.service.IFriendService; import com.lx.implatform.service.IUserService; import com.lx.implatform.session.SessionContext; +import com.lx.implatform.session.UserSession; import com.lx.implatform.vo.FriendVO; import org.springframework.aop.framework.AopContext; import org.springframework.beans.factory.annotation.Autowired; @@ -54,8 +55,9 @@ public class FriendServiceImpl extends ServiceImpl impleme throw new GlobalException(ResultCode.PROGRAM_ERROR,"不允许添加自己为好友"); } // 互相绑定好友关系 - bindFriend(userId,friendId); - bindFriend(friendId,userId); + FriendServiceImpl proxy = (FriendServiceImpl)AopContext.currentProxy(); + proxy.bindFriend(userId,friendId); + proxy.bindFriend(friendId,userId); } @@ -99,6 +101,7 @@ public class FriendServiceImpl extends ServiceImpl impleme this.updateById(f); } + @CacheEvict(key="#userId+':'+#friendId") public void bindFriend(Long userId, Long friendId) { QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.lambda() @@ -128,4 +131,21 @@ public class FriendServiceImpl extends ServiceImpl impleme } + @Override + public FriendVO findFriend(Long friendId) { + UserSession session = SessionContext.getSession(); + QueryWrapper wrapper = new QueryWrapper<>(); + wrapper.lambda() + .eq(Friend::getUserId,session.getId()) + .eq(Friend::getFriendId,friendId); + Friend friend = this.getOne(wrapper); + if(friend == null){ + throw new GlobalException(ResultCode.PROGRAM_ERROR,"对方不是您的好友"); + } + FriendVO vo = new FriendVO(); + vo.setId(friend.getFriendId()); + vo.setHeadImage(friend.getFriendHeadImage()); + vo.setNickName(friend.getFriendNickName()); + return vo; + } } diff --git a/im-platform/src/main/java/com/lx/implatform/service/impl/GroupMemberServiceImpl.java b/im-platform/src/main/java/com/lx/implatform/service/impl/GroupMemberServiceImpl.java index 7d745c4..86b2292 100644 --- a/im-platform/src/main/java/com/lx/implatform/service/impl/GroupMemberServiceImpl.java +++ b/im-platform/src/main/java/com/lx/implatform/service/impl/GroupMemberServiceImpl.java @@ -1,6 +1,7 @@ package com.lx.implatform.service.impl; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.lx.common.contant.RedisKey; import com.lx.implatform.entity.GroupMember; import com.lx.implatform.mapper.GroupMemberMapper; @@ -42,8 +43,8 @@ public class GroupMemberServiceImpl extends ServiceImpl members) { - return super.saveBatch(members); + public boolean saveOrUpdateBatch(Long groupId,List members) { + return super.saveOrUpdateBatch(members); } /** @@ -70,12 +71,13 @@ public class GroupMemberServiceImpl extends ServiceImpl findByUserId(Long userId) { QueryWrapper memberWrapper = new QueryWrapper(); - memberWrapper.lambda().eq(GroupMember::getUserId, userId); + memberWrapper.lambda().eq(GroupMember::getUserId, userId) + .eq(GroupMember::getQuit,false); return this.list(memberWrapper); } /** - * 根据群聊id查询群聊成员 + * 根据群聊id查询群聊成员(包括已退出) * * @param groupId 群聊id * @return @@ -87,16 +89,26 @@ public class GroupMemberServiceImpl extends ServiceImpl findUserIdsByGroupId(Long groupId) { - List members = this.findByGroupId(groupId); + QueryWrapper memberWrapper = new QueryWrapper(); + memberWrapper.lambda().eq(GroupMember::getGroupId, groupId) + .eq(GroupMember::getQuit,false); + List members = this.list(memberWrapper); return members.stream().map(m->m.getUserId()).collect(Collectors.toList()); } + + /** - *根据群聊id删除成员信息 + *根据群聊id删除移除成员 * * @param groupId 群聊id * @return @@ -104,13 +116,14 @@ public class GroupMemberServiceImpl extends ServiceImpl wrapper = new QueryWrapper(); - wrapper.lambda().eq(GroupMember::getGroupId,groupId); - this.remove(wrapper); + UpdateWrapper wrapper = new UpdateWrapper(); + wrapper.lambda().eq(GroupMember::getGroupId,groupId) + .set(GroupMember::getQuit,true); + this.update(wrapper); } /** - *根据群聊id和用户id删除成员信息 + *根据群聊id和用户id移除成员 * * @param groupId 群聊id * @param userId 用户id @@ -119,9 +132,10 @@ public class GroupMemberServiceImpl extends ServiceImpl wrapper = new QueryWrapper<>(); + UpdateWrapper wrapper = new UpdateWrapper<>(); wrapper.lambda().eq(GroupMember::getGroupId,groupId) - .eq(GroupMember::getUserId,userId); - this.remove(wrapper); + .eq(GroupMember::getUserId,userId) + .set(GroupMember::getQuit,true); + this.update(wrapper); } } diff --git a/im-platform/src/main/java/com/lx/implatform/service/impl/GroupMessageServiceImpl.java b/im-platform/src/main/java/com/lx/implatform/service/impl/GroupMessageServiceImpl.java index e9a8ad4..5e4284d 100644 --- a/im-platform/src/main/java/com/lx/implatform/service/impl/GroupMessageServiceImpl.java +++ b/im-platform/src/main/java/com/lx/implatform/service/impl/GroupMessageServiceImpl.java @@ -1,10 +1,12 @@ package com.lx.implatform.service.impl; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.lx.common.contant.RedisKey; import com.lx.common.enums.ResultCode; import com.lx.common.model.im.GroupMessageInfo; import com.lx.common.util.BeanUtils; import com.lx.implatform.entity.Group; +import com.lx.implatform.entity.GroupMember; import com.lx.implatform.entity.GroupMessage; import com.lx.implatform.exception.GlobalException; import com.lx.implatform.mapper.GroupMessageMapper; @@ -21,6 +23,7 @@ import org.springframework.stereotype.Service; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; @Service @@ -45,7 +48,7 @@ public class GroupMessageServiceImpl extends ServiceImpl> serverMap = new ConcurrentHashMap<>(); List userIds = groupMemberService.findUserIdsByGroupId(group.getId()); + if(!userIds.contains(userId)){ + throw new GlobalException(ResultCode.PROGRAM_ERROR,"您已不在群聊里面,无法发送消息"); + } + userIds.parallelStream().forEach(id->{ + if(id == userId){ + // 自己不需要推送给自己 + return; + } String key = RedisKey.IM_USER_SERVER_ID + id; Integer serverId = (Integer)redisTemplate.opsForValue().get(key); if(serverId != null){ @@ -73,8 +84,8 @@ public class GroupMessageServiceImpl extends ServiceImpl> entry : serverMap.entrySet()) { GroupMessageInfo msgInfo = BeanUtils.copyProperties(msg, GroupMessageInfo.class); - msgInfo.setRecvIds(entry.getValue()); - String key = RedisKey.IM_UNREAD_PRIVATE_MESSAGE +entry.getKey(); + msgInfo.setRecvIds(new LinkedList<>(entry.getValue())); + String key = RedisKey.IM_UNREAD_GROUP_MESSAGE +entry.getKey(); redisTemplate.opsForList().rightPush(key,msgInfo); } } @@ -86,6 +97,37 @@ public class GroupMessageServiceImpl extends ServiceImpl recvIds = new LinkedList(); + recvIds.add(userId); + List members = groupMemberService.findByUserId(userId); + for(GroupMember member:members){ + // 获取群聊已读的最大消息id,只推送未读消息 + key = RedisKey.IM_GROUP_READED_POSITION + member.getGroupId()+":"+userId; + Integer maxReadedId = (Integer)redisTemplate.opsForValue().get(key); + QueryWrapper wrapper = new QueryWrapper(); + wrapper.lambda().eq(GroupMessage::getGroupId,member.getGroupId()); + if(maxReadedId!=null){ + wrapper.lambda().gt(GroupMessage::getId,maxReadedId); + } + wrapper.last("limit 100"); + List messages = this.list(wrapper); + if(messages.isEmpty()){ + continue; + } + // 组装消息,准备推送 + List messageInfos = messages.stream().map(m->{ + GroupMessageInfo msgInfo = BeanUtils.copyProperties(m, GroupMessageInfo.class); + msgInfo.setRecvIds(recvIds); + return msgInfo; + }).collect(Collectors.toList()); + key = RedisKey.IM_UNREAD_GROUP_MESSAGE + serverId; + redisTemplate.opsForList().rightPushAll(key,messageInfos.toArray()); + } } } diff --git a/im-platform/src/main/java/com/lx/implatform/service/impl/GroupServiceImpl.java b/im-platform/src/main/java/com/lx/implatform/service/impl/GroupServiceImpl.java index 138f5e3..e930c6c 100644 --- a/im-platform/src/main/java/com/lx/implatform/service/impl/GroupServiceImpl.java +++ b/im-platform/src/main/java/com/lx/implatform/service/impl/GroupServiceImpl.java @@ -33,6 +33,7 @@ import org.springframework.transaction.annotation.Transactional; import java.lang.reflect.Member; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; @@ -126,12 +127,12 @@ public class GroupServiceImpl extends ServiceImpl implements if(group.getOwnerId() != session.getId()){ throw new GlobalException(ResultCode.PROGRAM_ERROR,"只有群主才有权限解除群聊"); } - // 删除群数据 - this.removeById(groupId); - // 删除成员数据 - groupMemberService.removeByGroupId(groupId); + // 逻辑删除群数据 + group.setDeleted(true); + this.updateById(group); } + /** *退出群聊 * @@ -152,6 +153,24 @@ public class GroupServiceImpl extends ServiceImpl implements groupMemberService.removeByGroupAndUserId(groupId,session.getId()); } + + @Override + public GroupVO findById(Long groupId) { + UserSession session = SessionContext.getSession(); + Group group = super.getById(groupId); + if(group == null){ + throw new GlobalException(ResultCode.PROGRAM_ERROR,"群聊不存在"); + } + GroupMember member = groupMemberService.findByGroupAndUserId(groupId,session.getId()); + if(member == null){ + throw new GlobalException(ResultCode.PROGRAM_ERROR,"您未加入群聊"); + } + GroupVO vo = BeanUtils.copyProperties(group,GroupVO.class); + vo.setAliasName(member.getAliasName()); + vo.setRemark(member.getRemark()); + return vo; + } + /** *根据id查找群聊,并进行缓存 * @@ -160,10 +179,12 @@ public class GroupServiceImpl extends ServiceImpl implements */ @Cacheable(value = "#groupId") @Override - public Group findById(Long groupId){ + public Group GetById(Long groupId){ return super.getById(groupId); } + + /** * 查询当前用户的所有群聊 * @@ -208,16 +229,11 @@ public class GroupServiceImpl extends ServiceImpl implements } // 群聊人数校验 List members = groupMemberService.findByGroupId(vo.getGroupId()); - if(vo.getFriendIds().size() + members.size() > Constant.MAX_GROUP_MEMBER){ + long size = members.stream().filter(m->!m.getQuit()).count(); + if(vo.getFriendIds().size() + size > Constant.MAX_GROUP_MEMBER){ throw new GlobalException(ResultCode.PROGRAM_ERROR, "群聊人数不能大于"+Constant.MAX_GROUP_MEMBER+"人"); } - // 已经在群里面用户,不可重复加入 - Boolean flag = vo.getFriendIds().stream().anyMatch(id->{ - return members.stream().anyMatch(m->m.getUserId()==id); - }); - if(flag){ - throw new GlobalException(ResultCode.PROGRAM_ERROR, "部分用户已经在群中,邀请失败"); - } + // 找出好友信息 List friends = friendsService.findFriendByUserId(session.getId()); List friendsList = vo.getFriendIds().stream().map(id -> @@ -228,16 +244,18 @@ public class GroupServiceImpl extends ServiceImpl implements // 批量保存成员数据 List groupMembers = friendsList.stream() .map(f -> { - GroupMember groupMember = new GroupMember(); + Optional optional = members.stream().filter(m->m.getUserId()==f.getFriendId()).findFirst(); + GroupMember groupMember = optional.isPresent()? optional.get():new GroupMember(); groupMember.setGroupId(vo.getGroupId()); groupMember.setUserId(f.getFriendId()); groupMember.setAliasName(f.getFriendNickName()); groupMember.setRemark(group.getName()); groupMember.setHeadImage(f.getFriendHeadImage()); + groupMember.setQuit(false); return groupMember; }).collect(Collectors.toList()); if(!groupMembers.isEmpty()) { - groupMemberService.saveBatch(group.getId(),groupMembers); + groupMemberService.saveOrUpdateBatch(group.getId(),groupMembers); } } diff --git a/im-platform/src/main/java/com/lx/implatform/vo/GroupMemberVO.java b/im-platform/src/main/java/com/lx/implatform/vo/GroupMemberVO.java index da3cd3a..59d913f 100644 --- a/im-platform/src/main/java/com/lx/implatform/vo/GroupMemberVO.java +++ b/im-platform/src/main/java/com/lx/implatform/vo/GroupMemberVO.java @@ -18,6 +18,9 @@ public class GroupMemberVO { @ApiModelProperty("头像") private String headImage; + @ApiModelProperty("是否已退出") + private Boolean quit; + @ApiModelProperty("备注") private String remark; diff --git a/im-platform/src/main/java/com/lx/implatform/vo/GroupVO.java b/im-platform/src/main/java/com/lx/implatform/vo/GroupVO.java index 7c7d3d2..102fbbe 100644 --- a/im-platform/src/main/java/com/lx/implatform/vo/GroupVO.java +++ b/im-platform/src/main/java/com/lx/implatform/vo/GroupVO.java @@ -6,6 +6,7 @@ import com.baomidou.mybatisplus.annotation.TableId; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.Data; +import org.hibernate.validator.constraints.Length; import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; @@ -19,6 +20,7 @@ public class GroupVO { @ApiModelProperty(value = "群id") private Long id; + @Length(max=20,message = "群名称长度不能大于20") @NotEmpty(message = "群名称不可为空") @ApiModelProperty(value = "群名称") private String name; @@ -33,12 +35,15 @@ public class GroupVO { @ApiModelProperty(value = "头像缩略图") private String headImageThumb; + @Length(max=1024,message = "群聊显示长度不能大于1024") @ApiModelProperty(value = "群公告") private String notice; + @Length(max=20,message = "群聊显示长度不能大于20") @ApiModelProperty(value = "用户在群显示昵称") private String aliasName; + @Length(max=20,message = "群聊显示长度不能大于20") @ApiModelProperty(value = "群聊显示备注") private String remark; diff --git a/im-platform/src/main/resources/db/db.sql b/im-platform/src/main/resources/db/db.sql index 5563217..226b258 100644 --- a/im-platform/src/main/resources/db/db.sql +++ b/im-platform/src/main/resources/db/db.sql @@ -45,6 +45,7 @@ create table `im_group`( `head_image_thumb` varchar(255) default '' comment '群头像缩略图', `notice` varchar(1024) default '' comment '群公告', `remark` varchar(255) default '' comment '群备注', + `deleted` tinyint(1) DEFAULT 0 comment '是否已删除', `created_time` datetime DEFAULT CURRENT_TIMESTAMP comment '创建时间' )ENGINE=InnoDB CHARSET=utf8mb3 comment '群'; @@ -55,6 +56,7 @@ create table `im_group_member`( `alias_name` varchar(255) DEFAULT '' comment '组内显示名称', `head_image` varchar(255) default '' comment '用户头像', `remark` varchar(255) DEFAULT '' comment '备注', + `quit` tinyint(1) DEFAULT 0 comment '是否已退出', `created_time` datetime DEFAULT CURRENT_TIMESTAMP comment '创建时间', key `idx_group_id`(`group_id`), key `idx_user_id`(`user_id`) @@ -63,7 +65,7 @@ create table `im_group_member`( create table `im_group_message`( `id` bigint not null auto_increment primary key comment 'id', `group_id` bigint not null comment '群id', - `send_user_id` bigint not null comment '发送用户id', + `send_id` bigint not null comment '发送用户id', `content` text comment '发送内容', `type` tinyint(1) NOT NULL comment '消息类型 0:文字 1:图片 2:文件', `send_time` datetime DEFAULT CURRENT_TIMESTAMP comment '发送时间', diff --git a/im-server/src/main/java/com/lx/implatform/imserver/task/PullUnreadGroupMessageTask.java b/im-server/src/main/java/com/lx/implatform/imserver/task/PullUnreadGroupMessageTask.java index 3138487..43a7943 100644 --- a/im-server/src/main/java/com/lx/implatform/imserver/task/PullUnreadGroupMessageTask.java +++ b/im-server/src/main/java/com/lx/implatform/imserver/task/PullUnreadGroupMessageTask.java @@ -36,7 +36,7 @@ public class PullUnreadGroupMessageTask extends AbstractPullMessageTask { for(Object o: messageInfos){ redisTemplate.opsForList().leftPop(key); GroupMessageInfo messageInfo = (GroupMessageInfo)o; - MessageProcessor processor = ProcessorFactory.createProcessor(WSCmdEnum.PRIVATE_MESSAGE); + MessageProcessor processor = ProcessorFactory.createProcessor(WSCmdEnum.GROUP_MESSAGE); processor.process(null,messageInfo); } } diff --git a/im-server/src/main/java/com/lx/implatform/imserver/task/PullUnreadPrivateMessageTask.java b/im-server/src/main/java/com/lx/implatform/imserver/task/PullUnreadPrivateMessageTask.java index f802282..ae2a2e8 100644 --- a/im-server/src/main/java/com/lx/implatform/imserver/task/PullUnreadPrivateMessageTask.java +++ b/im-server/src/main/java/com/lx/implatform/imserver/task/PullUnreadPrivateMessageTask.java @@ -30,18 +30,16 @@ public class PullUnreadPrivateMessageTask extends AbstractPullMessageTask { @Override public void pullMessage() { - log.info(Thread.currentThread().getName()); // 从redis拉取未读消息 String key = RedisKey.IM_UNREAD_PRIVATE_MESSAGE + WSServer.getServerId(); List messageInfos = redisTemplate.opsForList().range(key,0,-1); for(Object o: messageInfos){ redisTemplate.opsForList().leftPop(key); PrivateMessageInfo messageInfo = (PrivateMessageInfo)o; - ChannelHandlerContext ctx = WebsocketChannelCtxHloder.getChannelCtx(messageInfo.getRecvId()); - if(ctx != null){ + MessageProcessor processor = ProcessorFactory.createProcessor(WSCmdEnum.PRIVATE_MESSAGE); - processor.process(ctx,messageInfo); - } + processor.process(null,messageInfo); + } } diff --git a/im-server/src/main/java/com/lx/implatform/imserver/websocket/WebSocketHandler.java b/im-server/src/main/java/com/lx/implatform/imserver/websocket/WebSocketHandler.java index 65ec25b..4f757cc 100644 --- a/im-server/src/main/java/com/lx/implatform/imserver/websocket/WebSocketHandler.java +++ b/im-server/src/main/java/com/lx/implatform/imserver/websocket/WebSocketHandler.java @@ -31,10 +31,8 @@ public class WebSocketHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, SendInfo sendInfo) throws Exception { // 创建处理器进行处理 - HashMap map = (HashMap)sendInfo.getData(); - HeartbeatInfo beatInfo = BeanUtil.fillBeanWithMap(map, new HeartbeatInfo(), false); MessageProcessor processor = ProcessorFactory.createProcessor(WSCmdEnum.fromCode(sendInfo.getCmd())); - processor.process(ctx,beatInfo); + processor.process(ctx,processor.transForm(sendInfo.getData())); } /** @@ -64,16 +62,19 @@ public class WebSocketHandler extends SimpleChannelInboundHandler { @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - AttributeKey attr = AttributeKey.valueOf("USER_ID"); Long userId = ctx.channel().attr(attr).get(); - // 移除channel - WebsocketChannelCtxHloder.removeChannelCtx(userId); - // 用户下线 - RedisTemplate redisTemplate = SpringContextHolder.getBean("redisTemplate"); - String key = RedisKey.IM_USER_SERVER_ID + userId; - redisTemplate.delete(key); - log.info(ctx.channel().id().asLongText() + "断开连接"); + ChannelHandlerContext context = WebsocketChannelCtxHloder.getChannelCtx(userId); + // 判断一下,避免异地登录导致的误删 + if(context != null && ctx.channel().id().equals(context.channel().id())){ + // 移除channel + WebsocketChannelCtxHloder.removeChannelCtx(userId); + // 用户下线 + RedisTemplate redisTemplate = SpringContextHolder.getBean("redisTemplate"); + String key = RedisKey.IM_USER_SERVER_ID + userId; + redisTemplate.delete(key); + log.info("断开连接,userId:{}",userId); + } } @Override diff --git a/im-server/src/main/java/com/lx/implatform/imserver/websocket/WebsocketChannelCtxHloder.java b/im-server/src/main/java/com/lx/implatform/imserver/websocket/WebsocketChannelCtxHloder.java index c2c0789..dad9360 100644 --- a/im-server/src/main/java/com/lx/implatform/imserver/websocket/WebsocketChannelCtxHloder.java +++ b/im-server/src/main/java/com/lx/implatform/imserver/websocket/WebsocketChannelCtxHloder.java @@ -2,8 +2,7 @@ package com.lx.implatform.imserver.websocket; import io.netty.channel.ChannelHandlerContext; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; public class WebsocketChannelCtxHloder { @@ -19,6 +18,9 @@ public class WebsocketChannelCtxHloder { channelMap.remove(userId); } + + + public static ChannelHandlerContext getChannelCtx(Long userId){ return channelMap.get(userId); } diff --git a/im-server/src/main/java/com/lx/implatform/imserver/websocket/processor/GroupMessageProcessor.java b/im-server/src/main/java/com/lx/implatform/imserver/websocket/processor/GroupMessageProcessor.java index 4efa1db..b331501 100644 --- a/im-server/src/main/java/com/lx/implatform/imserver/websocket/processor/GroupMessageProcessor.java +++ b/im-server/src/main/java/com/lx/implatform/imserver/websocket/processor/GroupMessageProcessor.java @@ -16,7 +16,7 @@ import java.util.List; @Slf4j @Component -public class GroupMessageProcessor implements MessageProcessor { +public class GroupMessageProcessor extends MessageProcessor { @Autowired private RedisTemplate redisTemplate; @@ -24,7 +24,7 @@ public class GroupMessageProcessor implements MessageProcessor recvIds = data.getRecvIds(); // 接收者id列表不需要传输,节省带宽 data.setRecvIds(null); @@ -37,9 +37,12 @@ public class GroupMessageProcessor implements MessageProcessor { +public class HeartbeatProcessor extends MessageProcessor { @Autowired @@ -30,22 +26,17 @@ public class HeartbeatProcessor implements MessageProcessor { @Override public void process(ChannelHandlerContext ctx, HeartbeatInfo beatInfo) { - log.info("接收到心跳,userId:{}",beatInfo.getUserId()); - - // 绑定用户和channel - WebsocketChannelCtxHloder.addChannelCtx(beatInfo.getUserId(),ctx); - // 设置属性 - AttributeKey attr = AttributeKey.valueOf("USER_ID"); - ctx.channel().attr(attr).set(beatInfo.getUserId()); - - // 在redis上记录每个user的channelId,15秒没有心跳,则自动过期 - String key = RedisKey.IM_USER_SERVER_ID+beatInfo.getUserId(); - redisTemplate.opsForValue().set(key, WSServer.getServerId(),15, TimeUnit.SECONDS); - // 响应ws SendInfo sendInfo = new SendInfo(); - sendInfo.setCmd(WSCmdEnum.HEARTBEAT.getCode()); + sendInfo.setCmd(WSCmdEnum.HEART_BEAT.getCode()); ctx.channel().writeAndFlush(sendInfo); } + + @Override + public HeartbeatInfo transForm(Object o) { + HashMap map = (HashMap)o; + HeartbeatInfo heartbeatInfo = BeanUtil.fillBeanWithMap(map, new HeartbeatInfo(), false); + return heartbeatInfo; + } } diff --git a/im-server/src/main/java/com/lx/implatform/imserver/websocket/processor/LoginProcessor.java b/im-server/src/main/java/com/lx/implatform/imserver/websocket/processor/LoginProcessor.java new file mode 100644 index 0000000..06e0427 --- /dev/null +++ b/im-server/src/main/java/com/lx/implatform/imserver/websocket/processor/LoginProcessor.java @@ -0,0 +1,64 @@ +package com.lx.implatform.imserver.websocket.processor; + +import cn.hutool.core.bean.BeanUtil; +import com.lx.common.contant.RedisKey; +import com.lx.common.enums.WSCmdEnum; +import com.lx.common.model.im.HeartbeatInfo; +import com.lx.common.model.im.LoginInfo; +import com.lx.common.model.im.SendInfo; +import com.lx.implatform.imserver.websocket.WebsocketChannelCtxHloder; +import com.lx.implatform.imserver.websocket.WebsocketServer; +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.AttributeKey; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Component +public class LoginProcessor extends MessageProcessor { + + + @Autowired + private WebsocketServer WSServer; + + @Autowired + RedisTemplate redisTemplate; + + @Override + synchronized public void process(ChannelHandlerContext ctx, LoginInfo loginInfo) { + log.info("用户登录,userId:{}",loginInfo.getUserId()); + ChannelHandlerContext context = WebsocketChannelCtxHloder.getChannelCtx(loginInfo.getUserId()); + if(context != null){ + // 不允许多地登录,强制下线 + SendInfo sendInfo = new SendInfo(); + sendInfo.setCmd(WSCmdEnum.FORCE_LOGUT.getCode()); + context.channel().writeAndFlush(sendInfo); + } + + // 绑定用户和channel + WebsocketChannelCtxHloder.addChannelCtx(loginInfo.getUserId(),ctx); + // 设置属性 + AttributeKey attr = AttributeKey.valueOf("USER_ID"); + ctx.channel().attr(attr).set(loginInfo.getUserId()); + // 在redis上记录每个user的channelId,15秒没有心跳,则自动过期 + String key = RedisKey.IM_USER_SERVER_ID+loginInfo.getUserId(); + redisTemplate.opsForValue().set(key, WSServer.getServerId()); + // 响应ws + SendInfo sendInfo = new SendInfo(); + sendInfo.setCmd(WSCmdEnum.LOGIN.getCode()); + ctx.channel().writeAndFlush(sendInfo); + } + + + @Override + public LoginInfo transForm(Object o) { + HashMap map = (HashMap)o; + LoginInfo loginInfo = BeanUtil.fillBeanWithMap(map, new LoginInfo(), false); + return loginInfo; + } +} diff --git a/im-server/src/main/java/com/lx/implatform/imserver/websocket/processor/MessageProcessor.java b/im-server/src/main/java/com/lx/implatform/imserver/websocket/processor/MessageProcessor.java index 71f4ac9..120441a 100644 --- a/im-server/src/main/java/com/lx/implatform/imserver/websocket/processor/MessageProcessor.java +++ b/im-server/src/main/java/com/lx/implatform/imserver/websocket/processor/MessageProcessor.java @@ -1,9 +1,17 @@ package com.lx.implatform.imserver.websocket.processor; + import io.netty.channel.ChannelHandlerContext; -public interface MessageProcessor { +public abstract class MessageProcessor { + + public void process(ChannelHandlerContext ctx,T data){} + + public void process(T data){} + + public T transForm(Object o){ + return (T)o; + } - void process(ChannelHandlerContext ctx,T data); } diff --git a/im-server/src/main/java/com/lx/implatform/imserver/websocket/processor/PrivateMessageProcessor.java b/im-server/src/main/java/com/lx/implatform/imserver/websocket/processor/PrivateMessageProcessor.java index 6450172..8923aac 100644 --- a/im-server/src/main/java/com/lx/implatform/imserver/websocket/processor/PrivateMessageProcessor.java +++ b/im-server/src/main/java/com/lx/implatform/imserver/websocket/processor/PrivateMessageProcessor.java @@ -4,6 +4,7 @@ import com.lx.common.contant.RedisKey; import com.lx.common.enums.WSCmdEnum; import com.lx.common.model.im.SendInfo; import com.lx.common.model.im.PrivateMessageInfo; +import com.lx.implatform.imserver.websocket.WebsocketChannelCtxHloder; import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -11,26 +12,33 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; +import java.util.List; + @Slf4j @Component -public class PrivateMessageProcessor implements MessageProcessor { +public class PrivateMessageProcessor extends MessageProcessor { @Autowired private RedisTemplate redisTemplate; - @Async @Override public void process(ChannelHandlerContext ctx, PrivateMessageInfo data) { log.info("接收到消息,发送者:{},接收者:{},内容:{}",data.getSendId(),data.getRecvId(),data.getContent()); - // 推送消息到用户 - SendInfo sendInfo = new SendInfo(); - sendInfo.setCmd(WSCmdEnum.PRIVATE_MESSAGE.getCode()); - sendInfo.setData(data); - ctx.channel().writeAndFlush(sendInfo); + // 一个用户可以同时登陆,所以有多个channel + ChannelHandlerContext channelCtx = WebsocketChannelCtxHloder.getChannelCtx(data.getRecvId()); + if(channelCtx != null ){ + // 推送消息到用户 + SendInfo sendInfo = new SendInfo(); + sendInfo.setCmd(WSCmdEnum.PRIVATE_MESSAGE.getCode()); + sendInfo.setData(data); + channelCtx.channel().writeAndFlush(sendInfo); + // 已读消息推送至redis,等待更新数据库 + String key = RedisKey.IM_READED_PRIVATE_MESSAGE_ID; + redisTemplate.opsForList().rightPush(key,data.getId()); + }else{ + log.error("未找到WS连接,发送者:{},接收者:{},内容:{}",data.getSendId(),data.getRecvId(),data.getContent()); + } - // 已读消息推送至redis,等待更新数据库 - String key = RedisKey.IM_READED_PRIVATE_MESSAGE_ID; - redisTemplate.opsForList().rightPush(key,data.getId()); } } diff --git a/im-server/src/main/java/com/lx/implatform/imserver/websocket/processor/ProcessorFactory.java b/im-server/src/main/java/com/lx/implatform/imserver/websocket/processor/ProcessorFactory.java index 8574abf..0dfa2a6 100644 --- a/im-server/src/main/java/com/lx/implatform/imserver/websocket/processor/ProcessorFactory.java +++ b/im-server/src/main/java/com/lx/implatform/imserver/websocket/processor/ProcessorFactory.java @@ -8,11 +8,17 @@ public class ProcessorFactory { public static MessageProcessor createProcessor(WSCmdEnum cmd){ MessageProcessor processor = null; switch (cmd){ - case HEARTBEAT: - processor = (MessageProcessor) SpringContextHolder.getApplicationContext().getBean("heartbeatProcessor"); + case LOGIN: + processor = (MessageProcessor) SpringContextHolder.getApplicationContext().getBean(LoginProcessor.class); + break; + case HEART_BEAT: + processor = (MessageProcessor) SpringContextHolder.getApplicationContext().getBean(HeartbeatProcessor.class); break; case PRIVATE_MESSAGE: - processor = (MessageProcessor)SpringContextHolder.getApplicationContext().getBean("privateMessageProcessor"); + processor = (MessageProcessor)SpringContextHolder.getApplicationContext().getBean(PrivateMessageProcessor.class); + break; + case GROUP_MESSAGE: + processor = (MessageProcessor)SpringContextHolder.getApplicationContext().getBean(GroupMessageProcessor.class); break; default: break; diff --git a/im-ui/src/api/wssocket.js b/im-ui/src/api/wssocket.js index bd1b3b1..b365ef6 100644 --- a/im-ui/src/api/wssocket.js +++ b/im-ui/src/api/wssocket.js @@ -1,12 +1,11 @@ var websock = null; let rec; //断线重连后,延迟5秒重新创建WebSocket连接 rec用来存储延迟请求的代码 let isConnect = false; //连接标识 避免重复连接 -let isCompleteConnect = false; //完全连接标识(接收到心跳) let wsurl = ""; let $store = null; let messageCallBack = null; let openCallBack = null; - +let hasLogin = false; let createWebSocket = (url, store) => { $store = store; @@ -17,16 +16,19 @@ let createWebSocket = (url, store) => { let initWebSocket = () => { try { console.log("初始化WebSocket"); - isCompleteConnect = false; + hasLogin = false; websock = new WebSocket(wsurl); websock.onmessage = function(e) { let msg = JSON.parse(e.data) if (msg.cmd == 0) { - if(!isCompleteConnect){ - // 第一次上传心跳成功才算连接完成 - isCompleteConnect = true; - openCallBack && openCallBack(); - } + hasLogin = true; + heartCheck.start() + console.log('WebSocket登录成功') + // 登录成功才算连接完成 + openCallBack && openCallBack(); + } + else if(msg.cmd==1){ + // 重新开启心跳定时 heartCheck.reset(); } else { // 其他消息转发出去 @@ -36,12 +38,17 @@ let initWebSocket = () => { websock.onclose = function(e) { console.log('WebSocket连接关闭') isConnect = false; //断开后修改标识 - reConnect(); } websock.onopen = function() { console.log("WebSocket连接成功"); isConnect = true; - heartCheck.start() + // 发送登录命令 + let loginInfo = { + cmd: 0, + data: {userId: $store.state.userStore.userInfo.id} + }; + websock.send(JSON.stringify(loginInfo)); + } // 连接发生错误的回调方法 @@ -69,6 +76,8 @@ let reConnect = () => { let closeWebSocket = () => { websock.close(); }; + + //心跳设置 var heartCheck = { timeout: 5000, //每段时间发送一次心跳包 这里设置为20s @@ -77,14 +86,13 @@ var heartCheck = { if(isConnect){ console.log('发送WebSocket心跳') let heartBeat = { - cmd: 0, + cmd: 1, data: { userId: $store.state.userStore.userInfo.id } }; websock.send(JSON.stringify(heartBeat)) } - }, reset: function(){ @@ -125,7 +133,7 @@ function onmessage(callback) { function onopen(callback) { openCallBack = callback; - if (isCompleteConnect) { + if (hasLogin) { openCallBack(); } } diff --git a/im-ui/src/components/chat/ChatItem.vue b/im-ui/src/components/chat/ChatItem.vue index 49f62e5..31b8628 100644 --- a/im-ui/src/components/chat/ChatItem.vue +++ b/im-ui/src/components/chat/ChatItem.vue @@ -1,6 +1,6 @@