diff --git a/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQPullTask.java b/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQPullTask.java index feff73e..353dcaf 100644 --- a/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQPullTask.java +++ b/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQPullTask.java @@ -28,8 +28,7 @@ import java.util.concurrent.TimeUnit; @Component public class RedisMQPullTask implements CommandLineRunner { - private static final ScheduledThreadPoolExecutor EXECUTOR_SERVICE = - ThreadPoolExecutorFactory.getThreadPoolExecutor(); + private static final ScheduledThreadPoolExecutor EXECUTOR = ThreadPoolExecutorFactory.getThreadPoolExecutor(); @Autowired(required = false) private List consumers = Collections.emptyList(); @@ -48,12 +47,15 @@ public class RedisMQPullTask implements CommandLineRunner { // 获取泛型类型 Type superClass = consumer.getClass().getGenericSuperclass(); Type type = ((ParameterizedType)superClass).getActualTypeArguments()[0]; - EXECUTOR_SERVICE.execute(new Runnable() { + EXECUTOR.execute(new Runnable() { @Override public void run() { List datas = new LinkedList<>(); try { - if(consumer.isReady()){ + if(redisMQTemplate.isClose()){ + return; + } + if (consumer.isReady()) { String key = consumer.generateKey(); // 拉取一个批次的数据 List objects = pullBatch(key, batchSize); @@ -65,23 +67,22 @@ public class RedisMQPullTask implements CommandLineRunner { datas.add(data); } } - if(!datas.isEmpty()){ + if (!datas.isEmpty()) { consumer.onMessage(datas); } } } catch (Exception e) { log.error("数据消费异常,队列:{}", queue, e); - EXECUTOR_SERVICE.schedule(this, period, TimeUnit.MICROSECONDS); return; } // 继续消费数据 - if (!EXECUTOR_SERVICE.isShutdown()) { + if (!EXECUTOR.isShutdown()) { if (datas.size() < batchSize) { // 数据已经消费完,等待下一个周期继续拉取 - EXECUTOR_SERVICE.schedule(this, period, TimeUnit.MICROSECONDS); + EXECUTOR.schedule(this, period, TimeUnit.MICROSECONDS); } else { // 数据没有消费完,直接开启下一个消费周期 - EXECUTOR_SERVICE.execute(this); + EXECUTOR.execute(this); } } } @@ -106,7 +107,8 @@ public class RedisMQPullTask implements CommandLineRunner { } @PreDestroy - public void destory(){ + public void destory() { + log.info("消费线程停止..."); ThreadPoolExecutorFactory.shutDown(); } } diff --git a/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQTemplate.java b/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQTemplate.java index cc5f88b..c98a036 100644 --- a/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQTemplate.java +++ b/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQTemplate.java @@ -2,8 +2,10 @@ package com.bx.imcommon.mq; import org.apache.logging.log4j.util.Strings; import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; +import java.util.Objects; import java.util.Properties; /** @@ -38,4 +40,13 @@ public class RedisMQTemplate extends RedisTemplate { Integer secVersion = Integer.valueOf(arr[1]); return firVersion > 6 || (firVersion == 6 && secVersion >= 2); } + + + Boolean isClose(){ + try { + return getConnectionFactory().getConnection().isClosed(); + }catch (Exception e){ + return true; + } + } } diff --git a/im-platform/src/main/java/com/bx/implatform/entity/User.java b/im-platform/src/main/java/com/bx/implatform/entity/User.java index 6767630..ca292e6 100644 --- a/im-platform/src/main/java/com/bx/implatform/entity/User.java +++ b/im-platform/src/main/java/com/bx/implatform/entity/User.java @@ -84,9 +84,4 @@ public class User { */ private Integer type; - /** - * 客户端id,用于uni-push推送 - */ - private String cid; - } diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/UserServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/UserServiceImpl.java index 3b4dd3c..7ea154b 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/impl/UserServiceImpl.java +++ b/im-platform/src/main/java/com/bx/implatform/service/impl/UserServiceImpl.java @@ -108,7 +108,7 @@ public class UserServiceImpl extends ServiceImpl implements Us @Override public void register(RegisterDTO dto) { User user = this.findUserByUserName(dto.getUserName()); - if (Objects.isNull(user)) { + if (!Objects.isNull(user)) { throw new GlobalException(ResultCode.USERNAME_ALREADY_REGISTER); } user = BeanUtils.copyProperties(dto, User.class);