diff --git a/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQPullTask.java b/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQPullTask.java index ba51bc5..a2ecded 100644 --- a/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQPullTask.java +++ b/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQPullTask.java @@ -52,11 +52,6 @@ public class RedisMQPullTask implements CommandLineRunner { public void run() { List datas = new LinkedList<>(); try { - if(redisMQTemplate.isClose()){ - // 如果redis未初始化或已断开,3s后再重新尝试消费 - EXECUTOR.schedule(this, 3, TimeUnit.SECONDS); - return; - } if (consumer.isReady()) { String key = consumer.generateKey(); // 拉取一个批次的数据 @@ -75,6 +70,8 @@ public class RedisMQPullTask implements CommandLineRunner { } } catch (Exception e) { log.error("数据消费异常,队列:{}", queue, e); + // 出现异常,10s后再重新尝试消费 + EXECUTOR.schedule(this, 10, TimeUnit.SECONDS); return; } // 继续消费数据 diff --git a/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQTemplate.java b/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQTemplate.java index c98a036..dbb927b 100644 --- a/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQTemplate.java +++ b/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQTemplate.java @@ -2,10 +2,9 @@ package com.bx.imcommon.mq; import org.apache.logging.log4j.util.Strings; import org.springframework.data.redis.connection.RedisConnection; -import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.core.RedisConnectionUtils; import org.springframework.data.redis.core.RedisTemplate; -import java.util.Objects; import java.util.Properties; /** @@ -19,9 +18,10 @@ public class RedisMQTemplate extends RedisTemplate { public String getVersion() { if (version.isEmpty()) { - RedisConnection redisConnection = this.getConnectionFactory().getConnection(); - Properties properties = redisConnection.info(); + RedisConnection connection = RedisConnectionUtils.getConnection(getConnectionFactory()); + Properties properties = connection.info(); version = properties.getProperty("redis_version"); + RedisConnectionUtils.releaseConnection(connection,getConnectionFactory()); } return version; } @@ -41,12 +41,4 @@ public class RedisMQTemplate extends RedisTemplate { return firVersion > 6 || (firVersion == 6 && secVersion >= 2); } - - Boolean isClose(){ - try { - return getConnectionFactory().getConnection().isClosed(); - }catch (Exception e){ - return true; - } - } } diff --git a/im-platform/src/main/java/com/bx/implatform/config/TaskSchedulerConfig.java b/im-platform/src/main/java/com/bx/implatform/config/TaskSchedulerConfig.java new file mode 100644 index 0000000..ed14e09 --- /dev/null +++ b/im-platform/src/main/java/com/bx/implatform/config/TaskSchedulerConfig.java @@ -0,0 +1,27 @@ +package com.bx.implatform.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; + +/** + * @author: Blue + * @date: 2024-09-01 + * @version: 1.0 + */ + +@EnableScheduling +@Configuration +public class TaskSchedulerConfig { + + @Bean + public TaskScheduler taskScheduler() { + ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); + taskScheduler.setPoolSize(10); // 设置线程池大小 + taskScheduler.setThreadNamePrefix("scheduled-task-"); + taskScheduler.initialize(); + return taskScheduler; + } +} diff --git a/im-platform/src/main/java/com/bx/implatform/task/GroupBannedConsumerTask.java b/im-platform/src/main/java/com/bx/implatform/task/consumer/GroupBannedConsumerTask.java similarity index 98% rename from im-platform/src/main/java/com/bx/implatform/task/GroupBannedConsumerTask.java rename to im-platform/src/main/java/com/bx/implatform/task/consumer/GroupBannedConsumerTask.java index baacfba..1fc7f57 100644 --- a/im-platform/src/main/java/com/bx/implatform/task/GroupBannedConsumerTask.java +++ b/im-platform/src/main/java/com/bx/implatform/task/consumer/GroupBannedConsumerTask.java @@ -1,4 +1,4 @@ -package com.bx.implatform.task; +package com.bx.implatform.task.consumer; import com.bx.imclient.IMClient; import com.bx.imcommon.enums.IMTerminalType; diff --git a/im-platform/src/main/java/com/bx/implatform/task/GroupUnbanConsumerTask.java b/im-platform/src/main/java/com/bx/implatform/task/consumer/GroupUnbanConsumerTask.java similarity index 98% rename from im-platform/src/main/java/com/bx/implatform/task/GroupUnbanConsumerTask.java rename to im-platform/src/main/java/com/bx/implatform/task/consumer/GroupUnbanConsumerTask.java index a5adda0..e2c99c2 100644 --- a/im-platform/src/main/java/com/bx/implatform/task/GroupUnbanConsumerTask.java +++ b/im-platform/src/main/java/com/bx/implatform/task/consumer/GroupUnbanConsumerTask.java @@ -1,4 +1,4 @@ -package com.bx.implatform.task; +package com.bx.implatform.task.consumer; import com.bx.imclient.IMClient; import com.bx.imcommon.enums.IMTerminalType; diff --git a/im-platform/src/main/java/com/bx/implatform/task/UserBannedConsumerTask.java b/im-platform/src/main/java/com/bx/implatform/task/consumer/UserBannedConsumerTask.java similarity index 97% rename from im-platform/src/main/java/com/bx/implatform/task/UserBannedConsumerTask.java rename to im-platform/src/main/java/com/bx/implatform/task/consumer/UserBannedConsumerTask.java index 4b792fb..c84b986 100644 --- a/im-platform/src/main/java/com/bx/implatform/task/UserBannedConsumerTask.java +++ b/im-platform/src/main/java/com/bx/implatform/task/consumer/UserBannedConsumerTask.java @@ -1,4 +1,4 @@ -package com.bx.implatform.task; +package com.bx.implatform.task.consumer; import com.bx.imclient.IMClient; import com.bx.imcommon.model.IMSystemMessage; diff --git a/im-platform/src/main/java/com/bx/implatform/task/schedule/ReloadSensitiveWordTask.java b/im-platform/src/main/java/com/bx/implatform/task/schedule/ReloadSensitiveWordTask.java new file mode 100644 index 0000000..2ff15ac --- /dev/null +++ b/im-platform/src/main/java/com/bx/implatform/task/schedule/ReloadSensitiveWordTask.java @@ -0,0 +1,26 @@ +package com.bx.implatform.task.schedule; + +import com.bx.implatform.util.SensitiveFilterUtil; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +/** + * @author: Blue + * @date: 2024-09-01 + * @version: 1.0 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class ReloadSensitiveWordTask { + + private final SensitiveFilterUtil sensitiveFilterUtil; + + @Scheduled(fixedRate = 60000) + public void run() { + log.info("【定时任务】重新装载敏感词..."); + sensitiveFilterUtil.reload(); + } +} diff --git a/im-platform/src/main/java/com/bx/implatform/util/SensitiveFilterUtil.java b/im-platform/src/main/java/com/bx/implatform/util/SensitiveFilterUtil.java index 0491c14..c3e83d7 100644 --- a/im-platform/src/main/java/com/bx/implatform/util/SensitiveFilterUtil.java +++ b/im-platform/src/main/java/com/bx/implatform/util/SensitiveFilterUtil.java @@ -14,7 +14,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; /** * 敏感词过滤器——SensitiveFilter @@ -35,7 +34,7 @@ public final class SensitiveFilterUtil { /** * 根节点 */ - private static final TrieNode ROOT_NODE = new TrieNode(); + private static TrieNode ROOT_NODE = new TrieNode(); /** * 线程池 @@ -86,41 +85,41 @@ public final class SensitiveFilterUtil { * @date 2023/12/4 11:18 */ @PostConstruct - public void init() { - // 每120s装载一次敏感词 - EXECUTOR_SERVICE.scheduleAtFixedRate(() -> { - List keywords = sensitiveWordService.findAllEnabledWords(); - keywords.forEach(keyword->{ - if(StrUtil.isNotEmpty(keyword)){ - // 添加到前缀树 - addKeyword(keyword); - } - }); - },0,120, TimeUnit.SECONDS); + public void reload() { + // 使用copy on write的方式,防止出现并发问题 + TrieNode newNode = new TrieNode(); + List keywords = sensitiveWordService.findAllEnabledWords(); + keywords.forEach(keyword -> { + if (StrUtil.isNotEmpty(keyword)) { + // 添加到前缀树 + addKeyword(newNode,keyword); + } + }); + ROOT_NODE = newNode; } /** * 3、将一个敏感词添加到前缀树中 * + * @param node * @param keyword * @author NXY * @date 2023/12/4 11:15 */ - private void addKeyword(String keyword) { - TrieNode tempNode = ROOT_NODE; + private void addKeyword(TrieNode node, String keyword) { for (int i = 0; i < keyword.length(); i++) { char c = keyword.charAt(i); - TrieNode subNode = tempNode.getSubNode(c); + TrieNode subNode = node.getSubNode(c); if (subNode == null) { // 初始化子节点 subNode = new TrieNode(); - tempNode.addSubNode(c, subNode); + node.addSubNode(c, subNode); } // 指向子节点,进入下一轮循环 - tempNode = subNode; + node = subNode; // 设置结束标识 if (i == keyword.length() - 1) { - tempNode.setKeywordEnd(true); + node.setKeywordEnd(true); } } } @@ -195,9 +194,9 @@ public final class SensitiveFilterUtil { /** * 判断是否为符号 ——特殊符号 * + * @return boolean * @author NXY * @date 2023/12/4 11:17 - * @return boolean */ private boolean isSymbol(Character c) { // 0x2E80~0x9FFF 是东亚文字范围 diff --git a/im-server/src/main/java/com/bx/imserver/netty/processor/ProcessorFactory.java b/im-server/src/main/java/com/bx/imserver/netty/processor/ProcessorFactory.java index 117334c..f68e0f9 100644 --- a/im-server/src/main/java/com/bx/imserver/netty/processor/ProcessorFactory.java +++ b/im-server/src/main/java/com/bx/imserver/netty/processor/ProcessorFactory.java @@ -6,27 +6,14 @@ import com.bx.imserver.util.SpringContextHolder; public class ProcessorFactory { public static AbstractMessageProcessor createProcessor(IMCmdType cmd) { - AbstractMessageProcessor processor = null; - switch (cmd) { - case LOGIN: - processor = SpringContextHolder.getApplicationContext().getBean(LoginProcessor.class); - break; - case HEART_BEAT: - processor = SpringContextHolder.getApplicationContext().getBean(HeartbeatProcessor.class); - break; - case PRIVATE_MESSAGE: - processor = SpringContextHolder.getApplicationContext().getBean(PrivateMessageProcessor.class); - break; - case GROUP_MESSAGE: - processor = SpringContextHolder.getApplicationContext().getBean(GroupMessageProcessor.class); - break; - case SYSTEM_MESSAGE: - processor = SpringContextHolder.getApplicationContext().getBean(SystemMessageProcessor.class); - break; - default: - break; - } - return processor; + return switch (cmd) { + case LOGIN->SpringContextHolder.getApplicationContext().getBean(LoginProcessor.class); + case HEART_BEAT -> SpringContextHolder.getApplicationContext().getBean(HeartbeatProcessor.class); + case PRIVATE_MESSAGE->SpringContextHolder.getApplicationContext().getBean(PrivateMessageProcessor.class); + case GROUP_MESSAGE->SpringContextHolder.getApplicationContext().getBean(GroupMessageProcessor.class); + case SYSTEM_MESSAGE->SpringContextHolder.getApplicationContext().getBean(SystemMessageProcessor.class); + default -> null; + }; } }