From 8cfeffae5018b3990c3f1e68155a4a1a9b89ad47 Mon Sep 17 00:00:00 2001 From: "xie.bx" Date: Sat, 19 Nov 2022 17:22:46 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=81=E8=A3=85im-client?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- commom/pom.xml | 20 +--- .../java/com/bx/common/contant/Constant.java | 7 -- .../java/com/bx/common/contant/RedisKey.java | 21 ++-- .../{FileTypeEnum.java => FileType.java} | 8 +- .../enums/{WSCmdEnum.java => IMCmdType.java} | 8 +- .../com/bx/common/enums/ListenerType.java | 24 ++++ ...sageStatusEnum.java => MessageStatus.java} | 8 +- ...{MessageTypeEnum.java => MessageType.java} | 13 +- .../com/bx/common/enums/SendResultType.java | 25 ++++ .../bx/common/model/im/GroupMessageInfo.java | 3 - .../com/bx/common/model/im/IMRecvInfo.java | 17 +++ .../com/bx/common/model/im/SendResult.java | 17 +++ im-client/pom.xml | 26 ++++ .../com/bx/imclient/IMAutoConfiguration.java | 12 ++ .../main/java/com/bx/imclient/IMClient.java | 30 +++++ .../bx/imclient/annotation/IMListener.java | 18 +++ .../com/bx/imclient/config/RedisConfig.java | 49 ++++++++ .../bx/imclient/listener/MessageListener.java | 10 ++ .../listener/MessageListenerMulticaster.java | 28 +++++ .../java/com/bx/imclient/sender/IMSender.java | 112 ++++++++++++++++++ .../task/AbstractPullMessageTask.java | 47 ++++++++ .../task/PullSendResultGroupMessageTask.java | 33 ++++++ .../PullSendResultPrivateMessageTask.java | 38 ++++++ .../main/resources/META-INF/spring.factories | 2 + im-platform/pom.xml | 7 +- .../java/com/bx/implatform/ImplatformApp.java | 2 - .../implatform/config/WebSecurityConfg.java | 6 +- .../com/bx/implatform/contant/Constant.java | 12 ++ .../com/bx/implatform/contant/RedisKey.java | 16 +++ .../implatform/controller/FileController.java | 4 +- .../controller/FriendController.java | 5 +- .../controller/GroupController.java | 4 +- .../controller/GroupMessageController.java | 4 +- .../controller/PrivateMessageController.java | 4 +- .../controller/RegisterController.java | 4 +- .../implatform/controller/UserController.java | 7 +- .../com/bx/implatform}/enums/ResultCode.java | 2 +- .../implatform/exception/GlobalException.java | 2 +- .../exception/GlobalExceptionHandler.java | 6 +- .../implatform}/generator/CodeGenerator.java | 2 +- .../listener/GroupMessageListener.java | 34 ++++++ .../listener/PrivateMessageListener.java | 40 +++++++ .../com/bx/implatform}/result/Result.java | 2 +- .../bx/implatform}/result/ResultUtils.java | 6 +- .../service/impl/FriendServiceImpl.java | 4 +- .../service/impl/GroupMemberServiceImpl.java | 2 +- .../service/impl/GroupMessageServiceImpl.java | 66 ++++------- .../service/impl/GroupServiceImpl.java | 8 +- .../impl/PrivateMessageServiceImpl.java | 54 ++++----- .../impl/SecurityUserDetailsServiceImpl.java | 2 +- .../service/impl/UserServiceImpl.java | 4 +- .../service/thirdparty/FileService.java | 14 +-- .../task/PullAlreadyReadMessageTask.java | 74 ------------ .../com/bx/implatform}/util/BeanUtils.java | 3 +- .../bx/implatform}/util/DateTimeUtils.java | 2 +- .../com/bx/implatform/util/MinioUtil.java | 1 - .../src/main/resources/application.yml | 2 +- .../task/PullUnreadGroupMessageTask.java | 11 +- .../task/PullUnreadPrivateMessageTask.java | 11 +- .../imserver/websocket/WebSocketHandler.java | 4 +- .../processor/GroupMessageProcessor.java | 72 +++++++---- .../processor/HeartbeatProcessor.java | 4 +- .../websocket/processor/LoginProcessor.java | 6 +- .../processor/PrivateMessageProcessor.java | 64 ++++++---- .../websocket/processor/ProcessorFactory.java | 4 +- pom.xml | 1 + 66 files changed, 827 insertions(+), 331 deletions(-) rename commom/src/main/java/com/bx/common/enums/{FileTypeEnum.java => FileType.java} (73%) rename commom/src/main/java/com/bx/common/enums/{WSCmdEnum.java => IMCmdType.java} (77%) create mode 100644 commom/src/main/java/com/bx/common/enums/ListenerType.java rename commom/src/main/java/com/bx/common/enums/{MessageStatusEnum.java => MessageStatus.java} (70%) rename commom/src/main/java/com/bx/common/enums/{MessageTypeEnum.java => MessageType.java} (54%) create mode 100644 commom/src/main/java/com/bx/common/enums/SendResultType.java create mode 100644 commom/src/main/java/com/bx/common/model/im/IMRecvInfo.java create mode 100644 commom/src/main/java/com/bx/common/model/im/SendResult.java create mode 100644 im-client/pom.xml create mode 100644 im-client/src/main/java/com/bx/imclient/IMAutoConfiguration.java create mode 100644 im-client/src/main/java/com/bx/imclient/IMClient.java create mode 100644 im-client/src/main/java/com/bx/imclient/annotation/IMListener.java create mode 100644 im-client/src/main/java/com/bx/imclient/config/RedisConfig.java create mode 100644 im-client/src/main/java/com/bx/imclient/listener/MessageListener.java create mode 100644 im-client/src/main/java/com/bx/imclient/listener/MessageListenerMulticaster.java create mode 100644 im-client/src/main/java/com/bx/imclient/sender/IMSender.java create mode 100644 im-client/src/main/java/com/bx/imclient/task/AbstractPullMessageTask.java create mode 100644 im-client/src/main/java/com/bx/imclient/task/PullSendResultGroupMessageTask.java create mode 100644 im-client/src/main/java/com/bx/imclient/task/PullSendResultPrivateMessageTask.java create mode 100644 im-client/src/main/resources/META-INF/spring.factories create mode 100644 im-platform/src/main/java/com/bx/implatform/contant/Constant.java create mode 100644 im-platform/src/main/java/com/bx/implatform/contant/RedisKey.java rename {commom/src/main/java/com/bx/common => im-platform/src/main/java/com/bx/implatform}/enums/ResultCode.java (96%) rename {commom/src/main/java/com/bx/common => im-platform/src/main/java/com/bx/implatform}/generator/CodeGenerator.java (99%) create mode 100644 im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java create mode 100644 im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java rename {commom/src/main/java/com/bx/common => im-platform/src/main/java/com/bx/implatform}/result/Result.java (79%) rename {commom/src/main/java/com/bx/common => im-platform/src/main/java/com/bx/implatform}/result/ResultUtils.java (94%) delete mode 100644 im-platform/src/main/java/com/bx/implatform/task/PullAlreadyReadMessageTask.java rename {commom/src/main/java/com/bx/common => im-platform/src/main/java/com/bx/implatform}/util/BeanUtils.java (99%) rename {commom/src/main/java/com/bx/common => im-platform/src/main/java/com/bx/implatform}/util/DateTimeUtils.java (99%) diff --git a/commom/pom.xml b/commom/pom.xml index bec5ea0..6641d62 100644 --- a/commom/pom.xml +++ b/commom/pom.xml @@ -34,25 +34,11 @@ org.apache.commons commons-lang3 - - com.baomidou - mybatis-plus-generator - 3.3.2 - - - mysql - mysql-connector-java - compile - + org.springframework spring-beans - - - org.springframework.boot - spring-boot-starter-data-redis - org.apache.velocity velocity @@ -63,5 +49,9 @@ jackson-datatype-joda 2.9.10 + + org.springframework + spring-context + \ No newline at end of file diff --git a/commom/src/main/java/com/bx/common/contant/Constant.java b/commom/src/main/java/com/bx/common/contant/Constant.java index 6c5f130..296e054 100644 --- a/commom/src/main/java/com/bx/common/contant/Constant.java +++ b/commom/src/main/java/com/bx/common/contant/Constant.java @@ -1,15 +1,8 @@ package com.bx.common.contant; - public class Constant { - // 最大图片上传大小 - public static final long MAX_IMAGE_SIZE = 5*1024*1024; - // 最大上传文件大小 - public static final long MAX_FILE_SIZE = 10*1024*1024; - // 群聊最大人数 - public static final long MAX_GROUP_MEMBER = 500; // 在线状态过期时间 600s public static final long ONLINE_TIMEOUT_SECOND = 600; // 消息允许撤回时间 300s diff --git a/commom/src/main/java/com/bx/common/contant/RedisKey.java b/commom/src/main/java/com/bx/common/contant/RedisKey.java index ff0c04d..2a68dd7 100644 --- a/commom/src/main/java/com/bx/common/contant/RedisKey.java +++ b/commom/src/main/java/com/bx/common/contant/RedisKey.java @@ -7,20 +7,13 @@ public class RedisKey { // 用户ID所连接的IM-server的ID public final static String IM_USER_SERVER_ID = "im:user:server_id:"; // 未读私聊消息队列 - public final static String IM_UNREAD_PRIVATE_MESSAGE = "im:unread:private:"; + public final static String IM_UNREAD_PRIVATE_QUEUE = "im:unread:private:"; // 未读群聊消息队列 - public final static String IM_UNREAD_GROUP_MESSAGE = "im:unread:group:"; - // 已读私聊消息id队列 - public final static String IM_READED_PRIVATE_MESSAGE_ID = "im:readed:private:id"; - // 已读群聊消息位置(已读最大id) - public final static String IM_GROUP_READED_POSITION = "im:readed:group:position:"; - // 缓存前缀 - public final static String IM_CACHE = "im:cache:"; - // 缓存是否好友:bool - public final static String IM_CACHE_FRIEND = IM_CACHE+"friend"; - // 缓存群聊信息 - public final static String IM_CACHE_GROUP = IM_CACHE+"group"; - // 缓存群聊成员id - public final static String IM_CACHE_GROUP_MEMBER_ID = IM_CACHE+"group_member_ids"; + public final static String IM_UNREAD_GROUP_QUEUE = "im:unread:group:"; + // 私聊消息发送结果队列 + public final static String IM_RESULT_PRIVATE_QUEUE = "im:result:private"; + // 群聊消息发送结果队列 + public final static String IM_RESULT_GROUP_QUEUE = "im:result:group"; + } diff --git a/commom/src/main/java/com/bx/common/enums/FileTypeEnum.java b/commom/src/main/java/com/bx/common/enums/FileType.java similarity index 73% rename from commom/src/main/java/com/bx/common/enums/FileTypeEnum.java rename to commom/src/main/java/com/bx/common/enums/FileType.java index a41ac1f..afe3e5b 100644 --- a/commom/src/main/java/com/bx/common/enums/FileTypeEnum.java +++ b/commom/src/main/java/com/bx/common/enums/FileType.java @@ -1,6 +1,6 @@ package com.bx.common.enums; -public enum FileTypeEnum { +public enum FileType { FILE(0,"文件"), IMAGE(1,"图片"), @@ -13,13 +13,13 @@ public enum FileTypeEnum { private String desc; - FileTypeEnum(Integer index, String desc) { + FileType(Integer index, String desc) { this.code =index; this.desc=desc; } - public static FileTypeEnum fromCode(Integer code){ - for (FileTypeEnum typeEnum:values()) { + public static FileType fromCode(Integer code){ + for (FileType typeEnum:values()) { if (typeEnum.code.equals(code)) { return typeEnum; } diff --git a/commom/src/main/java/com/bx/common/enums/WSCmdEnum.java b/commom/src/main/java/com/bx/common/enums/IMCmdType.java similarity index 77% rename from commom/src/main/java/com/bx/common/enums/WSCmdEnum.java rename to commom/src/main/java/com/bx/common/enums/IMCmdType.java index 587ae04..f0233db 100644 --- a/commom/src/main/java/com/bx/common/enums/WSCmdEnum.java +++ b/commom/src/main/java/com/bx/common/enums/IMCmdType.java @@ -1,6 +1,6 @@ package com.bx.common.enums; -public enum WSCmdEnum { +public enum IMCmdType { LOGIN(0,"登陆"), HEART_BEAT(1,"心跳"), @@ -13,13 +13,13 @@ public enum WSCmdEnum { private String desc; - WSCmdEnum(Integer index, String desc) { + IMCmdType(Integer index, String desc) { this.code =index; this.desc=desc; } - public static WSCmdEnum fromCode(Integer code){ - for (WSCmdEnum typeEnum:values()) { + public static IMCmdType fromCode(Integer code){ + for (IMCmdType typeEnum:values()) { if (typeEnum.code.equals(code)) { return typeEnum; } diff --git a/commom/src/main/java/com/bx/common/enums/ListenerType.java b/commom/src/main/java/com/bx/common/enums/ListenerType.java new file mode 100644 index 0000000..d0cbb64 --- /dev/null +++ b/commom/src/main/java/com/bx/common/enums/ListenerType.java @@ -0,0 +1,24 @@ +package com.bx.common.enums; + +public enum ListenerType { + + PRIVATE_MESSAGE(0,"私聊消息"), + GROUP_MESSAGE(1,"群聊消息"); + + private Integer code; + + private String desc; + + ListenerType(Integer index, String desc) { + this.code =index; + this.desc=desc; + } + + public String getDesc() { + return desc; + } + + public Integer getCode(){ + return this.code; + } +} diff --git a/commom/src/main/java/com/bx/common/enums/MessageStatusEnum.java b/commom/src/main/java/com/bx/common/enums/MessageStatus.java similarity index 70% rename from commom/src/main/java/com/bx/common/enums/MessageStatusEnum.java rename to commom/src/main/java/com/bx/common/enums/MessageStatus.java index be6997b..f36c303 100644 --- a/commom/src/main/java/com/bx/common/enums/MessageStatusEnum.java +++ b/commom/src/main/java/com/bx/common/enums/MessageStatus.java @@ -1,7 +1,7 @@ package com.bx.common.enums; -public enum MessageStatusEnum { +public enum MessageStatus { UNREAD(0,"未读"), ALREADY_READ(1,"已读"), @@ -11,13 +11,13 @@ public enum MessageStatusEnum { private String desc; - MessageStatusEnum(Integer index, String desc) { + MessageStatus(Integer index, String desc) { this.code =index; this.desc=desc; } - public static MessageStatusEnum fromCode(Integer code){ - for (MessageStatusEnum typeEnum:values()) { + public static MessageStatus fromCode(Integer code){ + for (MessageStatus typeEnum:values()) { if (typeEnum.code.equals(code)) { return typeEnum; } diff --git a/commom/src/main/java/com/bx/common/enums/MessageTypeEnum.java b/commom/src/main/java/com/bx/common/enums/MessageType.java similarity index 54% rename from commom/src/main/java/com/bx/common/enums/MessageTypeEnum.java rename to commom/src/main/java/com/bx/common/enums/MessageType.java index de53313..9060809 100644 --- a/commom/src/main/java/com/bx/common/enums/MessageTypeEnum.java +++ b/commom/src/main/java/com/bx/common/enums/MessageType.java @@ -1,7 +1,7 @@ package com.bx.common.enums; -public enum MessageTypeEnum { +public enum MessageType { TEXT(0,"文字"), FILE(1,"文件"), @@ -13,20 +13,11 @@ public enum MessageTypeEnum { private String desc; - MessageTypeEnum(Integer index, String desc) { + MessageType(Integer index, String desc) { this.code =index; this.desc=desc; } - public static MessageTypeEnum fromCode(Integer code){ - for (MessageTypeEnum typeEnum:values()) { - if (typeEnum.code.equals(code)) { - return typeEnum; - } - } - return null; - } - public String getDesc() { return desc; diff --git a/commom/src/main/java/com/bx/common/enums/SendResultType.java b/commom/src/main/java/com/bx/common/enums/SendResultType.java new file mode 100644 index 0000000..cfbf124 --- /dev/null +++ b/commom/src/main/java/com/bx/common/enums/SendResultType.java @@ -0,0 +1,25 @@ +package com.bx.common.enums; + + +public enum SendResultType { + + SUCCESS(0,"发送成功"), + FAIL(1,"发送失败"); + + private int code; + private String msg; + + // 构造方法 + SendResultType(int code, String msg) { + this.code = code; + this.msg = msg; + } + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + +} diff --git a/commom/src/main/java/com/bx/common/model/im/GroupMessageInfo.java b/commom/src/main/java/com/bx/common/model/im/GroupMessageInfo.java index 67f08a7..b471fe7 100644 --- a/commom/src/main/java/com/bx/common/model/im/GroupMessageInfo.java +++ b/commom/src/main/java/com/bx/common/model/im/GroupMessageInfo.java @@ -5,7 +5,6 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize; import lombok.Data; import java.util.Date; -import java.util.List; @Data public class GroupMessageInfo { @@ -16,8 +15,6 @@ public class GroupMessageInfo { private Long sendId; - private List recvIds; - private String content; private Integer type; diff --git a/commom/src/main/java/com/bx/common/model/im/IMRecvInfo.java b/commom/src/main/java/com/bx/common/model/im/IMRecvInfo.java new file mode 100644 index 0000000..e0d8e89 --- /dev/null +++ b/commom/src/main/java/com/bx/common/model/im/IMRecvInfo.java @@ -0,0 +1,17 @@ +package com.bx.common.model.im; + +import lombok.Data; + +import java.util.List; + +@Data +public class IMRecvInfo { + + private Integer cmd; + + private List recvIds; + + private T data; +} + + diff --git a/commom/src/main/java/com/bx/common/model/im/SendResult.java b/commom/src/main/java/com/bx/common/model/im/SendResult.java new file mode 100644 index 0000000..c655b2f --- /dev/null +++ b/commom/src/main/java/com/bx/common/model/im/SendResult.java @@ -0,0 +1,17 @@ +package com.bx.common.model.im; + +import com.bx.common.enums.SendResultType; +import lombok.Data; + +@Data +public class SendResult { + + private Long recvId; + + private SendResultType result; + + private String failReason=""; + + private T messageInfo; + +} diff --git a/im-client/pom.xml b/im-client/pom.xml new file mode 100644 index 0000000..73f33b8 --- /dev/null +++ b/im-client/pom.xml @@ -0,0 +1,26 @@ + + + + box-im + com.bx + 1.0.0 + + 4.0.0 + + im-client + + + + com.bx + commom + 1.0.0 + + + + org.springframework.boot + spring-boot-starter-data-redis + + + \ No newline at end of file diff --git a/im-client/src/main/java/com/bx/imclient/IMAutoConfiguration.java b/im-client/src/main/java/com/bx/imclient/IMAutoConfiguration.java new file mode 100644 index 0000000..b3a93fc --- /dev/null +++ b/im-client/src/main/java/com/bx/imclient/IMAutoConfiguration.java @@ -0,0 +1,12 @@ +package com.bx.imclient; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; + + +@Slf4j +@Configuration +@ComponentScan("com.bx.imclient") +public class IMAutoConfiguration { +} diff --git a/im-client/src/main/java/com/bx/imclient/IMClient.java b/im-client/src/main/java/com/bx/imclient/IMClient.java new file mode 100644 index 0000000..dbe03a3 --- /dev/null +++ b/im-client/src/main/java/com/bx/imclient/IMClient.java @@ -0,0 +1,30 @@ +package com.bx.imclient; + +import com.bx.common.model.im.GroupMessageInfo; +import com.bx.common.model.im.PrivateMessageInfo; +import com.bx.imclient.listener.MessageListenerMulticaster; +import com.bx.imclient.sender.IMSender; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; + +import java.util.List; + +@Configuration +public class IMClient { + + @Autowired + private MessageListenerMulticaster listenerMulticaster; + + @Autowired + private IMSender imSender; + + public void sendPrivateMessage(Long userId, PrivateMessageInfo... messageInfo){ + imSender.sendPrivateMessage(userId,messageInfo); + } + + public void sendGroupMessage(List userTokens, GroupMessageInfo... messageInfo){ + imSender.sendGroupMessage(userTokens,messageInfo); + } + + +} diff --git a/im-client/src/main/java/com/bx/imclient/annotation/IMListener.java b/im-client/src/main/java/com/bx/imclient/annotation/IMListener.java new file mode 100644 index 0000000..8e86edf --- /dev/null +++ b/im-client/src/main/java/com/bx/imclient/annotation/IMListener.java @@ -0,0 +1,18 @@ +package com.bx.imclient.annotation; + +import com.bx.common.enums.ListenerType; +import org.springframework.stereotype.Component; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target({ElementType.TYPE,ElementType.FIELD}) +@Retention(RetentionPolicy.RUNTIME) +@Component +public @interface IMListener { + + ListenerType type(); + +} diff --git a/im-client/src/main/java/com/bx/imclient/config/RedisConfig.java b/im-client/src/main/java/com/bx/imclient/config/RedisConfig.java new file mode 100644 index 0000000..23fac7c --- /dev/null +++ b/im-client/src/main/java/com/bx/imclient/config/RedisConfig.java @@ -0,0 +1,49 @@ +package com.bx.imclient.config; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; +import org.springframework.data.redis.serializer.StringRedisSerializer; + +import javax.annotation.Resource; + +@Configuration("IMRedisConfig") +public class RedisConfig { + + @Resource + private RedisConnectionFactory factory; + + @Bean("IMRedisTemplate") + public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) { + RedisTemplate redisTemplate = new RedisTemplate(); + redisTemplate.setConnectionFactory(redisConnectionFactory); + // 设置值(value)的序列化采用jackson2JsonRedisSerializer + redisTemplate.setValueSerializer(jackson2JsonRedisSerializer()); + redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer()); + // 设置键(key)的序列化采用StringRedisSerializer。 + redisTemplate.setKeySerializer(new StringRedisSerializer()); + redisTemplate.setHashKeySerializer(new StringRedisSerializer()); + redisTemplate.afterPropertiesSet(); + return redisTemplate; + } + + @Bean + public Jackson2JsonRedisSerializer jackson2JsonRedisSerializer(){ + Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); + ObjectMapper om = new ObjectMapper(); + om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); + // 解决jackson2无法反序列化LocalDateTime的问题 + om.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY); + jackson2JsonRedisSerializer.setObjectMapper(om); + return jackson2JsonRedisSerializer; + } + +} diff --git a/im-client/src/main/java/com/bx/imclient/listener/MessageListener.java b/im-client/src/main/java/com/bx/imclient/listener/MessageListener.java new file mode 100644 index 0000000..a223f34 --- /dev/null +++ b/im-client/src/main/java/com/bx/imclient/listener/MessageListener.java @@ -0,0 +1,10 @@ +package com.bx.imclient.listener; + + +import com.bx.common.model.im.SendResult; + +public interface MessageListener { + + void process(SendResult result); + +} diff --git a/im-client/src/main/java/com/bx/imclient/listener/MessageListenerMulticaster.java b/im-client/src/main/java/com/bx/imclient/listener/MessageListenerMulticaster.java new file mode 100644 index 0000000..0cf4516 --- /dev/null +++ b/im-client/src/main/java/com/bx/imclient/listener/MessageListenerMulticaster.java @@ -0,0 +1,28 @@ +package com.bx.imclient.listener; + + +import com.bx.common.enums.ListenerType; +import com.bx.common.model.im.SendResult; +import com.bx.imclient.annotation.IMListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.Collections; +import java.util.List; + +@Component +public class MessageListenerMulticaster { + + + @Autowired(required = false) + private List messageListeners = Collections.emptyList(); + + public void multicast(ListenerType type, SendResult result){ + for(MessageListener listener:messageListeners){ + IMListener annotation = listener.getClass().getAnnotation(IMListener.class); + if(annotation.type().equals(type)){ + listener.process(result); + } + } + } +} diff --git a/im-client/src/main/java/com/bx/imclient/sender/IMSender.java b/im-client/src/main/java/com/bx/imclient/sender/IMSender.java new file mode 100644 index 0000000..5c5c665 --- /dev/null +++ b/im-client/src/main/java/com/bx/imclient/sender/IMSender.java @@ -0,0 +1,112 @@ +package com.bx.imclient.sender; + +import com.bx.common.contant.RedisKey; +import com.bx.common.enums.IMCmdType; +import com.bx.common.enums.ListenerType; +import com.bx.common.enums.SendResultType; +import com.bx.common.model.im.GroupMessageInfo; +import com.bx.common.model.im.IMRecvInfo; +import com.bx.common.model.im.PrivateMessageInfo; +import com.bx.common.model.im.SendResult; +import com.bx.imclient.listener.MessageListenerMulticaster; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Service; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Service +public class IMSender { + + @Autowired + @Qualifier("IMRedisTemplate") + private RedisTemplate redisTemplate; + + @Autowired + private MessageListenerMulticaster listenerMulticaster; + + public void sendPrivateMessage(Long recvId, PrivateMessageInfo... messageInfos){ + // 获取对方连接的channelId + String key = RedisKey.IM_USER_SERVER_ID + recvId; + Integer serverId = (Integer) redisTemplate.opsForValue().get(key); + // 如果对方在线,将数据存储至redis,等待拉取推送 + if (serverId != null) { + String sendKey = RedisKey.IM_UNREAD_PRIVATE_QUEUE + serverId; + IMRecvInfo[] recvInfos = new IMRecvInfo[messageInfos.length]; + for (int i=0;i recvInfo = new IMRecvInfo<>(); + recvInfo.setCmd(IMCmdType.PRIVATE_MESSAGE.getCode()); + List recvIds = new LinkedList(); + recvIds.add(recvId); + recvInfo.setRecvIds(recvIds); + recvInfo.setData(messageInfos[i]); + recvInfos[i] = recvInfo; + } + redisTemplate.opsForList().rightPushAll(sendKey, recvInfos); + }else{ + // 回复消息状态 + for(PrivateMessageInfo messageInfo : messageInfos ) { + SendResult result = new SendResult(); + result.setMessageInfo(messageInfo); + result.setRecvId(recvId); + result.setResult(SendResultType.FAIL); + result.setFailReason("用户不在线"); + listenerMulticaster.multicast(ListenerType.PRIVATE_MESSAGE, result); + } + } + } + + public void sendGroupMessage(List recvIds, GroupMessageInfo... messageInfos){ + // 根据群聊每个成员所连的IM-server,进行分组 + List offLineIds = Collections.synchronizedList(new LinkedList()); + Map> serverMap = new ConcurrentHashMap<>(); + recvIds.parallelStream().forEach(id->{ + String key = RedisKey.IM_USER_SERVER_ID + id; + Integer serverId = (Integer)redisTemplate.opsForValue().get(key); + if(serverId != null){ + if(serverMap.containsKey(serverId)){ + serverMap.get(serverId).add(id); + }else { + // 此处需要加锁,否则list可以会被覆盖 + synchronized(serverMap){ + List list = Collections.synchronizedList(new LinkedList()); + list.add(id); + serverMap.put(serverId,list); + } + } + }else{ + offLineIds.add(id); + } + }); + // 逐个server发送 + for (Map.Entry> entry : serverMap.entrySet()) { + IMRecvInfo[] recvInfos = new IMRecvInfo[messageInfos.length]; + for (int i=0;i recvInfo = new IMRecvInfo<>(); + recvInfo.setCmd(IMCmdType.GROUP_MESSAGE.getCode()); + recvInfo.setRecvIds(new LinkedList<>(entry.getValue())); + recvInfo.setData(messageInfos[i]); + recvInfos[i] = recvInfo; + } + String key = RedisKey.IM_UNREAD_GROUP_QUEUE +entry.getKey(); + redisTemplate.opsForList().rightPushAll(key,recvInfos); + } + // 不在线的用户,回复消息状态 + for(GroupMessageInfo messageInfo:messageInfos ){ + for(Long id : offLineIds){ + // 回复消息状态 + SendResult result = new SendResult(); + result.setMessageInfo(messageInfo); + result.setRecvId(id); + result.setResult(SendResultType.FAIL); + result.setFailReason("用户不在线"); + listenerMulticaster.multicast(ListenerType.GROUP_MESSAGE,result); + } + } + } +} diff --git a/im-client/src/main/java/com/bx/imclient/task/AbstractPullMessageTask.java b/im-client/src/main/java/com/bx/imclient/task/AbstractPullMessageTask.java new file mode 100644 index 0000000..53b368f --- /dev/null +++ b/im-client/src/main/java/com/bx/imclient/task/AbstractPullMessageTask.java @@ -0,0 +1,47 @@ +package com.bx.imclient.task; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Slf4j +public abstract class AbstractPullMessageTask { + + private int threadNum = 8; + + private ExecutorService executorService = Executors.newFixedThreadPool(threadNum); + + @PostConstruct + public void init(){ + // 初始化定时器 + for(int i=0;i redisTemplate; + + @Autowired + private MessageListenerMulticaster listenerMulticaster; + + @Override + public void pullMessage() { + String key = RedisKey.IM_RESULT_GROUP_QUEUE; + SendResult result = (SendResult)redisTemplate.opsForList().leftPop(key,10, TimeUnit.SECONDS); + if(result != null) { + listenerMulticaster.multicast(ListenerType.GROUP_MESSAGE,result); + } + } + +} diff --git a/im-client/src/main/java/com/bx/imclient/task/PullSendResultPrivateMessageTask.java b/im-client/src/main/java/com/bx/imclient/task/PullSendResultPrivateMessageTask.java new file mode 100644 index 0000000..6f3042d --- /dev/null +++ b/im-client/src/main/java/com/bx/imclient/task/PullSendResultPrivateMessageTask.java @@ -0,0 +1,38 @@ +package com.bx.imclient.task; + +import com.bx.common.contant.RedisKey; +import com.bx.common.enums.ListenerType; +import com.bx.common.model.im.SendResult; +import com.bx.imclient.listener.MessageListenerMulticaster; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +import java.util.concurrent.TimeUnit; + +; + +@Slf4j +@Component +public class PullSendResultPrivateMessageTask extends AbstractPullMessageTask{ + + + @Qualifier("IMRedisTemplate") + @Autowired + private RedisTemplate redisTemplate; + + @Autowired + private MessageListenerMulticaster listenerMulticaster; + + @Override + public void pullMessage() { + String key = RedisKey.IM_RESULT_PRIVATE_QUEUE; + SendResult result = (SendResult)redisTemplate.opsForList().leftPop(key,10, TimeUnit.SECONDS); + if(result != null) { + listenerMulticaster.multicast(ListenerType.PRIVATE_MESSAGE, result); + } + } + +} diff --git a/im-client/src/main/resources/META-INF/spring.factories b/im-client/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000..801458e --- /dev/null +++ b/im-client/src/main/resources/META-INF/spring.factories @@ -0,0 +1,2 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ +com.bx.imclient.IMAutoConfiguration \ No newline at end of file diff --git a/im-platform/pom.xml b/im-platform/pom.xml index c27f121..a963aee 100644 --- a/im-platform/pom.xml +++ b/im-platform/pom.xml @@ -15,7 +15,7 @@ com.bx - commom + im-client 1.0.0 @@ -101,6 +101,11 @@ thumbnailator 0.4.8 + + com.baomidou + mybatis-plus-generator + 3.3.2 + diff --git a/im-platform/src/main/java/com/bx/implatform/ImplatformApp.java b/im-platform/src/main/java/com/bx/implatform/ImplatformApp.java index 3743c93..15480c2 100644 --- a/im-platform/src/main/java/com/bx/implatform/ImplatformApp.java +++ b/im-platform/src/main/java/com/bx/implatform/ImplatformApp.java @@ -4,14 +4,12 @@ import lombok.extern.slf4j.Slf4j; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.EnableAspectJAutoProxy; @Slf4j @EnableAspectJAutoProxy(exposeProxy = true) @MapperScan(basePackages = {"com.bx.implatform.mapper"}) -@ComponentScan(basePackages={"com.bx"}) @SpringBootApplication public class ImplatformApp { diff --git a/im-platform/src/main/java/com/bx/implatform/config/WebSecurityConfg.java b/im-platform/src/main/java/com/bx/implatform/config/WebSecurityConfg.java index 9a3a218..7ef0d62 100644 --- a/im-platform/src/main/java/com/bx/implatform/config/WebSecurityConfg.java +++ b/im-platform/src/main/java/com/bx/implatform/config/WebSecurityConfg.java @@ -1,9 +1,9 @@ package com.bx.implatform.config; import com.alibaba.fastjson.JSON; -import com.bx.common.enums.ResultCode; -import com.bx.common.result.Result; -import com.bx.common.result.ResultUtils; +import com.bx.implatform.enums.ResultCode; +import com.bx.implatform.result.Result; +import com.bx.implatform.result.ResultUtils; import com.bx.implatform.service.IUserService; import com.bx.implatform.session.UserSession; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/im-platform/src/main/java/com/bx/implatform/contant/Constant.java b/im-platform/src/main/java/com/bx/implatform/contant/Constant.java new file mode 100644 index 0000000..77fab01 --- /dev/null +++ b/im-platform/src/main/java/com/bx/implatform/contant/Constant.java @@ -0,0 +1,12 @@ +package com.bx.implatform.contant; + + +public class Constant { + // 最大图片上传大小 + public static final long MAX_IMAGE_SIZE = 5*1024*1024; + // 最大上传文件大小 + public static final long MAX_FILE_SIZE = 10*1024*1024; + // 群聊最大人数 + public static final long MAX_GROUP_MEMBER = 500; + +} diff --git a/im-platform/src/main/java/com/bx/implatform/contant/RedisKey.java b/im-platform/src/main/java/com/bx/implatform/contant/RedisKey.java new file mode 100644 index 0000000..9d25026 --- /dev/null +++ b/im-platform/src/main/java/com/bx/implatform/contant/RedisKey.java @@ -0,0 +1,16 @@ +package com.bx.implatform.contant; + +public class RedisKey { + + // 已读群聊消息位置(已读最大id) + public final static String IM_GROUP_READED_POSITION = "im:readed:group:position:"; + // 缓存前缀 + public final static String IM_CACHE = "im:cache:"; + // 缓存是否好友:bool + public final static String IM_CACHE_FRIEND = IM_CACHE+"friend"; + // 缓存群聊信息 + public final static String IM_CACHE_GROUP = IM_CACHE+"group"; + // 缓存群聊成员id + public final static String IM_CACHE_GROUP_MEMBER_ID = IM_CACHE+"group_member_ids"; + +} diff --git a/im-platform/src/main/java/com/bx/implatform/controller/FileController.java b/im-platform/src/main/java/com/bx/implatform/controller/FileController.java index 901083f..0d50738 100644 --- a/im-platform/src/main/java/com/bx/implatform/controller/FileController.java +++ b/im-platform/src/main/java/com/bx/implatform/controller/FileController.java @@ -1,7 +1,7 @@ package com.bx.implatform.controller; -import com.bx.common.result.Result; -import com.bx.common.result.ResultUtils; +import com.bx.implatform.result.Result; +import com.bx.implatform.result.ResultUtils; import com.bx.implatform.service.thirdparty.FileService; import com.bx.implatform.vo.UploadImageVO; import io.swagger.annotations.Api; diff --git a/im-platform/src/main/java/com/bx/implatform/controller/FriendController.java b/im-platform/src/main/java/com/bx/implatform/controller/FriendController.java index e1d718e..9d97450 100644 --- a/im-platform/src/main/java/com/bx/implatform/controller/FriendController.java +++ b/im-platform/src/main/java/com/bx/implatform/controller/FriendController.java @@ -1,9 +1,8 @@ package com.bx.implatform.controller; - -import com.bx.common.result.Result; -import com.bx.common.result.ResultUtils; import com.bx.implatform.entity.Friend; +import com.bx.implatform.result.Result; +import com.bx.implatform.result.ResultUtils; import com.bx.implatform.service.IFriendService; import com.bx.implatform.session.SessionContext; import com.bx.implatform.vo.FriendVO; diff --git a/im-platform/src/main/java/com/bx/implatform/controller/GroupController.java b/im-platform/src/main/java/com/bx/implatform/controller/GroupController.java index aac8764..250ebbb 100644 --- a/im-platform/src/main/java/com/bx/implatform/controller/GroupController.java +++ b/im-platform/src/main/java/com/bx/implatform/controller/GroupController.java @@ -1,8 +1,8 @@ package com.bx.implatform.controller; -import com.bx.common.result.Result; -import com.bx.common.result.ResultUtils; +import com.bx.implatform.result.Result; +import com.bx.implatform.result.ResultUtils; import com.bx.implatform.service.IGroupService; import com.bx.implatform.vo.GroupInviteVO; import com.bx.implatform.vo.GroupMemberVO; diff --git a/im-platform/src/main/java/com/bx/implatform/controller/GroupMessageController.java b/im-platform/src/main/java/com/bx/implatform/controller/GroupMessageController.java index 68b882b..540b2ac 100644 --- a/im-platform/src/main/java/com/bx/implatform/controller/GroupMessageController.java +++ b/im-platform/src/main/java/com/bx/implatform/controller/GroupMessageController.java @@ -2,8 +2,8 @@ package com.bx.implatform.controller; import com.bx.common.model.im.GroupMessageInfo; -import com.bx.common.result.Result; -import com.bx.common.result.ResultUtils; +import com.bx.implatform.result.Result; +import com.bx.implatform.result.ResultUtils; import com.bx.implatform.service.IGroupMessageService; import com.bx.implatform.vo.GroupMessageVO; import io.swagger.annotations.Api; diff --git a/im-platform/src/main/java/com/bx/implatform/controller/PrivateMessageController.java b/im-platform/src/main/java/com/bx/implatform/controller/PrivateMessageController.java index 662c2d8..e07da66 100644 --- a/im-platform/src/main/java/com/bx/implatform/controller/PrivateMessageController.java +++ b/im-platform/src/main/java/com/bx/implatform/controller/PrivateMessageController.java @@ -2,8 +2,8 @@ package com.bx.implatform.controller; import com.bx.common.model.im.PrivateMessageInfo; -import com.bx.common.result.Result; -import com.bx.common.result.ResultUtils; +import com.bx.implatform.result.Result; +import com.bx.implatform.result.ResultUtils; import com.bx.implatform.service.IPrivateMessageService; import com.bx.implatform.vo.PrivateMessageVO; import io.swagger.annotations.Api; diff --git a/im-platform/src/main/java/com/bx/implatform/controller/RegisterController.java b/im-platform/src/main/java/com/bx/implatform/controller/RegisterController.java index 9a3e044..a1250b8 100644 --- a/im-platform/src/main/java/com/bx/implatform/controller/RegisterController.java +++ b/im-platform/src/main/java/com/bx/implatform/controller/RegisterController.java @@ -1,8 +1,8 @@ package com.bx.implatform.controller; -import com.bx.common.result.Result; -import com.bx.common.result.ResultUtils; +import com.bx.implatform.result.Result; +import com.bx.implatform.result.ResultUtils; import com.bx.implatform.service.IUserService; import com.bx.implatform.vo.RegisterVO; import io.swagger.annotations.Api; diff --git a/im-platform/src/main/java/com/bx/implatform/controller/UserController.java b/im-platform/src/main/java/com/bx/implatform/controller/UserController.java index 603c8ef..3cc5cdf 100644 --- a/im-platform/src/main/java/com/bx/implatform/controller/UserController.java +++ b/im-platform/src/main/java/com/bx/implatform/controller/UserController.java @@ -1,13 +1,12 @@ package com.bx.implatform.controller; - -import com.bx.common.result.Result; -import com.bx.common.result.ResultUtils; -import com.bx.common.util.BeanUtils; import com.bx.implatform.entity.User; +import com.bx.implatform.result.Result; +import com.bx.implatform.result.ResultUtils; import com.bx.implatform.service.IUserService; import com.bx.implatform.session.SessionContext; import com.bx.implatform.session.UserSession; +import com.bx.implatform.util.BeanUtils; import com.bx.implatform.vo.UserVO; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; diff --git a/commom/src/main/java/com/bx/common/enums/ResultCode.java b/im-platform/src/main/java/com/bx/implatform/enums/ResultCode.java similarity index 96% rename from commom/src/main/java/com/bx/common/enums/ResultCode.java rename to im-platform/src/main/java/com/bx/implatform/enums/ResultCode.java index ec41f5c..bc6f96c 100644 --- a/commom/src/main/java/com/bx/common/enums/ResultCode.java +++ b/im-platform/src/main/java/com/bx/implatform/enums/ResultCode.java @@ -1,4 +1,4 @@ -package com.bx.common.enums; +package com.bx.implatform.enums; /** * 响应码枚举 diff --git a/im-platform/src/main/java/com/bx/implatform/exception/GlobalException.java b/im-platform/src/main/java/com/bx/implatform/exception/GlobalException.java index b3deac0..fdd189d 100644 --- a/im-platform/src/main/java/com/bx/implatform/exception/GlobalException.java +++ b/im-platform/src/main/java/com/bx/implatform/exception/GlobalException.java @@ -1,6 +1,6 @@ package com.bx.implatform.exception; -import com.bx.common.enums.ResultCode; +import com.bx.implatform.enums.ResultCode; import lombok.Data; import java.io.Serializable; diff --git a/im-platform/src/main/java/com/bx/implatform/exception/GlobalExceptionHandler.java b/im-platform/src/main/java/com/bx/implatform/exception/GlobalExceptionHandler.java index 693ff39..0c44af8 100644 --- a/im-platform/src/main/java/com/bx/implatform/exception/GlobalExceptionHandler.java +++ b/im-platform/src/main/java/com/bx/implatform/exception/GlobalExceptionHandler.java @@ -1,9 +1,9 @@ package com.bx.implatform.exception; import cn.hutool.json.JSONException; -import com.bx.common.enums.ResultCode; -import com.bx.common.result.Result; -import com.bx.common.result.ResultUtils; +import com.bx.implatform.enums.ResultCode; +import com.bx.implatform.result.Result; +import com.bx.implatform.result.ResultUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; import org.springframework.http.converter.HttpMessageNotReadableException; diff --git a/commom/src/main/java/com/bx/common/generator/CodeGenerator.java b/im-platform/src/main/java/com/bx/implatform/generator/CodeGenerator.java similarity index 99% rename from commom/src/main/java/com/bx/common/generator/CodeGenerator.java rename to im-platform/src/main/java/com/bx/implatform/generator/CodeGenerator.java index 7e94971..d0f423f 100644 --- a/commom/src/main/java/com/bx/common/generator/CodeGenerator.java +++ b/im-platform/src/main/java/com/bx/implatform/generator/CodeGenerator.java @@ -1,4 +1,4 @@ -package com.bx.common.generator; +package com.bx.implatform.generator; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.core.toolkit.StringPool; diff --git a/im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java b/im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java new file mode 100644 index 0000000..44b7ae6 --- /dev/null +++ b/im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java @@ -0,0 +1,34 @@ +package com.bx.implatform.listener; + +import com.bx.common.enums.ListenerType; +import com.bx.common.enums.MessageType; +import com.bx.common.model.im.GroupMessageInfo; +import com.bx.common.model.im.SendResult; +import com.bx.imclient.annotation.IMListener; +import com.bx.imclient.listener.MessageListener; +import com.bx.implatform.contant.RedisKey; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; + + +@Slf4j +@IMListener(type = ListenerType.GROUP_MESSAGE) +public class GroupMessageListener implements MessageListener { + + @Autowired + private RedisTemplate redisTemplate; + + @Override + public void process(SendResult result){ + GroupMessageInfo messageInfo = (GroupMessageInfo) result.getMessageInfo(); + if(messageInfo.getType().equals(MessageType.TIP)){ + // 提示类数据不记录 + return; + } + // 保存该用户已拉取的最大消息id + String key = RedisKey.IM_GROUP_READED_POSITION + messageInfo.getGroupId()+":"+result.getRecvId(); + redisTemplate.opsForValue().set(key,messageInfo.getId()); + } + +} diff --git a/im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java b/im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java new file mode 100644 index 0000000..49ad576 --- /dev/null +++ b/im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java @@ -0,0 +1,40 @@ +package com.bx.implatform.listener; + +import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; +import com.bx.common.enums.ListenerType; +import com.bx.common.enums.MessageStatus; +import com.bx.common.enums.MessageType; +import com.bx.common.model.im.PrivateMessageInfo; +import com.bx.common.model.im.SendResult; +import com.bx.imclient.annotation.IMListener; +import com.bx.imclient.listener.MessageListener; +import com.bx.implatform.entity.PrivateMessage; +import com.bx.implatform.service.IPrivateMessageService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; + + +@Slf4j +@IMListener(type = ListenerType.PRIVATE_MESSAGE) +public class PrivateMessageListener implements MessageListener { + + @Autowired + private IPrivateMessageService privateMessageService; + + @Override + public void process(SendResult result){ + PrivateMessageInfo messageInfo = (PrivateMessageInfo) result.getMessageInfo(); + if(messageInfo.getType().equals(MessageType.TIP)){ + // 提示类数据不记录 + return; + } + // 更新消息状态 + UpdateWrapper updateWrapper = new UpdateWrapper<>(); + updateWrapper.lambda().eq(PrivateMessage::getId,messageInfo.getId()) + .eq(PrivateMessage::getStatus, MessageStatus.UNREAD.getCode()) + .set(PrivateMessage::getStatus, MessageStatus.ALREADY_READ.getCode()); + privateMessageService.update(updateWrapper); + log.info("消息已读,消息id:{},发送者:{},接收者:{}",messageInfo.getId(),messageInfo.getSendId(),messageInfo.getRecvId()); + } + +} diff --git a/commom/src/main/java/com/bx/common/result/Result.java b/im-platform/src/main/java/com/bx/implatform/result/Result.java similarity index 79% rename from commom/src/main/java/com/bx/common/result/Result.java rename to im-platform/src/main/java/com/bx/implatform/result/Result.java index 08006a4..16e630b 100644 --- a/commom/src/main/java/com/bx/common/result/Result.java +++ b/im-platform/src/main/java/com/bx/implatform/result/Result.java @@ -1,4 +1,4 @@ -package com.bx.common.result; +package com.bx.implatform.result; import lombok.Data; diff --git a/commom/src/main/java/com/bx/common/result/ResultUtils.java b/im-platform/src/main/java/com/bx/implatform/result/ResultUtils.java similarity index 94% rename from commom/src/main/java/com/bx/common/result/ResultUtils.java rename to im-platform/src/main/java/com/bx/implatform/result/ResultUtils.java index 0accdca..50326fd 100644 --- a/commom/src/main/java/com/bx/common/result/ResultUtils.java +++ b/im-platform/src/main/java/com/bx/implatform/result/ResultUtils.java @@ -1,7 +1,7 @@ -package com.bx.common.result; +package com.bx.implatform.result; -import com.bx.common.enums.ResultCode; +import com.bx.implatform.enums.ResultCode; public class ResultUtils { @@ -50,7 +50,7 @@ public class ResultUtils { return result; } - public static final Result error(ResultCode resultCode, String messsage,T data){ + public static final Result error(ResultCode resultCode, String messsage, T data){ Result result=new Result(); result.setCode(resultCode.getCode()); result.setMessage(messsage); diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/FriendServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/FriendServiceImpl.java index f5df55a..f074d76 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/impl/FriendServiceImpl.java +++ b/im-platform/src/main/java/com/bx/implatform/service/impl/FriendServiceImpl.java @@ -2,10 +2,10 @@ package com.bx.implatform.service.impl; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.bx.common.contant.RedisKey; -import com.bx.common.enums.ResultCode; +import com.bx.implatform.contant.RedisKey; import com.bx.implatform.entity.Friend; import com.bx.implatform.entity.User; +import com.bx.implatform.enums.ResultCode; import com.bx.implatform.exception.GlobalException; import com.bx.implatform.mapper.FriendMapper; import com.bx.implatform.service.IFriendService; diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMemberServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMemberServiceImpl.java index 69616b1..e62de44 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMemberServiceImpl.java +++ b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMemberServiceImpl.java @@ -3,7 +3,7 @@ package com.bx.implatform.service.impl; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.bx.common.contant.RedisKey; +import com.bx.implatform.contant.RedisKey; import com.bx.implatform.entity.GroupMember; import com.bx.implatform.mapper.GroupMemberMapper; import com.bx.implatform.service.IGroupMemberService; diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java index f2f40c6..dea178a 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java +++ b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java @@ -3,29 +3,32 @@ package com.bx.implatform.service.impl; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.bx.common.contant.Constant; -import com.bx.common.contant.RedisKey; -import com.bx.common.enums.MessageStatusEnum; -import com.bx.common.enums.MessageTypeEnum; -import com.bx.common.enums.ResultCode; +import com.bx.common.enums.MessageStatus; +import com.bx.common.enums.MessageType; import com.bx.common.model.im.GroupMessageInfo; -import com.bx.common.util.BeanUtils; +import com.bx.imclient.IMClient; +import com.bx.implatform.contant.RedisKey; import com.bx.implatform.entity.Group; import com.bx.implatform.entity.GroupMember; import com.bx.implatform.entity.GroupMessage; +import com.bx.implatform.enums.ResultCode; import com.bx.implatform.exception.GlobalException; import com.bx.implatform.mapper.GroupMessageMapper; import com.bx.implatform.service.IGroupMemberService; import com.bx.implatform.service.IGroupMessageService; import com.bx.implatform.service.IGroupService; import com.bx.implatform.session.SessionContext; +import com.bx.implatform.util.BeanUtils; import com.bx.implatform.vo.GroupMessageVO; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Collections; +import java.util.Date; +import java.util.LinkedList; +import java.util.List; import java.util.stream.Collectors; @Slf4j @@ -42,6 +45,9 @@ public class GroupMessageServiceImpl extends ServiceImpl redisTemplate; + @Autowired + private IMClient imClient; + /** * 发送群聊消息(与mysql所有交换都要进行缓存) * @@ -102,12 +108,12 @@ public class GroupMessageServiceImpl extends ServiceImpl userIds = groupMemberService.findUserIdsByGroupId(msg.getGroupId()); GroupMessageInfo msgInfo = BeanUtils.copyProperties(msg, GroupMessageInfo.class); - msgInfo.setType(MessageTypeEnum.TIP.getCode()); + msgInfo.setType(MessageType.TIP.getCode()); String content = String.format("'%s'撤回了一条消息",member.getAliasName()); msgInfo.setContent(content); msgInfo.setSendTime(new Date()); @@ -124,22 +130,17 @@ public class GroupMessageServiceImpl extends ServiceImpl recvIds = new LinkedList(); recvIds.add(userId); List members = groupMemberService.findByUserId(userId); for(GroupMember member:members){ // 获取群聊已读的最大消息id,只推送未读消息 - key = RedisKey.IM_GROUP_READED_POSITION + member.getGroupId()+":"+userId; + String key = RedisKey.IM_GROUP_READED_POSITION + member.getGroupId()+":"+userId; Integer maxReadedId = (Integer)redisTemplate.opsForValue().get(key); QueryWrapper wrapper = new QueryWrapper(); wrapper.lambda().eq(GroupMessage::getGroupId,member.getGroupId()) .gt(GroupMessage::getSendTime,member.getCreatedTime()) - .ne(GroupMessage::getStatus,MessageStatusEnum.RECALL.getCode()); + .ne(GroupMessage::getStatus, MessageStatus.RECALL.getCode()); if(maxReadedId!=null){ wrapper.lambda().gt(GroupMessage::getId,maxReadedId); } @@ -151,11 +152,11 @@ public class GroupMessageServiceImpl extends ServiceImpl messageInfos = messages.stream().map(m->{ GroupMessageInfo msgInfo = BeanUtils.copyProperties(m, GroupMessageInfo.class); - msgInfo.setRecvIds(recvIds); return msgInfo; }).collect(Collectors.toList()); - key = RedisKey.IM_UNREAD_GROUP_MESSAGE + serverId; - redisTemplate.opsForList().rightPushAll(key,messageInfos.toArray()); + // 发送消息 + GroupMessageInfo[] infoArr = messageInfos.toArray(new GroupMessageInfo[messageInfos.size()]); + imClient.sendGroupMessage(Collections.singletonList(userId), infoArr); log.info("拉取未读群聊消息,用户id:{},群聊id:{},数量:{}",userId,member.getGroupId(),messageInfos.size()); } @@ -184,7 +185,7 @@ public class GroupMessageServiceImpl extends ServiceImpl wrapper = new QueryWrapper<>(); wrapper.lambda().eq(GroupMessage::getGroupId,groupId) .gt(GroupMessage::getSendTime,member.getCreatedTime()) - .ne(GroupMessage::getStatus,MessageStatusEnum.RECALL.getCode()) + .ne(GroupMessage::getStatus, MessageStatus.RECALL.getCode()) .orderByDesc(GroupMessage::getId) .last("limit "+stIdx + ","+size); @@ -198,29 +199,6 @@ public class GroupMessageServiceImpl extends ServiceImpl userIds, GroupMessageInfo msgInfo){ - // 根据群聊每个成员所连的IM-server,进行分组 - Map> serverMap = new ConcurrentHashMap<>(); - userIds.parallelStream().forEach(id->{ - String key = RedisKey.IM_USER_SERVER_ID + id; - Integer serverId = (Integer)redisTemplate.opsForValue().get(key); - if(serverId != null){ - if(serverMap.containsKey(serverId)){ - serverMap.get(serverId).add(id); - }else { - // 此处需要加锁,否则list可以会被覆盖 - synchronized(serverMap){ - List list = Collections.synchronizedList(new LinkedList()); - list.add(id); - serverMap.put(serverId,list); - } - } - } - }); - // 逐个server发送 - for (Map.Entry> entry : serverMap.entrySet()) { - msgInfo.setRecvIds(new LinkedList<>(entry.getValue())); - String key = RedisKey.IM_UNREAD_GROUP_MESSAGE +entry.getKey(); - redisTemplate.opsForList().rightPush(key,msgInfo); - } + imClient.sendGroupMessage(userIds,msgInfo); } } diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupServiceImpl.java index d286a43..ab72d03 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupServiceImpl.java +++ b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupServiceImpl.java @@ -2,14 +2,13 @@ package com.bx.implatform.service.impl; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.bx.common.contant.Constant; -import com.bx.common.contant.RedisKey; -import com.bx.common.enums.ResultCode; -import com.bx.common.util.BeanUtils; +import com.bx.implatform.contant.Constant; +import com.bx.implatform.contant.RedisKey; import com.bx.implatform.entity.Friend; import com.bx.implatform.entity.Group; import com.bx.implatform.entity.GroupMember; import com.bx.implatform.entity.User; +import com.bx.implatform.enums.ResultCode; import com.bx.implatform.exception.GlobalException; import com.bx.implatform.mapper.GroupMapper; import com.bx.implatform.service.IFriendService; @@ -18,6 +17,7 @@ import com.bx.implatform.service.IGroupService; import com.bx.implatform.service.IUserService; import com.bx.implatform.session.SessionContext; import com.bx.implatform.session.UserSession; +import com.bx.implatform.util.BeanUtils; import com.bx.implatform.vo.GroupInviteVO; import com.bx.implatform.vo.GroupMemberVO; import com.bx.implatform.vo.GroupVO; diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java index 5bfdcb1..5a4fd3b 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java +++ b/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java @@ -4,17 +4,18 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.bx.common.contant.Constant; import com.bx.common.contant.RedisKey; -import com.bx.common.enums.MessageStatusEnum; -import com.bx.common.enums.MessageTypeEnum; -import com.bx.common.enums.ResultCode; +import com.bx.common.enums.MessageStatus; +import com.bx.common.enums.MessageType; import com.bx.common.model.im.PrivateMessageInfo; -import com.bx.common.util.BeanUtils; +import com.bx.imclient.IMClient; import com.bx.implatform.entity.PrivateMessage; +import com.bx.implatform.enums.ResultCode; import com.bx.implatform.exception.GlobalException; import com.bx.implatform.mapper.PrivateMessageMapper; import com.bx.implatform.service.IFriendService; import com.bx.implatform.service.IPrivateMessageService; import com.bx.implatform.session.SessionContext; +import com.bx.implatform.util.BeanUtils; import com.bx.implatform.vo.PrivateMessageVO; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -33,7 +34,8 @@ public class PrivateMessageServiceImpl extends ServiceImpl redisTemplate; - + @Autowired + private IMClient imClient; /** * 发送私聊消息 * @@ -50,18 +52,12 @@ public class PrivateMessageServiceImpl extends ServiceImpl wp.eq(PrivateMessage::getRecvId, userId) .eq(PrivateMessage::getSendId, friendId))) - .ne(PrivateMessage::getStatus, MessageStatusEnum.RECALL.getCode()) + .ne(PrivateMessage::getStatus, MessageStatus.RECALL.getCode()) .orderByDesc(PrivateMessage::getId) .last("limit " + stIdx + "," + size); @@ -154,7 +144,7 @@ public class PrivateMessageServiceImpl extends ServiceImpl queryWrapper = new QueryWrapper<>(); queryWrapper.lambda().eq(PrivateMessage::getRecvId, userId) - .eq(PrivateMessage::getStatus, MessageStatusEnum.UNREAD); + .eq(PrivateMessage::getStatus, MessageStatus.UNREAD); List messages = this.list(queryWrapper); // 上传至redis,等待推送 if (!messages.isEmpty()) { @@ -162,8 +152,8 @@ public class PrivateMessageServiceImpl extends ServiceImpl redisTemplate; - - @Autowired - private IPrivateMessageService privateMessageService; - - @PostConstruct - public void init(){ - for(int i=0;i updateWrapper = new UpdateWrapper<>(); - updateWrapper.lambda().eq(PrivateMessage::getId,msgId) - .eq(PrivateMessage::getStatus,MessageStatusEnum.UNREAD.getCode()) - .set(PrivateMessage::getStatus, MessageStatusEnum.ALREADY_READ.getCode()); - privateMessageService.update(updateWrapper); - log.info("消息已读,id:{}",msgId); - } - }catch (Exception e){ - log.error(e.getMessage()); - Thread.sleep(200); - }finally { - // 下一次循环 - if(!executorService.isShutdown()){ - executorService.submit(this); - } - } - } - } -} diff --git a/commom/src/main/java/com/bx/common/util/BeanUtils.java b/im-platform/src/main/java/com/bx/implatform/util/BeanUtils.java similarity index 99% rename from commom/src/main/java/com/bx/common/util/BeanUtils.java rename to im-platform/src/main/java/com/bx/implatform/util/BeanUtils.java index 5914212..9bc74be 100644 --- a/commom/src/main/java/com/bx/common/util/BeanUtils.java +++ b/im-platform/src/main/java/com/bx/implatform/util/BeanUtils.java @@ -1,4 +1,4 @@ -package com.bx.common.util; +package com.bx.implatform.util; import org.springframework.beans.BeanWrapper; import org.springframework.beans.BeanWrapperImpl; @@ -12,7 +12,6 @@ import java.util.Set; public class BeanUtils { - private static void handleReflectionException(Exception e) { ReflectionUtils.handleReflectionException(e); } diff --git a/commom/src/main/java/com/bx/common/util/DateTimeUtils.java b/im-platform/src/main/java/com/bx/implatform/util/DateTimeUtils.java similarity index 99% rename from commom/src/main/java/com/bx/common/util/DateTimeUtils.java rename to im-platform/src/main/java/com/bx/implatform/util/DateTimeUtils.java index 5d79b0f..4cad011 100644 --- a/commom/src/main/java/com/bx/common/util/DateTimeUtils.java +++ b/im-platform/src/main/java/com/bx/implatform/util/DateTimeUtils.java @@ -1,4 +1,4 @@ -package com.bx.common.util; +package com.bx.implatform.util; import cn.hutool.core.date.DateTime; import cn.hutool.core.date.DateUtil; diff --git a/im-platform/src/main/java/com/bx/implatform/util/MinioUtil.java b/im-platform/src/main/java/com/bx/implatform/util/MinioUtil.java index bf4f1cd..ff62244 100644 --- a/im-platform/src/main/java/com/bx/implatform/util/MinioUtil.java +++ b/im-platform/src/main/java/com/bx/implatform/util/MinioUtil.java @@ -1,7 +1,6 @@ package com.bx.implatform.util; -import com.bx.common.util.DateTimeUtils; import io.minio.*; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; diff --git a/im-platform/src/main/resources/application.yml b/im-platform/src/main/resources/application.yml index 71a18c9..a75e938 100644 --- a/im-platform/src/main/resources/application.yml +++ b/im-platform/src/main/resources/application.yml @@ -23,7 +23,7 @@ mybatis-plus: configuration: # 是否开启自动驼峰命名规则(camel case)映射,即从经典数据库列名 A_COLUMN(下划线命名) 到经典 Java 属性名 aColumn(驼峰命名) 的类似映射 map-underscore-to-camel-case: false - log-impl: org.apache.ibatis.logging.stdout.StdOutImpl + #log-impl: org.apache.ibatis.logging.stdout.StdOutImpl # mapper mapper-locations: # *.xml的具体路径 diff --git a/im-server/src/main/java/com/bx/imserver/task/PullUnreadGroupMessageTask.java b/im-server/src/main/java/com/bx/imserver/task/PullUnreadGroupMessageTask.java index 241cbb2..7e4b227 100644 --- a/im-server/src/main/java/com/bx/imserver/task/PullUnreadGroupMessageTask.java +++ b/im-server/src/main/java/com/bx/imserver/task/PullUnreadGroupMessageTask.java @@ -1,8 +1,9 @@ package com.bx.imserver.task; import com.bx.common.contant.RedisKey; -import com.bx.common.enums.WSCmdEnum; +import com.bx.common.enums.IMCmdType; import com.bx.common.model.im.GroupMessageInfo; +import com.bx.common.model.im.IMRecvInfo; import com.bx.imserver.websocket.WebsocketServer; import com.bx.imserver.websocket.processor.MessageProcessor; import com.bx.imserver.websocket.processor.ProcessorFactory; @@ -28,13 +29,13 @@ public class PullUnreadGroupMessageTask extends AbstractPullMessageTask { @Override public void pullMessage() { // 从redis拉取未读消息 - String key = RedisKey.IM_UNREAD_GROUP_MESSAGE + WSServer.getServerId(); + String key = RedisKey.IM_UNREAD_GROUP_QUEUE + WSServer.getServerId(); List messageInfos = redisTemplate.opsForList().range(key,0,-1); for(Object o: messageInfos){ redisTemplate.opsForList().leftPop(key); - GroupMessageInfo messageInfo = (GroupMessageInfo)o; - MessageProcessor processor = ProcessorFactory.createProcessor(WSCmdEnum.GROUP_MESSAGE); - processor.process(messageInfo); + IMRecvInfo recvInfo = (IMRecvInfo)o; + MessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.GROUP_MESSAGE); + processor.process(recvInfo); } } diff --git a/im-server/src/main/java/com/bx/imserver/task/PullUnreadPrivateMessageTask.java b/im-server/src/main/java/com/bx/imserver/task/PullUnreadPrivateMessageTask.java index bf515ef..a68038b 100644 --- a/im-server/src/main/java/com/bx/imserver/task/PullUnreadPrivateMessageTask.java +++ b/im-server/src/main/java/com/bx/imserver/task/PullUnreadPrivateMessageTask.java @@ -2,7 +2,8 @@ package com.bx.imserver.task; import com.bx.common.contant.RedisKey; -import com.bx.common.enums.WSCmdEnum; +import com.bx.common.enums.IMCmdType; +import com.bx.common.model.im.IMRecvInfo; import com.bx.common.model.im.PrivateMessageInfo; import com.bx.imserver.websocket.WebsocketServer; import com.bx.imserver.websocket.processor.MessageProcessor; @@ -28,13 +29,13 @@ public class PullUnreadPrivateMessageTask extends AbstractPullMessageTask { @Override public void pullMessage() { // 从redis拉取未读消息 - String key = RedisKey.IM_UNREAD_PRIVATE_MESSAGE + WSServer.getServerId(); + String key = RedisKey.IM_UNREAD_PRIVATE_QUEUE + WSServer.getServerId(); List messageInfos = redisTemplate.opsForList().range(key,0,-1); for(Object o: messageInfos){ redisTemplate.opsForList().leftPop(key); - PrivateMessageInfo messageInfo = (PrivateMessageInfo)o; - MessageProcessor processor = ProcessorFactory.createProcessor(WSCmdEnum.PRIVATE_MESSAGE); - processor.process(messageInfo); + IMRecvInfo recvInfo = (IMRecvInfo)o; + MessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.PRIVATE_MESSAGE); + processor.process(recvInfo); } } diff --git a/im-server/src/main/java/com/bx/imserver/websocket/WebSocketHandler.java b/im-server/src/main/java/com/bx/imserver/websocket/WebSocketHandler.java index 8e8e9f6..7012426 100644 --- a/im-server/src/main/java/com/bx/imserver/websocket/WebSocketHandler.java +++ b/im-server/src/main/java/com/bx/imserver/websocket/WebSocketHandler.java @@ -1,7 +1,7 @@ package com.bx.imserver.websocket; import com.bx.common.contant.RedisKey; -import com.bx.common.enums.WSCmdEnum; +import com.bx.common.enums.IMCmdType; import com.bx.common.model.im.SendInfo; import com.bx.common.util.SpringContextHolder; import com.bx.imserver.websocket.processor.MessageProcessor; @@ -33,7 +33,7 @@ public class WebSocketHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, SendInfo sendInfo) throws Exception { // 创建处理器进行处理 - MessageProcessor processor = ProcessorFactory.createProcessor(WSCmdEnum.fromCode(sendInfo.getCmd())); + MessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.fromCode(sendInfo.getCmd())); processor.process(ctx,processor.transForm(sendInfo.getData())); } diff --git a/im-server/src/main/java/com/bx/imserver/websocket/processor/GroupMessageProcessor.java b/im-server/src/main/java/com/bx/imserver/websocket/processor/GroupMessageProcessor.java index 67ace27..0c9f531 100644 --- a/im-server/src/main/java/com/bx/imserver/websocket/processor/GroupMessageProcessor.java +++ b/im-server/src/main/java/com/bx/imserver/websocket/processor/GroupMessageProcessor.java @@ -1,10 +1,12 @@ package com.bx.imserver.websocket.processor; import com.bx.common.contant.RedisKey; -import com.bx.common.enums.MessageTypeEnum; -import com.bx.common.enums.WSCmdEnum; +import com.bx.common.enums.IMCmdType; +import com.bx.common.enums.SendResultType; import com.bx.common.model.im.GroupMessageInfo; +import com.bx.common.model.im.IMRecvInfo; import com.bx.common.model.im.SendInfo; +import com.bx.common.model.im.SendResult; import com.bx.imserver.websocket.WebsocketChannelCtxHolder; import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; @@ -17,38 +19,58 @@ import java.util.List; @Slf4j @Component -public class GroupMessageProcessor extends MessageProcessor { +public class GroupMessageProcessor extends MessageProcessor> { @Autowired private RedisTemplate redisTemplate; @Async @Override - public void process(GroupMessageInfo data) { - log.info("接收到群消息,发送者:{},群id:{},接收id:{},内容:{}",data.getSendId(),data.getGroupId(),data.getRecvIds(),data.getContent()); - List recvIds = data.getRecvIds(); - // 接收者id列表不需要传输,节省带宽 - data.setRecvIds(null); + public void process(IMRecvInfo recvInfo) { + GroupMessageInfo messageInfo = recvInfo.getData(); + List recvIds = recvInfo.getRecvIds(); + log.info("接收到群消息,发送者:{},群id:{},接收id:{},内容:{}",messageInfo.getSendId(),messageInfo.getGroupId(),recvIds,messageInfo.getContent()); for(Long recvId:recvIds){ - ChannelHandlerContext channelCtx = WebsocketChannelCtxHolder.getChannelCtx(recvId); - if(channelCtx != null){ - // 自己发的消息不用推送 - if(recvId != data.getSendId()){ - // 推送消息到用户 - SendInfo sendInfo = new SendInfo(); - sendInfo.setCmd(WSCmdEnum.GROUP_MESSAGE.getCode()); - sendInfo.setData(data); - channelCtx.channel().writeAndFlush(sendInfo); + try { + ChannelHandlerContext channelCtx = WebsocketChannelCtxHolder.getChannelCtx(recvId); + if(channelCtx != null){ + // 自己发的消息不用推送 + if(recvId != messageInfo.getSendId()){ + // 推送消息到用户 + SendInfo sendInfo = new SendInfo(); + sendInfo.setCmd(IMCmdType.GROUP_MESSAGE.getCode()); + sendInfo.setData(messageInfo); + channelCtx.channel().writeAndFlush(sendInfo); + // 消息发送成功确认 + String key = RedisKey.IM_RESULT_GROUP_QUEUE; + SendResult sendResult = new SendResult(); + sendResult.setRecvId(recvId); + sendResult.setResult(SendResultType.SUCCESS); + sendResult.setMessageInfo(messageInfo); + redisTemplate.opsForList().rightPush(key,sendResult); + } + }else { + // 消息发送失败确认 + String key = RedisKey.IM_RESULT_GROUP_QUEUE; + SendResult sendResult = new SendResult(); + sendResult.setRecvId(recvId); + sendResult.setResult(SendResultType.FAIL); + sendResult.setFailReason("未找到WS连接"); + sendResult.setMessageInfo(messageInfo); + redisTemplate.opsForList().rightPush(key,sendResult); + log.error("未找到WS连接,发送者:{},群id:{},接收id:{},内容:{}",messageInfo.getSendId(),messageInfo.getGroupId(),recvIds,messageInfo.getContent()); } - if(data.getType() != MessageTypeEnum.TIP.getCode()){ - // 设置已读最大id - String key = RedisKey.IM_GROUP_READED_POSITION + data.getGroupId()+":"+recvId; - redisTemplate.opsForValue().set(key,data.getId()); - } - }else { - log.error("未找到WS连接,发送者:{},群id:{},接收id:{},内容:{}",data.getSendId(),data.getGroupId(),data.getRecvIds()); + }catch (Exception e){ + // 消息发送失败确认 + String key = RedisKey.IM_RESULT_GROUP_QUEUE; + SendResult sendResult = new SendResult(); + sendResult.setRecvId(recvId); + sendResult.setResult(SendResultType.FAIL); + sendResult.setFailReason("未知异常"); + sendResult.setMessageInfo(messageInfo); + redisTemplate.opsForList().rightPush(key,sendResult); + log.error("发送消息异常,发送者:{},群id:{},接收id:{},内容:{}",messageInfo.getSendId(),messageInfo.getGroupId(),recvIds,messageInfo.getContent()); } - } } diff --git a/im-server/src/main/java/com/bx/imserver/websocket/processor/HeartbeatProcessor.java b/im-server/src/main/java/com/bx/imserver/websocket/processor/HeartbeatProcessor.java index 1701299..0bbc743 100644 --- a/im-server/src/main/java/com/bx/imserver/websocket/processor/HeartbeatProcessor.java +++ b/im-server/src/main/java/com/bx/imserver/websocket/processor/HeartbeatProcessor.java @@ -3,7 +3,7 @@ package com.bx.imserver.websocket.processor; import cn.hutool.core.bean.BeanUtil; import com.bx.common.contant.Constant; import com.bx.common.contant.RedisKey; -import com.bx.common.enums.WSCmdEnum; +import com.bx.common.enums.IMCmdType; import com.bx.common.model.im.HeartbeatInfo; import com.bx.common.model.im.SendInfo; import com.bx.imserver.websocket.WebsocketServer; @@ -32,7 +32,7 @@ public class HeartbeatProcessor extends MessageProcessor { public void process(ChannelHandlerContext ctx, HeartbeatInfo beatInfo) { // 响应ws SendInfo sendInfo = new SendInfo(); - sendInfo.setCmd(WSCmdEnum.HEART_BEAT.getCode()); + sendInfo.setCmd(IMCmdType.HEART_BEAT.getCode()); ctx.channel().writeAndFlush(sendInfo); // 设置属性 diff --git a/im-server/src/main/java/com/bx/imserver/websocket/processor/LoginProcessor.java b/im-server/src/main/java/com/bx/imserver/websocket/processor/LoginProcessor.java index 64bd23a..a241afd 100644 --- a/im-server/src/main/java/com/bx/imserver/websocket/processor/LoginProcessor.java +++ b/im-server/src/main/java/com/bx/imserver/websocket/processor/LoginProcessor.java @@ -3,7 +3,7 @@ package com.bx.imserver.websocket.processor; import cn.hutool.core.bean.BeanUtil; import com.bx.common.contant.Constant; import com.bx.common.contant.RedisKey; -import com.bx.common.enums.WSCmdEnum; +import com.bx.common.enums.IMCmdType; import com.bx.common.model.im.LoginInfo; import com.bx.common.model.im.SendInfo; import com.bx.imserver.websocket.WebsocketChannelCtxHolder; @@ -36,7 +36,7 @@ public class LoginProcessor extends MessageProcessor { if(context != null){ // 不允许多地登录,强制下线 SendInfo sendInfo = new SendInfo(); - sendInfo.setCmd(WSCmdEnum.FORCE_LOGUT.getCode()); + sendInfo.setCmd(IMCmdType.FORCE_LOGUT.getCode()); context.channel().writeAndFlush(sendInfo); } // 绑定用户和channel @@ -52,7 +52,7 @@ public class LoginProcessor extends MessageProcessor { redisTemplate.opsForValue().set(key, WSServer.getServerId(), Constant.ONLINE_TIMEOUT_SECOND, TimeUnit.SECONDS); // 响应ws SendInfo sendInfo = new SendInfo(); - sendInfo.setCmd(WSCmdEnum.LOGIN.getCode()); + sendInfo.setCmd(IMCmdType.LOGIN.getCode()); ctx.channel().writeAndFlush(sendInfo); } diff --git a/im-server/src/main/java/com/bx/imserver/websocket/processor/PrivateMessageProcessor.java b/im-server/src/main/java/com/bx/imserver/websocket/processor/PrivateMessageProcessor.java index c8513ba..bd2b83c 100644 --- a/im-server/src/main/java/com/bx/imserver/websocket/processor/PrivateMessageProcessor.java +++ b/im-server/src/main/java/com/bx/imserver/websocket/processor/PrivateMessageProcessor.java @@ -1,10 +1,12 @@ package com.bx.imserver.websocket.processor; import com.bx.common.contant.RedisKey; -import com.bx.common.enums.MessageTypeEnum; -import com.bx.common.enums.WSCmdEnum; +import com.bx.common.enums.IMCmdType; +import com.bx.common.enums.SendResultType; +import com.bx.common.model.im.IMRecvInfo; import com.bx.common.model.im.PrivateMessageInfo; import com.bx.common.model.im.SendInfo; +import com.bx.common.model.im.SendResult; import com.bx.imserver.websocket.WebsocketChannelCtxHolder; import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; @@ -14,30 +16,52 @@ import org.springframework.stereotype.Component; @Slf4j @Component -public class PrivateMessageProcessor extends MessageProcessor { +public class PrivateMessageProcessor extends MessageProcessor> { @Autowired private RedisTemplate redisTemplate; @Override - public void process(PrivateMessageInfo data) { - log.info("接收到消息,发送者:{},接收者:{},内容:{}",data.getSendId(),data.getRecvId(),data.getContent()); - // 一个用户可以同时登陆,所以有多个channel - ChannelHandlerContext channelCtx = WebsocketChannelCtxHolder.getChannelCtx(data.getRecvId()); - if(channelCtx != null ){ - // 推送消息到用户 - SendInfo sendInfo = new SendInfo(); - sendInfo.setCmd(WSCmdEnum.PRIVATE_MESSAGE.getCode()); - sendInfo.setData(data); - channelCtx.channel().writeAndFlush(sendInfo); - - if(data.getType() != MessageTypeEnum.TIP.getCode()) { - // 已读消息推送至redis,等待更新数据库 - String key = RedisKey.IM_READED_PRIVATE_MESSAGE_ID; - redisTemplate.opsForList().rightPush(key, data.getId()); + public void process(IMRecvInfo recvInfo) { + PrivateMessageInfo messageInfo = recvInfo.getData(); + Long recvId = recvInfo.getRecvIds().get(0); + log.info("接收到消息,发送者:{},接收者:{},内容:{}",messageInfo.getSendId(),recvId,messageInfo.getContent()); + try{ + ChannelHandlerContext channelCtx = WebsocketChannelCtxHolder.getChannelCtx(recvId); + if(channelCtx != null ){ + // 推送消息到用户 + SendInfo sendInfo = new SendInfo(); + sendInfo.setCmd(IMCmdType.PRIVATE_MESSAGE.getCode()); + sendInfo.setData(messageInfo); + channelCtx.channel().writeAndFlush(sendInfo); + // 消息发送成功确认 + String key = RedisKey.IM_RESULT_PRIVATE_QUEUE; + SendResult sendResult = new SendResult(); + sendResult.setRecvId(recvId); + sendResult.setResult(SendResultType.SUCCESS); + sendResult.setMessageInfo(messageInfo); + redisTemplate.opsForList().rightPush(key,sendResult); + }else{ + // 消息推送失败确认 + String key = RedisKey.IM_RESULT_PRIVATE_QUEUE; + SendResult sendResult = new SendResult(); + sendResult.setRecvId(recvId); + sendResult.setResult(SendResultType.FAIL); + sendResult.setFailReason("未找到WS连接"); + sendResult.setMessageInfo(messageInfo); + redisTemplate.opsForList().rightPush(key,sendResult); + log.error("未找到WS连接,发送者:{},接收者:{},内容:{}",messageInfo.getSendId(),recvId,messageInfo.getContent()); } - }else{ - log.error("未找到WS连接,发送者:{},接收者:{},内容:{}",data.getSendId(),data.getRecvId(),data.getContent()); + }catch (Exception e){ + // 消息推送失败确认 + String key = RedisKey.IM_RESULT_PRIVATE_QUEUE; + SendResult sendResult = new SendResult(); + sendResult.setRecvId(recvId); + sendResult.setResult(SendResultType.FAIL); + sendResult.setFailReason("未知异常"); + sendResult.setMessageInfo(messageInfo); + redisTemplate.opsForList().rightPush(key,sendResult); + log.error("发送异常,发送者:{},接收者:{},内容:{}",messageInfo.getSendId(),recvId,messageInfo.getContent(),e); } } diff --git a/im-server/src/main/java/com/bx/imserver/websocket/processor/ProcessorFactory.java b/im-server/src/main/java/com/bx/imserver/websocket/processor/ProcessorFactory.java index 7f86cae..c15ed32 100644 --- a/im-server/src/main/java/com/bx/imserver/websocket/processor/ProcessorFactory.java +++ b/im-server/src/main/java/com/bx/imserver/websocket/processor/ProcessorFactory.java @@ -1,11 +1,11 @@ package com.bx.imserver.websocket.processor; -import com.bx.common.enums.WSCmdEnum; +import com.bx.common.enums.IMCmdType; import com.bx.common.util.SpringContextHolder; public class ProcessorFactory { - public static MessageProcessor createProcessor(WSCmdEnum cmd){ + public static MessageProcessor createProcessor(IMCmdType cmd){ MessageProcessor processor = null; switch (cmd){ case LOGIN: diff --git a/pom.xml b/pom.xml index fc15ee7..47b9492 100644 --- a/pom.xml +++ b/pom.xml @@ -13,6 +13,7 @@ im-platform im-server commom + im-client