diff --git a/commom/pom.xml b/commom/pom.xml
index bec5ea0..6641d62 100644
--- a/commom/pom.xml
+++ b/commom/pom.xml
@@ -34,25 +34,11 @@
org.apache.commons
commons-lang3
-
- com.baomidou
- mybatis-plus-generator
- 3.3.2
-
-
- mysql
- mysql-connector-java
- compile
-
+
org.springframework
spring-beans
-
-
- org.springframework.boot
- spring-boot-starter-data-redis
-
org.apache.velocity
velocity
@@ -63,5 +49,9 @@
jackson-datatype-joda
2.9.10
+
+ org.springframework
+ spring-context
+
\ No newline at end of file
diff --git a/commom/src/main/java/com/bx/common/contant/Constant.java b/commom/src/main/java/com/bx/common/contant/Constant.java
index 6c5f130..296e054 100644
--- a/commom/src/main/java/com/bx/common/contant/Constant.java
+++ b/commom/src/main/java/com/bx/common/contant/Constant.java
@@ -1,15 +1,8 @@
package com.bx.common.contant;
-
public class Constant {
- // 最大图片上传大小
- public static final long MAX_IMAGE_SIZE = 5*1024*1024;
- // 最大上传文件大小
- public static final long MAX_FILE_SIZE = 10*1024*1024;
- // 群聊最大人数
- public static final long MAX_GROUP_MEMBER = 500;
// 在线状态过期时间 600s
public static final long ONLINE_TIMEOUT_SECOND = 600;
// 消息允许撤回时间 300s
diff --git a/commom/src/main/java/com/bx/common/contant/RedisKey.java b/commom/src/main/java/com/bx/common/contant/RedisKey.java
index ff0c04d..2a68dd7 100644
--- a/commom/src/main/java/com/bx/common/contant/RedisKey.java
+++ b/commom/src/main/java/com/bx/common/contant/RedisKey.java
@@ -7,20 +7,13 @@ public class RedisKey {
// 用户ID所连接的IM-server的ID
public final static String IM_USER_SERVER_ID = "im:user:server_id:";
// 未读私聊消息队列
- public final static String IM_UNREAD_PRIVATE_MESSAGE = "im:unread:private:";
+ public final static String IM_UNREAD_PRIVATE_QUEUE = "im:unread:private:";
// 未读群聊消息队列
- public final static String IM_UNREAD_GROUP_MESSAGE = "im:unread:group:";
- // 已读私聊消息id队列
- public final static String IM_READED_PRIVATE_MESSAGE_ID = "im:readed:private:id";
- // 已读群聊消息位置(已读最大id)
- public final static String IM_GROUP_READED_POSITION = "im:readed:group:position:";
- // 缓存前缀
- public final static String IM_CACHE = "im:cache:";
- // 缓存是否好友:bool
- public final static String IM_CACHE_FRIEND = IM_CACHE+"friend";
- // 缓存群聊信息
- public final static String IM_CACHE_GROUP = IM_CACHE+"group";
- // 缓存群聊成员id
- public final static String IM_CACHE_GROUP_MEMBER_ID = IM_CACHE+"group_member_ids";
+ public final static String IM_UNREAD_GROUP_QUEUE = "im:unread:group:";
+ // 私聊消息发送结果队列
+ public final static String IM_RESULT_PRIVATE_QUEUE = "im:result:private";
+ // 群聊消息发送结果队列
+ public final static String IM_RESULT_GROUP_QUEUE = "im:result:group";
+
}
diff --git a/commom/src/main/java/com/bx/common/enums/FileTypeEnum.java b/commom/src/main/java/com/bx/common/enums/FileType.java
similarity index 73%
rename from commom/src/main/java/com/bx/common/enums/FileTypeEnum.java
rename to commom/src/main/java/com/bx/common/enums/FileType.java
index a41ac1f..afe3e5b 100644
--- a/commom/src/main/java/com/bx/common/enums/FileTypeEnum.java
+++ b/commom/src/main/java/com/bx/common/enums/FileType.java
@@ -1,6 +1,6 @@
package com.bx.common.enums;
-public enum FileTypeEnum {
+public enum FileType {
FILE(0,"文件"),
IMAGE(1,"图片"),
@@ -13,13 +13,13 @@ public enum FileTypeEnum {
private String desc;
- FileTypeEnum(Integer index, String desc) {
+ FileType(Integer index, String desc) {
this.code =index;
this.desc=desc;
}
- public static FileTypeEnum fromCode(Integer code){
- for (FileTypeEnum typeEnum:values()) {
+ public static FileType fromCode(Integer code){
+ for (FileType typeEnum:values()) {
if (typeEnum.code.equals(code)) {
return typeEnum;
}
diff --git a/commom/src/main/java/com/bx/common/enums/WSCmdEnum.java b/commom/src/main/java/com/bx/common/enums/IMCmdType.java
similarity index 77%
rename from commom/src/main/java/com/bx/common/enums/WSCmdEnum.java
rename to commom/src/main/java/com/bx/common/enums/IMCmdType.java
index 587ae04..f0233db 100644
--- a/commom/src/main/java/com/bx/common/enums/WSCmdEnum.java
+++ b/commom/src/main/java/com/bx/common/enums/IMCmdType.java
@@ -1,6 +1,6 @@
package com.bx.common.enums;
-public enum WSCmdEnum {
+public enum IMCmdType {
LOGIN(0,"登陆"),
HEART_BEAT(1,"心跳"),
@@ -13,13 +13,13 @@ public enum WSCmdEnum {
private String desc;
- WSCmdEnum(Integer index, String desc) {
+ IMCmdType(Integer index, String desc) {
this.code =index;
this.desc=desc;
}
- public static WSCmdEnum fromCode(Integer code){
- for (WSCmdEnum typeEnum:values()) {
+ public static IMCmdType fromCode(Integer code){
+ for (IMCmdType typeEnum:values()) {
if (typeEnum.code.equals(code)) {
return typeEnum;
}
diff --git a/commom/src/main/java/com/bx/common/enums/ListenerType.java b/commom/src/main/java/com/bx/common/enums/ListenerType.java
new file mode 100644
index 0000000..d0cbb64
--- /dev/null
+++ b/commom/src/main/java/com/bx/common/enums/ListenerType.java
@@ -0,0 +1,24 @@
+package com.bx.common.enums;
+
+public enum ListenerType {
+
+ PRIVATE_MESSAGE(0,"私聊消息"),
+ GROUP_MESSAGE(1,"群聊消息");
+
+ private Integer code;
+
+ private String desc;
+
+ ListenerType(Integer index, String desc) {
+ this.code =index;
+ this.desc=desc;
+ }
+
+ public String getDesc() {
+ return desc;
+ }
+
+ public Integer getCode(){
+ return this.code;
+ }
+}
diff --git a/commom/src/main/java/com/bx/common/enums/MessageStatusEnum.java b/commom/src/main/java/com/bx/common/enums/MessageStatus.java
similarity index 70%
rename from commom/src/main/java/com/bx/common/enums/MessageStatusEnum.java
rename to commom/src/main/java/com/bx/common/enums/MessageStatus.java
index be6997b..f36c303 100644
--- a/commom/src/main/java/com/bx/common/enums/MessageStatusEnum.java
+++ b/commom/src/main/java/com/bx/common/enums/MessageStatus.java
@@ -1,7 +1,7 @@
package com.bx.common.enums;
-public enum MessageStatusEnum {
+public enum MessageStatus {
UNREAD(0,"未读"),
ALREADY_READ(1,"已读"),
@@ -11,13 +11,13 @@ public enum MessageStatusEnum {
private String desc;
- MessageStatusEnum(Integer index, String desc) {
+ MessageStatus(Integer index, String desc) {
this.code =index;
this.desc=desc;
}
- public static MessageStatusEnum fromCode(Integer code){
- for (MessageStatusEnum typeEnum:values()) {
+ public static MessageStatus fromCode(Integer code){
+ for (MessageStatus typeEnum:values()) {
if (typeEnum.code.equals(code)) {
return typeEnum;
}
diff --git a/commom/src/main/java/com/bx/common/enums/MessageTypeEnum.java b/commom/src/main/java/com/bx/common/enums/MessageType.java
similarity index 54%
rename from commom/src/main/java/com/bx/common/enums/MessageTypeEnum.java
rename to commom/src/main/java/com/bx/common/enums/MessageType.java
index de53313..9060809 100644
--- a/commom/src/main/java/com/bx/common/enums/MessageTypeEnum.java
+++ b/commom/src/main/java/com/bx/common/enums/MessageType.java
@@ -1,7 +1,7 @@
package com.bx.common.enums;
-public enum MessageTypeEnum {
+public enum MessageType {
TEXT(0,"文字"),
FILE(1,"文件"),
@@ -13,20 +13,11 @@ public enum MessageTypeEnum {
private String desc;
- MessageTypeEnum(Integer index, String desc) {
+ MessageType(Integer index, String desc) {
this.code =index;
this.desc=desc;
}
- public static MessageTypeEnum fromCode(Integer code){
- for (MessageTypeEnum typeEnum:values()) {
- if (typeEnum.code.equals(code)) {
- return typeEnum;
- }
- }
- return null;
- }
-
public String getDesc() {
return desc;
diff --git a/commom/src/main/java/com/bx/common/enums/SendResultType.java b/commom/src/main/java/com/bx/common/enums/SendResultType.java
new file mode 100644
index 0000000..cfbf124
--- /dev/null
+++ b/commom/src/main/java/com/bx/common/enums/SendResultType.java
@@ -0,0 +1,25 @@
+package com.bx.common.enums;
+
+
+public enum SendResultType {
+
+ SUCCESS(0,"发送成功"),
+ FAIL(1,"发送失败");
+
+ private int code;
+ private String msg;
+
+ // 构造方法
+ SendResultType(int code, String msg) {
+ this.code = code;
+ this.msg = msg;
+ }
+ public int getCode() {
+ return code;
+ }
+
+ public void setCode(int code) {
+ this.code = code;
+ }
+
+}
diff --git a/commom/src/main/java/com/bx/common/model/im/GroupMessageInfo.java b/commom/src/main/java/com/bx/common/model/im/GroupMessageInfo.java
index 67f08a7..b471fe7 100644
--- a/commom/src/main/java/com/bx/common/model/im/GroupMessageInfo.java
+++ b/commom/src/main/java/com/bx/common/model/im/GroupMessageInfo.java
@@ -5,7 +5,6 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import lombok.Data;
import java.util.Date;
-import java.util.List;
@Data
public class GroupMessageInfo {
@@ -16,8 +15,6 @@ public class GroupMessageInfo {
private Long sendId;
- private List recvIds;
-
private String content;
private Integer type;
diff --git a/commom/src/main/java/com/bx/common/model/im/IMRecvInfo.java b/commom/src/main/java/com/bx/common/model/im/IMRecvInfo.java
new file mode 100644
index 0000000..e0d8e89
--- /dev/null
+++ b/commom/src/main/java/com/bx/common/model/im/IMRecvInfo.java
@@ -0,0 +1,17 @@
+package com.bx.common.model.im;
+
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+public class IMRecvInfo {
+
+ private Integer cmd;
+
+ private List recvIds;
+
+ private T data;
+}
+
+
diff --git a/commom/src/main/java/com/bx/common/model/im/SendResult.java b/commom/src/main/java/com/bx/common/model/im/SendResult.java
new file mode 100644
index 0000000..c655b2f
--- /dev/null
+++ b/commom/src/main/java/com/bx/common/model/im/SendResult.java
@@ -0,0 +1,17 @@
+package com.bx.common.model.im;
+
+import com.bx.common.enums.SendResultType;
+import lombok.Data;
+
+@Data
+public class SendResult {
+
+ private Long recvId;
+
+ private SendResultType result;
+
+ private String failReason="";
+
+ private T messageInfo;
+
+}
diff --git a/im-client/pom.xml b/im-client/pom.xml
new file mode 100644
index 0000000..73f33b8
--- /dev/null
+++ b/im-client/pom.xml
@@ -0,0 +1,26 @@
+
+
+
+ box-im
+ com.bx
+ 1.0.0
+
+ 4.0.0
+
+ im-client
+
+
+
+ com.bx
+ commom
+ 1.0.0
+
+
+
+ org.springframework.boot
+ spring-boot-starter-data-redis
+
+
+
\ No newline at end of file
diff --git a/im-client/src/main/java/com/bx/imclient/IMAutoConfiguration.java b/im-client/src/main/java/com/bx/imclient/IMAutoConfiguration.java
new file mode 100644
index 0000000..b3a93fc
--- /dev/null
+++ b/im-client/src/main/java/com/bx/imclient/IMAutoConfiguration.java
@@ -0,0 +1,12 @@
+package com.bx.imclient;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.Configuration;
+
+
+@Slf4j
+@Configuration
+@ComponentScan("com.bx.imclient")
+public class IMAutoConfiguration {
+}
diff --git a/im-client/src/main/java/com/bx/imclient/IMClient.java b/im-client/src/main/java/com/bx/imclient/IMClient.java
new file mode 100644
index 0000000..dbe03a3
--- /dev/null
+++ b/im-client/src/main/java/com/bx/imclient/IMClient.java
@@ -0,0 +1,30 @@
+package com.bx.imclient;
+
+import com.bx.common.model.im.GroupMessageInfo;
+import com.bx.common.model.im.PrivateMessageInfo;
+import com.bx.imclient.listener.MessageListenerMulticaster;
+import com.bx.imclient.sender.IMSender;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.List;
+
+@Configuration
+public class IMClient {
+
+ @Autowired
+ private MessageListenerMulticaster listenerMulticaster;
+
+ @Autowired
+ private IMSender imSender;
+
+ public void sendPrivateMessage(Long userId, PrivateMessageInfo... messageInfo){
+ imSender.sendPrivateMessage(userId,messageInfo);
+ }
+
+ public void sendGroupMessage(List userTokens, GroupMessageInfo... messageInfo){
+ imSender.sendGroupMessage(userTokens,messageInfo);
+ }
+
+
+}
diff --git a/im-client/src/main/java/com/bx/imclient/annotation/IMListener.java b/im-client/src/main/java/com/bx/imclient/annotation/IMListener.java
new file mode 100644
index 0000000..8e86edf
--- /dev/null
+++ b/im-client/src/main/java/com/bx/imclient/annotation/IMListener.java
@@ -0,0 +1,18 @@
+package com.bx.imclient.annotation;
+
+import com.bx.common.enums.ListenerType;
+import org.springframework.stereotype.Component;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ElementType.TYPE,ElementType.FIELD})
+@Retention(RetentionPolicy.RUNTIME)
+@Component
+public @interface IMListener {
+
+ ListenerType type();
+
+}
diff --git a/im-client/src/main/java/com/bx/imclient/config/RedisConfig.java b/im-client/src/main/java/com/bx/imclient/config/RedisConfig.java
new file mode 100644
index 0000000..23fac7c
--- /dev/null
+++ b/im-client/src/main/java/com/bx/imclient/config/RedisConfig.java
@@ -0,0 +1,49 @@
+package com.bx.imclient.config;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
+import org.springframework.data.redis.serializer.StringRedisSerializer;
+
+import javax.annotation.Resource;
+
+@Configuration("IMRedisConfig")
+public class RedisConfig {
+
+ @Resource
+ private RedisConnectionFactory factory;
+
+ @Bean("IMRedisTemplate")
+ public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
+ RedisTemplate redisTemplate = new RedisTemplate();
+ redisTemplate.setConnectionFactory(redisConnectionFactory);
+ // 设置值(value)的序列化采用jackson2JsonRedisSerializer
+ redisTemplate.setValueSerializer(jackson2JsonRedisSerializer());
+ redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer());
+ // 设置键(key)的序列化采用StringRedisSerializer。
+ redisTemplate.setKeySerializer(new StringRedisSerializer());
+ redisTemplate.setHashKeySerializer(new StringRedisSerializer());
+ redisTemplate.afterPropertiesSet();
+ return redisTemplate;
+ }
+
+ @Bean
+ public Jackson2JsonRedisSerializer jackson2JsonRedisSerializer(){
+ Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
+ ObjectMapper om = new ObjectMapper();
+ om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
+ // 解决jackson2无法反序列化LocalDateTime的问题
+ om.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
+ jackson2JsonRedisSerializer.setObjectMapper(om);
+ return jackson2JsonRedisSerializer;
+ }
+
+}
diff --git a/im-client/src/main/java/com/bx/imclient/listener/MessageListener.java b/im-client/src/main/java/com/bx/imclient/listener/MessageListener.java
new file mode 100644
index 0000000..a223f34
--- /dev/null
+++ b/im-client/src/main/java/com/bx/imclient/listener/MessageListener.java
@@ -0,0 +1,10 @@
+package com.bx.imclient.listener;
+
+
+import com.bx.common.model.im.SendResult;
+
+public interface MessageListener {
+
+ void process(SendResult result);
+
+}
diff --git a/im-client/src/main/java/com/bx/imclient/listener/MessageListenerMulticaster.java b/im-client/src/main/java/com/bx/imclient/listener/MessageListenerMulticaster.java
new file mode 100644
index 0000000..0cf4516
--- /dev/null
+++ b/im-client/src/main/java/com/bx/imclient/listener/MessageListenerMulticaster.java
@@ -0,0 +1,28 @@
+package com.bx.imclient.listener;
+
+
+import com.bx.common.enums.ListenerType;
+import com.bx.common.model.im.SendResult;
+import com.bx.imclient.annotation.IMListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Collections;
+import java.util.List;
+
+@Component
+public class MessageListenerMulticaster {
+
+
+ @Autowired(required = false)
+ private List messageListeners = Collections.emptyList();
+
+ public void multicast(ListenerType type, SendResult result){
+ for(MessageListener listener:messageListeners){
+ IMListener annotation = listener.getClass().getAnnotation(IMListener.class);
+ if(annotation.type().equals(type)){
+ listener.process(result);
+ }
+ }
+ }
+}
diff --git a/im-client/src/main/java/com/bx/imclient/sender/IMSender.java b/im-client/src/main/java/com/bx/imclient/sender/IMSender.java
new file mode 100644
index 0000000..5c5c665
--- /dev/null
+++ b/im-client/src/main/java/com/bx/imclient/sender/IMSender.java
@@ -0,0 +1,112 @@
+package com.bx.imclient.sender;
+
+import com.bx.common.contant.RedisKey;
+import com.bx.common.enums.IMCmdType;
+import com.bx.common.enums.ListenerType;
+import com.bx.common.enums.SendResultType;
+import com.bx.common.model.im.GroupMessageInfo;
+import com.bx.common.model.im.IMRecvInfo;
+import com.bx.common.model.im.PrivateMessageInfo;
+import com.bx.common.model.im.SendResult;
+import com.bx.imclient.listener.MessageListenerMulticaster;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Service;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Service
+public class IMSender {
+
+ @Autowired
+ @Qualifier("IMRedisTemplate")
+ private RedisTemplate redisTemplate;
+
+ @Autowired
+ private MessageListenerMulticaster listenerMulticaster;
+
+ public void sendPrivateMessage(Long recvId, PrivateMessageInfo... messageInfos){
+ // 获取对方连接的channelId
+ String key = RedisKey.IM_USER_SERVER_ID + recvId;
+ Integer serverId = (Integer) redisTemplate.opsForValue().get(key);
+ // 如果对方在线,将数据存储至redis,等待拉取推送
+ if (serverId != null) {
+ String sendKey = RedisKey.IM_UNREAD_PRIVATE_QUEUE + serverId;
+ IMRecvInfo[] recvInfos = new IMRecvInfo[messageInfos.length];
+ for (int i=0;i recvInfo = new IMRecvInfo<>();
+ recvInfo.setCmd(IMCmdType.PRIVATE_MESSAGE.getCode());
+ List recvIds = new LinkedList();
+ recvIds.add(recvId);
+ recvInfo.setRecvIds(recvIds);
+ recvInfo.setData(messageInfos[i]);
+ recvInfos[i] = recvInfo;
+ }
+ redisTemplate.opsForList().rightPushAll(sendKey, recvInfos);
+ }else{
+ // 回复消息状态
+ for(PrivateMessageInfo messageInfo : messageInfos ) {
+ SendResult result = new SendResult();
+ result.setMessageInfo(messageInfo);
+ result.setRecvId(recvId);
+ result.setResult(SendResultType.FAIL);
+ result.setFailReason("用户不在线");
+ listenerMulticaster.multicast(ListenerType.PRIVATE_MESSAGE, result);
+ }
+ }
+ }
+
+ public void sendGroupMessage(List recvIds, GroupMessageInfo... messageInfos){
+ // 根据群聊每个成员所连的IM-server,进行分组
+ List offLineIds = Collections.synchronizedList(new LinkedList());
+ Map> serverMap = new ConcurrentHashMap<>();
+ recvIds.parallelStream().forEach(id->{
+ String key = RedisKey.IM_USER_SERVER_ID + id;
+ Integer serverId = (Integer)redisTemplate.opsForValue().get(key);
+ if(serverId != null){
+ if(serverMap.containsKey(serverId)){
+ serverMap.get(serverId).add(id);
+ }else {
+ // 此处需要加锁,否则list可以会被覆盖
+ synchronized(serverMap){
+ List list = Collections.synchronizedList(new LinkedList());
+ list.add(id);
+ serverMap.put(serverId,list);
+ }
+ }
+ }else{
+ offLineIds.add(id);
+ }
+ });
+ // 逐个server发送
+ for (Map.Entry> entry : serverMap.entrySet()) {
+ IMRecvInfo[] recvInfos = new IMRecvInfo[messageInfos.length];
+ for (int i=0;i recvInfo = new IMRecvInfo<>();
+ recvInfo.setCmd(IMCmdType.GROUP_MESSAGE.getCode());
+ recvInfo.setRecvIds(new LinkedList<>(entry.getValue()));
+ recvInfo.setData(messageInfos[i]);
+ recvInfos[i] = recvInfo;
+ }
+ String key = RedisKey.IM_UNREAD_GROUP_QUEUE +entry.getKey();
+ redisTemplate.opsForList().rightPushAll(key,recvInfos);
+ }
+ // 不在线的用户,回复消息状态
+ for(GroupMessageInfo messageInfo:messageInfos ){
+ for(Long id : offLineIds){
+ // 回复消息状态
+ SendResult result = new SendResult();
+ result.setMessageInfo(messageInfo);
+ result.setRecvId(id);
+ result.setResult(SendResultType.FAIL);
+ result.setFailReason("用户不在线");
+ listenerMulticaster.multicast(ListenerType.GROUP_MESSAGE,result);
+ }
+ }
+ }
+}
diff --git a/im-client/src/main/java/com/bx/imclient/task/AbstractPullMessageTask.java b/im-client/src/main/java/com/bx/imclient/task/AbstractPullMessageTask.java
new file mode 100644
index 0000000..53b368f
--- /dev/null
+++ b/im-client/src/main/java/com/bx/imclient/task/AbstractPullMessageTask.java
@@ -0,0 +1,47 @@
+package com.bx.imclient.task;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+@Slf4j
+public abstract class AbstractPullMessageTask {
+
+ private int threadNum = 8;
+
+ private ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
+
+ @PostConstruct
+ public void init(){
+ // 初始化定时器
+ for(int i=0;i redisTemplate;
+
+ @Autowired
+ private MessageListenerMulticaster listenerMulticaster;
+
+ @Override
+ public void pullMessage() {
+ String key = RedisKey.IM_RESULT_GROUP_QUEUE;
+ SendResult result = (SendResult)redisTemplate.opsForList().leftPop(key,10, TimeUnit.SECONDS);
+ if(result != null) {
+ listenerMulticaster.multicast(ListenerType.GROUP_MESSAGE,result);
+ }
+ }
+
+}
diff --git a/im-client/src/main/java/com/bx/imclient/task/PullSendResultPrivateMessageTask.java b/im-client/src/main/java/com/bx/imclient/task/PullSendResultPrivateMessageTask.java
new file mode 100644
index 0000000..6f3042d
--- /dev/null
+++ b/im-client/src/main/java/com/bx/imclient/task/PullSendResultPrivateMessageTask.java
@@ -0,0 +1,38 @@
+package com.bx.imclient.task;
+
+import com.bx.common.contant.RedisKey;
+import com.bx.common.enums.ListenerType;
+import com.bx.common.model.im.SendResult;
+import com.bx.imclient.listener.MessageListenerMulticaster;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.TimeUnit;
+
+;
+
+@Slf4j
+@Component
+public class PullSendResultPrivateMessageTask extends AbstractPullMessageTask{
+
+
+ @Qualifier("IMRedisTemplate")
+ @Autowired
+ private RedisTemplate redisTemplate;
+
+ @Autowired
+ private MessageListenerMulticaster listenerMulticaster;
+
+ @Override
+ public void pullMessage() {
+ String key = RedisKey.IM_RESULT_PRIVATE_QUEUE;
+ SendResult result = (SendResult)redisTemplate.opsForList().leftPop(key,10, TimeUnit.SECONDS);
+ if(result != null) {
+ listenerMulticaster.multicast(ListenerType.PRIVATE_MESSAGE, result);
+ }
+ }
+
+}
diff --git a/im-client/src/main/resources/META-INF/spring.factories b/im-client/src/main/resources/META-INF/spring.factories
new file mode 100644
index 0000000..801458e
--- /dev/null
+++ b/im-client/src/main/resources/META-INF/spring.factories
@@ -0,0 +1,2 @@
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
+com.bx.imclient.IMAutoConfiguration
\ No newline at end of file
diff --git a/im-platform/pom.xml b/im-platform/pom.xml
index c27f121..a963aee 100644
--- a/im-platform/pom.xml
+++ b/im-platform/pom.xml
@@ -15,7 +15,7 @@
com.bx
- commom
+ im-client
1.0.0
@@ -101,6 +101,11 @@
thumbnailator
0.4.8
+
+ com.baomidou
+ mybatis-plus-generator
+ 3.3.2
+
diff --git a/im-platform/src/main/java/com/bx/implatform/ImplatformApp.java b/im-platform/src/main/java/com/bx/implatform/ImplatformApp.java
index 3743c93..15480c2 100644
--- a/im-platform/src/main/java/com/bx/implatform/ImplatformApp.java
+++ b/im-platform/src/main/java/com/bx/implatform/ImplatformApp.java
@@ -4,14 +4,12 @@ import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
@Slf4j
@EnableAspectJAutoProxy(exposeProxy = true)
@MapperScan(basePackages = {"com.bx.implatform.mapper"})
-@ComponentScan(basePackages={"com.bx"})
@SpringBootApplication
public class ImplatformApp {
diff --git a/im-platform/src/main/java/com/bx/implatform/config/WebSecurityConfg.java b/im-platform/src/main/java/com/bx/implatform/config/WebSecurityConfg.java
index 9a3a218..7ef0d62 100644
--- a/im-platform/src/main/java/com/bx/implatform/config/WebSecurityConfg.java
+++ b/im-platform/src/main/java/com/bx/implatform/config/WebSecurityConfg.java
@@ -1,9 +1,9 @@
package com.bx.implatform.config;
import com.alibaba.fastjson.JSON;
-import com.bx.common.enums.ResultCode;
-import com.bx.common.result.Result;
-import com.bx.common.result.ResultUtils;
+import com.bx.implatform.enums.ResultCode;
+import com.bx.implatform.result.Result;
+import com.bx.implatform.result.ResultUtils;
import com.bx.implatform.service.IUserService;
import com.bx.implatform.session.UserSession;
import com.fasterxml.jackson.databind.ObjectMapper;
diff --git a/im-platform/src/main/java/com/bx/implatform/contant/Constant.java b/im-platform/src/main/java/com/bx/implatform/contant/Constant.java
new file mode 100644
index 0000000..77fab01
--- /dev/null
+++ b/im-platform/src/main/java/com/bx/implatform/contant/Constant.java
@@ -0,0 +1,12 @@
+package com.bx.implatform.contant;
+
+
+public class Constant {
+ // 最大图片上传大小
+ public static final long MAX_IMAGE_SIZE = 5*1024*1024;
+ // 最大上传文件大小
+ public static final long MAX_FILE_SIZE = 10*1024*1024;
+ // 群聊最大人数
+ public static final long MAX_GROUP_MEMBER = 500;
+
+}
diff --git a/im-platform/src/main/java/com/bx/implatform/contant/RedisKey.java b/im-platform/src/main/java/com/bx/implatform/contant/RedisKey.java
new file mode 100644
index 0000000..9d25026
--- /dev/null
+++ b/im-platform/src/main/java/com/bx/implatform/contant/RedisKey.java
@@ -0,0 +1,16 @@
+package com.bx.implatform.contant;
+
+public class RedisKey {
+
+ // 已读群聊消息位置(已读最大id)
+ public final static String IM_GROUP_READED_POSITION = "im:readed:group:position:";
+ // 缓存前缀
+ public final static String IM_CACHE = "im:cache:";
+ // 缓存是否好友:bool
+ public final static String IM_CACHE_FRIEND = IM_CACHE+"friend";
+ // 缓存群聊信息
+ public final static String IM_CACHE_GROUP = IM_CACHE+"group";
+ // 缓存群聊成员id
+ public final static String IM_CACHE_GROUP_MEMBER_ID = IM_CACHE+"group_member_ids";
+
+}
diff --git a/im-platform/src/main/java/com/bx/implatform/controller/FileController.java b/im-platform/src/main/java/com/bx/implatform/controller/FileController.java
index 901083f..0d50738 100644
--- a/im-platform/src/main/java/com/bx/implatform/controller/FileController.java
+++ b/im-platform/src/main/java/com/bx/implatform/controller/FileController.java
@@ -1,7 +1,7 @@
package com.bx.implatform.controller;
-import com.bx.common.result.Result;
-import com.bx.common.result.ResultUtils;
+import com.bx.implatform.result.Result;
+import com.bx.implatform.result.ResultUtils;
import com.bx.implatform.service.thirdparty.FileService;
import com.bx.implatform.vo.UploadImageVO;
import io.swagger.annotations.Api;
diff --git a/im-platform/src/main/java/com/bx/implatform/controller/FriendController.java b/im-platform/src/main/java/com/bx/implatform/controller/FriendController.java
index e1d718e..9d97450 100644
--- a/im-platform/src/main/java/com/bx/implatform/controller/FriendController.java
+++ b/im-platform/src/main/java/com/bx/implatform/controller/FriendController.java
@@ -1,9 +1,8 @@
package com.bx.implatform.controller;
-
-import com.bx.common.result.Result;
-import com.bx.common.result.ResultUtils;
import com.bx.implatform.entity.Friend;
+import com.bx.implatform.result.Result;
+import com.bx.implatform.result.ResultUtils;
import com.bx.implatform.service.IFriendService;
import com.bx.implatform.session.SessionContext;
import com.bx.implatform.vo.FriendVO;
diff --git a/im-platform/src/main/java/com/bx/implatform/controller/GroupController.java b/im-platform/src/main/java/com/bx/implatform/controller/GroupController.java
index aac8764..250ebbb 100644
--- a/im-platform/src/main/java/com/bx/implatform/controller/GroupController.java
+++ b/im-platform/src/main/java/com/bx/implatform/controller/GroupController.java
@@ -1,8 +1,8 @@
package com.bx.implatform.controller;
-import com.bx.common.result.Result;
-import com.bx.common.result.ResultUtils;
+import com.bx.implatform.result.Result;
+import com.bx.implatform.result.ResultUtils;
import com.bx.implatform.service.IGroupService;
import com.bx.implatform.vo.GroupInviteVO;
import com.bx.implatform.vo.GroupMemberVO;
diff --git a/im-platform/src/main/java/com/bx/implatform/controller/GroupMessageController.java b/im-platform/src/main/java/com/bx/implatform/controller/GroupMessageController.java
index 68b882b..540b2ac 100644
--- a/im-platform/src/main/java/com/bx/implatform/controller/GroupMessageController.java
+++ b/im-platform/src/main/java/com/bx/implatform/controller/GroupMessageController.java
@@ -2,8 +2,8 @@ package com.bx.implatform.controller;
import com.bx.common.model.im.GroupMessageInfo;
-import com.bx.common.result.Result;
-import com.bx.common.result.ResultUtils;
+import com.bx.implatform.result.Result;
+import com.bx.implatform.result.ResultUtils;
import com.bx.implatform.service.IGroupMessageService;
import com.bx.implatform.vo.GroupMessageVO;
import io.swagger.annotations.Api;
diff --git a/im-platform/src/main/java/com/bx/implatform/controller/PrivateMessageController.java b/im-platform/src/main/java/com/bx/implatform/controller/PrivateMessageController.java
index 662c2d8..e07da66 100644
--- a/im-platform/src/main/java/com/bx/implatform/controller/PrivateMessageController.java
+++ b/im-platform/src/main/java/com/bx/implatform/controller/PrivateMessageController.java
@@ -2,8 +2,8 @@ package com.bx.implatform.controller;
import com.bx.common.model.im.PrivateMessageInfo;
-import com.bx.common.result.Result;
-import com.bx.common.result.ResultUtils;
+import com.bx.implatform.result.Result;
+import com.bx.implatform.result.ResultUtils;
import com.bx.implatform.service.IPrivateMessageService;
import com.bx.implatform.vo.PrivateMessageVO;
import io.swagger.annotations.Api;
diff --git a/im-platform/src/main/java/com/bx/implatform/controller/RegisterController.java b/im-platform/src/main/java/com/bx/implatform/controller/RegisterController.java
index 9a3e044..a1250b8 100644
--- a/im-platform/src/main/java/com/bx/implatform/controller/RegisterController.java
+++ b/im-platform/src/main/java/com/bx/implatform/controller/RegisterController.java
@@ -1,8 +1,8 @@
package com.bx.implatform.controller;
-import com.bx.common.result.Result;
-import com.bx.common.result.ResultUtils;
+import com.bx.implatform.result.Result;
+import com.bx.implatform.result.ResultUtils;
import com.bx.implatform.service.IUserService;
import com.bx.implatform.vo.RegisterVO;
import io.swagger.annotations.Api;
diff --git a/im-platform/src/main/java/com/bx/implatform/controller/UserController.java b/im-platform/src/main/java/com/bx/implatform/controller/UserController.java
index 603c8ef..3cc5cdf 100644
--- a/im-platform/src/main/java/com/bx/implatform/controller/UserController.java
+++ b/im-platform/src/main/java/com/bx/implatform/controller/UserController.java
@@ -1,13 +1,12 @@
package com.bx.implatform.controller;
-
-import com.bx.common.result.Result;
-import com.bx.common.result.ResultUtils;
-import com.bx.common.util.BeanUtils;
import com.bx.implatform.entity.User;
+import com.bx.implatform.result.Result;
+import com.bx.implatform.result.ResultUtils;
import com.bx.implatform.service.IUserService;
import com.bx.implatform.session.SessionContext;
import com.bx.implatform.session.UserSession;
+import com.bx.implatform.util.BeanUtils;
import com.bx.implatform.vo.UserVO;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
diff --git a/commom/src/main/java/com/bx/common/enums/ResultCode.java b/im-platform/src/main/java/com/bx/implatform/enums/ResultCode.java
similarity index 96%
rename from commom/src/main/java/com/bx/common/enums/ResultCode.java
rename to im-platform/src/main/java/com/bx/implatform/enums/ResultCode.java
index ec41f5c..bc6f96c 100644
--- a/commom/src/main/java/com/bx/common/enums/ResultCode.java
+++ b/im-platform/src/main/java/com/bx/implatform/enums/ResultCode.java
@@ -1,4 +1,4 @@
-package com.bx.common.enums;
+package com.bx.implatform.enums;
/**
* 响应码枚举
diff --git a/im-platform/src/main/java/com/bx/implatform/exception/GlobalException.java b/im-platform/src/main/java/com/bx/implatform/exception/GlobalException.java
index b3deac0..fdd189d 100644
--- a/im-platform/src/main/java/com/bx/implatform/exception/GlobalException.java
+++ b/im-platform/src/main/java/com/bx/implatform/exception/GlobalException.java
@@ -1,6 +1,6 @@
package com.bx.implatform.exception;
-import com.bx.common.enums.ResultCode;
+import com.bx.implatform.enums.ResultCode;
import lombok.Data;
import java.io.Serializable;
diff --git a/im-platform/src/main/java/com/bx/implatform/exception/GlobalExceptionHandler.java b/im-platform/src/main/java/com/bx/implatform/exception/GlobalExceptionHandler.java
index 693ff39..0c44af8 100644
--- a/im-platform/src/main/java/com/bx/implatform/exception/GlobalExceptionHandler.java
+++ b/im-platform/src/main/java/com/bx/implatform/exception/GlobalExceptionHandler.java
@@ -1,9 +1,9 @@
package com.bx.implatform.exception;
import cn.hutool.json.JSONException;
-import com.bx.common.enums.ResultCode;
-import com.bx.common.result.Result;
-import com.bx.common.result.ResultUtils;
+import com.bx.implatform.enums.ResultCode;
+import com.bx.implatform.result.Result;
+import com.bx.implatform.result.ResultUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.converter.HttpMessageNotReadableException;
diff --git a/commom/src/main/java/com/bx/common/generator/CodeGenerator.java b/im-platform/src/main/java/com/bx/implatform/generator/CodeGenerator.java
similarity index 99%
rename from commom/src/main/java/com/bx/common/generator/CodeGenerator.java
rename to im-platform/src/main/java/com/bx/implatform/generator/CodeGenerator.java
index 7e94971..d0f423f 100644
--- a/commom/src/main/java/com/bx/common/generator/CodeGenerator.java
+++ b/im-platform/src/main/java/com/bx/implatform/generator/CodeGenerator.java
@@ -1,4 +1,4 @@
-package com.bx.common.generator;
+package com.bx.implatform.generator;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.core.toolkit.StringPool;
diff --git a/im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java b/im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java
new file mode 100644
index 0000000..44b7ae6
--- /dev/null
+++ b/im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java
@@ -0,0 +1,34 @@
+package com.bx.implatform.listener;
+
+import com.bx.common.enums.ListenerType;
+import com.bx.common.enums.MessageType;
+import com.bx.common.model.im.GroupMessageInfo;
+import com.bx.common.model.im.SendResult;
+import com.bx.imclient.annotation.IMListener;
+import com.bx.imclient.listener.MessageListener;
+import com.bx.implatform.contant.RedisKey;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+
+
+@Slf4j
+@IMListener(type = ListenerType.GROUP_MESSAGE)
+public class GroupMessageListener implements MessageListener {
+
+ @Autowired
+ private RedisTemplate redisTemplate;
+
+ @Override
+ public void process(SendResult result){
+ GroupMessageInfo messageInfo = (GroupMessageInfo) result.getMessageInfo();
+ if(messageInfo.getType().equals(MessageType.TIP)){
+ // 提示类数据不记录
+ return;
+ }
+ // 保存该用户已拉取的最大消息id
+ String key = RedisKey.IM_GROUP_READED_POSITION + messageInfo.getGroupId()+":"+result.getRecvId();
+ redisTemplate.opsForValue().set(key,messageInfo.getId());
+ }
+
+}
diff --git a/im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java b/im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java
new file mode 100644
index 0000000..49ad576
--- /dev/null
+++ b/im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java
@@ -0,0 +1,40 @@
+package com.bx.implatform.listener;
+
+import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
+import com.bx.common.enums.ListenerType;
+import com.bx.common.enums.MessageStatus;
+import com.bx.common.enums.MessageType;
+import com.bx.common.model.im.PrivateMessageInfo;
+import com.bx.common.model.im.SendResult;
+import com.bx.imclient.annotation.IMListener;
+import com.bx.imclient.listener.MessageListener;
+import com.bx.implatform.entity.PrivateMessage;
+import com.bx.implatform.service.IPrivateMessageService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+
+
+@Slf4j
+@IMListener(type = ListenerType.PRIVATE_MESSAGE)
+public class PrivateMessageListener implements MessageListener {
+
+ @Autowired
+ private IPrivateMessageService privateMessageService;
+
+ @Override
+ public void process(SendResult result){
+ PrivateMessageInfo messageInfo = (PrivateMessageInfo) result.getMessageInfo();
+ if(messageInfo.getType().equals(MessageType.TIP)){
+ // 提示类数据不记录
+ return;
+ }
+ // 更新消息状态
+ UpdateWrapper updateWrapper = new UpdateWrapper<>();
+ updateWrapper.lambda().eq(PrivateMessage::getId,messageInfo.getId())
+ .eq(PrivateMessage::getStatus, MessageStatus.UNREAD.getCode())
+ .set(PrivateMessage::getStatus, MessageStatus.ALREADY_READ.getCode());
+ privateMessageService.update(updateWrapper);
+ log.info("消息已读,消息id:{},发送者:{},接收者:{}",messageInfo.getId(),messageInfo.getSendId(),messageInfo.getRecvId());
+ }
+
+}
diff --git a/commom/src/main/java/com/bx/common/result/Result.java b/im-platform/src/main/java/com/bx/implatform/result/Result.java
similarity index 79%
rename from commom/src/main/java/com/bx/common/result/Result.java
rename to im-platform/src/main/java/com/bx/implatform/result/Result.java
index 08006a4..16e630b 100644
--- a/commom/src/main/java/com/bx/common/result/Result.java
+++ b/im-platform/src/main/java/com/bx/implatform/result/Result.java
@@ -1,4 +1,4 @@
-package com.bx.common.result;
+package com.bx.implatform.result;
import lombok.Data;
diff --git a/commom/src/main/java/com/bx/common/result/ResultUtils.java b/im-platform/src/main/java/com/bx/implatform/result/ResultUtils.java
similarity index 94%
rename from commom/src/main/java/com/bx/common/result/ResultUtils.java
rename to im-platform/src/main/java/com/bx/implatform/result/ResultUtils.java
index 0accdca..50326fd 100644
--- a/commom/src/main/java/com/bx/common/result/ResultUtils.java
+++ b/im-platform/src/main/java/com/bx/implatform/result/ResultUtils.java
@@ -1,7 +1,7 @@
-package com.bx.common.result;
+package com.bx.implatform.result;
-import com.bx.common.enums.ResultCode;
+import com.bx.implatform.enums.ResultCode;
public class ResultUtils {
@@ -50,7 +50,7 @@ public class ResultUtils {
return result;
}
- public static final Result error(ResultCode resultCode, String messsage,T data){
+ public static final Result error(ResultCode resultCode, String messsage, T data){
Result result=new Result();
result.setCode(resultCode.getCode());
result.setMessage(messsage);
diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/FriendServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/FriendServiceImpl.java
index f5df55a..f074d76 100644
--- a/im-platform/src/main/java/com/bx/implatform/service/impl/FriendServiceImpl.java
+++ b/im-platform/src/main/java/com/bx/implatform/service/impl/FriendServiceImpl.java
@@ -2,10 +2,10 @@ package com.bx.implatform.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
-import com.bx.common.contant.RedisKey;
-import com.bx.common.enums.ResultCode;
+import com.bx.implatform.contant.RedisKey;
import com.bx.implatform.entity.Friend;
import com.bx.implatform.entity.User;
+import com.bx.implatform.enums.ResultCode;
import com.bx.implatform.exception.GlobalException;
import com.bx.implatform.mapper.FriendMapper;
import com.bx.implatform.service.IFriendService;
diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMemberServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMemberServiceImpl.java
index 69616b1..e62de44 100644
--- a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMemberServiceImpl.java
+++ b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMemberServiceImpl.java
@@ -3,7 +3,7 @@ package com.bx.implatform.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
-import com.bx.common.contant.RedisKey;
+import com.bx.implatform.contant.RedisKey;
import com.bx.implatform.entity.GroupMember;
import com.bx.implatform.mapper.GroupMemberMapper;
import com.bx.implatform.service.IGroupMemberService;
diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java
index f2f40c6..dea178a 100644
--- a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java
+++ b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java
@@ -3,29 +3,32 @@ package com.bx.implatform.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.bx.common.contant.Constant;
-import com.bx.common.contant.RedisKey;
-import com.bx.common.enums.MessageStatusEnum;
-import com.bx.common.enums.MessageTypeEnum;
-import com.bx.common.enums.ResultCode;
+import com.bx.common.enums.MessageStatus;
+import com.bx.common.enums.MessageType;
import com.bx.common.model.im.GroupMessageInfo;
-import com.bx.common.util.BeanUtils;
+import com.bx.imclient.IMClient;
+import com.bx.implatform.contant.RedisKey;
import com.bx.implatform.entity.Group;
import com.bx.implatform.entity.GroupMember;
import com.bx.implatform.entity.GroupMessage;
+import com.bx.implatform.enums.ResultCode;
import com.bx.implatform.exception.GlobalException;
import com.bx.implatform.mapper.GroupMessageMapper;
import com.bx.implatform.service.IGroupMemberService;
import com.bx.implatform.service.IGroupMessageService;
import com.bx.implatform.service.IGroupService;
import com.bx.implatform.session.SessionContext;
+import com.bx.implatform.util.BeanUtils;
import com.bx.implatform.vo.GroupMessageVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Collections;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
import java.util.stream.Collectors;
@Slf4j
@@ -42,6 +45,9 @@ public class GroupMessageServiceImpl extends ServiceImpl redisTemplate;
+ @Autowired
+ private IMClient imClient;
+
/**
* 发送群聊消息(与mysql所有交换都要进行缓存)
*
@@ -102,12 +108,12 @@ public class GroupMessageServiceImpl extends ServiceImpl userIds = groupMemberService.findUserIdsByGroupId(msg.getGroupId());
GroupMessageInfo msgInfo = BeanUtils.copyProperties(msg, GroupMessageInfo.class);
- msgInfo.setType(MessageTypeEnum.TIP.getCode());
+ msgInfo.setType(MessageType.TIP.getCode());
String content = String.format("'%s'撤回了一条消息",member.getAliasName());
msgInfo.setContent(content);
msgInfo.setSendTime(new Date());
@@ -124,22 +130,17 @@ public class GroupMessageServiceImpl extends ServiceImpl recvIds = new LinkedList();
recvIds.add(userId);
List members = groupMemberService.findByUserId(userId);
for(GroupMember member:members){
// 获取群聊已读的最大消息id,只推送未读消息
- key = RedisKey.IM_GROUP_READED_POSITION + member.getGroupId()+":"+userId;
+ String key = RedisKey.IM_GROUP_READED_POSITION + member.getGroupId()+":"+userId;
Integer maxReadedId = (Integer)redisTemplate.opsForValue().get(key);
QueryWrapper wrapper = new QueryWrapper();
wrapper.lambda().eq(GroupMessage::getGroupId,member.getGroupId())
.gt(GroupMessage::getSendTime,member.getCreatedTime())
- .ne(GroupMessage::getStatus,MessageStatusEnum.RECALL.getCode());
+ .ne(GroupMessage::getStatus, MessageStatus.RECALL.getCode());
if(maxReadedId!=null){
wrapper.lambda().gt(GroupMessage::getId,maxReadedId);
}
@@ -151,11 +152,11 @@ public class GroupMessageServiceImpl extends ServiceImpl messageInfos = messages.stream().map(m->{
GroupMessageInfo msgInfo = BeanUtils.copyProperties(m, GroupMessageInfo.class);
- msgInfo.setRecvIds(recvIds);
return msgInfo;
}).collect(Collectors.toList());
- key = RedisKey.IM_UNREAD_GROUP_MESSAGE + serverId;
- redisTemplate.opsForList().rightPushAll(key,messageInfos.toArray());
+ // 发送消息
+ GroupMessageInfo[] infoArr = messageInfos.toArray(new GroupMessageInfo[messageInfos.size()]);
+ imClient.sendGroupMessage(Collections.singletonList(userId), infoArr);
log.info("拉取未读群聊消息,用户id:{},群聊id:{},数量:{}",userId,member.getGroupId(),messageInfos.size());
}
@@ -184,7 +185,7 @@ public class GroupMessageServiceImpl extends ServiceImpl wrapper = new QueryWrapper<>();
wrapper.lambda().eq(GroupMessage::getGroupId,groupId)
.gt(GroupMessage::getSendTime,member.getCreatedTime())
- .ne(GroupMessage::getStatus,MessageStatusEnum.RECALL.getCode())
+ .ne(GroupMessage::getStatus, MessageStatus.RECALL.getCode())
.orderByDesc(GroupMessage::getId)
.last("limit "+stIdx + ","+size);
@@ -198,29 +199,6 @@ public class GroupMessageServiceImpl extends ServiceImpl userIds, GroupMessageInfo msgInfo){
- // 根据群聊每个成员所连的IM-server,进行分组
- Map> serverMap = new ConcurrentHashMap<>();
- userIds.parallelStream().forEach(id->{
- String key = RedisKey.IM_USER_SERVER_ID + id;
- Integer serverId = (Integer)redisTemplate.opsForValue().get(key);
- if(serverId != null){
- if(serverMap.containsKey(serverId)){
- serverMap.get(serverId).add(id);
- }else {
- // 此处需要加锁,否则list可以会被覆盖
- synchronized(serverMap){
- List list = Collections.synchronizedList(new LinkedList());
- list.add(id);
- serverMap.put(serverId,list);
- }
- }
- }
- });
- // 逐个server发送
- for (Map.Entry> entry : serverMap.entrySet()) {
- msgInfo.setRecvIds(new LinkedList<>(entry.getValue()));
- String key = RedisKey.IM_UNREAD_GROUP_MESSAGE +entry.getKey();
- redisTemplate.opsForList().rightPush(key,msgInfo);
- }
+ imClient.sendGroupMessage(userIds,msgInfo);
}
}
diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupServiceImpl.java
index d286a43..ab72d03 100644
--- a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupServiceImpl.java
+++ b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupServiceImpl.java
@@ -2,14 +2,13 @@ package com.bx.implatform.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
-import com.bx.common.contant.Constant;
-import com.bx.common.contant.RedisKey;
-import com.bx.common.enums.ResultCode;
-import com.bx.common.util.BeanUtils;
+import com.bx.implatform.contant.Constant;
+import com.bx.implatform.contant.RedisKey;
import com.bx.implatform.entity.Friend;
import com.bx.implatform.entity.Group;
import com.bx.implatform.entity.GroupMember;
import com.bx.implatform.entity.User;
+import com.bx.implatform.enums.ResultCode;
import com.bx.implatform.exception.GlobalException;
import com.bx.implatform.mapper.GroupMapper;
import com.bx.implatform.service.IFriendService;
@@ -18,6 +17,7 @@ import com.bx.implatform.service.IGroupService;
import com.bx.implatform.service.IUserService;
import com.bx.implatform.session.SessionContext;
import com.bx.implatform.session.UserSession;
+import com.bx.implatform.util.BeanUtils;
import com.bx.implatform.vo.GroupInviteVO;
import com.bx.implatform.vo.GroupMemberVO;
import com.bx.implatform.vo.GroupVO;
diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java
index 5bfdcb1..5a4fd3b 100644
--- a/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java
+++ b/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java
@@ -4,17 +4,18 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.bx.common.contant.Constant;
import com.bx.common.contant.RedisKey;
-import com.bx.common.enums.MessageStatusEnum;
-import com.bx.common.enums.MessageTypeEnum;
-import com.bx.common.enums.ResultCode;
+import com.bx.common.enums.MessageStatus;
+import com.bx.common.enums.MessageType;
import com.bx.common.model.im.PrivateMessageInfo;
-import com.bx.common.util.BeanUtils;
+import com.bx.imclient.IMClient;
import com.bx.implatform.entity.PrivateMessage;
+import com.bx.implatform.enums.ResultCode;
import com.bx.implatform.exception.GlobalException;
import com.bx.implatform.mapper.PrivateMessageMapper;
import com.bx.implatform.service.IFriendService;
import com.bx.implatform.service.IPrivateMessageService;
import com.bx.implatform.session.SessionContext;
+import com.bx.implatform.util.BeanUtils;
import com.bx.implatform.vo.PrivateMessageVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -33,7 +34,8 @@ public class PrivateMessageServiceImpl extends ServiceImpl redisTemplate;
-
+ @Autowired
+ private IMClient imClient;
/**
* 发送私聊消息
*
@@ -50,18 +52,12 @@ public class PrivateMessageServiceImpl extends ServiceImpl wp.eq(PrivateMessage::getRecvId, userId)
.eq(PrivateMessage::getSendId, friendId)))
- .ne(PrivateMessage::getStatus, MessageStatusEnum.RECALL.getCode())
+ .ne(PrivateMessage::getStatus, MessageStatus.RECALL.getCode())
.orderByDesc(PrivateMessage::getId)
.last("limit " + stIdx + "," + size);
@@ -154,7 +144,7 @@ public class PrivateMessageServiceImpl extends ServiceImpl queryWrapper = new QueryWrapper<>();
queryWrapper.lambda().eq(PrivateMessage::getRecvId, userId)
- .eq(PrivateMessage::getStatus, MessageStatusEnum.UNREAD);
+ .eq(PrivateMessage::getStatus, MessageStatus.UNREAD);
List messages = this.list(queryWrapper);
// 上传至redis,等待推送
if (!messages.isEmpty()) {
@@ -162,8 +152,8 @@ public class PrivateMessageServiceImpl extends ServiceImpl redisTemplate;
-
- @Autowired
- private IPrivateMessageService privateMessageService;
-
- @PostConstruct
- public void init(){
- for(int i=0;i updateWrapper = new UpdateWrapper<>();
- updateWrapper.lambda().eq(PrivateMessage::getId,msgId)
- .eq(PrivateMessage::getStatus,MessageStatusEnum.UNREAD.getCode())
- .set(PrivateMessage::getStatus, MessageStatusEnum.ALREADY_READ.getCode());
- privateMessageService.update(updateWrapper);
- log.info("消息已读,id:{}",msgId);
- }
- }catch (Exception e){
- log.error(e.getMessage());
- Thread.sleep(200);
- }finally {
- // 下一次循环
- if(!executorService.isShutdown()){
- executorService.submit(this);
- }
- }
- }
- }
-}
diff --git a/commom/src/main/java/com/bx/common/util/BeanUtils.java b/im-platform/src/main/java/com/bx/implatform/util/BeanUtils.java
similarity index 99%
rename from commom/src/main/java/com/bx/common/util/BeanUtils.java
rename to im-platform/src/main/java/com/bx/implatform/util/BeanUtils.java
index 5914212..9bc74be 100644
--- a/commom/src/main/java/com/bx/common/util/BeanUtils.java
+++ b/im-platform/src/main/java/com/bx/implatform/util/BeanUtils.java
@@ -1,4 +1,4 @@
-package com.bx.common.util;
+package com.bx.implatform.util;
import org.springframework.beans.BeanWrapper;
import org.springframework.beans.BeanWrapperImpl;
@@ -12,7 +12,6 @@ import java.util.Set;
public class BeanUtils {
-
private static void handleReflectionException(Exception e) {
ReflectionUtils.handleReflectionException(e);
}
diff --git a/commom/src/main/java/com/bx/common/util/DateTimeUtils.java b/im-platform/src/main/java/com/bx/implatform/util/DateTimeUtils.java
similarity index 99%
rename from commom/src/main/java/com/bx/common/util/DateTimeUtils.java
rename to im-platform/src/main/java/com/bx/implatform/util/DateTimeUtils.java
index 5d79b0f..4cad011 100644
--- a/commom/src/main/java/com/bx/common/util/DateTimeUtils.java
+++ b/im-platform/src/main/java/com/bx/implatform/util/DateTimeUtils.java
@@ -1,4 +1,4 @@
-package com.bx.common.util;
+package com.bx.implatform.util;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
diff --git a/im-platform/src/main/java/com/bx/implatform/util/MinioUtil.java b/im-platform/src/main/java/com/bx/implatform/util/MinioUtil.java
index bf4f1cd..ff62244 100644
--- a/im-platform/src/main/java/com/bx/implatform/util/MinioUtil.java
+++ b/im-platform/src/main/java/com/bx/implatform/util/MinioUtil.java
@@ -1,7 +1,6 @@
package com.bx.implatform.util;
-import com.bx.common.util.DateTimeUtils;
import io.minio.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
diff --git a/im-platform/src/main/resources/application.yml b/im-platform/src/main/resources/application.yml
index 71a18c9..a75e938 100644
--- a/im-platform/src/main/resources/application.yml
+++ b/im-platform/src/main/resources/application.yml
@@ -23,7 +23,7 @@ mybatis-plus:
configuration:
# 是否开启自动驼峰命名规则(camel case)映射,即从经典数据库列名 A_COLUMN(下划线命名) 到经典 Java 属性名 aColumn(驼峰命名) 的类似映射
map-underscore-to-camel-case: false
- log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
+ #log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
# mapper
mapper-locations:
# *.xml的具体路径
diff --git a/im-server/src/main/java/com/bx/imserver/task/PullUnreadGroupMessageTask.java b/im-server/src/main/java/com/bx/imserver/task/PullUnreadGroupMessageTask.java
index 241cbb2..7e4b227 100644
--- a/im-server/src/main/java/com/bx/imserver/task/PullUnreadGroupMessageTask.java
+++ b/im-server/src/main/java/com/bx/imserver/task/PullUnreadGroupMessageTask.java
@@ -1,8 +1,9 @@
package com.bx.imserver.task;
import com.bx.common.contant.RedisKey;
-import com.bx.common.enums.WSCmdEnum;
+import com.bx.common.enums.IMCmdType;
import com.bx.common.model.im.GroupMessageInfo;
+import com.bx.common.model.im.IMRecvInfo;
import com.bx.imserver.websocket.WebsocketServer;
import com.bx.imserver.websocket.processor.MessageProcessor;
import com.bx.imserver.websocket.processor.ProcessorFactory;
@@ -28,13 +29,13 @@ public class PullUnreadGroupMessageTask extends AbstractPullMessageTask {
@Override
public void pullMessage() {
// 从redis拉取未读消息
- String key = RedisKey.IM_UNREAD_GROUP_MESSAGE + WSServer.getServerId();
+ String key = RedisKey.IM_UNREAD_GROUP_QUEUE + WSServer.getServerId();
List messageInfos = redisTemplate.opsForList().range(key,0,-1);
for(Object o: messageInfos){
redisTemplate.opsForList().leftPop(key);
- GroupMessageInfo messageInfo = (GroupMessageInfo)o;
- MessageProcessor processor = ProcessorFactory.createProcessor(WSCmdEnum.GROUP_MESSAGE);
- processor.process(messageInfo);
+ IMRecvInfo recvInfo = (IMRecvInfo)o;
+ MessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.GROUP_MESSAGE);
+ processor.process(recvInfo);
}
}
diff --git a/im-server/src/main/java/com/bx/imserver/task/PullUnreadPrivateMessageTask.java b/im-server/src/main/java/com/bx/imserver/task/PullUnreadPrivateMessageTask.java
index bf515ef..a68038b 100644
--- a/im-server/src/main/java/com/bx/imserver/task/PullUnreadPrivateMessageTask.java
+++ b/im-server/src/main/java/com/bx/imserver/task/PullUnreadPrivateMessageTask.java
@@ -2,7 +2,8 @@ package com.bx.imserver.task;
import com.bx.common.contant.RedisKey;
-import com.bx.common.enums.WSCmdEnum;
+import com.bx.common.enums.IMCmdType;
+import com.bx.common.model.im.IMRecvInfo;
import com.bx.common.model.im.PrivateMessageInfo;
import com.bx.imserver.websocket.WebsocketServer;
import com.bx.imserver.websocket.processor.MessageProcessor;
@@ -28,13 +29,13 @@ public class PullUnreadPrivateMessageTask extends AbstractPullMessageTask {
@Override
public void pullMessage() {
// 从redis拉取未读消息
- String key = RedisKey.IM_UNREAD_PRIVATE_MESSAGE + WSServer.getServerId();
+ String key = RedisKey.IM_UNREAD_PRIVATE_QUEUE + WSServer.getServerId();
List messageInfos = redisTemplate.opsForList().range(key,0,-1);
for(Object o: messageInfos){
redisTemplate.opsForList().leftPop(key);
- PrivateMessageInfo messageInfo = (PrivateMessageInfo)o;
- MessageProcessor processor = ProcessorFactory.createProcessor(WSCmdEnum.PRIVATE_MESSAGE);
- processor.process(messageInfo);
+ IMRecvInfo recvInfo = (IMRecvInfo)o;
+ MessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.PRIVATE_MESSAGE);
+ processor.process(recvInfo);
}
}
diff --git a/im-server/src/main/java/com/bx/imserver/websocket/WebSocketHandler.java b/im-server/src/main/java/com/bx/imserver/websocket/WebSocketHandler.java
index 8e8e9f6..7012426 100644
--- a/im-server/src/main/java/com/bx/imserver/websocket/WebSocketHandler.java
+++ b/im-server/src/main/java/com/bx/imserver/websocket/WebSocketHandler.java
@@ -1,7 +1,7 @@
package com.bx.imserver.websocket;
import com.bx.common.contant.RedisKey;
-import com.bx.common.enums.WSCmdEnum;
+import com.bx.common.enums.IMCmdType;
import com.bx.common.model.im.SendInfo;
import com.bx.common.util.SpringContextHolder;
import com.bx.imserver.websocket.processor.MessageProcessor;
@@ -33,7 +33,7 @@ public class WebSocketHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, SendInfo sendInfo) throws Exception {
// 创建处理器进行处理
- MessageProcessor processor = ProcessorFactory.createProcessor(WSCmdEnum.fromCode(sendInfo.getCmd()));
+ MessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.fromCode(sendInfo.getCmd()));
processor.process(ctx,processor.transForm(sendInfo.getData()));
}
diff --git a/im-server/src/main/java/com/bx/imserver/websocket/processor/GroupMessageProcessor.java b/im-server/src/main/java/com/bx/imserver/websocket/processor/GroupMessageProcessor.java
index 67ace27..0c9f531 100644
--- a/im-server/src/main/java/com/bx/imserver/websocket/processor/GroupMessageProcessor.java
+++ b/im-server/src/main/java/com/bx/imserver/websocket/processor/GroupMessageProcessor.java
@@ -1,10 +1,12 @@
package com.bx.imserver.websocket.processor;
import com.bx.common.contant.RedisKey;
-import com.bx.common.enums.MessageTypeEnum;
-import com.bx.common.enums.WSCmdEnum;
+import com.bx.common.enums.IMCmdType;
+import com.bx.common.enums.SendResultType;
import com.bx.common.model.im.GroupMessageInfo;
+import com.bx.common.model.im.IMRecvInfo;
import com.bx.common.model.im.SendInfo;
+import com.bx.common.model.im.SendResult;
import com.bx.imserver.websocket.WebsocketChannelCtxHolder;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
@@ -17,38 +19,58 @@ import java.util.List;
@Slf4j
@Component
-public class GroupMessageProcessor extends MessageProcessor {
+public class GroupMessageProcessor extends MessageProcessor> {
@Autowired
private RedisTemplate redisTemplate;
@Async
@Override
- public void process(GroupMessageInfo data) {
- log.info("接收到群消息,发送者:{},群id:{},接收id:{},内容:{}",data.getSendId(),data.getGroupId(),data.getRecvIds(),data.getContent());
- List recvIds = data.getRecvIds();
- // 接收者id列表不需要传输,节省带宽
- data.setRecvIds(null);
+ public void process(IMRecvInfo recvInfo) {
+ GroupMessageInfo messageInfo = recvInfo.getData();
+ List recvIds = recvInfo.getRecvIds();
+ log.info("接收到群消息,发送者:{},群id:{},接收id:{},内容:{}",messageInfo.getSendId(),messageInfo.getGroupId(),recvIds,messageInfo.getContent());
for(Long recvId:recvIds){
- ChannelHandlerContext channelCtx = WebsocketChannelCtxHolder.getChannelCtx(recvId);
- if(channelCtx != null){
- // 自己发的消息不用推送
- if(recvId != data.getSendId()){
- // 推送消息到用户
- SendInfo sendInfo = new SendInfo();
- sendInfo.setCmd(WSCmdEnum.GROUP_MESSAGE.getCode());
- sendInfo.setData(data);
- channelCtx.channel().writeAndFlush(sendInfo);
+ try {
+ ChannelHandlerContext channelCtx = WebsocketChannelCtxHolder.getChannelCtx(recvId);
+ if(channelCtx != null){
+ // 自己发的消息不用推送
+ if(recvId != messageInfo.getSendId()){
+ // 推送消息到用户
+ SendInfo sendInfo = new SendInfo();
+ sendInfo.setCmd(IMCmdType.GROUP_MESSAGE.getCode());
+ sendInfo.setData(messageInfo);
+ channelCtx.channel().writeAndFlush(sendInfo);
+ // 消息发送成功确认
+ String key = RedisKey.IM_RESULT_GROUP_QUEUE;
+ SendResult sendResult = new SendResult();
+ sendResult.setRecvId(recvId);
+ sendResult.setResult(SendResultType.SUCCESS);
+ sendResult.setMessageInfo(messageInfo);
+ redisTemplate.opsForList().rightPush(key,sendResult);
+ }
+ }else {
+ // 消息发送失败确认
+ String key = RedisKey.IM_RESULT_GROUP_QUEUE;
+ SendResult sendResult = new SendResult();
+ sendResult.setRecvId(recvId);
+ sendResult.setResult(SendResultType.FAIL);
+ sendResult.setFailReason("未找到WS连接");
+ sendResult.setMessageInfo(messageInfo);
+ redisTemplate.opsForList().rightPush(key,sendResult);
+ log.error("未找到WS连接,发送者:{},群id:{},接收id:{},内容:{}",messageInfo.getSendId(),messageInfo.getGroupId(),recvIds,messageInfo.getContent());
}
- if(data.getType() != MessageTypeEnum.TIP.getCode()){
- // 设置已读最大id
- String key = RedisKey.IM_GROUP_READED_POSITION + data.getGroupId()+":"+recvId;
- redisTemplate.opsForValue().set(key,data.getId());
- }
- }else {
- log.error("未找到WS连接,发送者:{},群id:{},接收id:{},内容:{}",data.getSendId(),data.getGroupId(),data.getRecvIds());
+ }catch (Exception e){
+ // 消息发送失败确认
+ String key = RedisKey.IM_RESULT_GROUP_QUEUE;
+ SendResult sendResult = new SendResult();
+ sendResult.setRecvId(recvId);
+ sendResult.setResult(SendResultType.FAIL);
+ sendResult.setFailReason("未知异常");
+ sendResult.setMessageInfo(messageInfo);
+ redisTemplate.opsForList().rightPush(key,sendResult);
+ log.error("发送消息异常,发送者:{},群id:{},接收id:{},内容:{}",messageInfo.getSendId(),messageInfo.getGroupId(),recvIds,messageInfo.getContent());
}
-
}
}
diff --git a/im-server/src/main/java/com/bx/imserver/websocket/processor/HeartbeatProcessor.java b/im-server/src/main/java/com/bx/imserver/websocket/processor/HeartbeatProcessor.java
index 1701299..0bbc743 100644
--- a/im-server/src/main/java/com/bx/imserver/websocket/processor/HeartbeatProcessor.java
+++ b/im-server/src/main/java/com/bx/imserver/websocket/processor/HeartbeatProcessor.java
@@ -3,7 +3,7 @@ package com.bx.imserver.websocket.processor;
import cn.hutool.core.bean.BeanUtil;
import com.bx.common.contant.Constant;
import com.bx.common.contant.RedisKey;
-import com.bx.common.enums.WSCmdEnum;
+import com.bx.common.enums.IMCmdType;
import com.bx.common.model.im.HeartbeatInfo;
import com.bx.common.model.im.SendInfo;
import com.bx.imserver.websocket.WebsocketServer;
@@ -32,7 +32,7 @@ public class HeartbeatProcessor extends MessageProcessor {
public void process(ChannelHandlerContext ctx, HeartbeatInfo beatInfo) {
// 响应ws
SendInfo sendInfo = new SendInfo();
- sendInfo.setCmd(WSCmdEnum.HEART_BEAT.getCode());
+ sendInfo.setCmd(IMCmdType.HEART_BEAT.getCode());
ctx.channel().writeAndFlush(sendInfo);
// 设置属性
diff --git a/im-server/src/main/java/com/bx/imserver/websocket/processor/LoginProcessor.java b/im-server/src/main/java/com/bx/imserver/websocket/processor/LoginProcessor.java
index 64bd23a..a241afd 100644
--- a/im-server/src/main/java/com/bx/imserver/websocket/processor/LoginProcessor.java
+++ b/im-server/src/main/java/com/bx/imserver/websocket/processor/LoginProcessor.java
@@ -3,7 +3,7 @@ package com.bx.imserver.websocket.processor;
import cn.hutool.core.bean.BeanUtil;
import com.bx.common.contant.Constant;
import com.bx.common.contant.RedisKey;
-import com.bx.common.enums.WSCmdEnum;
+import com.bx.common.enums.IMCmdType;
import com.bx.common.model.im.LoginInfo;
import com.bx.common.model.im.SendInfo;
import com.bx.imserver.websocket.WebsocketChannelCtxHolder;
@@ -36,7 +36,7 @@ public class LoginProcessor extends MessageProcessor {
if(context != null){
// 不允许多地登录,强制下线
SendInfo sendInfo = new SendInfo();
- sendInfo.setCmd(WSCmdEnum.FORCE_LOGUT.getCode());
+ sendInfo.setCmd(IMCmdType.FORCE_LOGUT.getCode());
context.channel().writeAndFlush(sendInfo);
}
// 绑定用户和channel
@@ -52,7 +52,7 @@ public class LoginProcessor extends MessageProcessor {
redisTemplate.opsForValue().set(key, WSServer.getServerId(), Constant.ONLINE_TIMEOUT_SECOND, TimeUnit.SECONDS);
// 响应ws
SendInfo sendInfo = new SendInfo();
- sendInfo.setCmd(WSCmdEnum.LOGIN.getCode());
+ sendInfo.setCmd(IMCmdType.LOGIN.getCode());
ctx.channel().writeAndFlush(sendInfo);
}
diff --git a/im-server/src/main/java/com/bx/imserver/websocket/processor/PrivateMessageProcessor.java b/im-server/src/main/java/com/bx/imserver/websocket/processor/PrivateMessageProcessor.java
index c8513ba..bd2b83c 100644
--- a/im-server/src/main/java/com/bx/imserver/websocket/processor/PrivateMessageProcessor.java
+++ b/im-server/src/main/java/com/bx/imserver/websocket/processor/PrivateMessageProcessor.java
@@ -1,10 +1,12 @@
package com.bx.imserver.websocket.processor;
import com.bx.common.contant.RedisKey;
-import com.bx.common.enums.MessageTypeEnum;
-import com.bx.common.enums.WSCmdEnum;
+import com.bx.common.enums.IMCmdType;
+import com.bx.common.enums.SendResultType;
+import com.bx.common.model.im.IMRecvInfo;
import com.bx.common.model.im.PrivateMessageInfo;
import com.bx.common.model.im.SendInfo;
+import com.bx.common.model.im.SendResult;
import com.bx.imserver.websocket.WebsocketChannelCtxHolder;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
@@ -14,30 +16,52 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
-public class PrivateMessageProcessor extends MessageProcessor {
+public class PrivateMessageProcessor extends MessageProcessor> {
@Autowired
private RedisTemplate redisTemplate;
@Override
- public void process(PrivateMessageInfo data) {
- log.info("接收到消息,发送者:{},接收者:{},内容:{}",data.getSendId(),data.getRecvId(),data.getContent());
- // 一个用户可以同时登陆,所以有多个channel
- ChannelHandlerContext channelCtx = WebsocketChannelCtxHolder.getChannelCtx(data.getRecvId());
- if(channelCtx != null ){
- // 推送消息到用户
- SendInfo sendInfo = new SendInfo();
- sendInfo.setCmd(WSCmdEnum.PRIVATE_MESSAGE.getCode());
- sendInfo.setData(data);
- channelCtx.channel().writeAndFlush(sendInfo);
-
- if(data.getType() != MessageTypeEnum.TIP.getCode()) {
- // 已读消息推送至redis,等待更新数据库
- String key = RedisKey.IM_READED_PRIVATE_MESSAGE_ID;
- redisTemplate.opsForList().rightPush(key, data.getId());
+ public void process(IMRecvInfo recvInfo) {
+ PrivateMessageInfo messageInfo = recvInfo.getData();
+ Long recvId = recvInfo.getRecvIds().get(0);
+ log.info("接收到消息,发送者:{},接收者:{},内容:{}",messageInfo.getSendId(),recvId,messageInfo.getContent());
+ try{
+ ChannelHandlerContext channelCtx = WebsocketChannelCtxHolder.getChannelCtx(recvId);
+ if(channelCtx != null ){
+ // 推送消息到用户
+ SendInfo sendInfo = new SendInfo();
+ sendInfo.setCmd(IMCmdType.PRIVATE_MESSAGE.getCode());
+ sendInfo.setData(messageInfo);
+ channelCtx.channel().writeAndFlush(sendInfo);
+ // 消息发送成功确认
+ String key = RedisKey.IM_RESULT_PRIVATE_QUEUE;
+ SendResult sendResult = new SendResult();
+ sendResult.setRecvId(recvId);
+ sendResult.setResult(SendResultType.SUCCESS);
+ sendResult.setMessageInfo(messageInfo);
+ redisTemplate.opsForList().rightPush(key,sendResult);
+ }else{
+ // 消息推送失败确认
+ String key = RedisKey.IM_RESULT_PRIVATE_QUEUE;
+ SendResult sendResult = new SendResult();
+ sendResult.setRecvId(recvId);
+ sendResult.setResult(SendResultType.FAIL);
+ sendResult.setFailReason("未找到WS连接");
+ sendResult.setMessageInfo(messageInfo);
+ redisTemplate.opsForList().rightPush(key,sendResult);
+ log.error("未找到WS连接,发送者:{},接收者:{},内容:{}",messageInfo.getSendId(),recvId,messageInfo.getContent());
}
- }else{
- log.error("未找到WS连接,发送者:{},接收者:{},内容:{}",data.getSendId(),data.getRecvId(),data.getContent());
+ }catch (Exception e){
+ // 消息推送失败确认
+ String key = RedisKey.IM_RESULT_PRIVATE_QUEUE;
+ SendResult sendResult = new SendResult();
+ sendResult.setRecvId(recvId);
+ sendResult.setResult(SendResultType.FAIL);
+ sendResult.setFailReason("未知异常");
+ sendResult.setMessageInfo(messageInfo);
+ redisTemplate.opsForList().rightPush(key,sendResult);
+ log.error("发送异常,发送者:{},接收者:{},内容:{}",messageInfo.getSendId(),recvId,messageInfo.getContent(),e);
}
}
diff --git a/im-server/src/main/java/com/bx/imserver/websocket/processor/ProcessorFactory.java b/im-server/src/main/java/com/bx/imserver/websocket/processor/ProcessorFactory.java
index 7f86cae..c15ed32 100644
--- a/im-server/src/main/java/com/bx/imserver/websocket/processor/ProcessorFactory.java
+++ b/im-server/src/main/java/com/bx/imserver/websocket/processor/ProcessorFactory.java
@@ -1,11 +1,11 @@
package com.bx.imserver.websocket.processor;
-import com.bx.common.enums.WSCmdEnum;
+import com.bx.common.enums.IMCmdType;
import com.bx.common.util.SpringContextHolder;
public class ProcessorFactory {
- public static MessageProcessor createProcessor(WSCmdEnum cmd){
+ public static MessageProcessor createProcessor(IMCmdType cmd){
MessageProcessor processor = null;
switch (cmd){
case LOGIN:
diff --git a/pom.xml b/pom.xml
index fc15ee7..47b9492 100644
--- a/pom.xml
+++ b/pom.xml
@@ -13,6 +13,7 @@
im-platform
im-server
commom
+ im-client