From 8fa8efc051f39fe37300e34a16abd24e74a833ab Mon Sep 17 00:00:00 2001 From: xsx <825657193@qq.com> Date: Tue, 16 Jul 2024 21:18:11 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=B0=86redis=E5=B0=81=E8=A3=85?= =?UTF-8?q?=E6=88=90MQ?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../resources/db/db.sql => db/im-platfrom.sql | 5 +- im-client/pom.xml | 5 - .../main/java/com/bx/imclient/IMClient.java | 2 + .../com/bx/imclient/config/RedisConfig.java | 31 ----- .../java/com/bx/imclient/sender/IMSender.java | 28 ++--- .../task/AbstractMessageResultTask.java | 15 +-- .../task/GroupMessageResultResultTask.java | 15 ++- .../task/PrivateMessageResultResultTask.java | 15 ++- im-commom/pom.xml | 5 + .../com/bx/imcommon/mq/RedisMQConfig.java | 33 ++++++ .../com/bx/imcommon/mq/RedisMQConsumer.java | 13 +++ .../com/bx/imcommon/mq/RedisMQListener.java | 29 +++++ .../com/bx/imcommon/mq/RedisMQPullTask.java | 109 ++++++++++++++++++ .../com/bx/imcommon/mq/RedisMQTemplate.java | 42 +++++++ .../util/ThreadPoolExecutorFactory.java | 29 +---- .../java/com/bx/implatform/IMPlatformApp.java | 13 +-- .../com/bx/implatform/contant/Constant.java | 13 ++- .../com/bx/implatform/contant/RedisKey.java | 31 +++-- .../com/bx/implatform/dto/GroupBanDTO.java | 21 ++++ .../com/bx/implatform/dto/GroupUnbanDTO.java | 19 +++ .../com/bx/implatform/dto/UserBanDTO.java | 22 ++++ .../com/bx/implatform/enums/MessageType.java | 59 ++-------- .../task/UserBannedConsumerTask.java | 50 ++++++++ .../task/AbstractPullMessageTask.java | 12 +- 24 files changed, 430 insertions(+), 186 deletions(-) rename im-platform/src/main/resources/db/db.sql => db/im-platfrom.sql (94%) delete mode 100644 im-client/src/main/java/com/bx/imclient/config/RedisConfig.java create mode 100644 im-commom/src/main/java/com/bx/imcommon/mq/RedisMQConfig.java create mode 100644 im-commom/src/main/java/com/bx/imcommon/mq/RedisMQConsumer.java create mode 100644 im-commom/src/main/java/com/bx/imcommon/mq/RedisMQListener.java create mode 100644 im-commom/src/main/java/com/bx/imcommon/mq/RedisMQPullTask.java create mode 100644 im-commom/src/main/java/com/bx/imcommon/mq/RedisMQTemplate.java create mode 100644 im-platform/src/main/java/com/bx/implatform/dto/GroupBanDTO.java create mode 100644 im-platform/src/main/java/com/bx/implatform/dto/GroupUnbanDTO.java create mode 100644 im-platform/src/main/java/com/bx/implatform/dto/UserBanDTO.java create mode 100644 im-platform/src/main/java/com/bx/implatform/task/UserBannedConsumerTask.java diff --git a/im-platform/src/main/resources/db/db.sql b/db/im-platfrom.sql similarity index 94% rename from im-platform/src/main/resources/db/db.sql rename to db/im-platfrom.sql index 1ea4b86..a5b44a9 100644 --- a/im-platform/src/main/resources/db/db.sql +++ b/db/im-platfrom.sql @@ -7,6 +7,8 @@ create table `im_user`( `head_image_thumb` varchar(255) default '' comment '用户头像缩略图', `password` varchar(255) not null comment '密码(明文)', `sex` tinyint(1) default 0 comment '性别 0:男 1:女', + `is_banned` tinyint(1) default 0 comment '是否被封禁 0:否 1:是', + `reason` varchar(255) comment '被封禁原因', `type` smallint default 1 comment '用户类型 1:普通用户 2:审核账户', `signature` varchar(1024) default '' comment '个性签名', `last_login_time` datetime DEFAULT null comment '最后登录时间', @@ -45,7 +47,8 @@ create table `im_group`( `head_image` varchar(255) default '' comment '群头像', `head_image_thumb` varchar(255) default '' comment '群头像缩略图', `notice` varchar(1024) default '' comment '群公告', - `remark` varchar(255) default '' comment '群备注', + `is_banned` tinyint(1) default 0 comment '是否被封禁 0:否 1:是', + `reason` varchar(255) comment '被封禁原因', `deleted` tinyint(1) default 0 comment '是否已删除', `created_time` datetime default CURRENT_TIMESTAMP comment '创建时间' )ENGINE=InnoDB CHARSET=utf8mb3 comment '群'; diff --git a/im-client/pom.xml b/im-client/pom.xml index 320a3cb..774bafb 100644 --- a/im-client/pom.xml +++ b/im-client/pom.xml @@ -17,10 +17,5 @@ im-commom 2.0.0 - - - org.springframework.boot - spring-boot-starter-data-redis - \ No newline at end of file diff --git a/im-client/src/main/java/com/bx/imclient/IMClient.java b/im-client/src/main/java/com/bx/imclient/IMClient.java index e94f0b8..7a476b1 100644 --- a/im-client/src/main/java/com/bx/imclient/IMClient.java +++ b/im-client/src/main/java/com/bx/imclient/IMClient.java @@ -65,4 +65,6 @@ public class IMClient { } + + } diff --git a/im-client/src/main/java/com/bx/imclient/config/RedisConfig.java b/im-client/src/main/java/com/bx/imclient/config/RedisConfig.java deleted file mode 100644 index 5ef996c..0000000 --- a/im-client/src/main/java/com/bx/imclient/config/RedisConfig.java +++ /dev/null @@ -1,31 +0,0 @@ -package com.bx.imclient.config; - -import com.alibaba.fastjson.support.spring.FastJsonRedisSerializer; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.data.redis.connection.RedisConnectionFactory; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.data.redis.serializer.StringRedisSerializer; - -@Configuration("IMRedisConfig") -public class RedisConfig { - - @Bean("IMRedisTemplate") - public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) { - RedisTemplate redisTemplate = new RedisTemplate(); - redisTemplate.setConnectionFactory(redisConnectionFactory); - // 设置值(value)的序列化采用FastJsonRedisSerializer - redisTemplate.setValueSerializer(fastJsonRedisSerializer()); - redisTemplate.setHashValueSerializer(fastJsonRedisSerializer()); - // 设置键(key)的序列化采用StringRedisSerializer。 - redisTemplate.setKeySerializer(new StringRedisSerializer()); - redisTemplate.setHashKeySerializer(new StringRedisSerializer()); - redisTemplate.afterPropertiesSet(); - return redisTemplate; - } - - public FastJsonRedisSerializer fastJsonRedisSerializer(){ - return new FastJsonRedisSerializer<>(Object.class); - } - -} diff --git a/im-client/src/main/java/com/bx/imclient/sender/IMSender.java b/im-client/src/main/java/com/bx/imclient/sender/IMSender.java index 1e0a7f6..2a4fb88 100644 --- a/im-client/src/main/java/com/bx/imclient/sender/IMSender.java +++ b/im-client/src/main/java/com/bx/imclient/sender/IMSender.java @@ -8,20 +8,20 @@ import com.bx.imcommon.enums.IMListenerType; import com.bx.imcommon.enums.IMSendCode; import com.bx.imcommon.enums.IMTerminalType; import com.bx.imcommon.model.*; +import com.bx.imcommon.mq.RedisMQTemplate; import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; -import javax.annotation.Resource; import java.util.*; @Service @RequiredArgsConstructor public class IMSender { - @Resource(name="IMRedisTemplate") - private RedisTemplate redisTemplate; + @Autowired + private RedisMQTemplate redisMQTemplate; @Value("${spring.application.name}") private String appName; @@ -34,7 +34,7 @@ public class IMSender { for (Integer terminal : message.getRecvTerminals()) { // 获取对方连接的channelId String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, message.getRecvId().toString(), terminal.toString()); - Integer serverId = (Integer)redisTemplate.opsForValue().get(key); + Integer serverId = (Integer)redisMQTemplate.opsForValue().get(key); // 如果对方在线,将数据存储至redis,等待拉取推送 if (serverId != null) { String sendKey = String.join(":", IMRedisKey.IM_MESSAGE_PRIVATE_QUEUE, serverId.toString()); @@ -45,7 +45,7 @@ public class IMSender { recvInfo.setSender(message.getSender()); recvInfo.setReceivers(Collections.singletonList(new IMUserInfo(message.getRecvId(), terminal))); recvInfo.setData(message.getData()); - redisTemplate.opsForList().rightPush(sendKey, recvInfo); + redisMQTemplate.opsForList().rightPush(sendKey, recvInfo); } else { IMSendResult result = new IMSendResult(); result.setSender(message.getSender()); @@ -65,7 +65,7 @@ public class IMSender { } // 获取终端连接的channelId String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, message.getSender().getId().toString(), terminal.toString()); - Integer serverId = (Integer)redisTemplate.opsForValue().get(key); + Integer serverId = (Integer)redisMQTemplate.opsForValue().get(key); // 如果终端在线,将数据存储至redis,等待拉取推送 if (serverId != null) { String sendKey = String.join(":", IMRedisKey.IM_MESSAGE_PRIVATE_QUEUE, serverId.toString()); @@ -76,7 +76,7 @@ public class IMSender { recvInfo.setSender(message.getSender()); recvInfo.setReceivers(Collections.singletonList(new IMUserInfo(message.getSender().getId(), terminal))); recvInfo.setData(message.getData()); - redisTemplate.opsForList().rightPush(sendKey, recvInfo); + redisMQTemplate.opsForList().rightPush(sendKey, recvInfo); } } } @@ -97,7 +97,7 @@ public class IMSender { }); } // 批量拉取 - List serverIds = redisTemplate.opsForValue().multiGet(sendMap.keySet()); + List serverIds = redisMQTemplate.opsForValue().multiGet(sendMap.keySet()); // 格式:map<服务器id,list<接收方>> Map> serverMap = new HashMap<>(); List offLineUsers = new LinkedList<>(); @@ -123,7 +123,7 @@ public class IMSender { recvInfo.setData(message.getData()); // 推送至队列 String key = String.join(":", IMRedisKey.IM_MESSAGE_GROUP_QUEUE, entry.getKey().toString()); - redisTemplate.opsForList().rightPush(key, recvInfo); + redisMQTemplate.opsForList().rightPush(key, recvInfo); } // 推送给自己的其他终端 @@ -134,7 +134,7 @@ public class IMSender { } // 获取终端连接的channelId String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, message.getSender().getId().toString(), terminal.toString()); - Integer serverId = (Integer)redisTemplate.opsForValue().get(key); + Integer serverId = (Integer)redisMQTemplate.opsForValue().get(key); // 如果终端在线,将数据存储至redis,等待拉取推送 if (serverId != null) { IMRecvInfo recvInfo = new IMRecvInfo(); @@ -145,7 +145,7 @@ public class IMSender { recvInfo.setSendResult(false); recvInfo.setData(message.getData()); String sendKey = String.join(":", IMRedisKey.IM_MESSAGE_GROUP_QUEUE, serverId.toString()); - redisTemplate.opsForList().rightPush(sendKey, recvInfo); + redisMQTemplate.opsForList().rightPush(sendKey, recvInfo); } } } @@ -178,7 +178,7 @@ public class IMSender { } } // 批量拉取 - List serverIds = redisTemplate.opsForValue().multiGet(userMap.keySet()); + List serverIds = redisMQTemplate.opsForValue().multiGet(userMap.keySet()); int idx = 0; Map> onlineMap = new HashMap<>(); for (Map.Entry entry : userMap.entrySet()) { @@ -195,7 +195,7 @@ public class IMSender { public Boolean isOnline(Long userId) { String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, userId.toString(), "*"); - return !Objects.requireNonNull(redisTemplate.keys(key)).isEmpty(); + return !Objects.requireNonNull(redisMQTemplate.keys(key)).isEmpty(); } public List getOnlineUser(List userIds){ diff --git a/im-client/src/main/java/com/bx/imclient/task/AbstractMessageResultTask.java b/im-client/src/main/java/com/bx/imclient/task/AbstractMessageResultTask.java index 154f2f3..001ffd2 100644 --- a/im-client/src/main/java/com/bx/imclient/task/AbstractMessageResultTask.java +++ b/im-client/src/main/java/com/bx/imclient/task/AbstractMessageResultTask.java @@ -5,13 +5,13 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.CommandLineRunner; -import javax.annotation.PreDestroy; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; @Slf4j public abstract class AbstractMessageResultTask implements CommandLineRunner { - private static final ExecutorService EXECUTOR_SERVICE = ThreadPoolExecutorFactory.getThreadPoolExecutor(); + private static final ScheduledThreadPoolExecutor EXECUTOR_SERVICE = ThreadPoolExecutorFactory.getThreadPoolExecutor(); @Override public void run(String... args) { @@ -26,19 +26,12 @@ public abstract class AbstractMessageResultTask implements CommandLineRunner { log.error("任务调度异常", e); } if (!EXECUTOR_SERVICE.isShutdown()) { - Thread.sleep(100); - EXECUTOR_SERVICE.execute(this); + EXECUTOR_SERVICE.schedule(this,100, TimeUnit.MICROSECONDS); } } }); } - @PreDestroy - public void destroy() { - log.info("{}线程任务关闭", this.getClass().getSimpleName()); - EXECUTOR_SERVICE.shutdown(); - } - public abstract void pullMessage() throws InterruptedException; } diff --git a/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java b/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java index 400220d..d9fa639 100644 --- a/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java +++ b/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java @@ -6,12 +6,11 @@ import com.bx.imclient.listener.MessageListenerMulticaster; import com.bx.imcommon.contant.IMRedisKey; import com.bx.imcommon.enums.IMListenerType; import com.bx.imcommon.model.IMSendResult; +import com.bx.imcommon.mq.RedisMQTemplate; import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; - -import javax.annotation.Resource; import java.util.LinkedList; import java.util.List; import java.util.Objects; @@ -21,8 +20,8 @@ import java.util.Objects; @RequiredArgsConstructor public class GroupMessageResultResultTask extends AbstractMessageResultTask { - @Resource(name = "IMRedisTemplate") - private RedisTemplate redisTemplate; + @Autowired + private RedisMQTemplate redisMQTemplate; @Value("${spring.application.name}") private String appName; @@ -46,12 +45,12 @@ public class GroupMessageResultResultTask extends AbstractMessageResultTask { List loadBatch() { String key = StrUtil.join(":", IMRedisKey.IM_RESULT_GROUP_QUEUE, appName); //这个接口redis6.2以上才支持 - //List list = redisTemplate.opsForList().leftPop(key, batchSize); + //List list = redisMQTemplate.opsForList().leftPop(key, batchSize); List results = new LinkedList<>(); - JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); + JSONObject jsonObject = (JSONObject) redisMQTemplate.opsForList().leftPop(key); while (!Objects.isNull(jsonObject) && results.size() < batchSize) { results.add(jsonObject.toJavaObject(IMSendResult.class)); - jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); + jsonObject = (JSONObject) redisMQTemplate.opsForList().leftPop(key); } return results; } diff --git a/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java b/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java index 2451db1..ac7355c 100644 --- a/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java +++ b/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java @@ -6,13 +6,12 @@ import com.bx.imclient.listener.MessageListenerMulticaster; import com.bx.imcommon.contant.IMRedisKey; import com.bx.imcommon.enums.IMListenerType; import com.bx.imcommon.model.IMSendResult; +import com.bx.imcommon.mq.RedisMQTemplate; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; - -import javax.annotation.Resource; import java.util.LinkedList; import java.util.List; import java.util.Objects; @@ -22,8 +21,8 @@ import java.util.Objects; @RequiredArgsConstructor public class PrivateMessageResultResultTask extends AbstractMessageResultTask { - @Resource(name = "IMRedisTemplate") - private RedisTemplate redisTemplate; + @Autowired + private RedisMQTemplate redisMQTemplate; @Value("${spring.application.name}") private String appName; @@ -47,12 +46,12 @@ public class PrivateMessageResultResultTask extends AbstractMessageResultTask { List loadBatch() { String key = StrUtil.join(":", IMRedisKey.IM_RESULT_PRIVATE_QUEUE, appName); //这个接口redis6.2以上才支持 - //List list = redisTemplate.opsForList().leftPop(key, batchSize); + //List list = redisMQTemplate.opsForList().leftPop(key, batchSize); List results = new LinkedList<>(); - JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); + JSONObject jsonObject = (JSONObject) redisMQTemplate.opsForList().leftPop(key); while (!Objects.isNull(jsonObject) && results.size() < batchSize) { results.add(jsonObject.toJavaObject(IMSendResult.class)); - jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); + jsonObject = (JSONObject) redisMQTemplate.opsForList().leftPop(key); } return results; } diff --git a/im-commom/pom.xml b/im-commom/pom.xml index 43fb929..61b263c 100644 --- a/im-commom/pom.xml +++ b/im-commom/pom.xml @@ -65,5 +65,10 @@ slf4j-api 1.7.36 + + + org.springframework.boot + spring-boot-starter-data-redis + \ No newline at end of file diff --git a/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQConfig.java b/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQConfig.java new file mode 100644 index 0000000..24ba4ba --- /dev/null +++ b/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQConfig.java @@ -0,0 +1,33 @@ +package com.bx.imcommon.mq; + +import com.alibaba.fastjson.support.spring.FastJsonRedisSerializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.serializer.StringRedisSerializer; + + +@Configuration +public class RedisMQConfig { + + @Bean + public RedisMQTemplate redisMQTemplate(RedisConnectionFactory redisConnectionFactory) { + RedisMQTemplate redisMQTemplate = new RedisMQTemplate(); + redisMQTemplate.setConnectionFactory(redisConnectionFactory); + // 设置值(value)的序列化采用FastJsonRedisSerializer + redisMQTemplate.setValueSerializer(fastJsonRedisSerializer()); + redisMQTemplate.setHashValueSerializer(fastJsonRedisSerializer()); + // 设置键(key)的序列化采用StringRedisSerializer。 + redisMQTemplate.setKeySerializer(new StringRedisSerializer()); + redisMQTemplate.setHashKeySerializer(new StringRedisSerializer()); + redisMQTemplate.afterPropertiesSet(); + return redisMQTemplate; + } + + @Bean + public FastJsonRedisSerializer fastJsonRedisSerializer(){ + return new FastJsonRedisSerializer<>(Object.class); + } + +} diff --git a/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQConsumer.java b/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQConsumer.java new file mode 100644 index 0000000..b9dd6bc --- /dev/null +++ b/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQConsumer.java @@ -0,0 +1,13 @@ +package com.bx.imcommon.mq; + +import java.util.List; + +/** + * redis 队列消费者抽象类 + */ +public abstract class RedisMQConsumer { + + public void onMessage(T data){} + + public void onMessage(List datas){} +} diff --git a/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQListener.java b/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQListener.java new file mode 100644 index 0000000..eb5fd1e --- /dev/null +++ b/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQListener.java @@ -0,0 +1,29 @@ +package com.bx.imcommon.mq; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * redis 队列消费监听注解 + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface RedisMQListener { + + /** + * 队列,也是redis的key + */ + String queue(); + + /** + * 一次性拉取的数据数量 + */ + int batchSize() default 1; + + /** + * 拉取间隔周期,单位:ms + */ + int period() default 100; +} diff --git a/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQPullTask.java b/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQPullTask.java new file mode 100644 index 0000000..e0a3700 --- /dev/null +++ b/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQPullTask.java @@ -0,0 +1,109 @@ +package com.bx.imcommon.mq; + +import com.alibaba.fastjson.JSONObject; + +import com.bx.imcommon.util.ThreadPoolExecutorFactory; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; +import javax.annotation.Resource; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.*; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * reids 队列拉取定时任务 + * + * @author: Blue + * @date: 2024-07-15 + * @version: 1.0 + */ +@Slf4j +@Component +public class RedisMQPullTask implements CommandLineRunner { + + private static final ScheduledThreadPoolExecutor EXECUTOR_SERVICE = + ThreadPoolExecutorFactory.getThreadPoolExecutor(); + + @Autowired(required = false) + private List consumers = Collections.emptyList(); + + @Autowired + private RedisMQTemplate redisMQTemplate; + + @Override + public void run(String... args) { + consumers.forEach((consumer -> { + // 注解参数 + RedisMQListener annotation = consumer.getClass().getAnnotation(RedisMQListener.class); + String key = annotation.queue(); + int batchSize = annotation.batchSize(); + int period = annotation.period(); + // 获取泛型类型 + Type superClass = consumer.getClass().getGenericSuperclass(); + Type type = ((ParameterizedType)superClass).getActualTypeArguments()[0]; + EXECUTOR_SERVICE.execute(new Runnable() { + @Override + public void run() { + List datas = new LinkedList<>(); + try { + // 拉取一个批次的数据 + List objects = pullBatch(key, batchSize); + for (Object obj : objects) { + if (obj instanceof JSONObject) { + JSONObject jsonObject = (JSONObject)obj; + Object data = jsonObject.toJavaObject(type); + consumer.onMessage(data); + datas.add(data); + } + } + if(!datas.isEmpty()){ + consumer.onMessage(datas); + } + } catch (Exception e) { + log.error("数据消费异常,队列:{}", key, e); + } + // 继续消费数据 + if (!EXECUTOR_SERVICE.isShutdown()) { + if (datas.size() < batchSize) { + // 数据已经消费完,等待下一个周期继续拉取 + EXECUTOR_SERVICE.schedule(this, period, TimeUnit.MICROSECONDS); + } else { + // 数据没有消费完,直接开启下一个消费周期 + EXECUTOR_SERVICE.execute(this); + } + } + } + }); + })); + } + + private List pullBatch(String key, Integer batchSize) { + List objects = new LinkedList<>(); + if (redisMQTemplate.isSupportBatchPull()) { + // 版本大于6.2,支持批量拉取 + objects = redisMQTemplate.opsForList().leftPop(key, 100); + } else { + // 版本小于6.2,只能逐条拉取 + Object obj = redisMQTemplate.opsForList().leftPop(key); + while (!Objects.isNull(obj) && objects.size() < batchSize) { + objects.add(obj); + obj = redisMQTemplate.opsForList().leftPop(key); + } + } + return objects; + } + + @PreDestroy + public void destory(){ + ThreadPoolExecutorFactory.shutDown(); + } +} diff --git a/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQTemplate.java b/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQTemplate.java new file mode 100644 index 0000000..f966579 --- /dev/null +++ b/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQTemplate.java @@ -0,0 +1,42 @@ +package com.bx.imcommon.mq; + +import cn.hutool.core.util.StrUtil; +import org.apache.logging.log4j.util.Strings; +import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.core.RedisTemplate; + +import java.util.Properties; + +/** + * @author: 谢绍许 + * @date: 2024-07-16 + * @version: 1.0 + */ +public class RedisMQTemplate extends RedisTemplate { + + private String version = Strings.EMPTY; + + public String getVersion() { + if (version.isEmpty()) { + RedisConnection redisConnection = this.getConnectionFactory().getConnection(); + Properties properties = redisConnection.info(); + version = properties.getProperty("redis_version"); + } + return version; + } + + /** + * 是否支持批量拉取,redis版本大于6.2支持批量拉取 + * @return + */ + Boolean isSupportBatchPull() { + String version = getVersion(); + String[] arr = version.split("\\."); + if (arr.length < 2) { + return false; + } + Integer firVersion = Integer.valueOf(arr[0]); + Integer secVersion = Integer.valueOf(arr[1]); + return firVersion > 6 || (firVersion == 6 && secVersion >= 2); + } +} diff --git a/im-commom/src/main/java/com/bx/imcommon/util/ThreadPoolExecutorFactory.java b/im-commom/src/main/java/com/bx/imcommon/util/ThreadPoolExecutorFactory.java index d07e0f4..e2cdd6d 100644 --- a/im-commom/src/main/java/com/bx/imcommon/util/ThreadPoolExecutorFactory.java +++ b/im-commom/src/main/java/com/bx/imcommon/util/ThreadPoolExecutorFactory.java @@ -2,9 +2,7 @@ package com.bx.imcommon.util; import lombok.extern.slf4j.Slf4j; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * 创建单例线程池 @@ -36,7 +34,7 @@ public final class ThreadPoolExecutorFactory { /** * 线程池对象 */ - private static volatile ThreadPoolExecutor threadPoolExecutor = null; + private static volatile ScheduledThreadPoolExecutor threadPoolExecutor = null; /** * 构造方法私有化 @@ -47,32 +45,17 @@ public final class ThreadPoolExecutorFactory { } } - /** - * 重写readResolve方法 - */ - private Object readResolve() { - //重写readResolve方法,防止序列化破坏单例 - return ThreadPoolExecutorFactory.getThreadPoolExecutor(); - } /** * 双检锁创建线程安全的单例 */ - public static ThreadPoolExecutor getThreadPoolExecutor() { + public static ScheduledThreadPoolExecutor getThreadPoolExecutor() { if (null == threadPoolExecutor) { synchronized (ThreadPoolExecutorFactory.class) { if (null == threadPoolExecutor) { - threadPoolExecutor = new ThreadPoolExecutor( + threadPoolExecutor = new ScheduledThreadPoolExecutor( //核心线程数 CORE_POOL_SIZE, - //最大线程数,包含临时线程 - MAX_IMUM_POOL_SIZE, - //临时线程的存活时间 - KEEP_ALIVE_TIME, - //时间单位(毫秒) - TimeUnit.MILLISECONDS, - //等待队列 - new LinkedBlockingQueue<>(QUEUE_SIZE), //拒绝策略 new ThreadPoolExecutor.CallerRunsPolicy() ); @@ -85,13 +68,13 @@ public final class ThreadPoolExecutorFactory { /** * 关闭线程池 */ - public void shutDown() { + public static void shutDown() { if (threadPoolExecutor != null) { threadPoolExecutor.shutdown(); } } - public void execute(Runnable runnable) { + public static void execute(Runnable runnable) { if (runnable == null) { return; } diff --git a/im-platform/src/main/java/com/bx/implatform/IMPlatformApp.java b/im-platform/src/main/java/com/bx/implatform/IMPlatformApp.java index e2aabee..ecb30a4 100644 --- a/im-platform/src/main/java/com/bx/implatform/IMPlatformApp.java +++ b/im-platform/src/main/java/com/bx/implatform/IMPlatformApp.java @@ -1,25 +1,18 @@ package com.bx.implatform; -import cn.hutool.core.util.StrUtil; -import com.bx.implatform.contant.RedisKey; import lombok.extern.slf4j.Slf4j; import org.mybatis.spring.annotation.MapperScan; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration; +import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.EnableAspectJAutoProxy; -import org.springframework.data.redis.core.RedisTemplate; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.Set; + @Slf4j @EnableAspectJAutoProxy(exposeProxy = true) +@ComponentScan(basePackages = {"com.bx"}) @MapperScan(basePackages = {"com.bx.implatform.mapper"}) @SpringBootApplication(exclude = {SecurityAutoConfiguration.class})// 禁用secrity public class IMPlatformApp { diff --git a/im-platform/src/main/java/com/bx/implatform/contant/Constant.java b/im-platform/src/main/java/com/bx/implatform/contant/Constant.java index 6d23dfc..08e4125 100644 --- a/im-platform/src/main/java/com/bx/implatform/contant/Constant.java +++ b/im-platform/src/main/java/com/bx/implatform/contant/Constant.java @@ -2,20 +2,21 @@ package com.bx.implatform.contant; public final class Constant { - private Constant() { - } - + /** + * 系统用户id + */ + public static final Long SYS_USER_ID = 0L; /** * 最大图片上传大小 */ - public static final long MAX_IMAGE_SIZE = 20 * 1024 * 1024; + public static final Long MAX_IMAGE_SIZE = 20 * 1024 * 1024L; /** * 最大上传文件大小 */ - public static final long MAX_FILE_SIZE = 20 * 1024 * 1024; + public static final Long MAX_FILE_SIZE = 20 * 1024 * 1024L; /** * 群聊最大人数 */ - public static final long MAX_GROUP_MEMBER = 500; + public static final Long MAX_GROUP_MEMBER = 500L; } diff --git a/im-platform/src/main/java/com/bx/implatform/contant/RedisKey.java b/im-platform/src/main/java/com/bx/implatform/contant/RedisKey.java index 026e3c9..f6fbdd3 100644 --- a/im-platform/src/main/java/com/bx/implatform/contant/RedisKey.java +++ b/im-platform/src/main/java/com/bx/implatform/contant/RedisKey.java @@ -18,31 +18,38 @@ public final class RedisKey { * webrtc 群通话 */ public static final String IM_WEBRTC_GROUP_SESSION = "im:webrtc:group:session"; + + /** + * 用户被封禁消息队列 + */ + public static final String IM_QUEUE_USER_BANNED = "im:queue:user:banned"; + + /** + * 群聊被封禁消息队列 + */ + public static final String IM_QUEUE_GROUP_BANNED = "im:queue:group:banned"; + /** - * 缓存前缀 + * 群聊解封消息队列 */ - public static final String IM_CACHE = "im:cache:"; + public static final String IM_QUEUE_GROUP_UNBAN = "im:queue:user:unban"; + + /** * 缓存是否好友:bool */ - public static final String IM_CACHE_FRIEND = IM_CACHE + "friend"; + public static final String IM_CACHE_FRIEND = "im:cache:friend"; /** * 缓存群聊信息 */ - public static final String IM_CACHE_GROUP = IM_CACHE + "group"; + public static final String IM_CACHE_GROUP = "im:cache:group"; /** * 缓存群聊成员id */ - public static final String IM_CACHE_GROUP_MEMBER_ID = IM_CACHE + "group_member_ids"; - - /** - * 分布式锁前缀 - */ - public static final String IM_LOCK = "im:lock:"; - + public static final String IM_CACHE_GROUP_MEMBER_ID = "im:cache:group_member_ids"; /** * 分布式锁前缀 */ - public static final String IM_LOCK_RTC_GROUP = IM_LOCK + "rtc:group"; + public static final String IM_LOCK_RTC_GROUP = "im:lock:rtc:group"; } diff --git a/im-platform/src/main/java/com/bx/implatform/dto/GroupBanDTO.java b/im-platform/src/main/java/com/bx/implatform/dto/GroupBanDTO.java new file mode 100644 index 0000000..ec36509 --- /dev/null +++ b/im-platform/src/main/java/com/bx/implatform/dto/GroupBanDTO.java @@ -0,0 +1,21 @@ +package com.bx.implatform.dto; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +/** + * @author: Blue + * @date: 2024-07-14 + * @version: 1.0 + */ +@Data +@ApiModel(description = "群组封禁") +public class GroupBanDTO { + + @ApiModelProperty(value = "群组id") + private Long id; + + @ApiModelProperty(value = "封禁原因") + private String reason; +} diff --git a/im-platform/src/main/java/com/bx/implatform/dto/GroupUnbanDTO.java b/im-platform/src/main/java/com/bx/implatform/dto/GroupUnbanDTO.java new file mode 100644 index 0000000..7a9a7fd --- /dev/null +++ b/im-platform/src/main/java/com/bx/implatform/dto/GroupUnbanDTO.java @@ -0,0 +1,19 @@ +package com.bx.implatform.dto; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +/** + * @author: Blue + * @date: 2024-07-14 + * @version: 1.0 + */ +@Data +@ApiModel(description = "群组解锁") +public class GroupUnbanDTO { + + @ApiModelProperty(value = "群组id") + private Long id; + +} diff --git a/im-platform/src/main/java/com/bx/implatform/dto/UserBanDTO.java b/im-platform/src/main/java/com/bx/implatform/dto/UserBanDTO.java new file mode 100644 index 0000000..38571f9 --- /dev/null +++ b/im-platform/src/main/java/com/bx/implatform/dto/UserBanDTO.java @@ -0,0 +1,22 @@ +package com.bx.implatform.dto; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +/** + * @author: Blue + * @date: 2024-07-14 + * @version: 1.0 + */ +@Data +@ApiModel("用户锁定DTO") +public class UserBanDTO { + + @ApiModelProperty(value = "用户id") + private Long id; + + @ApiModelProperty(value = "锁定原因") + private String reason; + +} diff --git a/im-platform/src/main/java/com/bx/implatform/enums/MessageType.java b/im-platform/src/main/java/com/bx/implatform/enums/MessageType.java index 21537d4..7a69c2c 100644 --- a/im-platform/src/main/java/com/bx/implatform/enums/MessageType.java +++ b/im-platform/src/main/java/com/bx/implatform/enums/MessageType.java @@ -9,6 +9,7 @@ import lombok.AllArgsConstructor; * 20-29: 提示类消息: 在会话中间显示的提示 * 30-39: UI交互类消息: 显示加载状态等 * 40-49: 操作交互类消息: 语音通话、视频通话消息等 + * 50-60: 后台操作类消息: 用户封禁、群组封禁等 * 100-199: 单人语音通话rtc信令 * 200-299: 多人语音通话rtc信令 * @@ -16,62 +17,22 @@ import lombok.AllArgsConstructor; @AllArgsConstructor public enum MessageType { - /** - * 文字 - */ - TEXT(0, "文字"), - /** - * 图片 - */ - IMAGE(1, "图片"), - /** - * 文件 - */ - FILE(2, "文件"), - /** - * 音频 - */ - AUDIO(3, "音频"), - /** - * 视频 - */ - VIDEO(4, "视频"), - - /** - * 撤回 - */ + TEXT(0, "文字消息"), + IMAGE(1, "图片消息"), + FILE(2, "文件消息"), + AUDIO(3, "语音消息"), + VIDEO(4, "视频消息"), RECALL(10, "撤回"), - /** - * 已读 - */ READED(11, "已读"), - - /** - * 消息已读回执 - */ RECEIPT(12, "消息已读回执"), - /** - * 时间提示 - */ TIP_TIME(20,"时间提示"), - /** - * 文字提示 - */ TIP_TEXT(21,"文字提示"), - /** - * 消息加载标记 - */ - LOADING(30,"加载中"), - - /** - * 语音通话提示 - */ + LOADING(30,"加载中标记"), ACT_RT_VOICE(40,"语音通话"), - /** - * 视频通话提示 - */ ACT_RT_VIDEO(41,"视频通话"), - + USER_BANNED(50,"用户封禁"), + GROUP_BANNED(51,"群聊封禁"), + GROUP_UNBAN(52,"群聊解封"), RTC_CALL_VOICE(100, "语音呼叫"), RTC_CALL_VIDEO(101, "视频呼叫"), RTC_ACCEPT(102, "接受"), diff --git a/im-platform/src/main/java/com/bx/implatform/task/UserBannedConsumerTask.java b/im-platform/src/main/java/com/bx/implatform/task/UserBannedConsumerTask.java new file mode 100644 index 0000000..31f61b8 --- /dev/null +++ b/im-platform/src/main/java/com/bx/implatform/task/UserBannedConsumerTask.java @@ -0,0 +1,50 @@ +package com.bx.implatform.task; + +import com.bx.imclient.IMClient; +import com.bx.imcommon.enums.IMTerminalType; +import com.bx.imcommon.model.IMPrivateMessage; +import com.bx.imcommon.model.IMUserInfo; +import com.bx.imcommon.mq.RedisMQConsumer; +import com.bx.imcommon.mq.RedisMQListener; +import com.bx.implatform.contant.Constant; +import com.bx.implatform.contant.RedisKey; +import com.bx.implatform.dto.UserBanDTO; +import com.bx.implatform.service.IUserService; +import com.bx.implatform.util.BeanUtils; +import com.bx.implatform.vo.PrivateMessageVO; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.security.core.parameters.P; +import org.springframework.stereotype.Component; + +/** + * @author: 谢绍许 + * @date: 2024-07-15 + * @version: 1.0 + */ +@Slf4j +@Component +@RequiredArgsConstructor +@RedisMQListener(queue = RedisKey.IM_QUEUE_USER_BANNED) +public class UserBannedConsumerTask extends RedisMQConsumer { + + private final IMClient imClient; + @Override + public void onMessage(UserBanDTO dto) { + + log.info("用户被封禁处理,userId:{},原因:{}",dto.getId(),dto.getReason()); + // 推送消息 + + PrivateMessageVO msgInfo = new PrivateMessageVO(); + msgInfo.setRecvId(dto.getId()); + msgInfo.setSendId(Constant.SYS_USER_ID); + msgInfo.setContent(dto.getReason()); + IMPrivateMessage sendMessage = new IMPrivateMessage<>(); + sendMessage.setSender(new IMUserInfo(Constant.SYS_USER_ID, IMTerminalType.WEB.code())); + sendMessage.setRecvId(dto.getId()); + sendMessage.setSendToSelf(false); + sendMessage.setData(msgInfo); + sendMessage.setSendResult(true); + imClient.sendPrivateMessage(sendMessage); + } +} diff --git a/im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java b/im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java index 6752869..905c5fc 100644 --- a/im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java +++ b/im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java @@ -9,11 +9,13 @@ import org.springframework.boot.CommandLineRunner; import javax.annotation.PreDestroy; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; @Slf4j public abstract class AbstractPullMessageTask implements CommandLineRunner { - private static final ExecutorService EXECUTOR_SERVICE = ThreadPoolExecutorFactory.getThreadPoolExecutor(); + private static final ScheduledThreadPoolExecutor EXECUTOR_SERVICE = ThreadPoolExecutorFactory.getThreadPoolExecutor(); @Autowired private IMServerGroup serverGroup; @@ -32,18 +34,12 @@ public abstract class AbstractPullMessageTask implements CommandLineRunner { log.error("任务调度异常", e); } if (!EXECUTOR_SERVICE.isShutdown()) { - Thread.sleep(100); - EXECUTOR_SERVICE.execute(this); + EXECUTOR_SERVICE.schedule(this,100, TimeUnit.MICROSECONDS); } } }); } - @PreDestroy - public void destroy() { - log.info("{}线程任务关闭", this.getClass().getSimpleName()); - EXECUTOR_SERVICE.shutdown(); - } public abstract void pullMessage() throws InterruptedException; }