Browse Source

feat: 将redis封装成MQ

master
xsx 2 years ago
parent
commit
8fa8efc051
  1. 5
      db/im-platfrom.sql
  2. 5
      im-client/pom.xml
  3. 2
      im-client/src/main/java/com/bx/imclient/IMClient.java
  4. 31
      im-client/src/main/java/com/bx/imclient/config/RedisConfig.java
  5. 28
      im-client/src/main/java/com/bx/imclient/sender/IMSender.java
  6. 15
      im-client/src/main/java/com/bx/imclient/task/AbstractMessageResultTask.java
  7. 15
      im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java
  8. 15
      im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java
  9. 5
      im-commom/pom.xml
  10. 33
      im-commom/src/main/java/com/bx/imcommon/mq/RedisMQConfig.java
  11. 13
      im-commom/src/main/java/com/bx/imcommon/mq/RedisMQConsumer.java
  12. 29
      im-commom/src/main/java/com/bx/imcommon/mq/RedisMQListener.java
  13. 109
      im-commom/src/main/java/com/bx/imcommon/mq/RedisMQPullTask.java
  14. 42
      im-commom/src/main/java/com/bx/imcommon/mq/RedisMQTemplate.java
  15. 29
      im-commom/src/main/java/com/bx/imcommon/util/ThreadPoolExecutorFactory.java
  16. 13
      im-platform/src/main/java/com/bx/implatform/IMPlatformApp.java
  17. 13
      im-platform/src/main/java/com/bx/implatform/contant/Constant.java
  18. 31
      im-platform/src/main/java/com/bx/implatform/contant/RedisKey.java
  19. 21
      im-platform/src/main/java/com/bx/implatform/dto/GroupBanDTO.java
  20. 19
      im-platform/src/main/java/com/bx/implatform/dto/GroupUnbanDTO.java
  21. 22
      im-platform/src/main/java/com/bx/implatform/dto/UserBanDTO.java
  22. 59
      im-platform/src/main/java/com/bx/implatform/enums/MessageType.java
  23. 50
      im-platform/src/main/java/com/bx/implatform/task/UserBannedConsumerTask.java
  24. 12
      im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java

5
im-platform/src/main/resources/db/db.sql → db/im-platfrom.sql

@ -7,6 +7,8 @@ create table `im_user`(
`head_image_thumb` varchar(255) default '' comment '用户头像缩略图',
`password` varchar(255) not null comment '密码(明文)',
`sex` tinyint(1) default 0 comment '性别 0:男 1:女',
`is_banned` tinyint(1) default 0 comment '是否被封禁 0:否 1:是',
`reason` varchar(255) comment '被封禁原因',
`type` smallint default 1 comment '用户类型 1:普通用户 2:审核账户',
`signature` varchar(1024) default '' comment '个性签名',
`last_login_time` datetime DEFAULT null comment '最后登录时间',
@ -45,7 +47,8 @@ create table `im_group`(
`head_image` varchar(255) default '' comment '群头像',
`head_image_thumb` varchar(255) default '' comment '群头像缩略图',
`notice` varchar(1024) default '' comment '群公告',
`remark` varchar(255) default '' comment '群备注',
`is_banned` tinyint(1) default 0 comment '是否被封禁 0:否 1:是',
`reason` varchar(255) comment '被封禁原因',
`deleted` tinyint(1) default 0 comment '是否已删除',
`created_time` datetime default CURRENT_TIMESTAMP comment '创建时间'
)ENGINE=InnoDB CHARSET=utf8mb3 comment '';

5
im-client/pom.xml

@ -17,10 +17,5 @@
<artifactId>im-commom</artifactId>
<version>2.0.0</version>
</dependency>
<!-- 引入redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
</project>

2
im-client/src/main/java/com/bx/imclient/IMClient.java

@ -65,4 +65,6 @@ public class IMClient {
}
}

31
im-client/src/main/java/com/bx/imclient/config/RedisConfig.java

@ -1,31 +0,0 @@
package com.bx.imclient.config;
import com.alibaba.fastjson.support.spring.FastJsonRedisSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration("IMRedisConfig")
public class RedisConfig {
@Bean("IMRedisTemplate")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate();
redisTemplate.setConnectionFactory(redisConnectionFactory);
// 设置值(value)的序列化采用FastJsonRedisSerializer
redisTemplate.setValueSerializer(fastJsonRedisSerializer());
redisTemplate.setHashValueSerializer(fastJsonRedisSerializer());
// 设置键(key)的序列化采用StringRedisSerializer。
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
public FastJsonRedisSerializer fastJsonRedisSerializer(){
return new FastJsonRedisSerializer<>(Object.class);
}
}

28
im-client/src/main/java/com/bx/imclient/sender/IMSender.java

@ -8,20 +8,20 @@ import com.bx.imcommon.enums.IMListenerType;
import com.bx.imcommon.enums.IMSendCode;
import com.bx.imcommon.enums.IMTerminalType;
import com.bx.imcommon.model.*;
import com.bx.imcommon.mq.RedisMQTemplate;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.*;
@Service
@RequiredArgsConstructor
public class IMSender {
@Resource(name="IMRedisTemplate")
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RedisMQTemplate redisMQTemplate;
@Value("${spring.application.name}")
private String appName;
@ -34,7 +34,7 @@ public class IMSender {
for (Integer terminal : message.getRecvTerminals()) {
// 获取对方连接的channelId
String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, message.getRecvId().toString(), terminal.toString());
Integer serverId = (Integer)redisTemplate.opsForValue().get(key);
Integer serverId = (Integer)redisMQTemplate.opsForValue().get(key);
// 如果对方在线,将数据存储至redis,等待拉取推送
if (serverId != null) {
String sendKey = String.join(":", IMRedisKey.IM_MESSAGE_PRIVATE_QUEUE, serverId.toString());
@ -45,7 +45,7 @@ public class IMSender {
recvInfo.setSender(message.getSender());
recvInfo.setReceivers(Collections.singletonList(new IMUserInfo(message.getRecvId(), terminal)));
recvInfo.setData(message.getData());
redisTemplate.opsForList().rightPush(sendKey, recvInfo);
redisMQTemplate.opsForList().rightPush(sendKey, recvInfo);
} else {
IMSendResult result = new IMSendResult();
result.setSender(message.getSender());
@ -65,7 +65,7 @@ public class IMSender {
}
// 获取终端连接的channelId
String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, message.getSender().getId().toString(), terminal.toString());
Integer serverId = (Integer)redisTemplate.opsForValue().get(key);
Integer serverId = (Integer)redisMQTemplate.opsForValue().get(key);
// 如果终端在线,将数据存储至redis,等待拉取推送
if (serverId != null) {
String sendKey = String.join(":", IMRedisKey.IM_MESSAGE_PRIVATE_QUEUE, serverId.toString());
@ -76,7 +76,7 @@ public class IMSender {
recvInfo.setSender(message.getSender());
recvInfo.setReceivers(Collections.singletonList(new IMUserInfo(message.getSender().getId(), terminal)));
recvInfo.setData(message.getData());
redisTemplate.opsForList().rightPush(sendKey, recvInfo);
redisMQTemplate.opsForList().rightPush(sendKey, recvInfo);
}
}
}
@ -97,7 +97,7 @@ public class IMSender {
});
}
// 批量拉取
List<Object> serverIds = redisTemplate.opsForValue().multiGet(sendMap.keySet());
List<Object> serverIds = redisMQTemplate.opsForValue().multiGet(sendMap.keySet());
// 格式:map<服务器id,list<接收方>>
Map<Integer, List<IMUserInfo>> serverMap = new HashMap<>();
List<IMUserInfo> offLineUsers = new LinkedList<>();
@ -123,7 +123,7 @@ public class IMSender {
recvInfo.setData(message.getData());
// 推送至队列
String key = String.join(":", IMRedisKey.IM_MESSAGE_GROUP_QUEUE, entry.getKey().toString());
redisTemplate.opsForList().rightPush(key, recvInfo);
redisMQTemplate.opsForList().rightPush(key, recvInfo);
}
// 推送给自己的其他终端
@ -134,7 +134,7 @@ public class IMSender {
}
// 获取终端连接的channelId
String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, message.getSender().getId().toString(), terminal.toString());
Integer serverId = (Integer)redisTemplate.opsForValue().get(key);
Integer serverId = (Integer)redisMQTemplate.opsForValue().get(key);
// 如果终端在线,将数据存储至redis,等待拉取推送
if (serverId != null) {
IMRecvInfo recvInfo = new IMRecvInfo();
@ -145,7 +145,7 @@ public class IMSender {
recvInfo.setSendResult(false);
recvInfo.setData(message.getData());
String sendKey = String.join(":", IMRedisKey.IM_MESSAGE_GROUP_QUEUE, serverId.toString());
redisTemplate.opsForList().rightPush(sendKey, recvInfo);
redisMQTemplate.opsForList().rightPush(sendKey, recvInfo);
}
}
}
@ -178,7 +178,7 @@ public class IMSender {
}
}
// 批量拉取
List<Object> serverIds = redisTemplate.opsForValue().multiGet(userMap.keySet());
List<Object> serverIds = redisMQTemplate.opsForValue().multiGet(userMap.keySet());
int idx = 0;
Map<Long,List<IMTerminalType>> onlineMap = new HashMap<>();
for (Map.Entry<String,IMUserInfo> entry : userMap.entrySet()) {
@ -195,7 +195,7 @@ public class IMSender {
public Boolean isOnline(Long userId) {
String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, userId.toString(), "*");
return !Objects.requireNonNull(redisTemplate.keys(key)).isEmpty();
return !Objects.requireNonNull(redisMQTemplate.keys(key)).isEmpty();
}
public List<Long> getOnlineUser(List<Long> userIds){

15
im-client/src/main/java/com/bx/imclient/task/AbstractMessageResultTask.java

@ -5,13 +5,13 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import javax.annotation.PreDestroy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
public abstract class AbstractMessageResultTask implements CommandLineRunner {
private static final ExecutorService EXECUTOR_SERVICE = ThreadPoolExecutorFactory.getThreadPoolExecutor();
private static final ScheduledThreadPoolExecutor EXECUTOR_SERVICE = ThreadPoolExecutorFactory.getThreadPoolExecutor();
@Override
public void run(String... args) {
@ -26,19 +26,12 @@ public abstract class AbstractMessageResultTask implements CommandLineRunner {
log.error("任务调度异常", e);
}
if (!EXECUTOR_SERVICE.isShutdown()) {
Thread.sleep(100);
EXECUTOR_SERVICE.execute(this);
EXECUTOR_SERVICE.schedule(this,100, TimeUnit.MICROSECONDS);
}
}
});
}
@PreDestroy
public void destroy() {
log.info("{}线程任务关闭", this.getClass().getSimpleName());
EXECUTOR_SERVICE.shutdown();
}
public abstract void pullMessage() throws InterruptedException;
}

15
im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java

@ -6,12 +6,11 @@ import com.bx.imclient.listener.MessageListenerMulticaster;
import com.bx.imcommon.contant.IMRedisKey;
import com.bx.imcommon.enums.IMListenerType;
import com.bx.imcommon.model.IMSendResult;
import com.bx.imcommon.mq.RedisMQTemplate;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
@ -21,8 +20,8 @@ import java.util.Objects;
@RequiredArgsConstructor
public class GroupMessageResultResultTask extends AbstractMessageResultTask {
@Resource(name = "IMRedisTemplate")
private RedisTemplate<String,Object> redisTemplate;
@Autowired
private RedisMQTemplate redisMQTemplate;
@Value("${spring.application.name}")
private String appName;
@ -46,12 +45,12 @@ public class GroupMessageResultResultTask extends AbstractMessageResultTask {
List<IMSendResult> loadBatch() {
String key = StrUtil.join(":", IMRedisKey.IM_RESULT_GROUP_QUEUE, appName);
//这个接口redis6.2以上才支持
//List<Object> list = redisTemplate.opsForList().leftPop(key, batchSize);
//List<Object> list = redisMQTemplate.opsForList().leftPop(key, batchSize);
List<IMSendResult> results = new LinkedList<>();
JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key);
JSONObject jsonObject = (JSONObject) redisMQTemplate.opsForList().leftPop(key);
while (!Objects.isNull(jsonObject) && results.size() < batchSize) {
results.add(jsonObject.toJavaObject(IMSendResult.class));
jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key);
jsonObject = (JSONObject) redisMQTemplate.opsForList().leftPop(key);
}
return results;
}

15
im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java

@ -6,13 +6,12 @@ import com.bx.imclient.listener.MessageListenerMulticaster;
import com.bx.imcommon.contant.IMRedisKey;
import com.bx.imcommon.enums.IMListenerType;
import com.bx.imcommon.model.IMSendResult;
import com.bx.imcommon.mq.RedisMQTemplate;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
@ -22,8 +21,8 @@ import java.util.Objects;
@RequiredArgsConstructor
public class PrivateMessageResultResultTask extends AbstractMessageResultTask {
@Resource(name = "IMRedisTemplate")
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RedisMQTemplate redisMQTemplate;
@Value("${spring.application.name}")
private String appName;
@ -47,12 +46,12 @@ public class PrivateMessageResultResultTask extends AbstractMessageResultTask {
List<IMSendResult> loadBatch() {
String key = StrUtil.join(":", IMRedisKey.IM_RESULT_PRIVATE_QUEUE, appName);
//这个接口redis6.2以上才支持
//List<Object> list = redisTemplate.opsForList().leftPop(key, batchSize);
//List<Object> list = redisMQTemplate.opsForList().leftPop(key, batchSize);
List<IMSendResult> results = new LinkedList<>();
JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key);
JSONObject jsonObject = (JSONObject) redisMQTemplate.opsForList().leftPop(key);
while (!Objects.isNull(jsonObject) && results.size() < batchSize) {
results.add(jsonObject.toJavaObject(IMSendResult.class));
jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key);
jsonObject = (JSONObject) redisMQTemplate.opsForList().leftPop(key);
}
return results;
}

5
im-commom/pom.xml

@ -65,5 +65,10 @@
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<!-- 引入redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
</project>

33
im-commom/src/main/java/com/bx/imcommon/mq/RedisMQConfig.java

@ -0,0 +1,33 @@
package com.bx.imcommon.mq;
import com.alibaba.fastjson.support.spring.FastJsonRedisSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisMQConfig {
@Bean
public RedisMQTemplate redisMQTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisMQTemplate redisMQTemplate = new RedisMQTemplate();
redisMQTemplate.setConnectionFactory(redisConnectionFactory);
// 设置值(value)的序列化采用FastJsonRedisSerializer
redisMQTemplate.setValueSerializer(fastJsonRedisSerializer());
redisMQTemplate.setHashValueSerializer(fastJsonRedisSerializer());
// 设置键(key)的序列化采用StringRedisSerializer。
redisMQTemplate.setKeySerializer(new StringRedisSerializer());
redisMQTemplate.setHashKeySerializer(new StringRedisSerializer());
redisMQTemplate.afterPropertiesSet();
return redisMQTemplate;
}
@Bean
public FastJsonRedisSerializer fastJsonRedisSerializer(){
return new FastJsonRedisSerializer<>(Object.class);
}
}

13
im-commom/src/main/java/com/bx/imcommon/mq/RedisMQConsumer.java

@ -0,0 +1,13 @@
package com.bx.imcommon.mq;
import java.util.List;
/**
* redis 队列消费者抽象类
*/
public abstract class RedisMQConsumer<T> {
public void onMessage(T data){}
public void onMessage(List<T> datas){}
}

29
im-commom/src/main/java/com/bx/imcommon/mq/RedisMQListener.java

@ -0,0 +1,29 @@
package com.bx.imcommon.mq;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* redis 队列消费监听注解
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface RedisMQListener {
/**
* 队列也是redis的key
*/
String queue();
/**
* 一次性拉取的数据数量
*/
int batchSize() default 1;
/**
* 拉取间隔周期单位:ms
*/
int period() default 100;
}

109
im-commom/src/main/java/com/bx/imcommon/mq/RedisMQPullTask.java

@ -0,0 +1,109 @@
package com.bx.imcommon.mq;
import com.alibaba.fastjson.JSONObject;
import com.bx.imcommon.util.ThreadPoolExecutorFactory;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.*;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* reids 队列拉取定时任务
*
* @author: Blue
* @date: 2024-07-15
* @version: 1.0
*/
@Slf4j
@Component
public class RedisMQPullTask implements CommandLineRunner {
private static final ScheduledThreadPoolExecutor EXECUTOR_SERVICE =
ThreadPoolExecutorFactory.getThreadPoolExecutor();
@Autowired(required = false)
private List<RedisMQConsumer> consumers = Collections.emptyList();
@Autowired
private RedisMQTemplate redisMQTemplate;
@Override
public void run(String... args) {
consumers.forEach((consumer -> {
// 注解参数
RedisMQListener annotation = consumer.getClass().getAnnotation(RedisMQListener.class);
String key = annotation.queue();
int batchSize = annotation.batchSize();
int period = annotation.period();
// 获取泛型类型
Type superClass = consumer.getClass().getGenericSuperclass();
Type type = ((ParameterizedType)superClass).getActualTypeArguments()[0];
EXECUTOR_SERVICE.execute(new Runnable() {
@Override
public void run() {
List<Object> datas = new LinkedList<>();
try {
// 拉取一个批次的数据
List<Object> objects = pullBatch(key, batchSize);
for (Object obj : objects) {
if (obj instanceof JSONObject) {
JSONObject jsonObject = (JSONObject)obj;
Object data = jsonObject.toJavaObject(type);
consumer.onMessage(data);
datas.add(data);
}
}
if(!datas.isEmpty()){
consumer.onMessage(datas);
}
} catch (Exception e) {
log.error("数据消费异常,队列:{}", key, e);
}
// 继续消费数据
if (!EXECUTOR_SERVICE.isShutdown()) {
if (datas.size() < batchSize) {
// 数据已经消费完,等待下一个周期继续拉取
EXECUTOR_SERVICE.schedule(this, period, TimeUnit.MICROSECONDS);
} else {
// 数据没有消费完,直接开启下一个消费周期
EXECUTOR_SERVICE.execute(this);
}
}
}
});
}));
}
private List<Object> pullBatch(String key, Integer batchSize) {
List<Object> objects = new LinkedList<>();
if (redisMQTemplate.isSupportBatchPull()) {
// 版本大于6.2,支持批量拉取
objects = redisMQTemplate.opsForList().leftPop(key, 100);
} else {
// 版本小于6.2,只能逐条拉取
Object obj = redisMQTemplate.opsForList().leftPop(key);
while (!Objects.isNull(obj) && objects.size() < batchSize) {
objects.add(obj);
obj = redisMQTemplate.opsForList().leftPop(key);
}
}
return objects;
}
@PreDestroy
public void destory(){
ThreadPoolExecutorFactory.shutDown();
}
}

42
im-commom/src/main/java/com/bx/imcommon/mq/RedisMQTemplate.java

@ -0,0 +1,42 @@
package com.bx.imcommon.mq;
import cn.hutool.core.util.StrUtil;
import org.apache.logging.log4j.util.Strings;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.Properties;
/**
* @author: 谢绍许
* @date: 2024-07-16
* @version: 1.0
*/
public class RedisMQTemplate extends RedisTemplate<String, Object> {
private String version = Strings.EMPTY;
public String getVersion() {
if (version.isEmpty()) {
RedisConnection redisConnection = this.getConnectionFactory().getConnection();
Properties properties = redisConnection.info();
version = properties.getProperty("redis_version");
}
return version;
}
/**
* 是否支持批量拉取redis版本大于6.2支持批量拉取
* @return
*/
Boolean isSupportBatchPull() {
String version = getVersion();
String[] arr = version.split("\\.");
if (arr.length < 2) {
return false;
}
Integer firVersion = Integer.valueOf(arr[0]);
Integer secVersion = Integer.valueOf(arr[1]);
return firVersion > 6 || (firVersion == 6 && secVersion >= 2);
}
}

29
im-commom/src/main/java/com/bx/imcommon/util/ThreadPoolExecutorFactory.java

@ -2,9 +2,7 @@ package com.bx.imcommon.util;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
/**
* 创建单例线程池
@ -36,7 +34,7 @@ public final class ThreadPoolExecutorFactory {
/**
* 线程池对象
*/
private static volatile ThreadPoolExecutor threadPoolExecutor = null;
private static volatile ScheduledThreadPoolExecutor threadPoolExecutor = null;
/**
* 构造方法私有化
@ -47,32 +45,17 @@ public final class ThreadPoolExecutorFactory {
}
}
/**
* 重写readResolve方法
*/
private Object readResolve() {
//重写readResolve方法,防止序列化破坏单例
return ThreadPoolExecutorFactory.getThreadPoolExecutor();
}
/**
* 双检锁创建线程安全的单例
*/
public static ThreadPoolExecutor getThreadPoolExecutor() {
public static ScheduledThreadPoolExecutor getThreadPoolExecutor() {
if (null == threadPoolExecutor) {
synchronized (ThreadPoolExecutorFactory.class) {
if (null == threadPoolExecutor) {
threadPoolExecutor = new ThreadPoolExecutor(
threadPoolExecutor = new ScheduledThreadPoolExecutor(
//核心线程数
CORE_POOL_SIZE,
//最大线程数,包含临时线程
MAX_IMUM_POOL_SIZE,
//临时线程的存活时间
KEEP_ALIVE_TIME,
//时间单位(毫秒)
TimeUnit.MILLISECONDS,
//等待队列
new LinkedBlockingQueue<>(QUEUE_SIZE),
//拒绝策略
new ThreadPoolExecutor.CallerRunsPolicy()
);
@ -85,13 +68,13 @@ public final class ThreadPoolExecutorFactory {
/**
* 关闭线程池
*/
public void shutDown() {
public static void shutDown() {
if (threadPoolExecutor != null) {
threadPoolExecutor.shutdown();
}
}
public void execute(Runnable runnable) {
public static void execute(Runnable runnable) {
if (runnable == null) {
return;
}

13
im-platform/src/main/java/com/bx/implatform/IMPlatformApp.java

@ -1,25 +1,18 @@
package com.bx.implatform;
import cn.hutool.core.util.StrUtil;
import com.bx.implatform.contant.RedisKey;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@Slf4j
@EnableAspectJAutoProxy(exposeProxy = true)
@ComponentScan(basePackages = {"com.bx"})
@MapperScan(basePackages = {"com.bx.implatform.mapper"})
@SpringBootApplication(exclude = {SecurityAutoConfiguration.class})// 禁用secrity
public class IMPlatformApp {

13
im-platform/src/main/java/com/bx/implatform/contant/Constant.java

@ -2,20 +2,21 @@ package com.bx.implatform.contant;
public final class Constant {
private Constant() {
}
/**
* 系统用户id
*/
public static final Long SYS_USER_ID = 0L;
/**
* 最大图片上传大小
*/
public static final long MAX_IMAGE_SIZE = 20 * 1024 * 1024;
public static final Long MAX_IMAGE_SIZE = 20 * 1024 * 1024L;
/**
* 最大上传文件大小
*/
public static final long MAX_FILE_SIZE = 20 * 1024 * 1024;
public static final Long MAX_FILE_SIZE = 20 * 1024 * 1024L;
/**
* 群聊最大人数
*/
public static final long MAX_GROUP_MEMBER = 500;
public static final Long MAX_GROUP_MEMBER = 500L;
}

31
im-platform/src/main/java/com/bx/implatform/contant/RedisKey.java

@ -18,31 +18,38 @@ public final class RedisKey {
* webrtc 群通话
*/
public static final String IM_WEBRTC_GROUP_SESSION = "im:webrtc:group:session";
/**
* 用户被封禁消息队列
*/
public static final String IM_QUEUE_USER_BANNED = "im:queue:user:banned";
/**
* 群聊被封禁消息队列
*/
public static final String IM_QUEUE_GROUP_BANNED = "im:queue:group:banned";
/**
* 缓存前缀
* 群聊解封消息队列
*/
public static final String IM_CACHE = "im:cache:";
public static final String IM_QUEUE_GROUP_UNBAN = "im:queue:user:unban";
/**
* 缓存是否好友bool
*/
public static final String IM_CACHE_FRIEND = IM_CACHE + "friend";
public static final String IM_CACHE_FRIEND = "im:cache:friend";
/**
* 缓存群聊信息
*/
public static final String IM_CACHE_GROUP = IM_CACHE + "group";
public static final String IM_CACHE_GROUP = "im:cache:group";
/**
* 缓存群聊成员id
*/
public static final String IM_CACHE_GROUP_MEMBER_ID = IM_CACHE + "group_member_ids";
/**
* 分布式锁前缀
*/
public static final String IM_LOCK = "im:lock:";
public static final String IM_CACHE_GROUP_MEMBER_ID = "im:cache:group_member_ids";
/**
* 分布式锁前缀
*/
public static final String IM_LOCK_RTC_GROUP = IM_LOCK + "rtc:group";
public static final String IM_LOCK_RTC_GROUP = "im:lock:rtc:group";
}

21
im-platform/src/main/java/com/bx/implatform/dto/GroupBanDTO.java

@ -0,0 +1,21 @@
package com.bx.implatform.dto;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
/**
* @author: Blue
* @date: 2024-07-14
* @version: 1.0
*/
@Data
@ApiModel(description = "群组封禁")
public class GroupBanDTO {
@ApiModelProperty(value = "群组id")
private Long id;
@ApiModelProperty(value = "封禁原因")
private String reason;
}

19
im-platform/src/main/java/com/bx/implatform/dto/GroupUnbanDTO.java

@ -0,0 +1,19 @@
package com.bx.implatform.dto;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
/**
* @author: Blue
* @date: 2024-07-14
* @version: 1.0
*/
@Data
@ApiModel(description = "群组解锁")
public class GroupUnbanDTO {
@ApiModelProperty(value = "群组id")
private Long id;
}

22
im-platform/src/main/java/com/bx/implatform/dto/UserBanDTO.java

@ -0,0 +1,22 @@
package com.bx.implatform.dto;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
/**
* @author: Blue
* @date: 2024-07-14
* @version: 1.0
*/
@Data
@ApiModel("用户锁定DTO")
public class UserBanDTO {
@ApiModelProperty(value = "用户id")
private Long id;
@ApiModelProperty(value = "锁定原因")
private String reason;
}

59
im-platform/src/main/java/com/bx/implatform/enums/MessageType.java

@ -9,6 +9,7 @@ import lombok.AllArgsConstructor;
* 20-29: 提示类消息: 在会话中间显示的提示
* 30-39: UI交互类消息: 显示加载状态等
* 40-49: 操作交互类消息: 语音通话视频通话消息等
* 50-60: 后台操作类消息: 用户封禁群组封禁等
* 100-199: 单人语音通话rtc信令
* 200-299: 多人语音通话rtc信令
*
@ -16,62 +17,22 @@ import lombok.AllArgsConstructor;
@AllArgsConstructor
public enum MessageType {
/**
* 文字
*/
TEXT(0, "文字"),
/**
* 图片
*/
IMAGE(1, "图片"),
/**
* 文件
*/
FILE(2, "文件"),
/**
* 音频
*/
AUDIO(3, "音频"),
/**
* 视频
*/
VIDEO(4, "视频"),
/**
* 撤回
*/
TEXT(0, "文字消息"),
IMAGE(1, "图片消息"),
FILE(2, "文件消息"),
AUDIO(3, "语音消息"),
VIDEO(4, "视频消息"),
RECALL(10, "撤回"),
/**
* 已读
*/
READED(11, "已读"),
/**
* 消息已读回执
*/
RECEIPT(12, "消息已读回执"),
/**
* 时间提示
*/
TIP_TIME(20,"时间提示"),
/**
* 文字提示
*/
TIP_TEXT(21,"文字提示"),
/**
* 消息加载标记
*/
LOADING(30,"加载中"),
/**
* 语音通话提示
*/
LOADING(30,"加载中标记"),
ACT_RT_VOICE(40,"语音通话"),
/**
* 视频通话提示
*/
ACT_RT_VIDEO(41,"视频通话"),
USER_BANNED(50,"用户封禁"),
GROUP_BANNED(51,"群聊封禁"),
GROUP_UNBAN(52,"群聊解封"),
RTC_CALL_VOICE(100, "语音呼叫"),
RTC_CALL_VIDEO(101, "视频呼叫"),
RTC_ACCEPT(102, "接受"),

50
im-platform/src/main/java/com/bx/implatform/task/UserBannedConsumerTask.java

@ -0,0 +1,50 @@
package com.bx.implatform.task;
import com.bx.imclient.IMClient;
import com.bx.imcommon.enums.IMTerminalType;
import com.bx.imcommon.model.IMPrivateMessage;
import com.bx.imcommon.model.IMUserInfo;
import com.bx.imcommon.mq.RedisMQConsumer;
import com.bx.imcommon.mq.RedisMQListener;
import com.bx.implatform.contant.Constant;
import com.bx.implatform.contant.RedisKey;
import com.bx.implatform.dto.UserBanDTO;
import com.bx.implatform.service.IUserService;
import com.bx.implatform.util.BeanUtils;
import com.bx.implatform.vo.PrivateMessageVO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.security.core.parameters.P;
import org.springframework.stereotype.Component;
/**
* @author: 谢绍许
* @date: 2024-07-15
* @version: 1.0
*/
@Slf4j
@Component
@RequiredArgsConstructor
@RedisMQListener(queue = RedisKey.IM_QUEUE_USER_BANNED)
public class UserBannedConsumerTask extends RedisMQConsumer<UserBanDTO> {
private final IMClient imClient;
@Override
public void onMessage(UserBanDTO dto) {
log.info("用户被封禁处理,userId:{},原因:{}",dto.getId(),dto.getReason());
// 推送消息
PrivateMessageVO msgInfo = new PrivateMessageVO();
msgInfo.setRecvId(dto.getId());
msgInfo.setSendId(Constant.SYS_USER_ID);
msgInfo.setContent(dto.getReason());
IMPrivateMessage<PrivateMessageVO> sendMessage = new IMPrivateMessage<>();
sendMessage.setSender(new IMUserInfo(Constant.SYS_USER_ID, IMTerminalType.WEB.code()));
sendMessage.setRecvId(dto.getId());
sendMessage.setSendToSelf(false);
sendMessage.setData(msgInfo);
sendMessage.setSendResult(true);
imClient.sendPrivateMessage(sendMessage);
}
}

12
im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java

@ -9,11 +9,13 @@ import org.springframework.boot.CommandLineRunner;
import javax.annotation.PreDestroy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
public abstract class AbstractPullMessageTask implements CommandLineRunner {
private static final ExecutorService EXECUTOR_SERVICE = ThreadPoolExecutorFactory.getThreadPoolExecutor();
private static final ScheduledThreadPoolExecutor EXECUTOR_SERVICE = ThreadPoolExecutorFactory.getThreadPoolExecutor();
@Autowired
private IMServerGroup serverGroup;
@ -32,18 +34,12 @@ public abstract class AbstractPullMessageTask implements CommandLineRunner {
log.error("任务调度异常", e);
}
if (!EXECUTOR_SERVICE.isShutdown()) {
Thread.sleep(100);
EXECUTOR_SERVICE.execute(this);
EXECUTOR_SERVICE.schedule(this,100, TimeUnit.MICROSECONDS);
}
}
});
}
@PreDestroy
public void destroy() {
log.info("{}线程任务关闭", this.getClass().getSimpleName());
EXECUTOR_SERVICE.shutdown();
}
public abstract void pullMessage() throws InterruptedException;
}

Loading…
Cancel
Save