Browse Source

IM-server支持TCP方式连接

master
xie.bx 3 years ago
parent
commit
5d025168f1
  1. 10
      im-client/src/main/java/com/bx/imclient/sender/IMSender.java
  2. 9
      im-server/src/main/java/com/bx/imserver/IMServerApp.java
  3. 8
      im-server/src/main/java/com/bx/imserver/constant/Constant.java
  4. 13
      im-server/src/main/java/com/bx/imserver/netty/IMChannelHandler.java
  5. 10
      im-server/src/main/java/com/bx/imserver/netty/IMServer.java
  6. 43
      im-server/src/main/java/com/bx/imserver/netty/IMServerMap.java
  7. 4
      im-server/src/main/java/com/bx/imserver/netty/UserChannelCtxMap.java
  8. 6
      im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java
  9. 6
      im-server/src/main/java/com/bx/imserver/netty/processor/HeartbeatProcessor.java
  10. 15
      im-server/src/main/java/com/bx/imserver/netty/processor/LoginProcessor.java
  11. 2
      im-server/src/main/java/com/bx/imserver/netty/processor/MessageProcessor.java
  12. 6
      im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java
  13. 2
      im-server/src/main/java/com/bx/imserver/netty/processor/ProcessorFactory.java
  14. 94
      im-server/src/main/java/com/bx/imserver/netty/tcp/TcpSocketServer.java
  15. 29
      im-server/src/main/java/com/bx/imserver/netty/tcp/endecode/MessageProtocolDecoder.java
  16. 22
      im-server/src/main/java/com/bx/imserver/netty/tcp/endecode/MessageProtocolEncoder.java
  17. 102
      im-server/src/main/java/com/bx/imserver/netty/ws/WebSocketServer.java
  18. 2
      im-server/src/main/java/com/bx/imserver/netty/ws/endecode/MessageProtocolDecoder.java
  19. 2
      im-server/src/main/java/com/bx/imserver/netty/ws/endecode/MessageProtocolEncoder.java
  20. 4
      im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java
  21. 11
      im-server/src/main/java/com/bx/imserver/task/PullUnreadGroupMessageTask.java
  22. 11
      im-server/src/main/java/com/bx/imserver/task/PullUnreadPrivateMessageTask.java
  23. 101
      im-server/src/main/java/com/bx/imserver/ws/WebsocketServer.java
  24. 13
      im-server/src/main/resources/application.yml

10
im-client/src/main/java/com/bx/imclient/sender/IMSender.java

@ -69,11 +69,11 @@ public class IMSender {
String key = RedisKey.IM_USER_SERVER_ID + id; String key = RedisKey.IM_USER_SERVER_ID + id;
Integer serverId = (Integer)redisTemplate.opsForValue().get(key); Integer serverId = (Integer)redisTemplate.opsForValue().get(key);
if(serverId != null){ if(serverId != null){
if(serverMap.containsKey(serverId)){ // 此处需要加锁,否则list可以会被覆盖
serverMap.get(serverId).add(id); synchronized(serverMap){
}else { if(serverMap.containsKey(serverId)){
// 此处需要加锁,否则list可以会被覆盖 serverMap.get(serverId).add(id);
synchronized(serverMap){ }else {
List<Long> list = Collections.synchronizedList(new LinkedList<Long>()); List<Long> list = Collections.synchronizedList(new LinkedList<Long>());
list.add(id); list.add(id);
serverMap.put(serverId,list); serverMap.put(serverId,list);

9
im-server/src/main/java/com/bx/imserver/IMServerApp.java

@ -1,9 +1,6 @@
package com.bx.imserver; package com.bx.imserver;
import com.bx.imserver.ws.WebsocketServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
@ -18,11 +15,6 @@ import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication @SpringBootApplication
public class IMServerApp implements CommandLineRunner { public class IMServerApp implements CommandLineRunner {
@Value("${websocket.port}")
private int port;
@Autowired
private WebsocketServer WSServer;
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(IMServerApp.class,args); SpringApplication.run(IMServerApp.class,args);
@ -30,6 +22,5 @@ public class IMServerApp implements CommandLineRunner {
public void run(String... args) throws Exception { public void run(String... args) throws Exception {
WSServer.start(port);
} }
} }

8
im-server/src/main/java/com/bx/imserver/constant/Constant.java

@ -1,8 +0,0 @@
package com.bx.imserver.constant;
public class Constant {
// public static String LOCAL_SERVER_ID = UUID.randomUUID().toString();
}

13
im-server/src/main/java/com/bx/imserver/ws/WebSocketHandler.java → im-server/src/main/java/com/bx/imserver/netty/IMChannelHandler.java

@ -1,12 +1,11 @@
package com.bx.imserver.ws; package com.bx.imserver.netty;
import com.bx.imcommon.contant.RedisKey; import com.bx.imcommon.contant.RedisKey;
import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.enums.IMCmdType;
import com.bx.imcommon.model.IMSendInfo; import com.bx.imcommon.model.IMSendInfo;
import com.bx.imserver.processor.MessageProcessor; import com.bx.imserver.netty.processor.MessageProcessor;
import com.bx.imserver.processor.ProcessorFactory; import com.bx.imserver.netty.processor.ProcessorFactory;
import com.bx.imserver.util.SpringContextHolder; import com.bx.imserver.util.SpringContextHolder;
import com.bx.imserver.util.UserChannelCtxHolder;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleState;
@ -22,7 +21,7 @@ import org.springframework.data.redis.core.RedisTemplate;
* 浏览器连接状态监控 * 浏览器连接状态监控
*/ */
@Slf4j @Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<IMSendInfo> { public class IMChannelHandler extends SimpleChannelInboundHandler<IMSendInfo> {
/** /**
* 读取到消息后进行处理 * 读取到消息后进行处理
@ -67,11 +66,11 @@ public class WebSocketHandler extends SimpleChannelInboundHandler<IMSendInfo> {
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
AttributeKey<Long> attr = AttributeKey.valueOf("USER_ID"); AttributeKey<Long> attr = AttributeKey.valueOf("USER_ID");
Long userId = ctx.channel().attr(attr).get(); Long userId = ctx.channel().attr(attr).get();
ChannelHandlerContext context = UserChannelCtxHolder.getChannelCtx(userId); ChannelHandlerContext context = UserChannelCtxMap.getChannelCtx(userId);
// 判断一下,避免异地登录导致的误删 // 判断一下,避免异地登录导致的误删
if(context != null && ctx.channel().id().equals(context.channel().id())){ if(context != null && ctx.channel().id().equals(context.channel().id())){
// 移除channel // 移除channel
UserChannelCtxHolder.removeChannelCtx(userId); UserChannelCtxMap.removeChannelCtx(userId);
// 用户下线 // 用户下线
RedisTemplate redisTemplate = SpringContextHolder.getBean("redisTemplate"); RedisTemplate redisTemplate = SpringContextHolder.getBean("redisTemplate");
String key = RedisKey.IM_USER_SERVER_ID + userId; String key = RedisKey.IM_USER_SERVER_ID + userId;

10
im-server/src/main/java/com/bx/imserver/netty/IMServer.java

@ -0,0 +1,10 @@
package com.bx.imserver.netty;
public interface IMServer {
boolean isReady();
void start();
void stop();
}

43
im-server/src/main/java/com/bx/imserver/netty/IMServerMap.java

@ -0,0 +1,43 @@
package com.bx.imserver.netty;
import com.bx.imcommon.contant.RedisKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import java.util.List;
@Slf4j
@Component
public class IMServerMap implements CommandLineRunner {
public static volatile long serverId = 0;
@Autowired
RedisTemplate<String,Object> redisTemplate;
@Autowired
private List<IMServer> imServers;
@Override
public void run(String... args) throws Exception {
// 初始化SERVER_ID
String key = RedisKey.IM_MAX_SERVER_ID;
serverId = redisTemplate.opsForValue().increment(key,1);
// 启动服务
for(IMServer imServer:imServers){
imServer.start();
}
}
@PreDestroy
public void destroy(){
// 停止服务
for(IMServer imServer:imServers){
imServer.stop();
}
}
}

4
im-server/src/main/java/com/bx/imserver/util/UserChannelCtxHolder.java → im-server/src/main/java/com/bx/imserver/netty/UserChannelCtxMap.java

@ -1,4 +1,4 @@
package com.bx.imserver.util; package com.bx.imserver.netty;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -6,7 +6,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
public class UserChannelCtxHolder { public class UserChannelCtxMap {
/* /*
* 维护userId和ctx的关联关系格式:Map<userId,ctx> * 维护userId和ctx的关联关系格式:Map<userId,ctx>

6
im-server/src/main/java/com/bx/imserver/processor/GroupMessageProcessor.java → im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java

@ -1,4 +1,4 @@
package com.bx.imserver.processor; package com.bx.imserver.netty.processor;
import com.bx.imcommon.contant.RedisKey; import com.bx.imcommon.contant.RedisKey;
import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.enums.IMCmdType;
@ -7,7 +7,7 @@ import com.bx.imcommon.model.GroupMessageInfo;
import com.bx.imcommon.model.IMRecvInfo; import com.bx.imcommon.model.IMRecvInfo;
import com.bx.imcommon.model.IMSendInfo; import com.bx.imcommon.model.IMSendInfo;
import com.bx.imcommon.model.SendResult; import com.bx.imcommon.model.SendResult;
import com.bx.imserver.util.UserChannelCtxHolder; import com.bx.imserver.netty.UserChannelCtxMap;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -32,7 +32,7 @@ public class GroupMessageProcessor extends MessageProcessor<IMRecvInfo<GroupMes
log.info("接收到群消息,发送者:{},群id:{},接收id:{},内容:{}",messageInfo.getSendId(),messageInfo.getGroupId(),recvIds,messageInfo.getContent()); log.info("接收到群消息,发送者:{},群id:{},接收id:{},内容:{}",messageInfo.getSendId(),messageInfo.getGroupId(),recvIds,messageInfo.getContent());
for(Long recvId:recvIds){ for(Long recvId:recvIds){
try { try {
ChannelHandlerContext channelCtx = UserChannelCtxHolder.getChannelCtx(recvId); ChannelHandlerContext channelCtx = UserChannelCtxMap.getChannelCtx(recvId);
if(channelCtx != null){ if(channelCtx != null){
// 自己发的消息不用推送 // 自己发的消息不用推送
if(recvId != messageInfo.getSendId()){ if(recvId != messageInfo.getSendId()){

6
im-server/src/main/java/com/bx/imserver/processor/HeartbeatProcessor.java → im-server/src/main/java/com/bx/imserver/netty/processor/HeartbeatProcessor.java

@ -1,4 +1,4 @@
package com.bx.imserver.processor; package com.bx.imserver.netty.processor;
import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.bean.BeanUtil;
import com.bx.imcommon.contant.Constant; import com.bx.imcommon.contant.Constant;
@ -6,7 +6,7 @@ import com.bx.imcommon.contant.RedisKey;
import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.enums.IMCmdType;
import com.bx.imcommon.model.HeartbeatInfo; import com.bx.imcommon.model.HeartbeatInfo;
import com.bx.imcommon.model.IMSendInfo; import com.bx.imcommon.model.IMSendInfo;
import com.bx.imserver.ws.WebsocketServer; import com.bx.imserver.netty.ws.WebSocketServer;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -23,7 +23,7 @@ public class HeartbeatProcessor extends MessageProcessor<HeartbeatInfo> {
@Autowired @Autowired
private WebsocketServer WSServer; private WebSocketServer WSServer;
@Autowired @Autowired
RedisTemplate<String,Object> redisTemplate; RedisTemplate<String,Object> redisTemplate;

15
im-server/src/main/java/com/bx/imserver/processor/LoginProcessor.java → im-server/src/main/java/com/bx/imserver/netty/processor/LoginProcessor.java

@ -1,4 +1,4 @@
package com.bx.imserver.processor; package com.bx.imserver.netty.processor;
import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.bean.BeanUtil;
import com.bx.imcommon.contant.Constant; import com.bx.imcommon.contant.Constant;
@ -6,8 +6,9 @@ import com.bx.imcommon.contant.RedisKey;
import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.enums.IMCmdType;
import com.bx.imcommon.model.IMSendInfo; import com.bx.imcommon.model.IMSendInfo;
import com.bx.imcommon.model.LoginInfo; import com.bx.imcommon.model.LoginInfo;
import com.bx.imserver.util.UserChannelCtxHolder; import com.bx.imserver.netty.IMServerMap;
import com.bx.imserver.ws.WebsocketServer; import com.bx.imserver.netty.UserChannelCtxMap;
import com.bx.imserver.netty.ws.WebSocketServer;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -24,7 +25,7 @@ public class LoginProcessor extends MessageProcessor<LoginInfo> {
@Autowired @Autowired
private WebsocketServer WSServer; private WebSocketServer WSServer;
@Autowired @Autowired
RedisTemplate<String,Object> redisTemplate; RedisTemplate<String,Object> redisTemplate;
@ -32,7 +33,7 @@ public class LoginProcessor extends MessageProcessor<LoginInfo> {
@Override @Override
synchronized public void process(ChannelHandlerContext ctx, LoginInfo loginInfo) { synchronized public void process(ChannelHandlerContext ctx, LoginInfo loginInfo) {
log.info("用户登录,userId:{}",loginInfo.getUserId()); log.info("用户登录,userId:{}",loginInfo.getUserId());
ChannelHandlerContext context = UserChannelCtxHolder.getChannelCtx(loginInfo.getUserId()); ChannelHandlerContext context = UserChannelCtxMap.getChannelCtx(loginInfo.getUserId());
if(context != null){ if(context != null){
// 不允许多地登录,强制下线 // 不允许多地登录,强制下线
IMSendInfo sendInfo = new IMSendInfo(); IMSendInfo sendInfo = new IMSendInfo();
@ -40,7 +41,7 @@ public class LoginProcessor extends MessageProcessor<LoginInfo> {
context.channel().writeAndFlush(sendInfo); context.channel().writeAndFlush(sendInfo);
} }
// 绑定用户和channel // 绑定用户和channel
UserChannelCtxHolder.addChannelCtx(loginInfo.getUserId(),ctx); UserChannelCtxMap.addChannelCtx(loginInfo.getUserId(),ctx);
// 设置用户id属性 // 设置用户id属性
AttributeKey<Long> attr = AttributeKey.valueOf("USER_ID"); AttributeKey<Long> attr = AttributeKey.valueOf("USER_ID");
ctx.channel().attr(attr).set(loginInfo.getUserId()); ctx.channel().attr(attr).set(loginInfo.getUserId());
@ -49,7 +50,7 @@ public class LoginProcessor extends MessageProcessor<LoginInfo> {
ctx.channel().attr(attr).set(0L); ctx.channel().attr(attr).set(0L);
// 在redis上记录每个user的channelId,15秒没有心跳,则自动过期 // 在redis上记录每个user的channelId,15秒没有心跳,则自动过期
String key = RedisKey.IM_USER_SERVER_ID+loginInfo.getUserId(); String key = RedisKey.IM_USER_SERVER_ID+loginInfo.getUserId();
redisTemplate.opsForValue().set(key, WSServer.getServerId(), Constant.ONLINE_TIMEOUT_SECOND, TimeUnit.SECONDS); redisTemplate.opsForValue().set(key, IMServerMap.serverId, Constant.ONLINE_TIMEOUT_SECOND, TimeUnit.SECONDS);
// 响应ws // 响应ws
IMSendInfo sendInfo = new IMSendInfo(); IMSendInfo sendInfo = new IMSendInfo();
sendInfo.setCmd(IMCmdType.LOGIN.code()); sendInfo.setCmd(IMCmdType.LOGIN.code());

2
im-server/src/main/java/com/bx/imserver/processor/MessageProcessor.java → im-server/src/main/java/com/bx/imserver/netty/processor/MessageProcessor.java

@ -1,4 +1,4 @@
package com.bx.imserver.processor; package com.bx.imserver.netty.processor;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;

6
im-server/src/main/java/com/bx/imserver/processor/PrivateMessageProcessor.java → im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java

@ -1,4 +1,4 @@
package com.bx.imserver.processor; package com.bx.imserver.netty.processor;
import com.bx.imcommon.contant.RedisKey; import com.bx.imcommon.contant.RedisKey;
import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.enums.IMCmdType;
@ -7,7 +7,7 @@ import com.bx.imcommon.model.IMRecvInfo;
import com.bx.imcommon.model.IMSendInfo; import com.bx.imcommon.model.IMSendInfo;
import com.bx.imcommon.model.PrivateMessageInfo; import com.bx.imcommon.model.PrivateMessageInfo;
import com.bx.imcommon.model.SendResult; import com.bx.imcommon.model.SendResult;
import com.bx.imserver.util.UserChannelCtxHolder; import com.bx.imserver.netty.UserChannelCtxMap;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -27,7 +27,7 @@ public class PrivateMessageProcessor extends MessageProcessor<IMRecvInfo<Privat
Long recvId = recvInfo.getRecvIds().get(0); Long recvId = recvInfo.getRecvIds().get(0);
log.info("接收到消息,发送者:{},接收者:{},内容:{}",messageInfo.getSendId(),recvId,messageInfo.getContent()); log.info("接收到消息,发送者:{},接收者:{},内容:{}",messageInfo.getSendId(),recvId,messageInfo.getContent());
try{ try{
ChannelHandlerContext channelCtx = UserChannelCtxHolder.getChannelCtx(recvId); ChannelHandlerContext channelCtx = UserChannelCtxMap.getChannelCtx(recvId);
if(channelCtx != null ){ if(channelCtx != null ){
// 推送消息到用户 // 推送消息到用户
IMSendInfo sendInfo = new IMSendInfo(); IMSendInfo sendInfo = new IMSendInfo();

2
im-server/src/main/java/com/bx/imserver/processor/ProcessorFactory.java → im-server/src/main/java/com/bx/imserver/netty/processor/ProcessorFactory.java

@ -1,4 +1,4 @@
package com.bx.imserver.processor; package com.bx.imserver.netty.processor;
import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.enums.IMCmdType;
import com.bx.imserver.util.SpringContextHolder; import com.bx.imserver.util.SpringContextHolder;

94
im-server/src/main/java/com/bx/imserver/netty/tcp/TcpSocketServer.java

@ -0,0 +1,94 @@
package com.bx.imserver.netty.tcp;
import com.bx.imserver.netty.IMChannelHandler;
import com.bx.imserver.netty.IMServer;
import com.bx.imserver.netty.tcp.endecode.MessageProtocolDecoder;
import com.bx.imserver.netty.tcp.endecode.MessageProtocolEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* TCP服务器,用于连接非网页的客户端,协议格式 4字节内容的长度+IMSendInfo的JSON序列化
*
* @author Blue
* @date 2022-11-20
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "tcpsocket", value = "enable", havingValue = "true",matchIfMissing = true)
public class TcpSocketServer implements IMServer {
private volatile boolean ready = false;
@Value("${tcpsocket.port}")
private int port;
private ServerBootstrap bootstrap = new ServerBootstrap();
private EventLoopGroup bossGroup = new NioEventLoopGroup();
private EventLoopGroup workGroup = new NioEventLoopGroup();
@Override
public boolean isReady() {
return ready;
}
@Override
public void start() {
// 设置为主从线程模型
bootstrap.group(bossGroup, workGroup)
// 设置服务端NIO通信类型
.channel(NioServerSocketChannel.class)
// 设置ChannelPipeline,也就是业务职责链,由处理的Handler串联而成,由从线程池处理
.childHandler(new ChannelInitializer<Channel>() {
// 添加处理的Handler,通常包括消息编解码、业务处理,也可以是日志、权限、过滤等
@Override
protected void initChannel(Channel ch) throws Exception {
// 获取职责链
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(15, 0, 0, TimeUnit.SECONDS));
pipeline.addLast("encode",new MessageProtocolEncoder());
pipeline.addLast("decode",new MessageProtocolDecoder());
pipeline.addLast("handler", new IMChannelHandler());
}
})
// bootstrap 还可以设置TCP参数,根据需要可以分别设置主线程池和从线程池参数,来优化服务端性能。
// 其中主线程池使用option方法来设置,从线程池使用childOption方法设置。
// backlog表示主线程池中在套接口排队的最大数量,队列由未连接队列(三次握手未完成的)和已连接队列
.option(ChannelOption.SO_BACKLOG, 5)
// 表示连接保活,相当于心跳机制,默认为7200s
.childOption(ChannelOption.SO_KEEPALIVE, true);
try {
// 绑定端口,启动select线程,轮询监听channel事件,监听到事件之后就会交给从线程池处理
Channel channel = bootstrap.bind(port).sync().channel();
// 就绪标志
this.ready = true;
log.info("tcp server 初始化完成,端口:{}",port);
// 等待服务端口关闭
//channel.closeFuture().sync();
} catch (InterruptedException e) {
log.info("tcp server 初始化异常",e);
}
}
@Override
public void stop(){
log.info("tcp server 停止");
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
this.ready = false;
}
}

29
im-server/src/main/java/com/bx/imserver/netty/tcp/endecode/MessageProtocolDecoder.java

@ -0,0 +1,29 @@
package com.bx.imserver.netty.tcp.endecode;
import com.bx.imcommon.model.IMSendInfo;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
@Slf4j
public class MessageProtocolDecoder extends ReplayingDecoder {
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
if(byteBuf.readableBytes()< 4){
return;
}
// 获取到包的长度
long length=byteBuf.readLong();
// 转成IMSendInfo
ByteBuf contentBuf = byteBuf.readBytes((int)length);
String content = contentBuf.toString(CharsetUtil.UTF_8);
ObjectMapper objectMapper = new ObjectMapper();
IMSendInfo sendInfo = objectMapper.readValue(content, IMSendInfo.class);
list.add(sendInfo);
}
}

22
im-server/src/main/java/com/bx/imserver/netty/tcp/endecode/MessageProtocolEncoder.java

@ -0,0 +1,22 @@
package com.bx.imserver.netty.tcp.endecode;
import com.bx.imcommon.model.IMSendInfo;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class MessageProtocolEncoder extends MessageToByteEncoder<IMSendInfo> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, IMSendInfo sendInfo, ByteBuf byteBuf) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
String content = objectMapper.writeValueAsString(sendInfo);
byte[] bytes = content.getBytes("UTF-8");
// 写入长度
byteBuf.writeLong(bytes.length);
// 写入命令体
byteBuf.writeBytes(bytes);
}
}

102
im-server/src/main/java/com/bx/imserver/netty/ws/WebSocketServer.java

@ -0,0 +1,102 @@
package com.bx.imserver.netty.ws;
import com.bx.imserver.netty.IMChannelHandler;
import com.bx.imserver.netty.IMServer;
import com.bx.imserver.netty.ws.endecode.MessageProtocolDecoder;
import com.bx.imserver.netty.ws.endecode.MessageProtocolEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* WS服务器,用于连接网页的客户端,协议格式: 直接IMSendInfo的JSON序列化
*
* @author Blue
* @date 2022-11-20
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "websocket", value = "enable", havingValue = "true",matchIfMissing = true)
public class WebSocketServer implements IMServer {
@Value("${websocket.port}")
private int port;
private volatile boolean ready = false;
private ServerBootstrap bootstrap = new ServerBootstrap();
private EventLoopGroup bossGroup = new NioEventLoopGroup();
private EventLoopGroup workGroup = new NioEventLoopGroup();
@Override
public boolean isReady(){
return ready;
}
@Override
public void start() {
// 设置为主从线程模型
bootstrap.group(bossGroup, workGroup)
// 设置服务端NIO通信类型
.channel(NioServerSocketChannel.class)
// 设置ChannelPipeline,也就是业务职责链,由处理的Handler串联而成,由从线程池处理
.childHandler(new ChannelInitializer<Channel>() {
// 添加处理的Handler,通常包括消息编解码、业务处理,也可以是日志、权限、过滤等
@Override
protected void initChannel(Channel ch) throws Exception {
// 获取职责链
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(15, 0, 0, TimeUnit.SECONDS));
pipeline.addLast("http-codec", new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65535));
pipeline.addLast("http-chunked", new ChunkedWriteHandler());
pipeline.addLast(new WebSocketServerProtocolHandler("/im"));
pipeline.addLast("encode",new MessageProtocolEncoder());
pipeline.addLast("decode",new MessageProtocolDecoder());
pipeline.addLast("handler", new IMChannelHandler());
}
})
// bootstrap 还可以设置TCP参数,根据需要可以分别设置主线程池和从线程池参数,来优化服务端性能。
// 其中主线程池使用option方法来设置,从线程池使用childOption方法设置。
// backlog表示主线程池中在套接口排队的最大数量,队列由未连接队列(三次握手未完成的)和已连接队列
.option(ChannelOption.SO_BACKLOG, 5)
// 表示连接保活,相当于心跳机制,默认为7200s
.childOption(ChannelOption.SO_KEEPALIVE, true);
try {
// 绑定端口,启动select线程,轮询监听channel事件,监听到事件之后就会交给从线程池处理
Channel channel = bootstrap.bind(port).sync().channel();
// 就绪标志
this.ready = true;
log.info("websocket server 初始化完成,端口:{}",port);
// 等待服务端口关闭
//channel.closeFuture().sync();
} catch (InterruptedException e) {
log.info("websocket server 初始化异常",e);
}
}
@Override
public void stop() {
log.info("websocket server 停止");
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
this.ready = false;
}
}

2
im-server/src/main/java/com/bx/imserver/ws/endecode/MessageProtocolDecoder.java → im-server/src/main/java/com/bx/imserver/netty/ws/endecode/MessageProtocolDecoder.java

@ -1,4 +1,4 @@
package com.bx.imserver.ws.endecode; package com.bx.imserver.netty.ws.endecode;
import com.bx.imcommon.model.IMSendInfo; import com.bx.imcommon.model.IMSendInfo;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;

2
im-server/src/main/java/com/bx/imserver/ws/endecode/MessageProtocolEncoder.java → im-server/src/main/java/com/bx/imserver/netty/ws/endecode/MessageProtocolEncoder.java

@ -1,4 +1,4 @@
package com.bx.imserver.ws.endecode; package com.bx.imserver.netty.ws.endecode;
import com.bx.imcommon.model.IMSendInfo; import com.bx.imcommon.model.IMSendInfo;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;

4
im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java

@ -1,6 +1,6 @@
package com.bx.imserver.task; package com.bx.imserver.task;
import com.bx.imserver.ws.WebsocketServer; import com.bx.imserver.netty.ws.WebSocketServer;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -17,7 +17,7 @@ public abstract class AbstractPullMessageTask{
private ExecutorService executorService; private ExecutorService executorService;
@Autowired @Autowired
private WebsocketServer WSServer; private WebSocketServer WSServer;
public AbstractPullMessageTask(){ public AbstractPullMessageTask(){
this.threadNum = 1; this.threadNum = 1;

11
im-server/src/main/java/com/bx/imserver/task/PullUnreadGroupMessageTask.java

@ -4,9 +4,10 @@ import com.bx.imcommon.contant.RedisKey;
import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.enums.IMCmdType;
import com.bx.imcommon.model.GroupMessageInfo; import com.bx.imcommon.model.GroupMessageInfo;
import com.bx.imcommon.model.IMRecvInfo; import com.bx.imcommon.model.IMRecvInfo;
import com.bx.imserver.processor.MessageProcessor; import com.bx.imserver.netty.IMServerMap;
import com.bx.imserver.processor.ProcessorFactory; import com.bx.imserver.netty.processor.MessageProcessor;
import com.bx.imserver.ws.WebsocketServer; import com.bx.imserver.netty.processor.ProcessorFactory;
import com.bx.imserver.netty.ws.WebSocketServer;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
@ -19,7 +20,7 @@ import java.util.List;
public class PullUnreadGroupMessageTask extends AbstractPullMessageTask { public class PullUnreadGroupMessageTask extends AbstractPullMessageTask {
@Autowired @Autowired
private WebsocketServer WSServer; private WebSocketServer WSServer;
@Autowired @Autowired
private RedisTemplate<String,Object> redisTemplate; private RedisTemplate<String,Object> redisTemplate;
@ -29,7 +30,7 @@ public class PullUnreadGroupMessageTask extends AbstractPullMessageTask {
@Override @Override
public void pullMessage() { public void pullMessage() {
// 从redis拉取未读消息 // 从redis拉取未读消息
String key = RedisKey.IM_UNREAD_GROUP_QUEUE + WSServer.getServerId(); String key = RedisKey.IM_UNREAD_GROUP_QUEUE + IMServerMap.serverId;
List messageInfos = redisTemplate.opsForList().range(key,0,-1); List messageInfos = redisTemplate.opsForList().range(key,0,-1);
for(Object o: messageInfos){ for(Object o: messageInfos){
redisTemplate.opsForList().leftPop(key); redisTemplate.opsForList().leftPop(key);

11
im-server/src/main/java/com/bx/imserver/task/PullUnreadPrivateMessageTask.java

@ -5,9 +5,10 @@ import com.bx.imcommon.contant.RedisKey;
import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.enums.IMCmdType;
import com.bx.imcommon.model.IMRecvInfo; import com.bx.imcommon.model.IMRecvInfo;
import com.bx.imcommon.model.PrivateMessageInfo; import com.bx.imcommon.model.PrivateMessageInfo;
import com.bx.imserver.processor.MessageProcessor; import com.bx.imserver.netty.IMServerMap;
import com.bx.imserver.processor.ProcessorFactory; import com.bx.imserver.netty.processor.MessageProcessor;
import com.bx.imserver.ws.WebsocketServer; import com.bx.imserver.netty.processor.ProcessorFactory;
import com.bx.imserver.netty.ws.WebSocketServer;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
@ -21,7 +22,7 @@ import java.util.List;
public class PullUnreadPrivateMessageTask extends AbstractPullMessageTask { public class PullUnreadPrivateMessageTask extends AbstractPullMessageTask {
@Autowired @Autowired
private WebsocketServer WSServer; private WebSocketServer WSServer;
@Autowired @Autowired
private RedisTemplate<String,Object> redisTemplate; private RedisTemplate<String,Object> redisTemplate;
@ -29,7 +30,7 @@ public class PullUnreadPrivateMessageTask extends AbstractPullMessageTask {
@Override @Override
public void pullMessage() { public void pullMessage() {
// 从redis拉取未读消息 // 从redis拉取未读消息
String key = RedisKey.IM_UNREAD_PRIVATE_QUEUE + WSServer.getServerId(); String key = RedisKey.IM_UNREAD_PRIVATE_QUEUE + IMServerMap.serverId;
List messageInfos = redisTemplate.opsForList().range(key,0,-1); List messageInfos = redisTemplate.opsForList().range(key,0,-1);
for(Object o: messageInfos){ for(Object o: messageInfos){
redisTemplate.opsForList().leftPop(key); redisTemplate.opsForList().leftPop(key);

101
im-server/src/main/java/com/bx/imserver/ws/WebsocketServer.java

@ -1,101 +0,0 @@
package com.bx.imserver.ws;
import com.bx.imcommon.contant.RedisKey;
import com.bx.imserver.ws.endecode.MessageProtocolDecoder;
import com.bx.imserver.ws.endecode.MessageProtocolEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class WebsocketServer {
public static long serverId = 0;
@Autowired
RedisTemplate<String,Object> redisTemplate;
private volatile boolean ready = false;
@PostConstruct
public void init(){
// 初始化SERVER_ID
String key = RedisKey.IM_MAX_SERVER_ID;
serverId = redisTemplate.opsForValue().increment(key,1);
}
public boolean isReady(){
return ready;
}
public long getServerId(){
return serverId;
}
public void start(int port) {
// 服务端启动辅助类,用于设置TCP相关参数
ServerBootstrap bootstrap = new ServerBootstrap();
// 获取Reactor线程池
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
// 设置为主从线程模型
bootstrap.group(bossGroup, workGroup)
// 设置服务端NIO通信类型
.channel(NioServerSocketChannel.class)
// 设置ChannelPipeline,也就是业务职责链,由处理的Handler串联而成,由从线程池处理
.childHandler(new ChannelInitializer<Channel>() {
// 添加处理的Handler,通常包括消息编解码、业务处理,也可以是日志、权限、过滤等
@Override
protected void initChannel(Channel ch) throws Exception {
// 获取职责链
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(15, 0, 0, TimeUnit.SECONDS));
pipeline.addLast("http-codec", new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65535));
pipeline.addLast("http-chunked", new ChunkedWriteHandler());
pipeline.addLast(new WebSocketServerProtocolHandler("/im"));
pipeline.addLast("encode",new MessageProtocolEncoder());
pipeline.addLast("decode",new MessageProtocolDecoder());
pipeline.addLast("handler", new WebSocketHandler());
}
})
// bootstrap 还可以设置TCP参数,根据需要可以分别设置主线程池和从线程池参数,来优化服务端性能。
// 其中主线程池使用option方法来设置,从线程池使用childOption方法设置。
// backlog表示主线程池中在套接口排队的最大数量,队列由未连接队列(三次握手未完成的)和已连接队列
.option(ChannelOption.SO_BACKLOG, 5)
// 表示连接保活,相当于心跳机制,默认为7200s
.childOption(ChannelOption.SO_KEEPALIVE, true);
try {
// 绑定端口,启动select线程,轮询监听channel事件,监听到事件之后就会交给从线程池处理
Channel channel = bootstrap.bind(port).sync().channel();
// 就绪标志
this.ready = true;
log.info("websocket server 初始化完成....");
// 等待服务端口关闭
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}

13
im-server/src/main/resources/application.yml

@ -1,11 +1,16 @@
server: server:
port: 8877 port: 8877
websocket:
port: 8878
spring: spring:
redis: redis:
host: 127.0.0.1 host: 127.0.0.1
port: 6379 port: 6379
database: 1 database: 1
websocket:
enable: true
port: 8878
tcpsocket:
enable: false # 暂时不开启
port: 8879
Loading…
Cancel
Save