diff --git a/im-client/src/main/java/com/bx/imclient/sender/IMSender.java b/im-client/src/main/java/com/bx/imclient/sender/IMSender.java index b6b0de5..9aa66d2 100644 --- a/im-client/src/main/java/com/bx/imclient/sender/IMSender.java +++ b/im-client/src/main/java/com/bx/imclient/sender/IMSender.java @@ -69,11 +69,11 @@ public class IMSender { String key = RedisKey.IM_USER_SERVER_ID + id; Integer serverId = (Integer)redisTemplate.opsForValue().get(key); if(serverId != null){ - if(serverMap.containsKey(serverId)){ - serverMap.get(serverId).add(id); - }else { - // 此处需要加锁,否则list可以会被覆盖 - synchronized(serverMap){ + // 此处需要加锁,否则list可以会被覆盖 + synchronized(serverMap){ + if(serverMap.containsKey(serverId)){ + serverMap.get(serverId).add(id); + }else { List list = Collections.synchronizedList(new LinkedList()); list.add(id); serverMap.put(serverId,list); diff --git a/im-server/src/main/java/com/bx/imserver/IMServerApp.java b/im-server/src/main/java/com/bx/imserver/IMServerApp.java index 236d5a8..78574aa 100644 --- a/im-server/src/main/java/com/bx/imserver/IMServerApp.java +++ b/im-server/src/main/java/com/bx/imserver/IMServerApp.java @@ -1,9 +1,6 @@ package com.bx.imserver; -import com.bx.imserver.ws.WebsocketServer; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -18,11 +15,6 @@ import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication public class IMServerApp implements CommandLineRunner { - @Value("${websocket.port}") - private int port; - - @Autowired - private WebsocketServer WSServer; public static void main(String[] args) { SpringApplication.run(IMServerApp.class,args); @@ -30,6 +22,5 @@ public class IMServerApp implements CommandLineRunner { public void run(String... args) throws Exception { - WSServer.start(port); } } diff --git a/im-server/src/main/java/com/bx/imserver/constant/Constant.java b/im-server/src/main/java/com/bx/imserver/constant/Constant.java deleted file mode 100644 index e83e3e2..0000000 --- a/im-server/src/main/java/com/bx/imserver/constant/Constant.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.bx.imserver.constant; - -public class Constant { - - // public static String LOCAL_SERVER_ID = UUID.randomUUID().toString(); - - -} diff --git a/im-server/src/main/java/com/bx/imserver/ws/WebSocketHandler.java b/im-server/src/main/java/com/bx/imserver/netty/IMChannelHandler.java similarity index 88% rename from im-server/src/main/java/com/bx/imserver/ws/WebSocketHandler.java rename to im-server/src/main/java/com/bx/imserver/netty/IMChannelHandler.java index 9d76477..9a13f62 100644 --- a/im-server/src/main/java/com/bx/imserver/ws/WebSocketHandler.java +++ b/im-server/src/main/java/com/bx/imserver/netty/IMChannelHandler.java @@ -1,12 +1,11 @@ -package com.bx.imserver.ws; +package com.bx.imserver.netty; import com.bx.imcommon.contant.RedisKey; import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.model.IMSendInfo; -import com.bx.imserver.processor.MessageProcessor; -import com.bx.imserver.processor.ProcessorFactory; +import com.bx.imserver.netty.processor.MessageProcessor; +import com.bx.imserver.netty.processor.ProcessorFactory; import com.bx.imserver.util.SpringContextHolder; -import com.bx.imserver.util.UserChannelCtxHolder; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleState; @@ -22,7 +21,7 @@ import org.springframework.data.redis.core.RedisTemplate; * 浏览器连接状态监控 */ @Slf4j -public class WebSocketHandler extends SimpleChannelInboundHandler { +public class IMChannelHandler extends SimpleChannelInboundHandler { /** * 读取到消息后进行处理 @@ -67,11 +66,11 @@ public class WebSocketHandler extends SimpleChannelInboundHandler { public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { AttributeKey attr = AttributeKey.valueOf("USER_ID"); Long userId = ctx.channel().attr(attr).get(); - ChannelHandlerContext context = UserChannelCtxHolder.getChannelCtx(userId); + ChannelHandlerContext context = UserChannelCtxMap.getChannelCtx(userId); // 判断一下,避免异地登录导致的误删 if(context != null && ctx.channel().id().equals(context.channel().id())){ // 移除channel - UserChannelCtxHolder.removeChannelCtx(userId); + UserChannelCtxMap.removeChannelCtx(userId); // 用户下线 RedisTemplate redisTemplate = SpringContextHolder.getBean("redisTemplate"); String key = RedisKey.IM_USER_SERVER_ID + userId; diff --git a/im-server/src/main/java/com/bx/imserver/netty/IMServer.java b/im-server/src/main/java/com/bx/imserver/netty/IMServer.java new file mode 100644 index 0000000..286e356 --- /dev/null +++ b/im-server/src/main/java/com/bx/imserver/netty/IMServer.java @@ -0,0 +1,10 @@ +package com.bx.imserver.netty; + +public interface IMServer { + + boolean isReady(); + + void start(); + + void stop(); +} diff --git a/im-server/src/main/java/com/bx/imserver/netty/IMServerMap.java b/im-server/src/main/java/com/bx/imserver/netty/IMServerMap.java new file mode 100644 index 0000000..de6fff8 --- /dev/null +++ b/im-server/src/main/java/com/bx/imserver/netty/IMServerMap.java @@ -0,0 +1,43 @@ +package com.bx.imserver.netty; + +import com.bx.imcommon.contant.RedisKey; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; +import java.util.List; + +@Slf4j +@Component +public class IMServerMap implements CommandLineRunner { + + public static volatile long serverId = 0; + + @Autowired + RedisTemplate redisTemplate; + + @Autowired + private List imServers; + + @Override + public void run(String... args) throws Exception { + // 初始化SERVER_ID + String key = RedisKey.IM_MAX_SERVER_ID; + serverId = redisTemplate.opsForValue().increment(key,1); + // 启动服务 + for(IMServer imServer:imServers){ + imServer.start(); + } + } + + @PreDestroy + public void destroy(){ + // 停止服务 + for(IMServer imServer:imServers){ + imServer.stop(); + } + } +} diff --git a/im-server/src/main/java/com/bx/imserver/util/UserChannelCtxHolder.java b/im-server/src/main/java/com/bx/imserver/netty/UserChannelCtxMap.java similarity index 91% rename from im-server/src/main/java/com/bx/imserver/util/UserChannelCtxHolder.java rename to im-server/src/main/java/com/bx/imserver/netty/UserChannelCtxMap.java index e457e41..8f5de0a 100644 --- a/im-server/src/main/java/com/bx/imserver/util/UserChannelCtxHolder.java +++ b/im-server/src/main/java/com/bx/imserver/netty/UserChannelCtxMap.java @@ -1,4 +1,4 @@ -package com.bx.imserver.util; +package com.bx.imserver.netty; import io.netty.channel.ChannelHandlerContext; @@ -6,7 +6,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -public class UserChannelCtxHolder { +public class UserChannelCtxMap { /* * 维护userId和ctx的关联关系,格式:Map diff --git a/im-server/src/main/java/com/bx/imserver/processor/GroupMessageProcessor.java b/im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java similarity index 96% rename from im-server/src/main/java/com/bx/imserver/processor/GroupMessageProcessor.java rename to im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java index 5bf361f..b506551 100644 --- a/im-server/src/main/java/com/bx/imserver/processor/GroupMessageProcessor.java +++ b/im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java @@ -1,4 +1,4 @@ -package com.bx.imserver.processor; +package com.bx.imserver.netty.processor; import com.bx.imcommon.contant.RedisKey; import com.bx.imcommon.enums.IMCmdType; @@ -7,7 +7,7 @@ import com.bx.imcommon.model.GroupMessageInfo; import com.bx.imcommon.model.IMRecvInfo; import com.bx.imcommon.model.IMSendInfo; import com.bx.imcommon.model.SendResult; -import com.bx.imserver.util.UserChannelCtxHolder; +import com.bx.imserver.netty.UserChannelCtxMap; import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -32,7 +32,7 @@ public class GroupMessageProcessor extends MessageProcessor { @Autowired - private WebsocketServer WSServer; + private WebSocketServer WSServer; @Autowired RedisTemplate redisTemplate; diff --git a/im-server/src/main/java/com/bx/imserver/processor/LoginProcessor.java b/im-server/src/main/java/com/bx/imserver/netty/processor/LoginProcessor.java similarity index 80% rename from im-server/src/main/java/com/bx/imserver/processor/LoginProcessor.java rename to im-server/src/main/java/com/bx/imserver/netty/processor/LoginProcessor.java index c89f1f0..0285779 100644 --- a/im-server/src/main/java/com/bx/imserver/processor/LoginProcessor.java +++ b/im-server/src/main/java/com/bx/imserver/netty/processor/LoginProcessor.java @@ -1,4 +1,4 @@ -package com.bx.imserver.processor; +package com.bx.imserver.netty.processor; import cn.hutool.core.bean.BeanUtil; import com.bx.imcommon.contant.Constant; @@ -6,8 +6,9 @@ import com.bx.imcommon.contant.RedisKey; import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.model.IMSendInfo; import com.bx.imcommon.model.LoginInfo; -import com.bx.imserver.util.UserChannelCtxHolder; -import com.bx.imserver.ws.WebsocketServer; +import com.bx.imserver.netty.IMServerMap; +import com.bx.imserver.netty.UserChannelCtxMap; +import com.bx.imserver.netty.ws.WebSocketServer; import io.netty.channel.ChannelHandlerContext; import io.netty.util.AttributeKey; import lombok.extern.slf4j.Slf4j; @@ -24,7 +25,7 @@ public class LoginProcessor extends MessageProcessor { @Autowired - private WebsocketServer WSServer; + private WebSocketServer WSServer; @Autowired RedisTemplate redisTemplate; @@ -32,7 +33,7 @@ public class LoginProcessor extends MessageProcessor { @Override synchronized public void process(ChannelHandlerContext ctx, LoginInfo loginInfo) { log.info("用户登录,userId:{}",loginInfo.getUserId()); - ChannelHandlerContext context = UserChannelCtxHolder.getChannelCtx(loginInfo.getUserId()); + ChannelHandlerContext context = UserChannelCtxMap.getChannelCtx(loginInfo.getUserId()); if(context != null){ // 不允许多地登录,强制下线 IMSendInfo sendInfo = new IMSendInfo(); @@ -40,7 +41,7 @@ public class LoginProcessor extends MessageProcessor { context.channel().writeAndFlush(sendInfo); } // 绑定用户和channel - UserChannelCtxHolder.addChannelCtx(loginInfo.getUserId(),ctx); + UserChannelCtxMap.addChannelCtx(loginInfo.getUserId(),ctx); // 设置用户id属性 AttributeKey attr = AttributeKey.valueOf("USER_ID"); ctx.channel().attr(attr).set(loginInfo.getUserId()); @@ -49,7 +50,7 @@ public class LoginProcessor extends MessageProcessor { ctx.channel().attr(attr).set(0L); // 在redis上记录每个user的channelId,15秒没有心跳,则自动过期 String key = RedisKey.IM_USER_SERVER_ID+loginInfo.getUserId(); - redisTemplate.opsForValue().set(key, WSServer.getServerId(), Constant.ONLINE_TIMEOUT_SECOND, TimeUnit.SECONDS); + redisTemplate.opsForValue().set(key, IMServerMap.serverId, Constant.ONLINE_TIMEOUT_SECOND, TimeUnit.SECONDS); // 响应ws IMSendInfo sendInfo = new IMSendInfo(); sendInfo.setCmd(IMCmdType.LOGIN.code()); diff --git a/im-server/src/main/java/com/bx/imserver/processor/MessageProcessor.java b/im-server/src/main/java/com/bx/imserver/netty/processor/MessageProcessor.java similarity index 86% rename from im-server/src/main/java/com/bx/imserver/processor/MessageProcessor.java rename to im-server/src/main/java/com/bx/imserver/netty/processor/MessageProcessor.java index a5ebc5e..c600968 100644 --- a/im-server/src/main/java/com/bx/imserver/processor/MessageProcessor.java +++ b/im-server/src/main/java/com/bx/imserver/netty/processor/MessageProcessor.java @@ -1,4 +1,4 @@ -package com.bx.imserver.processor; +package com.bx.imserver.netty.processor; import io.netty.channel.ChannelHandlerContext; diff --git a/im-server/src/main/java/com/bx/imserver/processor/PrivateMessageProcessor.java b/im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java similarity index 94% rename from im-server/src/main/java/com/bx/imserver/processor/PrivateMessageProcessor.java rename to im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java index ce5f910..d40eb0e 100644 --- a/im-server/src/main/java/com/bx/imserver/processor/PrivateMessageProcessor.java +++ b/im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java @@ -1,4 +1,4 @@ -package com.bx.imserver.processor; +package com.bx.imserver.netty.processor; import com.bx.imcommon.contant.RedisKey; import com.bx.imcommon.enums.IMCmdType; @@ -7,7 +7,7 @@ import com.bx.imcommon.model.IMRecvInfo; import com.bx.imcommon.model.IMSendInfo; import com.bx.imcommon.model.PrivateMessageInfo; import com.bx.imcommon.model.SendResult; -import com.bx.imserver.util.UserChannelCtxHolder; +import com.bx.imserver.netty.UserChannelCtxMap; import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -27,7 +27,7 @@ public class PrivateMessageProcessor extends MessageProcessor() { + // 添加处理的Handler,通常包括消息编解码、业务处理,也可以是日志、权限、过滤等 + @Override + protected void initChannel(Channel ch) throws Exception { + // 获取职责链 + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new IdleStateHandler(15, 0, 0, TimeUnit.SECONDS)); + pipeline.addLast("encode",new MessageProtocolEncoder()); + pipeline.addLast("decode",new MessageProtocolDecoder()); + pipeline.addLast("handler", new IMChannelHandler()); + } + }) + // bootstrap 还可以设置TCP参数,根据需要可以分别设置主线程池和从线程池参数,来优化服务端性能。 + // 其中主线程池使用option方法来设置,从线程池使用childOption方法设置。 + // backlog表示主线程池中在套接口排队的最大数量,队列由未连接队列(三次握手未完成的)和已连接队列 + .option(ChannelOption.SO_BACKLOG, 5) + // 表示连接保活,相当于心跳机制,默认为7200s + .childOption(ChannelOption.SO_KEEPALIVE, true); + + try { + // 绑定端口,启动select线程,轮询监听channel事件,监听到事件之后就会交给从线程池处理 + Channel channel = bootstrap.bind(port).sync().channel(); + // 就绪标志 + this.ready = true; + log.info("tcp server 初始化完成,端口:{}",port); + // 等待服务端口关闭 + //channel.closeFuture().sync(); + } catch (InterruptedException e) { + log.info("tcp server 初始化异常",e); + } + } + + @Override + public void stop(){ + log.info("tcp server 停止"); + bossGroup.shutdownGracefully(); + workGroup.shutdownGracefully(); + this.ready = false; + } + + +} diff --git a/im-server/src/main/java/com/bx/imserver/netty/tcp/endecode/MessageProtocolDecoder.java b/im-server/src/main/java/com/bx/imserver/netty/tcp/endecode/MessageProtocolDecoder.java new file mode 100644 index 0000000..0414568 --- /dev/null +++ b/im-server/src/main/java/com/bx/imserver/netty/tcp/endecode/MessageProtocolDecoder.java @@ -0,0 +1,29 @@ +package com.bx.imserver.netty.tcp.endecode; + +import com.bx.imcommon.model.IMSendInfo; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ReplayingDecoder; +import io.netty.util.CharsetUtil; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +@Slf4j +public class MessageProtocolDecoder extends ReplayingDecoder { + + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { + if(byteBuf.readableBytes()< 4){ + return; + } + // 获取到包的长度 + long length=byteBuf.readLong(); + // 转成IMSendInfo + ByteBuf contentBuf = byteBuf.readBytes((int)length); + String content = contentBuf.toString(CharsetUtil.UTF_8); + ObjectMapper objectMapper = new ObjectMapper(); + IMSendInfo sendInfo = objectMapper.readValue(content, IMSendInfo.class); + list.add(sendInfo); + } +} diff --git a/im-server/src/main/java/com/bx/imserver/netty/tcp/endecode/MessageProtocolEncoder.java b/im-server/src/main/java/com/bx/imserver/netty/tcp/endecode/MessageProtocolEncoder.java new file mode 100644 index 0000000..eea9e3d --- /dev/null +++ b/im-server/src/main/java/com/bx/imserver/netty/tcp/endecode/MessageProtocolEncoder.java @@ -0,0 +1,22 @@ +package com.bx.imserver.netty.tcp.endecode; + +import com.bx.imcommon.model.IMSendInfo; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; + +public class MessageProtocolEncoder extends MessageToByteEncoder { + + @Override + protected void encode(ChannelHandlerContext channelHandlerContext, IMSendInfo sendInfo, ByteBuf byteBuf) throws Exception { + ObjectMapper objectMapper = new ObjectMapper(); + String content = objectMapper.writeValueAsString(sendInfo); + byte[] bytes = content.getBytes("UTF-8"); + // 写入长度 + byteBuf.writeLong(bytes.length); + // 写入命令体 + byteBuf.writeBytes(bytes); + } + +} diff --git a/im-server/src/main/java/com/bx/imserver/netty/ws/WebSocketServer.java b/im-server/src/main/java/com/bx/imserver/netty/ws/WebSocketServer.java new file mode 100644 index 0000000..ffe8dc8 --- /dev/null +++ b/im-server/src/main/java/com/bx/imserver/netty/ws/WebSocketServer.java @@ -0,0 +1,102 @@ +package com.bx.imserver.netty.ws; + +import com.bx.imserver.netty.IMChannelHandler; +import com.bx.imserver.netty.IMServer; +import com.bx.imserver.netty.ws.endecode.MessageProtocolDecoder; +import com.bx.imserver.netty.ws.endecode.MessageProtocolEncoder; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; +import io.netty.handler.stream.ChunkedWriteHandler; +import io.netty.handler.timeout.IdleStateHandler; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.util.concurrent.TimeUnit; + + +/** + * WS服务器,用于连接网页的客户端,协议格式: 直接IMSendInfo的JSON序列化 + * + * @author Blue + * @date 2022-11-20 + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "websocket", value = "enable", havingValue = "true",matchIfMissing = true) +public class WebSocketServer implements IMServer { + + @Value("${websocket.port}") + private int port; + + private volatile boolean ready = false; + + private ServerBootstrap bootstrap = new ServerBootstrap(); + private EventLoopGroup bossGroup = new NioEventLoopGroup(); + private EventLoopGroup workGroup = new NioEventLoopGroup(); + + + @Override + public boolean isReady(){ + return ready; + } + + @Override + public void start() { + // 设置为主从线程模型 + bootstrap.group(bossGroup, workGroup) + // 设置服务端NIO通信类型 + .channel(NioServerSocketChannel.class) + // 设置ChannelPipeline,也就是业务职责链,由处理的Handler串联而成,由从线程池处理 + .childHandler(new ChannelInitializer() { + // 添加处理的Handler,通常包括消息编解码、业务处理,也可以是日志、权限、过滤等 + @Override + protected void initChannel(Channel ch) throws Exception { + // 获取职责链 + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new IdleStateHandler(15, 0, 0, TimeUnit.SECONDS)); + pipeline.addLast("http-codec", new HttpServerCodec()); + pipeline.addLast("aggregator", new HttpObjectAggregator(65535)); + pipeline.addLast("http-chunked", new ChunkedWriteHandler()); + pipeline.addLast(new WebSocketServerProtocolHandler("/im")); + pipeline.addLast("encode",new MessageProtocolEncoder()); + pipeline.addLast("decode",new MessageProtocolDecoder()); + pipeline.addLast("handler", new IMChannelHandler()); + } + }) + // bootstrap 还可以设置TCP参数,根据需要可以分别设置主线程池和从线程池参数,来优化服务端性能。 + // 其中主线程池使用option方法来设置,从线程池使用childOption方法设置。 + // backlog表示主线程池中在套接口排队的最大数量,队列由未连接队列(三次握手未完成的)和已连接队列 + .option(ChannelOption.SO_BACKLOG, 5) + // 表示连接保活,相当于心跳机制,默认为7200s + .childOption(ChannelOption.SO_KEEPALIVE, true); + + try { + // 绑定端口,启动select线程,轮询监听channel事件,监听到事件之后就会交给从线程池处理 + Channel channel = bootstrap.bind(port).sync().channel(); + // 就绪标志 + this.ready = true; + log.info("websocket server 初始化完成,端口:{}",port); + // 等待服务端口关闭 + //channel.closeFuture().sync(); + } catch (InterruptedException e) { + log.info("websocket server 初始化异常",e); + } + } + + @Override + public void stop() { + log.info("websocket server 停止"); + bossGroup.shutdownGracefully(); + workGroup.shutdownGracefully(); + this.ready = false; + } + + +} diff --git a/im-server/src/main/java/com/bx/imserver/ws/endecode/MessageProtocolDecoder.java b/im-server/src/main/java/com/bx/imserver/netty/ws/endecode/MessageProtocolDecoder.java similarity index 94% rename from im-server/src/main/java/com/bx/imserver/ws/endecode/MessageProtocolDecoder.java rename to im-server/src/main/java/com/bx/imserver/netty/ws/endecode/MessageProtocolDecoder.java index 0ed8342..7374048 100644 --- a/im-server/src/main/java/com/bx/imserver/ws/endecode/MessageProtocolDecoder.java +++ b/im-server/src/main/java/com/bx/imserver/netty/ws/endecode/MessageProtocolDecoder.java @@ -1,4 +1,4 @@ -package com.bx.imserver.ws.endecode; +package com.bx.imserver.netty.ws.endecode; import com.bx.imcommon.model.IMSendInfo; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/im-server/src/main/java/com/bx/imserver/ws/endecode/MessageProtocolEncoder.java b/im-server/src/main/java/com/bx/imserver/netty/ws/endecode/MessageProtocolEncoder.java similarity index 94% rename from im-server/src/main/java/com/bx/imserver/ws/endecode/MessageProtocolEncoder.java rename to im-server/src/main/java/com/bx/imserver/netty/ws/endecode/MessageProtocolEncoder.java index a99946b..967e39a 100644 --- a/im-server/src/main/java/com/bx/imserver/ws/endecode/MessageProtocolEncoder.java +++ b/im-server/src/main/java/com/bx/imserver/netty/ws/endecode/MessageProtocolEncoder.java @@ -1,4 +1,4 @@ -package com.bx.imserver.ws.endecode; +package com.bx.imserver.netty.ws.endecode; import com.bx.imcommon.model.IMSendInfo; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java b/im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java index c5841e6..ea4625a 100644 --- a/im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java +++ b/im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java @@ -1,6 +1,6 @@ package com.bx.imserver.task; -import com.bx.imserver.ws.WebsocketServer; +import com.bx.imserver.netty.ws.WebSocketServer; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -17,7 +17,7 @@ public abstract class AbstractPullMessageTask{ private ExecutorService executorService; @Autowired - private WebsocketServer WSServer; + private WebSocketServer WSServer; public AbstractPullMessageTask(){ this.threadNum = 1; diff --git a/im-server/src/main/java/com/bx/imserver/task/PullUnreadGroupMessageTask.java b/im-server/src/main/java/com/bx/imserver/task/PullUnreadGroupMessageTask.java index 03f508b..7be78c6 100644 --- a/im-server/src/main/java/com/bx/imserver/task/PullUnreadGroupMessageTask.java +++ b/im-server/src/main/java/com/bx/imserver/task/PullUnreadGroupMessageTask.java @@ -4,9 +4,10 @@ import com.bx.imcommon.contant.RedisKey; import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.model.GroupMessageInfo; import com.bx.imcommon.model.IMRecvInfo; -import com.bx.imserver.processor.MessageProcessor; -import com.bx.imserver.processor.ProcessorFactory; -import com.bx.imserver.ws.WebsocketServer; +import com.bx.imserver.netty.IMServerMap; +import com.bx.imserver.netty.processor.MessageProcessor; +import com.bx.imserver.netty.processor.ProcessorFactory; +import com.bx.imserver.netty.ws.WebSocketServer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; @@ -19,7 +20,7 @@ import java.util.List; public class PullUnreadGroupMessageTask extends AbstractPullMessageTask { @Autowired - private WebsocketServer WSServer; + private WebSocketServer WSServer; @Autowired private RedisTemplate redisTemplate; @@ -29,7 +30,7 @@ public class PullUnreadGroupMessageTask extends AbstractPullMessageTask { @Override public void pullMessage() { // 从redis拉取未读消息 - String key = RedisKey.IM_UNREAD_GROUP_QUEUE + WSServer.getServerId(); + String key = RedisKey.IM_UNREAD_GROUP_QUEUE + IMServerMap.serverId; List messageInfos = redisTemplate.opsForList().range(key,0,-1); for(Object o: messageInfos){ redisTemplate.opsForList().leftPop(key); diff --git a/im-server/src/main/java/com/bx/imserver/task/PullUnreadPrivateMessageTask.java b/im-server/src/main/java/com/bx/imserver/task/PullUnreadPrivateMessageTask.java index 4b0c7cb..071033b 100644 --- a/im-server/src/main/java/com/bx/imserver/task/PullUnreadPrivateMessageTask.java +++ b/im-server/src/main/java/com/bx/imserver/task/PullUnreadPrivateMessageTask.java @@ -5,9 +5,10 @@ import com.bx.imcommon.contant.RedisKey; import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.model.IMRecvInfo; import com.bx.imcommon.model.PrivateMessageInfo; -import com.bx.imserver.processor.MessageProcessor; -import com.bx.imserver.processor.ProcessorFactory; -import com.bx.imserver.ws.WebsocketServer; +import com.bx.imserver.netty.IMServerMap; +import com.bx.imserver.netty.processor.MessageProcessor; +import com.bx.imserver.netty.processor.ProcessorFactory; +import com.bx.imserver.netty.ws.WebSocketServer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; @@ -21,7 +22,7 @@ import java.util.List; public class PullUnreadPrivateMessageTask extends AbstractPullMessageTask { @Autowired - private WebsocketServer WSServer; + private WebSocketServer WSServer; @Autowired private RedisTemplate redisTemplate; @@ -29,7 +30,7 @@ public class PullUnreadPrivateMessageTask extends AbstractPullMessageTask { @Override public void pullMessage() { // 从redis拉取未读消息 - String key = RedisKey.IM_UNREAD_PRIVATE_QUEUE + WSServer.getServerId(); + String key = RedisKey.IM_UNREAD_PRIVATE_QUEUE + IMServerMap.serverId; List messageInfos = redisTemplate.opsForList().range(key,0,-1); for(Object o: messageInfos){ redisTemplate.opsForList().leftPop(key); diff --git a/im-server/src/main/java/com/bx/imserver/ws/WebsocketServer.java b/im-server/src/main/java/com/bx/imserver/ws/WebsocketServer.java deleted file mode 100644 index b51d33d..0000000 --- a/im-server/src/main/java/com/bx/imserver/ws/WebsocketServer.java +++ /dev/null @@ -1,101 +0,0 @@ -package com.bx.imserver.ws; - -import com.bx.imcommon.contant.RedisKey; -import com.bx.imserver.ws.endecode.MessageProtocolDecoder; -import com.bx.imserver.ws.endecode.MessageProtocolEncoder; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.*; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.codec.http.HttpObjectAggregator; -import io.netty.handler.codec.http.HttpServerCodec; -import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; -import io.netty.handler.stream.ChunkedWriteHandler; -import io.netty.handler.timeout.IdleStateHandler; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; -import java.util.concurrent.TimeUnit; - -@Slf4j -@Component -public class WebsocketServer { - - public static long serverId = 0; - - @Autowired - RedisTemplate redisTemplate; - - private volatile boolean ready = false; - - - @PostConstruct - public void init(){ - // 初始化SERVER_ID - String key = RedisKey.IM_MAX_SERVER_ID; - serverId = redisTemplate.opsForValue().increment(key,1); - } - - public boolean isReady(){ - return ready; - } - - public long getServerId(){ - return serverId; - } - - public void start(int port) { - // 服务端启动辅助类,用于设置TCP相关参数 - ServerBootstrap bootstrap = new ServerBootstrap(); - // 获取Reactor线程池 - EventLoopGroup bossGroup = new NioEventLoopGroup(); - EventLoopGroup workGroup = new NioEventLoopGroup(); - // 设置为主从线程模型 - bootstrap.group(bossGroup, workGroup) - // 设置服务端NIO通信类型 - .channel(NioServerSocketChannel.class) - // 设置ChannelPipeline,也就是业务职责链,由处理的Handler串联而成,由从线程池处理 - .childHandler(new ChannelInitializer() { - // 添加处理的Handler,通常包括消息编解码、业务处理,也可以是日志、权限、过滤等 - @Override - protected void initChannel(Channel ch) throws Exception { - // 获取职责链 - ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast(new IdleStateHandler(15, 0, 0, TimeUnit.SECONDS)); - pipeline.addLast("http-codec", new HttpServerCodec()); - pipeline.addLast("aggregator", new HttpObjectAggregator(65535)); - pipeline.addLast("http-chunked", new ChunkedWriteHandler()); - pipeline.addLast(new WebSocketServerProtocolHandler("/im")); - pipeline.addLast("encode",new MessageProtocolEncoder()); - pipeline.addLast("decode",new MessageProtocolDecoder()); - pipeline.addLast("handler", new WebSocketHandler()); - } - }) - // bootstrap 还可以设置TCP参数,根据需要可以分别设置主线程池和从线程池参数,来优化服务端性能。 - // 其中主线程池使用option方法来设置,从线程池使用childOption方法设置。 - // backlog表示主线程池中在套接口排队的最大数量,队列由未连接队列(三次握手未完成的)和已连接队列 - .option(ChannelOption.SO_BACKLOG, 5) - // 表示连接保活,相当于心跳机制,默认为7200s - .childOption(ChannelOption.SO_KEEPALIVE, true); - - try { - // 绑定端口,启动select线程,轮询监听channel事件,监听到事件之后就会交给从线程池处理 - Channel channel = bootstrap.bind(port).sync().channel(); - // 就绪标志 - this.ready = true; - log.info("websocket server 初始化完成...."); - // 等待服务端口关闭 - channel.closeFuture().sync(); - } catch (InterruptedException e) { - e.printStackTrace(); - } finally { - // 优雅退出,释放线程池资源 - bossGroup.shutdownGracefully(); - workGroup.shutdownGracefully(); - } - } - -} diff --git a/im-server/src/main/resources/application.yml b/im-server/src/main/resources/application.yml index 261b9c2..ee5f9e2 100644 --- a/im-server/src/main/resources/application.yml +++ b/im-server/src/main/resources/application.yml @@ -1,11 +1,16 @@ server: port: 8877 -websocket: - port: 8878 - spring: redis: host: 127.0.0.1 port: 6379 - database: 1 \ No newline at end of file + database: 1 + +websocket: + enable: true + port: 8878 + +tcpsocket: + enable: false # 暂时不开启 + port: 8879 \ No newline at end of file