From def6d68d2666f9d36f787896b5e3363f8f6da774 Mon Sep 17 00:00:00 2001 From: xsx <825657193@qq.com> Date: Sat, 28 Sep 2024 18:59:39 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/com/bx/imserver/netty/IMChannelHandler.java | 4 ++-- .../main/java/com/bx/imserver/netty/IMServerGroup.java | 7 +++---- .../bx/imserver/netty/processor/HeartbeatProcessor.java | 8 ++++---- .../com/bx/imserver/netty/processor/LoginProcessor.java | 6 +++--- 4 files changed, 12 insertions(+), 13 deletions(-) diff --git a/im-server/src/main/java/com/bx/imserver/netty/IMChannelHandler.java b/im-server/src/main/java/com/bx/imserver/netty/IMChannelHandler.java index a932093..298f1c3 100644 --- a/im-server/src/main/java/com/bx/imserver/netty/IMChannelHandler.java +++ b/im-server/src/main/java/com/bx/imserver/netty/IMChannelHandler.java @@ -3,6 +3,7 @@ package com.bx.imserver.netty; import com.bx.imcommon.contant.IMRedisKey; import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.model.IMSendInfo; +import com.bx.imcommon.mq.RedisMQTemplate; import com.bx.imserver.constant.ChannelAttrKey; import com.bx.imserver.netty.processor.AbstractMessageProcessor; import com.bx.imserver.netty.processor.ProcessorFactory; @@ -13,7 +14,6 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.AttributeKey; import lombok.extern.slf4j.Slf4j; -import org.springframework.data.redis.core.RedisTemplate; /** * WebSocket 长连接下 文本帧的处理器 @@ -71,7 +71,7 @@ public class IMChannelHandler extends SimpleChannelInboundHandler { // 移除channel UserChannelCtxMap.removeChannelCtx(userId, terminal); // 用户下线 - RedisTemplate redisTemplate = SpringContextHolder.getBean(RedisTemplate.class); + RedisMQTemplate redisTemplate = SpringContextHolder.getBean(RedisMQTemplate.class); String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, userId.toString(), terminal.toString()); redisTemplate.delete(key); log.info("断开连接,userId:{},终端类型:{},{}", userId, terminal, ctx.channel().id().asLongText()); diff --git a/im-server/src/main/java/com/bx/imserver/netty/IMServerGroup.java b/im-server/src/main/java/com/bx/imserver/netty/IMServerGroup.java index 1498d18..8c5a36a 100644 --- a/im-server/src/main/java/com/bx/imserver/netty/IMServerGroup.java +++ b/im-server/src/main/java/com/bx/imserver/netty/IMServerGroup.java @@ -1,11 +1,11 @@ package com.bx.imserver.netty; import com.bx.imcommon.contant.IMRedisKey; +import com.bx.imcommon.mq.RedisMQTemplate; import jakarta.annotation.PreDestroy; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.CommandLineRunner; -import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.util.List; @@ -17,14 +17,13 @@ public class IMServerGroup implements CommandLineRunner { public static volatile long serverId = 0; - private final RedisTemplate redisTemplate; + private final RedisMQTemplate redisMQTemplate; private final List imServers; /*** * 判断服务器是否就绪 * - * @return **/ public boolean isReady() { for (IMServer imServer : imServers) { @@ -39,7 +38,7 @@ public class IMServerGroup implements CommandLineRunner { public void run(String... args) { // 初始化SERVER_ID String key = IMRedisKey.IM_MAX_SERVER_ID; - serverId = redisTemplate.opsForValue().increment(key, 1); + serverId = redisMQTemplate.opsForValue().increment(key, 1); // 启动服务 for (IMServer imServer : imServers) { imServer.start(); diff --git a/im-server/src/main/java/com/bx/imserver/netty/processor/HeartbeatProcessor.java b/im-server/src/main/java/com/bx/imserver/netty/processor/HeartbeatProcessor.java index 275a271..be90c40 100644 --- a/im-server/src/main/java/com/bx/imserver/netty/processor/HeartbeatProcessor.java +++ b/im-server/src/main/java/com/bx/imserver/netty/processor/HeartbeatProcessor.java @@ -6,12 +6,12 @@ import com.bx.imcommon.contant.IMRedisKey; import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.model.IMHeartbeatInfo; import com.bx.imcommon.model.IMSendInfo; +import com.bx.imcommon.mq.RedisMQTemplate; import com.bx.imserver.constant.ChannelAttrKey; import io.netty.channel.ChannelHandlerContext; import io.netty.util.AttributeKey; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.util.HashMap; @@ -22,12 +22,12 @@ import java.util.concurrent.TimeUnit; @RequiredArgsConstructor public class HeartbeatProcessor extends AbstractMessageProcessor { - private final RedisTemplate redisTemplate; + private final RedisMQTemplate redisMQTemplate; @Override public void process(ChannelHandlerContext ctx, IMHeartbeatInfo beatInfo) { // 响应ws - IMSendInfo sendInfo = new IMSendInfo(); + IMSendInfo sendInfo = new IMSendInfo<>(); sendInfo.setCmd(IMCmdType.HEART_BEAT.code()); ctx.channel().writeAndFlush(sendInfo); // 设置属性 @@ -41,7 +41,7 @@ public class HeartbeatProcessor extends AbstractMessageProcessor terminalAttr = AttributeKey.valueOf(ChannelAttrKey.TERMINAL_TYPE); Integer terminal = ctx.channel().attr(terminalAttr).get(); String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, userId.toString(), terminal.toString()); - redisTemplate.expire(key, IMConstant.ONLINE_TIMEOUT_SECOND, TimeUnit.SECONDS); + redisMQTemplate.expire(key, IMConstant.ONLINE_TIMEOUT_SECOND, TimeUnit.SECONDS); } AttributeKey userIdAttr = AttributeKey.valueOf(ChannelAttrKey.USER_ID); Long userId = ctx.channel().attr(userIdAttr).get(); diff --git a/im-server/src/main/java/com/bx/imserver/netty/processor/LoginProcessor.java b/im-server/src/main/java/com/bx/imserver/netty/processor/LoginProcessor.java index 780fe19..c3ef039 100644 --- a/im-server/src/main/java/com/bx/imserver/netty/processor/LoginProcessor.java +++ b/im-server/src/main/java/com/bx/imserver/netty/processor/LoginProcessor.java @@ -8,6 +8,7 @@ import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.model.IMLoginInfo; import com.bx.imcommon.model.IMSendInfo; import com.bx.imcommon.model.IMSessionInfo; +import com.bx.imcommon.mq.RedisMQTemplate; import com.bx.imcommon.util.JwtUtil; import com.bx.imserver.constant.ChannelAttrKey; import com.bx.imserver.netty.IMServerGroup; @@ -17,7 +18,6 @@ import io.netty.util.AttributeKey; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; -import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.util.HashMap; @@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit; @RequiredArgsConstructor public class LoginProcessor extends AbstractMessageProcessor { - private final RedisTemplate redisTemplate; + private final RedisMQTemplate redisMQTemplate; @Value("${jwt.accessToken.secret}") private String accessTokenSecret; @@ -66,7 +66,7 @@ public class LoginProcessor extends AbstractMessageProcessor { ctx.channel().attr(heartBeatAttr).set(0L); // 在redis上记录每个user的channelId,15秒没有心跳,则自动过期 String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, userId.toString(), terminal.toString()); - redisTemplate.opsForValue().set(key, IMServerGroup.serverId, IMConstant.ONLINE_TIMEOUT_SECOND, TimeUnit.SECONDS); + redisMQTemplate.opsForValue().set(key, IMServerGroup.serverId, IMConstant.ONLINE_TIMEOUT_SECOND, TimeUnit.SECONDS); // 响应ws IMSendInfo sendInfo = new IMSendInfo<>(); sendInfo.setCmd(IMCmdType.LOGIN.code());