committed by
Gitee
96 changed files with 1533 additions and 601 deletions
@ -1,26 +0,0 @@ |
|||
package com.bx.common.contant; |
|||
|
|||
public class RedisKey { |
|||
|
|||
// im-server最大id,从0开始递增
|
|||
public final static String IM_MAX_SERVER_ID = "im:max_server_id"; |
|||
// 用户ID所连接的IM-server的ID
|
|||
public final static String IM_USER_SERVER_ID = "im:user:server_id:"; |
|||
// 未读私聊消息队列
|
|||
public final static String IM_UNREAD_PRIVATE_MESSAGE = "im:unread:private:"; |
|||
// 未读群聊消息队列
|
|||
public final static String IM_UNREAD_GROUP_MESSAGE = "im:unread:group:"; |
|||
// 已读私聊消息id队列
|
|||
public final static String IM_READED_PRIVATE_MESSAGE_ID = "im:readed:private:id"; |
|||
// 已读群聊消息位置(已读最大id)
|
|||
public final static String IM_GROUP_READED_POSITION = "im:readed:group:position:"; |
|||
// 缓存前缀
|
|||
public final static String IM_CACHE = "im:cache:"; |
|||
// 缓存是否好友:bool
|
|||
public final static String IM_CACHE_FRIEND = IM_CACHE+"friend"; |
|||
// 缓存群聊信息
|
|||
public final static String IM_CACHE_GROUP = IM_CACHE+"group"; |
|||
// 缓存群聊成员id
|
|||
public final static String IM_CACHE_GROUP_MEMBER_ID = IM_CACHE+"group_member_ids"; |
|||
|
|||
} |
|||
@ -1,38 +0,0 @@ |
|||
package com.bx.common.enums; |
|||
|
|||
|
|||
public enum MessageTypeEnum { |
|||
|
|||
TEXT(0,"文字"), |
|||
FILE(1,"文件"), |
|||
IMAGE(2,"图片"), |
|||
VIDEO(3,"视频"), |
|||
TIP(10,"系统提示"); |
|||
|
|||
private Integer code; |
|||
|
|||
private String desc; |
|||
|
|||
MessageTypeEnum(Integer index, String desc) { |
|||
this.code =index; |
|||
this.desc=desc; |
|||
} |
|||
|
|||
public static MessageTypeEnum fromCode(Integer code){ |
|||
for (MessageTypeEnum typeEnum:values()) { |
|||
if (typeEnum.code.equals(code)) { |
|||
return typeEnum; |
|||
} |
|||
} |
|||
return null; |
|||
} |
|||
|
|||
|
|||
public String getDesc() { |
|||
return desc; |
|||
} |
|||
|
|||
public Integer getCode(){ |
|||
return this.code; |
|||
} |
|||
} |
|||
@ -1,24 +0,0 @@ |
|||
package com.bx.common.model.im; |
|||
|
|||
import lombok.Data; |
|||
|
|||
import java.util.Date; |
|||
import java.util.List; |
|||
|
|||
@Data |
|||
public class GroupMessageInfo { |
|||
|
|||
private Long id; |
|||
|
|||
private Long groupId; |
|||
|
|||
private Long sendId; |
|||
|
|||
private List<Long> recvIds; |
|||
|
|||
private String content; |
|||
|
|||
private Integer type; |
|||
|
|||
private Date sendTime; |
|||
} |
|||
@ -1,21 +0,0 @@ |
|||
package com.bx.common.model.im; |
|||
|
|||
import lombok.Data; |
|||
|
|||
import java.util.Date; |
|||
|
|||
@Data |
|||
public class PrivateMessageInfo { |
|||
|
|||
private long id; |
|||
|
|||
private Long sendId; |
|||
|
|||
private Long recvId; |
|||
|
|||
private String content; |
|||
|
|||
private Integer type; |
|||
|
|||
private Date sendTime; |
|||
} |
|||
@ -1,11 +0,0 @@ |
|||
package com.bx.common.model.im; |
|||
|
|||
import lombok.Data; |
|||
|
|||
@Data |
|||
public class SendInfo<T> { |
|||
|
|||
private Integer cmd; |
|||
private T data; |
|||
|
|||
} |
|||
@ -0,0 +1,26 @@ |
|||
<?xml version="1.0" encoding="UTF-8"?> |
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" |
|||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
|||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
|||
<parent> |
|||
<artifactId>box-im</artifactId> |
|||
<groupId>com.bx</groupId> |
|||
<version>1.1.0</version> |
|||
</parent> |
|||
<modelVersion>4.0.0</modelVersion> |
|||
|
|||
<artifactId>im-client</artifactId> |
|||
|
|||
<dependencies> |
|||
<dependency> |
|||
<groupId>com.bx</groupId> |
|||
<artifactId>im-commom</artifactId> |
|||
<version>1.1.0</version> |
|||
</dependency> |
|||
<!-- 引入redis --> |
|||
<dependency> |
|||
<groupId>org.springframework.boot</groupId> |
|||
<artifactId>spring-boot-starter-data-redis</artifactId> |
|||
</dependency> |
|||
</dependencies> |
|||
</project> |
|||
@ -0,0 +1,12 @@ |
|||
package com.bx.imclient; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.context.annotation.ComponentScan; |
|||
import org.springframework.context.annotation.Configuration; |
|||
|
|||
|
|||
@Slf4j |
|||
@Configuration |
|||
@ComponentScan("com.bx.imclient") |
|||
public class IMAutoConfiguration { |
|||
} |
|||
@ -0,0 +1,41 @@ |
|||
package com.bx.imclient; |
|||
|
|||
import com.bx.imclient.listener.MessageListenerMulticaster; |
|||
import com.bx.imclient.sender.IMSender; |
|||
import com.bx.imcommon.model.GroupMessageInfo; |
|||
import com.bx.imcommon.model.PrivateMessageInfo; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.context.annotation.Configuration; |
|||
|
|||
import java.util.List; |
|||
|
|||
@Configuration |
|||
public class IMClient { |
|||
|
|||
@Autowired |
|||
private MessageListenerMulticaster listenerMulticaster; |
|||
@Autowired |
|||
private IMSender imSender; |
|||
|
|||
/** |
|||
* 发送私聊消息 |
|||
* |
|||
* @param recvId 接收用户id |
|||
* @param messageInfo 消息体,将转成json发送到客户端 |
|||
*/ |
|||
public void sendPrivateMessage(Long recvId, PrivateMessageInfo... messageInfo){ |
|||
imSender.sendPrivateMessage(recvId,messageInfo); |
|||
} |
|||
|
|||
/** |
|||
* 发送群聊消息 |
|||
* |
|||
* @param recvIds 群聊用户id列表 |
|||
* @param messageInfo 消息体,将转成json发送到客户端 |
|||
*/ |
|||
public void sendGroupMessage(List<Long> recvIds, GroupMessageInfo... messageInfo){ |
|||
imSender.sendGroupMessage(recvIds,messageInfo); |
|||
} |
|||
|
|||
|
|||
} |
|||
@ -0,0 +1,18 @@ |
|||
package com.bx.imclient.annotation; |
|||
|
|||
import com.bx.imcommon.enums.IMListenerType; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.lang.annotation.ElementType; |
|||
import java.lang.annotation.Retention; |
|||
import java.lang.annotation.RetentionPolicy; |
|||
import java.lang.annotation.Target; |
|||
|
|||
@Target({ElementType.TYPE,ElementType.FIELD}) |
|||
@Retention(RetentionPolicy.RUNTIME) |
|||
@Component |
|||
public @interface IMListener { |
|||
|
|||
IMListenerType type(); |
|||
|
|||
} |
|||
@ -0,0 +1,49 @@ |
|||
package com.bx.imclient.config; |
|||
|
|||
import com.fasterxml.jackson.annotation.JsonAutoDetect; |
|||
import com.fasterxml.jackson.annotation.JsonTypeInfo; |
|||
import com.fasterxml.jackson.annotation.PropertyAccessor; |
|||
import com.fasterxml.jackson.databind.ObjectMapper; |
|||
import com.fasterxml.jackson.databind.SerializationFeature; |
|||
import org.springframework.context.annotation.Bean; |
|||
import org.springframework.context.annotation.Configuration; |
|||
import org.springframework.data.redis.connection.RedisConnectionFactory; |
|||
import org.springframework.data.redis.core.RedisTemplate; |
|||
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; |
|||
import org.springframework.data.redis.serializer.StringRedisSerializer; |
|||
|
|||
import javax.annotation.Resource; |
|||
|
|||
@Configuration("IMRedisConfig") |
|||
public class RedisConfig { |
|||
|
|||
@Resource |
|||
private RedisConnectionFactory factory; |
|||
|
|||
@Bean("IMRedisTemplate") |
|||
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { |
|||
RedisTemplate<String, Object> redisTemplate = new RedisTemplate(); |
|||
redisTemplate.setConnectionFactory(redisConnectionFactory); |
|||
// 设置值(value)的序列化采用jackson2JsonRedisSerializer
|
|||
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer()); |
|||
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer()); |
|||
// 设置键(key)的序列化采用StringRedisSerializer。
|
|||
redisTemplate.setKeySerializer(new StringRedisSerializer()); |
|||
redisTemplate.setHashKeySerializer(new StringRedisSerializer()); |
|||
redisTemplate.afterPropertiesSet(); |
|||
return redisTemplate; |
|||
} |
|||
|
|||
@Bean |
|||
public Jackson2JsonRedisSerializer jackson2JsonRedisSerializer(){ |
|||
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); |
|||
ObjectMapper om = new ObjectMapper(); |
|||
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); |
|||
// 解决jackson2无法反序列化LocalDateTime的问题
|
|||
om.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); |
|||
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY); |
|||
jackson2JsonRedisSerializer.setObjectMapper(om); |
|||
return jackson2JsonRedisSerializer; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,10 @@ |
|||
package com.bx.imclient.listener; |
|||
|
|||
|
|||
import com.bx.imcommon.model.SendResult; |
|||
|
|||
public interface MessageListener { |
|||
|
|||
void process(SendResult result); |
|||
|
|||
} |
|||
@ -0,0 +1,27 @@ |
|||
package com.bx.imclient.listener; |
|||
|
|||
|
|||
import com.bx.imclient.annotation.IMListener; |
|||
import com.bx.imcommon.enums.IMListenerType; |
|||
import com.bx.imcommon.model.SendResult; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.util.Collections; |
|||
import java.util.List; |
|||
|
|||
@Component |
|||
public class MessageListenerMulticaster { |
|||
|
|||
@Autowired(required = false) |
|||
private List<MessageListener> messageListeners = Collections.emptyList(); |
|||
|
|||
public void multicast(IMListenerType type, SendResult result){ |
|||
for(MessageListener listener:messageListeners){ |
|||
IMListener annotation = listener.getClass().getAnnotation(IMListener.class); |
|||
if(annotation!=null && (annotation.type().equals(IMListenerType.ALL) || annotation.type().equals(type))){ |
|||
listener.process(result); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,112 @@ |
|||
package com.bx.imclient.sender; |
|||
|
|||
import com.bx.imclient.listener.MessageListenerMulticaster; |
|||
import com.bx.imcommon.contant.RedisKey; |
|||
import com.bx.imcommon.enums.IMCmdType; |
|||
import com.bx.imcommon.enums.IMListenerType; |
|||
import com.bx.imcommon.enums.IMSendStatus; |
|||
import com.bx.imcommon.model.GroupMessageInfo; |
|||
import com.bx.imcommon.model.IMRecvInfo; |
|||
import com.bx.imcommon.model.PrivateMessageInfo; |
|||
import com.bx.imcommon.model.SendResult; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.beans.factory.annotation.Qualifier; |
|||
import org.springframework.data.redis.core.RedisTemplate; |
|||
import org.springframework.stereotype.Service; |
|||
|
|||
import java.util.Collections; |
|||
import java.util.LinkedList; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.concurrent.ConcurrentHashMap; |
|||
|
|||
@Service |
|||
public class IMSender { |
|||
|
|||
@Autowired |
|||
@Qualifier("IMRedisTemplate") |
|||
private RedisTemplate redisTemplate; |
|||
|
|||
@Autowired |
|||
private MessageListenerMulticaster listenerMulticaster; |
|||
|
|||
public void sendPrivateMessage(Long recvId, PrivateMessageInfo... messageInfos){ |
|||
// 获取对方连接的channelId
|
|||
String key = RedisKey.IM_USER_SERVER_ID + recvId; |
|||
Integer serverId = (Integer) redisTemplate.opsForValue().get(key); |
|||
// 如果对方在线,将数据存储至redis,等待拉取推送
|
|||
if (serverId != null) { |
|||
String sendKey = RedisKey.IM_UNREAD_PRIVATE_QUEUE + serverId; |
|||
IMRecvInfo[] recvInfos = new IMRecvInfo[messageInfos.length]; |
|||
for (int i=0;i<messageInfos.length;i++){ |
|||
IMRecvInfo<PrivateMessageInfo> recvInfo = new IMRecvInfo<>(); |
|||
recvInfo.setCmd(IMCmdType.PRIVATE_MESSAGE.code()); |
|||
List recvIds = new LinkedList(); |
|||
recvIds.add(recvId); |
|||
recvInfo.setRecvIds(recvIds); |
|||
recvInfo.setData(messageInfos[i]); |
|||
recvInfos[i] = recvInfo; |
|||
} |
|||
redisTemplate.opsForList().rightPushAll(sendKey, recvInfos); |
|||
}else{ |
|||
// 回复消息状态
|
|||
for(PrivateMessageInfo messageInfo : messageInfos ) { |
|||
SendResult result = new SendResult(); |
|||
result.setMessageInfo(messageInfo); |
|||
result.setRecvId(recvId); |
|||
result.setStatus(IMSendStatus.FAIL); |
|||
result.setFailReason("用户不在线"); |
|||
listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, result); |
|||
} |
|||
} |
|||
} |
|||
|
|||
public void sendGroupMessage(List<Long> recvIds, GroupMessageInfo... messageInfos){ |
|||
// 根据群聊每个成员所连的IM-server,进行分组
|
|||
List<Long> offLineIds = Collections.synchronizedList(new LinkedList<Long>()); |
|||
Map<Integer, List<Long>> serverMap = new ConcurrentHashMap<>(); |
|||
recvIds.parallelStream().forEach(id->{ |
|||
String key = RedisKey.IM_USER_SERVER_ID + id; |
|||
Integer serverId = (Integer)redisTemplate.opsForValue().get(key); |
|||
if(serverId != null){ |
|||
if(serverMap.containsKey(serverId)){ |
|||
serverMap.get(serverId).add(id); |
|||
}else { |
|||
// 此处需要加锁,否则list可以会被覆盖
|
|||
synchronized(serverMap){ |
|||
List<Long> list = Collections.synchronizedList(new LinkedList<Long>()); |
|||
list.add(id); |
|||
serverMap.put(serverId,list); |
|||
} |
|||
} |
|||
}else{ |
|||
offLineIds.add(id); |
|||
} |
|||
}); |
|||
// 逐个server发送
|
|||
for (Map.Entry<Integer,List<Long>> entry : serverMap.entrySet()) { |
|||
IMRecvInfo[] recvInfos = new IMRecvInfo[messageInfos.length]; |
|||
for (int i=0;i<messageInfos.length;i++){ |
|||
IMRecvInfo<GroupMessageInfo> recvInfo = new IMRecvInfo<>(); |
|||
recvInfo.setCmd(IMCmdType.GROUP_MESSAGE.code()); |
|||
recvInfo.setRecvIds(new LinkedList<>(entry.getValue())); |
|||
recvInfo.setData(messageInfos[i]); |
|||
recvInfos[i] = recvInfo; |
|||
} |
|||
String key = RedisKey.IM_UNREAD_GROUP_QUEUE +entry.getKey(); |
|||
redisTemplate.opsForList().rightPushAll(key,recvInfos); |
|||
} |
|||
// 不在线的用户,回复消息状态
|
|||
for(GroupMessageInfo messageInfo:messageInfos ){ |
|||
for(Long id : offLineIds){ |
|||
// 回复消息状态
|
|||
SendResult result = new SendResult(); |
|||
result.setMessageInfo(messageInfo); |
|||
result.setRecvId(id); |
|||
result.setStatus(IMSendStatus.FAIL); |
|||
result.setFailReason("用户不在线"); |
|||
listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE,result); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,47 @@ |
|||
package com.bx.imclient.task; |
|||
|
|||
import lombok.SneakyThrows; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
|
|||
import javax.annotation.PostConstruct; |
|||
import javax.annotation.PreDestroy; |
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.Executors; |
|||
|
|||
@Slf4j |
|||
public abstract class AbstractPullMessageTask { |
|||
|
|||
private int threadNum = 8; |
|||
|
|||
private ExecutorService executorService = Executors.newFixedThreadPool(threadNum); |
|||
|
|||
@PostConstruct |
|||
public void init(){ |
|||
// 初始化定时器
|
|||
for(int i=0;i<threadNum;i++){ |
|||
executorService.execute(new Runnable() { |
|||
@SneakyThrows |
|||
@Override |
|||
public void run() { |
|||
try{ |
|||
pullMessage(); |
|||
}catch (Exception e){ |
|||
log.error("任务调度异常",e); |
|||
Thread.sleep(200); |
|||
} |
|||
if(!executorService.isShutdown()){ |
|||
executorService.execute(this); |
|||
} |
|||
} |
|||
}); |
|||
} |
|||
} |
|||
|
|||
@PreDestroy |
|||
public void destroy(){ |
|||
log.info("{}线程任务关闭",this.getClass().getSimpleName()); |
|||
executorService.shutdown(); |
|||
} |
|||
|
|||
public abstract void pullMessage(); |
|||
} |
|||
@ -0,0 +1,33 @@ |
|||
package com.bx.imclient.task; |
|||
|
|||
import com.bx.imclient.listener.MessageListenerMulticaster; |
|||
import com.bx.imcommon.contant.RedisKey; |
|||
import com.bx.imcommon.enums.IMListenerType; |
|||
import com.bx.imcommon.model.SendResult; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.beans.factory.annotation.Qualifier; |
|||
import org.springframework.data.redis.core.RedisTemplate; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
@Component |
|||
public class PullSendResultGroupMessageTask extends AbstractPullMessageTask{ |
|||
|
|||
@Qualifier("IMRedisTemplate") |
|||
@Autowired |
|||
private RedisTemplate<String,Object> redisTemplate; |
|||
|
|||
@Autowired |
|||
private MessageListenerMulticaster listenerMulticaster; |
|||
|
|||
@Override |
|||
public void pullMessage() { |
|||
String key = RedisKey.IM_RESULT_GROUP_QUEUE; |
|||
SendResult result = (SendResult)redisTemplate.opsForList().leftPop(key,10, TimeUnit.SECONDS); |
|||
if(result != null) { |
|||
listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE,result); |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,38 @@ |
|||
package com.bx.imclient.task; |
|||
|
|||
import com.bx.imclient.listener.MessageListenerMulticaster; |
|||
import com.bx.imcommon.contant.RedisKey; |
|||
import com.bx.imcommon.enums.IMListenerType; |
|||
import com.bx.imcommon.model.SendResult; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.beans.factory.annotation.Qualifier; |
|||
import org.springframework.data.redis.core.RedisTemplate; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
; |
|||
|
|||
@Slf4j |
|||
@Component |
|||
public class PullSendResultPrivateMessageTask extends AbstractPullMessageTask{ |
|||
|
|||
|
|||
@Qualifier("IMRedisTemplate") |
|||
@Autowired |
|||
private RedisTemplate<String,Object> redisTemplate; |
|||
|
|||
@Autowired |
|||
private MessageListenerMulticaster listenerMulticaster; |
|||
|
|||
@Override |
|||
public void pullMessage() { |
|||
String key = RedisKey.IM_RESULT_PRIVATE_QUEUE; |
|||
SendResult result = (SendResult)redisTemplate.opsForList().leftPop(key,10, TimeUnit.SECONDS); |
|||
if(result != null) { |
|||
listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, result); |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,2 @@ |
|||
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ |
|||
com.bx.imclient.IMAutoConfiguration |
|||
@ -0,0 +1,10 @@ |
|||
package com.bx.imcommon.contant; |
|||
|
|||
|
|||
public class Constant { |
|||
|
|||
// 在线状态过期时间 600s
|
|||
public static final long ONLINE_TIMEOUT_SECOND = 600; |
|||
// 消息允许撤回时间 300s
|
|||
public static final long ALLOW_RECALL_SECOND = 300; |
|||
} |
|||
@ -0,0 +1,19 @@ |
|||
package com.bx.imcommon.contant; |
|||
|
|||
public class RedisKey { |
|||
|
|||
// im-server最大id,从0开始递增
|
|||
public final static String IM_MAX_SERVER_ID = "im:max_server_id"; |
|||
// 用户ID所连接的IM-server的ID
|
|||
public final static String IM_USER_SERVER_ID = "im:user:server_id:"; |
|||
// 未读私聊消息队列
|
|||
public final static String IM_UNREAD_PRIVATE_QUEUE = "im:unread:private:"; |
|||
// 未读群聊消息队列
|
|||
public final static String IM_UNREAD_GROUP_QUEUE = "im:unread:group:"; |
|||
// 私聊消息发送结果队列
|
|||
public final static String IM_RESULT_PRIVATE_QUEUE = "im:result:private"; |
|||
// 群聊消息发送结果队列
|
|||
public final static String IM_RESULT_GROUP_QUEUE = "im:result:group"; |
|||
|
|||
|
|||
} |
|||
@ -0,0 +1,27 @@ |
|||
package com.bx.imcommon.enums; |
|||
|
|||
public enum IMListenerType{ |
|||
ALL(0,"全部消息"), |
|||
PRIVATE_MESSAGE(1,"私聊消息"), |
|||
GROUP_MESSAGE(2,"群聊消息"); |
|||
|
|||
private Integer code; |
|||
|
|||
private String desc; |
|||
|
|||
IMListenerType(Integer index, String desc) { |
|||
this.code =index; |
|||
this.desc=desc; |
|||
} |
|||
|
|||
|
|||
public String description() { |
|||
return desc; |
|||
} |
|||
|
|||
|
|||
public Integer code(){ |
|||
return this.code; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,27 @@ |
|||
package com.bx.imcommon.enums; |
|||
|
|||
|
|||
public enum IMSendStatus { |
|||
|
|||
SUCCESS(0,"发送成功"), |
|||
FAIL(1,"发送失败"); |
|||
|
|||
private int code; |
|||
private String desc; |
|||
|
|||
// 构造方法
|
|||
IMSendStatus(int code, String desc) { |
|||
this.code = code; |
|||
this.desc = desc; |
|||
} |
|||
|
|||
public String description() { |
|||
return desc; |
|||
} |
|||
|
|||
|
|||
public Integer code(){ |
|||
return this.code; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,42 @@ |
|||
package com.bx.imcommon.model; |
|||
|
|||
import com.bx.imcommon.serializer.DateToLongSerializer; |
|||
import com.fasterxml.jackson.databind.annotation.JsonSerialize; |
|||
import lombok.Data; |
|||
|
|||
import java.util.Date; |
|||
|
|||
@Data |
|||
public class GroupMessageInfo { |
|||
|
|||
/* |
|||
* 消息id |
|||
*/ |
|||
private Long id; |
|||
|
|||
/* |
|||
* 群聊id |
|||
*/ |
|||
private Long groupId; |
|||
|
|||
/* |
|||
* 发送者id |
|||
*/ |
|||
private Long sendId; |
|||
|
|||
/* |
|||
* 消息内容 |
|||
*/ |
|||
private String content; |
|||
|
|||
/* |
|||
* 消息内容类型 具体枚举值由应用层定义 |
|||
*/ |
|||
private Integer type; |
|||
|
|||
/** |
|||
* 发送时间 |
|||
*/ |
|||
@JsonSerialize(using = DateToLongSerializer.class) |
|||
private Date sendTime; |
|||
} |
|||
@ -1,4 +1,4 @@ |
|||
package com.bx.common.model.im; |
|||
package com.bx.imcommon.model; |
|||
|
|||
import lombok.Data; |
|||
|
|||
@ -0,0 +1,26 @@ |
|||
package com.bx.imcommon.model; |
|||
|
|||
import lombok.Data; |
|||
|
|||
import java.util.List; |
|||
|
|||
@Data |
|||
public class IMRecvInfo<T> { |
|||
|
|||
/* |
|||
* 命令类型 |
|||
*/ |
|||
private Integer cmd; |
|||
|
|||
/* |
|||
* 接收者id列表 |
|||
*/ |
|||
private List<Long> recvIds; |
|||
|
|||
/* |
|||
* 推送消息体 |
|||
*/ |
|||
private T data; |
|||
} |
|||
|
|||
|
|||
@ -0,0 +1,18 @@ |
|||
package com.bx.imcommon.model; |
|||
|
|||
import lombok.Data; |
|||
|
|||
@Data |
|||
public class IMSendInfo<T> { |
|||
|
|||
/* |
|||
* 命令 |
|||
*/ |
|||
private Integer cmd; |
|||
|
|||
/* |
|||
* 推送消息体 |
|||
*/ |
|||
private T data; |
|||
|
|||
} |
|||
@ -1,4 +1,4 @@ |
|||
package com.bx.common.model.im; |
|||
package com.bx.imcommon.model; |
|||
|
|||
import lombok.Data; |
|||
|
|||
@ -0,0 +1,42 @@ |
|||
package com.bx.imcommon.model; |
|||
|
|||
import com.bx.imcommon.serializer.DateToLongSerializer; |
|||
import com.fasterxml.jackson.databind.annotation.JsonSerialize; |
|||
import lombok.Data; |
|||
|
|||
import java.util.Date; |
|||
|
|||
@Data |
|||
public class PrivateMessageInfo { |
|||
|
|||
/* |
|||
* 消息id |
|||
*/ |
|||
private long id; |
|||
|
|||
/* |
|||
* 发送者id |
|||
*/ |
|||
private Long sendId; |
|||
|
|||
/* |
|||
* 接收者id |
|||
*/ |
|||
private Long recvId; |
|||
|
|||
/* |
|||
* 发送内容 |
|||
*/ |
|||
private String content; |
|||
|
|||
/* |
|||
* 消息内容类型 具体枚举值由应用层定义 |
|||
*/ |
|||
private Integer type; |
|||
|
|||
/** |
|||
* 发送时间 |
|||
*/ |
|||
@JsonSerialize(using = DateToLongSerializer.class) |
|||
private Date sendTime; |
|||
} |
|||
@ -0,0 +1,29 @@ |
|||
package com.bx.imcommon.model; |
|||
|
|||
import com.bx.imcommon.enums.IMSendStatus; |
|||
import lombok.Data; |
|||
|
|||
@Data |
|||
public class SendResult<T> { |
|||
|
|||
/* |
|||
* 接收者id |
|||
*/ |
|||
private Long recvId; |
|||
|
|||
/* |
|||
* 发送状态 |
|||
*/ |
|||
private IMSendStatus status; |
|||
|
|||
/* |
|||
* 失败原因 |
|||
*/ |
|||
private String failReason=""; |
|||
|
|||
/* |
|||
* 消息体(透传) |
|||
*/ |
|||
private T messageInfo; |
|||
|
|||
} |
|||
@ -0,0 +1,28 @@ |
|||
package com.bx.imcommon.serializer; |
|||
|
|||
import com.fasterxml.jackson.core.JsonGenerator; |
|||
import com.fasterxml.jackson.core.JsonToken; |
|||
import com.fasterxml.jackson.core.type.WritableTypeId; |
|||
import com.fasterxml.jackson.databind.JsonSerializer; |
|||
import com.fasterxml.jackson.databind.SerializerProvider; |
|||
import com.fasterxml.jackson.databind.jsontype.TypeSerializer; |
|||
|
|||
import java.io.IOException; |
|||
import java.util.Date; |
|||
|
|||
public class DateToLongSerializer extends JsonSerializer<Date> { |
|||
|
|||
@Override |
|||
public void serialize(Date date, JsonGenerator jsonGenerator, |
|||
SerializerProvider serializerProvider) throws IOException { |
|||
jsonGenerator.writeNumber(date.getTime()); |
|||
} |
|||
|
|||
@Override |
|||
public void serializeWithType(Date value, JsonGenerator gen, SerializerProvider serializers, TypeSerializer typeSer) throws IOException { |
|||
WritableTypeId typeIdDef = typeSer.writeTypePrefix(gen, |
|||
typeSer.typeId(value, JsonToken.VALUE_STRING)); |
|||
serialize(value, gen, serializers); |
|||
typeSer.writeTypeSuffix(gen, typeIdDef); |
|||
} |
|||
} |
|||
@ -1,17 +1,12 @@ |
|||
package com.bx.common.contant; |
|||
|
|||
package com.bx.implatform.contant; |
|||
|
|||
|
|||
public class Constant { |
|||
|
|||
// 最大图片上传大小
|
|||
public static final long MAX_IMAGE_SIZE = 5*1024*1024; |
|||
// 最大上传文件大小
|
|||
public static final long MAX_FILE_SIZE = 10*1024*1024; |
|||
// 群聊最大人数
|
|||
public static final long MAX_GROUP_MEMBER = 500; |
|||
// 在线状态过期时间 600s
|
|||
public static final long ONLINE_TIMEOUT_SECOND = 600; |
|||
// 消息允许撤回时间 300s
|
|||
public static final long ALLOW_RECALL_SECOND = 300; |
|||
|
|||
} |
|||
@ -0,0 +1,16 @@ |
|||
package com.bx.implatform.contant; |
|||
|
|||
public class RedisKey { |
|||
|
|||
// 已读群聊消息位置(已读最大id)
|
|||
public final static String IM_GROUP_READED_POSITION = "im:readed:group:position:"; |
|||
// 缓存前缀
|
|||
public final static String IM_CACHE = "im:cache:"; |
|||
// 缓存是否好友:bool
|
|||
public final static String IM_CACHE_FRIEND = IM_CACHE+"friend"; |
|||
// 缓存群聊信息
|
|||
public final static String IM_CACHE_GROUP = IM_CACHE+"group"; |
|||
// 缓存群聊成员id
|
|||
public final static String IM_CACHE_GROUP_MEMBER_ID = IM_CACHE+"group_member_ids"; |
|||
|
|||
} |
|||
@ -0,0 +1,29 @@ |
|||
package com.bx.implatform.enums; |
|||
|
|||
|
|||
public enum MessageType { |
|||
|
|||
TEXT(0,"文字"), |
|||
FILE(1,"文件"), |
|||
IMAGE(2,"图片"), |
|||
VIDEO(3,"视频"), |
|||
TIP(10,"系统提示"); |
|||
|
|||
private Integer code; |
|||
|
|||
private String desc; |
|||
|
|||
MessageType(Integer index, String desc) { |
|||
this.code =index; |
|||
this.desc=desc; |
|||
} |
|||
|
|||
|
|||
public String description() { |
|||
return desc; |
|||
} |
|||
|
|||
public Integer code(){ |
|||
return this.code; |
|||
} |
|||
} |
|||
@ -1,4 +1,4 @@ |
|||
package com.bx.common.enums; |
|||
package com.bx.implatform.enums; |
|||
|
|||
/** |
|||
* 响应码枚举 |
|||
@ -1,4 +1,4 @@ |
|||
package com.bx.common.generator; |
|||
package com.bx.implatform.generator; |
|||
|
|||
import com.baomidou.mybatisplus.annotation.IdType; |
|||
import com.baomidou.mybatisplus.core.toolkit.StringPool; |
|||
@ -0,0 +1,38 @@ |
|||
package com.bx.implatform.listener; |
|||
|
|||
import com.bx.imclient.annotation.IMListener; |
|||
import com.bx.imclient.listener.MessageListener; |
|||
import com.bx.imcommon.enums.IMListenerType; |
|||
import com.bx.imcommon.enums.IMSendStatus; |
|||
import com.bx.imcommon.model.GroupMessageInfo; |
|||
import com.bx.imcommon.model.SendResult; |
|||
import com.bx.implatform.contant.RedisKey; |
|||
import com.bx.implatform.enums.MessageType; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.data.redis.core.RedisTemplate; |
|||
|
|||
|
|||
@Slf4j |
|||
@IMListener(type = IMListenerType.GROUP_MESSAGE) |
|||
public class GroupMessageListener implements MessageListener { |
|||
|
|||
@Autowired |
|||
private RedisTemplate<String,Object> redisTemplate; |
|||
|
|||
@Override |
|||
public void process(SendResult result){ |
|||
GroupMessageInfo messageInfo = (GroupMessageInfo) result.getMessageInfo(); |
|||
if(messageInfo.getType().equals(MessageType.TIP)){ |
|||
// 提示类数据不记录
|
|||
return; |
|||
} |
|||
|
|||
// 保存该用户已拉取的最大消息id
|
|||
if(result.getStatus().equals(IMSendStatus.SUCCESS)) { |
|||
String key = RedisKey.IM_GROUP_READED_POSITION + messageInfo.getGroupId() + ":" + result.getRecvId(); |
|||
redisTemplate.opsForValue().set(key, messageInfo.getId()); |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,43 @@ |
|||
package com.bx.implatform.listener; |
|||
|
|||
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; |
|||
import com.bx.imclient.annotation.IMListener; |
|||
import com.bx.imclient.listener.MessageListener; |
|||
import com.bx.imcommon.enums.IMListenerType; |
|||
import com.bx.imcommon.enums.IMSendStatus; |
|||
import com.bx.imcommon.model.PrivateMessageInfo; |
|||
import com.bx.imcommon.model.SendResult; |
|||
import com.bx.implatform.entity.PrivateMessage; |
|||
import com.bx.implatform.enums.MessageStatus; |
|||
import com.bx.implatform.enums.MessageType; |
|||
import com.bx.implatform.service.IPrivateMessageService; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
|
|||
|
|||
@Slf4j |
|||
@IMListener(type = IMListenerType.PRIVATE_MESSAGE) |
|||
public class PrivateMessageListener implements MessageListener { |
|||
|
|||
@Autowired |
|||
private IPrivateMessageService privateMessageService; |
|||
|
|||
@Override |
|||
public void process(SendResult result){ |
|||
PrivateMessageInfo messageInfo = (PrivateMessageInfo) result.getMessageInfo(); |
|||
if(messageInfo.getType().equals(MessageType.TIP)){ |
|||
// 提示类数据不记录
|
|||
return; |
|||
} |
|||
// 更新消息状态
|
|||
if(result.getStatus().equals(IMSendStatus.SUCCESS)){ |
|||
UpdateWrapper<PrivateMessage> updateWrapper = new UpdateWrapper<>(); |
|||
updateWrapper.lambda().eq(PrivateMessage::getId,messageInfo.getId()) |
|||
.eq(PrivateMessage::getStatus, MessageStatus.UNREAD.code()) |
|||
.set(PrivateMessage::getStatus, MessageStatus.ALREADY_READ.code()); |
|||
privateMessageService.update(updateWrapper); |
|||
log.info("消息已读,消息id:{},发送者:{},接收者:{}",messageInfo.getId(),messageInfo.getSendId(),messageInfo.getRecvId()); |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -1,4 +1,4 @@ |
|||
package com.bx.common.result; |
|||
package com.bx.implatform.result; |
|||
|
|||
import lombok.Data; |
|||
|
|||
@ -1,74 +0,0 @@ |
|||
package com.bx.implatform.task; |
|||
|
|||
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; |
|||
import com.bx.common.contant.RedisKey; |
|||
import com.bx.common.enums.MessageStatusEnum; |
|||
import com.bx.implatform.entity.PrivateMessage; |
|||
import com.bx.implatform.service.IPrivateMessageService; |
|||
import lombok.SneakyThrows; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.data.redis.core.RedisTemplate; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import javax.annotation.PostConstruct; |
|||
import javax.annotation.PreDestroy; |
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.Executors; |
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
@Slf4j |
|||
@Component |
|||
public class PullAlreadyReadMessageTask { |
|||
|
|||
private int threadNum = 8; |
|||
|
|||
private ExecutorService executorService = Executors.newFixedThreadPool(threadNum); |
|||
|
|||
@Autowired |
|||
private RedisTemplate<String,Object> redisTemplate; |
|||
|
|||
@Autowired |
|||
private IPrivateMessageService privateMessageService; |
|||
|
|||
@PostConstruct |
|||
public void init(){ |
|||
for(int i=0;i<threadNum;i++){ |
|||
executorService.execute(new Task()); |
|||
} |
|||
} |
|||
|
|||
@PreDestroy |
|||
public void destroy(){ |
|||
log.info("{}线程任务关闭",this.getClass().getSimpleName()); |
|||
executorService.shutdown(); |
|||
} |
|||
|
|||
|
|||
protected class Task implements Runnable{ |
|||
@SneakyThrows |
|||
@Override |
|||
public void run() { |
|||
try { |
|||
String key = RedisKey.IM_READED_PRIVATE_MESSAGE_ID; |
|||
Integer msgId = (Integer)redisTemplate.opsForList().leftPop(key,10, TimeUnit.SECONDS); |
|||
if(msgId!=null){ |
|||
UpdateWrapper<PrivateMessage> updateWrapper = new UpdateWrapper<>(); |
|||
updateWrapper.lambda().eq(PrivateMessage::getId,msgId) |
|||
.eq(PrivateMessage::getStatus,MessageStatusEnum.UNREAD.getCode()) |
|||
.set(PrivateMessage::getStatus, MessageStatusEnum.ALREADY_READ.getCode()); |
|||
privateMessageService.update(updateWrapper); |
|||
log.info("消息已读,id:{}",msgId); |
|||
} |
|||
}catch (Exception e){ |
|||
log.error(e.getMessage()); |
|||
Thread.sleep(200); |
|||
}finally { |
|||
// 下一次循环
|
|||
if(!executorService.isShutdown()){ |
|||
executorService.submit(this); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -1,4 +1,4 @@ |
|||
package com.bx.common.util; |
|||
package com.bx.implatform.util; |
|||
|
|||
import cn.hutool.core.date.DateTime; |
|||
import cn.hutool.core.date.DateUtil; |
|||
@ -0,0 +1,77 @@ |
|||
package com.bx.imserver.processor; |
|||
|
|||
import com.bx.imcommon.contant.RedisKey; |
|||
import com.bx.imcommon.enums.IMCmdType; |
|||
import com.bx.imcommon.enums.IMSendStatus; |
|||
import com.bx.imcommon.model.GroupMessageInfo; |
|||
import com.bx.imcommon.model.IMRecvInfo; |
|||
import com.bx.imcommon.model.IMSendInfo; |
|||
import com.bx.imcommon.model.SendResult; |
|||
import com.bx.imserver.util.UserChannelCtxHolder; |
|||
import io.netty.channel.ChannelHandlerContext; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.data.redis.core.RedisTemplate; |
|||
import org.springframework.scheduling.annotation.Async; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.util.List; |
|||
|
|||
@Slf4j |
|||
@Component |
|||
public class GroupMessageProcessor extends MessageProcessor<IMRecvInfo<GroupMessageInfo>> { |
|||
|
|||
@Autowired |
|||
private RedisTemplate<String,Object> redisTemplate; |
|||
|
|||
@Async |
|||
@Override |
|||
public void process(IMRecvInfo<GroupMessageInfo> recvInfo) { |
|||
GroupMessageInfo messageInfo = recvInfo.getData(); |
|||
List<Long> recvIds = recvInfo.getRecvIds(); |
|||
log.info("接收到群消息,发送者:{},群id:{},接收id:{},内容:{}",messageInfo.getSendId(),messageInfo.getGroupId(),recvIds,messageInfo.getContent()); |
|||
for(Long recvId:recvIds){ |
|||
try { |
|||
ChannelHandlerContext channelCtx = UserChannelCtxHolder.getChannelCtx(recvId); |
|||
if(channelCtx != null){ |
|||
// 自己发的消息不用推送
|
|||
if(recvId != messageInfo.getSendId()){ |
|||
// 推送消息到用户
|
|||
IMSendInfo sendInfo = new IMSendInfo(); |
|||
sendInfo.setCmd(IMCmdType.GROUP_MESSAGE.code()); |
|||
sendInfo.setData(messageInfo); |
|||
channelCtx.channel().writeAndFlush(sendInfo); |
|||
// 消息发送成功确认
|
|||
String key = RedisKey.IM_RESULT_GROUP_QUEUE; |
|||
SendResult sendResult = new SendResult(); |
|||
sendResult.setRecvId(recvId); |
|||
sendResult.setStatus(IMSendStatus.SUCCESS); |
|||
sendResult.setMessageInfo(messageInfo); |
|||
redisTemplate.opsForList().rightPush(key,sendResult); |
|||
} |
|||
}else { |
|||
// 消息发送失败确认
|
|||
String key = RedisKey.IM_RESULT_GROUP_QUEUE; |
|||
SendResult sendResult = new SendResult(); |
|||
sendResult.setRecvId(recvId); |
|||
sendResult.setStatus(IMSendStatus.FAIL); |
|||
sendResult.setFailReason("未找到WS连接"); |
|||
sendResult.setMessageInfo(messageInfo); |
|||
redisTemplate.opsForList().rightPush(key,sendResult); |
|||
log.error("未找到WS连接,发送者:{},群id:{},接收id:{},内容:{}",messageInfo.getSendId(),messageInfo.getGroupId(),recvIds,messageInfo.getContent()); |
|||
} |
|||
}catch (Exception e){ |
|||
// 消息发送失败确认
|
|||
String key = RedisKey.IM_RESULT_GROUP_QUEUE; |
|||
SendResult sendResult = new SendResult(); |
|||
sendResult.setRecvId(recvId); |
|||
sendResult.setStatus(IMSendStatus.FAIL); |
|||
sendResult.setFailReason("未知异常"); |
|||
sendResult.setMessageInfo(messageInfo); |
|||
redisTemplate.opsForList().rightPush(key,sendResult); |
|||
log.error("发送消息异常,发送者:{},群id:{},接收id:{},内容:{}",messageInfo.getSendId(),messageInfo.getGroupId(),recvIds,messageInfo.getContent()); |
|||
} |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -1,4 +1,4 @@ |
|||
package com.bx.imserver.websocket.processor; |
|||
package com.bx.imserver.processor; |
|||
|
|||
|
|||
import io.netty.channel.ChannelHandlerContext; |
|||
@ -0,0 +1,69 @@ |
|||
package com.bx.imserver.processor; |
|||
|
|||
import com.bx.imcommon.contant.RedisKey; |
|||
import com.bx.imcommon.enums.IMCmdType; |
|||
import com.bx.imcommon.enums.IMSendStatus; |
|||
import com.bx.imcommon.model.IMRecvInfo; |
|||
import com.bx.imcommon.model.IMSendInfo; |
|||
import com.bx.imcommon.model.PrivateMessageInfo; |
|||
import com.bx.imcommon.model.SendResult; |
|||
import com.bx.imserver.util.UserChannelCtxHolder; |
|||
import io.netty.channel.ChannelHandlerContext; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.data.redis.core.RedisTemplate; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
@Slf4j |
|||
@Component |
|||
public class PrivateMessageProcessor extends MessageProcessor<IMRecvInfo<PrivateMessageInfo>> { |
|||
|
|||
@Autowired |
|||
private RedisTemplate<String,Object> redisTemplate; |
|||
|
|||
@Override |
|||
public void process(IMRecvInfo<PrivateMessageInfo> recvInfo) { |
|||
PrivateMessageInfo messageInfo = recvInfo.getData(); |
|||
Long recvId = recvInfo.getRecvIds().get(0); |
|||
log.info("接收到消息,发送者:{},接收者:{},内容:{}",messageInfo.getSendId(),recvId,messageInfo.getContent()); |
|||
try{ |
|||
ChannelHandlerContext channelCtx = UserChannelCtxHolder.getChannelCtx(recvId); |
|||
if(channelCtx != null ){ |
|||
// 推送消息到用户
|
|||
IMSendInfo sendInfo = new IMSendInfo(); |
|||
sendInfo.setCmd(IMCmdType.PRIVATE_MESSAGE.code()); |
|||
sendInfo.setData(messageInfo); |
|||
channelCtx.channel().writeAndFlush(sendInfo); |
|||
// 消息发送成功确认
|
|||
String key = RedisKey.IM_RESULT_PRIVATE_QUEUE; |
|||
SendResult sendResult = new SendResult(); |
|||
sendResult.setRecvId(recvId); |
|||
sendResult.setStatus(IMSendStatus.SUCCESS); |
|||
sendResult.setMessageInfo(messageInfo); |
|||
redisTemplate.opsForList().rightPush(key,sendResult); |
|||
}else{ |
|||
// 消息推送失败确认
|
|||
String key = RedisKey.IM_RESULT_PRIVATE_QUEUE; |
|||
SendResult sendResult = new SendResult(); |
|||
sendResult.setRecvId(recvId); |
|||
sendResult.setStatus(IMSendStatus.FAIL); |
|||
sendResult.setFailReason("未找到WS连接"); |
|||
sendResult.setMessageInfo(messageInfo); |
|||
redisTemplate.opsForList().rightPush(key,sendResult); |
|||
log.error("未找到WS连接,发送者:{},接收者:{},内容:{}",messageInfo.getSendId(),recvId,messageInfo.getContent()); |
|||
} |
|||
}catch (Exception e){ |
|||
// 消息推送失败确认
|
|||
String key = RedisKey.IM_RESULT_PRIVATE_QUEUE; |
|||
SendResult sendResult = new SendResult(); |
|||
sendResult.setRecvId(recvId); |
|||
sendResult.setStatus(IMSendStatus.FAIL); |
|||
sendResult.setFailReason("未知异常"); |
|||
sendResult.setMessageInfo(messageInfo); |
|||
redisTemplate.opsForList().rightPush(key,sendResult); |
|||
log.error("发送异常,发送者:{},接收者:{},内容:{}",messageInfo.getSendId(),recvId,messageInfo.getContent(),e); |
|||
} |
|||
|
|||
} |
|||
|
|||
} |
|||
@ -1,11 +1,11 @@ |
|||
package com.bx.imserver.websocket.processor; |
|||
package com.bx.imserver.processor; |
|||
|
|||
import com.bx.common.enums.WSCmdEnum; |
|||
import com.bx.common.util.SpringContextHolder; |
|||
import com.bx.imcommon.enums.IMCmdType; |
|||
import com.bx.imserver.util.SpringContextHolder; |
|||
|
|||
public class ProcessorFactory { |
|||
|
|||
public static MessageProcessor createProcessor(WSCmdEnum cmd){ |
|||
public static MessageProcessor createProcessor(IMCmdType cmd){ |
|||
MessageProcessor processor = null; |
|||
switch (cmd){ |
|||
case LOGIN: |
|||
@ -1,4 +1,4 @@ |
|||
package com.bx.common.util; |
|||
package com.bx.imserver.util; |
|||
|
|||
import org.springframework.beans.BeansException; |
|||
import org.springframework.context.ApplicationContext; |
|||
@ -1,55 +0,0 @@ |
|||
package com.bx.imserver.websocket.processor; |
|||
|
|||
import com.bx.common.contant.RedisKey; |
|||
import com.bx.common.enums.MessageTypeEnum; |
|||
import com.bx.common.enums.WSCmdEnum; |
|||
import com.bx.common.model.im.GroupMessageInfo; |
|||
import com.bx.common.model.im.SendInfo; |
|||
import com.bx.imserver.websocket.WebsocketChannelCtxHolder; |
|||
import io.netty.channel.ChannelHandlerContext; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.data.redis.core.RedisTemplate; |
|||
import org.springframework.scheduling.annotation.Async; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.util.List; |
|||
|
|||
@Slf4j |
|||
@Component |
|||
public class GroupMessageProcessor extends MessageProcessor<GroupMessageInfo> { |
|||
|
|||
@Autowired |
|||
private RedisTemplate<String,Object> redisTemplate; |
|||
|
|||
@Async |
|||
@Override |
|||
public void process(GroupMessageInfo data) { |
|||
log.info("接收到群消息,发送者:{},群id:{},接收id:{},内容:{}",data.getSendId(),data.getGroupId(),data.getRecvIds(),data.getContent()); |
|||
List<Long> recvIds = data.getRecvIds(); |
|||
// 接收者id列表不需要传输,节省带宽
|
|||
data.setRecvIds(null); |
|||
for(Long recvId:recvIds){ |
|||
ChannelHandlerContext channelCtx = WebsocketChannelCtxHolder.getChannelCtx(recvId); |
|||
if(channelCtx != null){ |
|||
// 自己发的消息不用推送
|
|||
if(recvId != data.getSendId()){ |
|||
// 推送消息到用户
|
|||
SendInfo sendInfo = new SendInfo(); |
|||
sendInfo.setCmd(WSCmdEnum.GROUP_MESSAGE.getCode()); |
|||
sendInfo.setData(data); |
|||
channelCtx.channel().writeAndFlush(sendInfo); |
|||
} |
|||
if(data.getType() != MessageTypeEnum.TIP.getCode()){ |
|||
// 设置已读最大id
|
|||
String key = RedisKey.IM_GROUP_READED_POSITION + data.getGroupId()+":"+recvId; |
|||
redisTemplate.opsForValue().set(key,data.getId()); |
|||
} |
|||
}else { |
|||
log.error("未找到WS连接,发送者:{},群id:{},接收id:{},内容:{}",data.getSendId(),data.getGroupId(),data.getRecvIds()); |
|||
} |
|||
|
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -1,45 +0,0 @@ |
|||
package com.bx.imserver.websocket.processor; |
|||
|
|||
import com.bx.common.contant.RedisKey; |
|||
import com.bx.common.enums.MessageTypeEnum; |
|||
import com.bx.common.enums.WSCmdEnum; |
|||
import com.bx.common.model.im.PrivateMessageInfo; |
|||
import com.bx.common.model.im.SendInfo; |
|||
import com.bx.imserver.websocket.WebsocketChannelCtxHolder; |
|||
import io.netty.channel.ChannelHandlerContext; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.data.redis.core.RedisTemplate; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
@Slf4j |
|||
@Component |
|||
public class PrivateMessageProcessor extends MessageProcessor<PrivateMessageInfo> { |
|||
|
|||
@Autowired |
|||
private RedisTemplate<String,Object> redisTemplate; |
|||
|
|||
@Override |
|||
public void process(PrivateMessageInfo data) { |
|||
log.info("接收到消息,发送者:{},接收者:{},内容:{}",data.getSendId(),data.getRecvId(),data.getContent()); |
|||
// 一个用户可以同时登陆,所以有多个channel
|
|||
ChannelHandlerContext channelCtx = WebsocketChannelCtxHolder.getChannelCtx(data.getRecvId()); |
|||
if(channelCtx != null ){ |
|||
// 推送消息到用户
|
|||
SendInfo sendInfo = new SendInfo(); |
|||
sendInfo.setCmd(WSCmdEnum.PRIVATE_MESSAGE.getCode()); |
|||
sendInfo.setData(data); |
|||
channelCtx.channel().writeAndFlush(sendInfo); |
|||
|
|||
if(data.getType() != MessageTypeEnum.TIP.getCode()) { |
|||
// 已读消息推送至redis,等待更新数据库
|
|||
String key = RedisKey.IM_READED_PRIVATE_MESSAGE_ID; |
|||
redisTemplate.opsForList().rightPush(key, data.getId()); |
|||
} |
|||
}else{ |
|||
log.error("未找到WS连接,发送者:{},接收者:{},内容:{}",data.getSendId(),data.getRecvId(),data.getContent()); |
|||
} |
|||
|
|||
} |
|||
|
|||
} |
|||
@ -1,8 +1,8 @@ |
|||
package com.bx.imserver.websocket; |
|||
package com.bx.imserver.ws; |
|||
|
|||
import com.bx.common.contant.RedisKey; |
|||
import com.bx.imserver.websocket.endecode.MessageProtocolDecoder; |
|||
import com.bx.imserver.websocket.endecode.MessageProtocolEncoder; |
|||
import com.bx.imcommon.contant.RedisKey; |
|||
import com.bx.imserver.ws.endecode.MessageProtocolDecoder; |
|||
import com.bx.imserver.ws.endecode.MessageProtocolEncoder; |
|||
import io.netty.bootstrap.ServerBootstrap; |
|||
import io.netty.channel.*; |
|||
import io.netty.channel.nio.NioEventLoopGroup; |
|||
@ -0,0 +1,170 @@ |
|||
<template> |
|||
<el-drawer title="聊天历史记录" size="700px" :visible.sync="visible" direction="rtl" :before-close="handleClose"> |
|||
<div class="chat-history" v-loading="loading" |
|||
element-loading-text="拼命加载中"> |
|||
<el-scrollbar class="chat-history-scrollbar" ref="scrollbar" id="historyScrollbar" > |
|||
<ul> |
|||
<li v-for="(msgInfo,idx) in messages" :key="idx"> |
|||
<message-item :mine="msgInfo.sendId == mine.id" :headImage="headImage(msgInfo)" :showName="showName(msgInfo)" |
|||
:msgInfo="msgInfo" :menu="false"> |
|||
</message-item> |
|||
</li> |
|||
</ul> |
|||
</el-scrollbar> |
|||
</div> |
|||
</el-drawer> |
|||
</template> |
|||
|
|||
<script> |
|||
import MessageItem from './MessageItem.vue'; |
|||
|
|||
export default { |
|||
name: 'chatHistory', |
|||
components: { |
|||
MessageItem |
|||
}, |
|||
props: { |
|||
visible: { |
|||
type: Boolean |
|||
}, |
|||
chat: { |
|||
type: Object |
|||
}, |
|||
friend: { |
|||
type: Object |
|||
}, |
|||
group: { |
|||
type: Object |
|||
}, |
|||
groupMembers: { |
|||
type: Array, |
|||
} |
|||
}, |
|||
data() { |
|||
return { |
|||
page: 1, |
|||
size: 10, |
|||
messages: [], |
|||
loadAll: false, |
|||
loading: false, |
|||
lastScrollTime: new Date() |
|||
} |
|||
}, |
|||
methods: { |
|||
handleClose() { |
|||
this.page = 1; |
|||
this.messages = []; |
|||
this.loadAll = false; |
|||
this.$emit('close'); |
|||
}, |
|||
handleScroll() { |
|||
let high = this.$refs.scrollbar.$refs.wrap.scrollTop; //距离顶部的距离 |
|||
let timeDiff = new Date().getTime() - this.lastScrollTime.getTime(); |
|||
if ( high < 30 && timeDiff>500) { |
|||
this.lastScrollTime = new Date(); |
|||
this.loadMessages(); |
|||
|
|||
} |
|||
}, |
|||
loadMessages() { |
|||
if(this.loadAll){ |
|||
return this.$message.success("已到达顶部"); |
|||
} |
|||
let param = { |
|||
page: this.page++, |
|||
size: this.size |
|||
} |
|||
if (this.chat.type == 'GROUP') { |
|||
param.groupId = this.group.id; |
|||
} else { |
|||
param.friendId = this.friend.id; |
|||
} |
|||
this.loading = true; |
|||
this.$http({ |
|||
url: this.histroyAction, |
|||
method: 'get', |
|||
params: param |
|||
}).then(messages => { |
|||
messages.forEach(m => this.messages.unshift(m)); |
|||
this.loading = false; |
|||
if(messages.length <this.size){ |
|||
this.loadAll = true; |
|||
} |
|||
this.refreshScrollPos(); |
|||
}).catch(()=>{ |
|||
this.loading = false; |
|||
}) |
|||
}, |
|||
showName(msgInfo) { |
|||
if (this.chat.type == 'GROUP') { |
|||
let member = this.groupMembers.find((m) => m.userId == msgInfo.sendId); |
|||
return member ? member.aliasName : ""; |
|||
} else { |
|||
return msgInfo.sendId == this.mine.id ? this.mine.nickName : this.chat.showName |
|||
} |
|||
}, |
|||
headImage(msgInfo) { |
|||
if (this.chat.type == 'GROUP') { |
|||
let member = this.groupMembers.find((m) => m.userId == msgInfo.sendId); |
|||
return member ? member.headImage : ""; |
|||
} else { |
|||
return msgInfo.sendId == this.mine.id ? this.mine.headImageThumb : this.chat.headImage |
|||
} |
|||
}, |
|||
refreshScrollPos(){ |
|||
let scrollWrap = this.$refs.scrollbar.$refs.wrap; |
|||
let scrollHeight = scrollWrap.scrollHeight; |
|||
let scrollTop = scrollWrap.scrollTop; |
|||
this.$nextTick(() => { |
|||
let offsetTop = scrollWrap.scrollHeight - scrollHeight; |
|||
scrollWrap.scrollTop = scrollTop + offsetTop; |
|||
// 滚动条没出来,继续加载 |
|||
if(scrollWrap.scrollHeight == scrollHeight){ |
|||
this.loadMessages(); |
|||
} |
|||
}); |
|||
} |
|||
}, |
|||
computed: { |
|||
mine() { |
|||
return this.$store.state.userStore.userInfo; |
|||
}, |
|||
histroyAction() { |
|||
return `/message/${this.chat.type.toLowerCase()}/history`; |
|||
} |
|||
}, |
|||
watch: { |
|||
visible: { |
|||
handler(newValue, oldValue) { |
|||
if (newValue) { |
|||
this.loadMessages(); |
|||
this.$nextTick(() => { |
|||
document.getElementById('historyScrollbar').addEventListener("mousewheel", this.handleScroll,true); |
|||
}); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |
|||
</script> |
|||
|
|||
<style lang="scss"> |
|||
.chat-history { |
|||
display: flex; |
|||
height: 100%; |
|||
|
|||
.chat-history-scrollbar { |
|||
flex: 1; |
|||
.el-scrollbar__thumb { |
|||
background-color: #555555; |
|||
} |
|||
ul { |
|||
padding: 20px; |
|||
|
|||
li { |
|||
list-style-type: none; |
|||
} |
|||
} |
|||
} |
|||
} |
|||
</style> |
|||
Loading…
Reference in new issue