Browse Source

代码优化

master
xie.bx 3 years ago
parent
commit
87896d1df9
  1. 3
      im-client/src/main/java/com/bx/imclient/task/AbstractPullMessageTask.java
  2. 4
      im-platform/src/main/java/com/bx/implatform/config/MvcConfig.java
  3. 2
      im-platform/src/main/java/com/bx/implatform/service/IFriendService.java
  4. 1
      im-platform/src/main/java/com/bx/implatform/service/IGroupMemberService.java
  5. 2
      im-platform/src/main/java/com/bx/implatform/service/IGroupService.java
  6. 6
      im-platform/src/main/java/com/bx/implatform/service/impl/FriendServiceImpl.java
  7. 2
      im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java
  8. 14
      im-platform/src/main/java/com/bx/implatform/service/impl/GroupServiceImpl.java
  9. 18
      im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java
  10. 1083
      im-platform/src/main/java/com/bx/implatform/util/DateTimeUtils.java
  11. 5
      im-server/src/main/java/com/bx/imserver/IMServerApp.java
  12. 2
      im-server/src/main/java/com/bx/imserver/constant/ChannelAttrKey.java
  13. 4
      im-server/src/main/java/com/bx/imserver/netty/IMChannelHandler.java
  14. 2
      im-server/src/main/java/com/bx/imserver/netty/processor/AbstractMessageProcessor.java
  15. 3
      im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java
  16. 6
      im-server/src/main/java/com/bx/imserver/netty/processor/HeartbeatProcessor.java
  17. 8
      im-server/src/main/java/com/bx/imserver/netty/processor/LoginProcessor.java
  18. 5
      im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java
  19. 12
      im-server/src/main/java/com/bx/imserver/netty/processor/ProcessorFactory.java
  20. 1
      im-server/src/main/java/com/bx/imserver/netty/tcp/endecode/MessageProtocolDecoder.java
  21. 27
      im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java
  22. 9
      im-server/src/main/java/com/bx/imserver/task/PullUnreadGroupMessageTask.java
  23. 8
      im-server/src/main/java/com/bx/imserver/task/PullUnreadPrivateMessageTask.java

3
im-client/src/main/java/com/bx/imclient/task/AbstractPullMessageTask.java

@ -5,8 +5,7 @@ import lombok.extern.slf4j.Slf4j;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import java.util.concurrent.ExecutorService; import java.util.concurrent.*;
import java.util.concurrent.Executors;
@Slf4j @Slf4j
public abstract class AbstractPullMessageTask { public abstract class AbstractPullMessageTask {

4
im-platform/src/main/java/com/bx/implatform/config/MvcConfig.java

@ -16,7 +16,7 @@ public class MvcConfig implements WebMvcConfigurer {
@Override @Override
public void addInterceptors(InterceptorRegistry registry) { public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(XssInterceptor()) registry.addInterceptor(xssInterceptor())
.addPathPatterns("/**"); .addPathPatterns("/**");
registry.addInterceptor(authInterceptor()) registry.addInterceptor(authInterceptor())
.addPathPatterns("/**") .addPathPatterns("/**")
@ -30,7 +30,7 @@ public class MvcConfig implements WebMvcConfigurer {
} }
@Bean @Bean
public XssInterceptor XssInterceptor() { public XssInterceptor xssInterceptor() {
return new XssInterceptor(); return new XssInterceptor();
} }

2
im-platform/src/main/java/com/bx/implatform/service/IFriendService.java

@ -11,7 +11,7 @@ public interface IFriendService extends IService<Friend> {
Boolean isFriend(Long userId1, Long userId2); Boolean isFriend(Long userId1, Long userId2);
List<Friend> findFriendByUserId(Long UserId); List<Friend> findFriendByUserId(Long userId);
void addFriend(Long friendId); void addFriend(Long friendId);

1
im-platform/src/main/java/com/bx/implatform/service/IGroupMemberService.java

@ -18,7 +18,6 @@ public interface IGroupMemberService extends IService<GroupMember> {
List<Long> findUserIdsByGroupId(Long groupId); List<Long> findUserIdsByGroupId(Long groupId);
boolean save(GroupMember member);
boolean saveOrUpdateBatch(Long groupId,List<GroupMember> members); boolean saveOrUpdateBatch(Long groupId,List<GroupMember> members);

2
im-platform/src/main/java/com/bx/implatform/service/IGroupService.java

@ -26,7 +26,7 @@ public interface IGroupService extends IService<Group> {
void invite(GroupInviteVO vo); void invite(GroupInviteVO vo);
Group GetById(Long groupId); Group getById(Long groupId);
GroupVO findById(Long groupId); GroupVO findById(Long groupId);

6
im-platform/src/main/java/com/bx/implatform/service/impl/FriendServiceImpl.java

@ -36,13 +36,13 @@ public class FriendServiceImpl extends ServiceImpl<FriendMapper, Friend> impleme
/** /**
* 查询用户的所有好友 * 查询用户的所有好友
* *
* @param UserId 用户id * @param userId 用户id
* @return * @return
*/ */
@Override @Override
public List<Friend> findFriendByUserId(Long UserId) { public List<Friend> findFriendByUserId(Long userId) {
QueryWrapper<Friend> queryWrapper = new QueryWrapper(); QueryWrapper<Friend> queryWrapper = new QueryWrapper();
queryWrapper.lambda().eq(Friend::getUserId,UserId); queryWrapper.lambda().eq(Friend::getUserId,userId);
List<Friend> friends = this.list(queryWrapper); List<Friend> friends = this.list(queryWrapper);
return friends; return friends;
} }

2
im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java

@ -75,7 +75,7 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
msg.setSendTime(new Date()); msg.setSendTime(new Date());
this.save(msg); this.save(msg);
// 不用发给自己 // 不用发给自己
userIds = userIds.stream().filter(id->userId!=id).collect(Collectors.toList()); userIds = userIds.stream().filter(id->userId.equals(id)).collect(Collectors.toList());
// 群发 // 群发
GroupMessageInfo msgInfo = BeanUtils.copyProperties(msg, GroupMessageInfo.class); GroupMessageInfo msgInfo = BeanUtils.copyProperties(msg, GroupMessageInfo.class);
imClient.sendGroupMessage(userIds,msgInfo); imClient.sendGroupMessage(userIds,msgInfo);

14
im-platform/src/main/java/com/bx/implatform/service/impl/GroupServiceImpl.java

@ -100,7 +100,7 @@ public class GroupServiceImpl extends ServiceImpl<GroupMapper, Group> implements
// 校验是不是群主,只有群主能改信息 // 校验是不是群主,只有群主能改信息
Group group = this.getById(vo.getId()); Group group = this.getById(vo.getId());
// 群主有权修改群基本信息 // 群主有权修改群基本信息
if(group.getOwnerId() == session.getUserId()){ if(group.getOwnerId().equals(session.getUserId()) ){
group = BeanUtils.copyProperties(vo,Group.class); group = BeanUtils.copyProperties(vo,Group.class);
this.updateById(group); this.updateById(group);
} }
@ -129,7 +129,7 @@ public class GroupServiceImpl extends ServiceImpl<GroupMapper, Group> implements
public void deleteGroup(Long groupId) { public void deleteGroup(Long groupId) {
UserSession session = SessionContext.getSession(); UserSession session = SessionContext.getSession();
Group group = this.getById(groupId); Group group = this.getById(groupId);
if(group.getOwnerId() != session.getUserId()){ if(group.getOwnerId().equals(session.getUserId())){
throw new GlobalException(ResultCode.PROGRAM_ERROR,"只有群主才有权限解除群聊"); throw new GlobalException(ResultCode.PROGRAM_ERROR,"只有群主才有权限解除群聊");
} }
// 逻辑删除群数据 // 逻辑删除群数据
@ -151,7 +151,7 @@ public class GroupServiceImpl extends ServiceImpl<GroupMapper, Group> implements
public void quitGroup(Long groupId) { public void quitGroup(Long groupId) {
Long userId = SessionContext.getSession().getUserId(); Long userId = SessionContext.getSession().getUserId();
Group group = this.getById(groupId); Group group = this.getById(groupId);
if(group.getOwnerId() == userId){ if(group.getOwnerId().equals(userId)){
throw new GlobalException(ResultCode.PROGRAM_ERROR,"您是群主,不可退出群聊"); throw new GlobalException(ResultCode.PROGRAM_ERROR,"您是群主,不可退出群聊");
} }
// 删除群聊成员 // 删除群聊成员
@ -171,10 +171,10 @@ public class GroupServiceImpl extends ServiceImpl<GroupMapper, Group> implements
public void kickGroup(Long groupId, Long userId) { public void kickGroup(Long groupId, Long userId) {
UserSession session = SessionContext.getSession(); UserSession session = SessionContext.getSession();
Group group = this.getById(groupId); Group group = this.getById(groupId);
if(group.getOwnerId() != session.getUserId()){ if(group.getOwnerId().equals(session.getUserId()) ){
throw new GlobalException(ResultCode.PROGRAM_ERROR,"您不是群主,没有权限踢人"); throw new GlobalException(ResultCode.PROGRAM_ERROR,"您不是群主,没有权限踢人");
} }
if(userId == session.getUserId()){ if(userId.equals(session.getUserId())){
throw new GlobalException(ResultCode.PROGRAM_ERROR,"亲,不能自己踢自己哟"); throw new GlobalException(ResultCode.PROGRAM_ERROR,"亲,不能自己踢自己哟");
} }
// 删除群聊成员 // 删除群聊成员
@ -204,7 +204,7 @@ public class GroupServiceImpl extends ServiceImpl<GroupMapper, Group> implements
*/ */
@Cacheable(value = "#groupId") @Cacheable(value = "#groupId")
@Override @Override
public Group GetById(Long groupId){ public Group getById(Long groupId){
Group group = super.getById(groupId); Group group = super.getById(groupId);
if(group == null){ if(group == null){
throw new GlobalException(ResultCode.PROGRAM_ERROR,"群组不存在"); throw new GlobalException(ResultCode.PROGRAM_ERROR,"群组不存在");
@ -276,7 +276,7 @@ public class GroupServiceImpl extends ServiceImpl<GroupMapper, Group> implements
// 批量保存成员数据 // 批量保存成员数据
List<GroupMember> groupMembers = friendsList.stream() List<GroupMember> groupMembers = friendsList.stream()
.map(f -> { .map(f -> {
Optional<GroupMember> optional = members.stream().filter(m->m.getUserId()==f.getFriendId()).findFirst(); Optional<GroupMember> optional = members.stream().filter(m->m.getUserId().equals(f.getFriendId())).findFirst();
GroupMember groupMember = optional.isPresent()? optional.get():new GroupMember(); GroupMember groupMember = optional.isPresent()? optional.get():new GroupMember();
groupMember.setGroupId(vo.getGroupId()); groupMember.setGroupId(vo.getGroupId());
groupMember.setUserId(f.getFriendId()); groupMember.setUserId(f.getFriendId());

18
im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java

@ -36,8 +36,7 @@ public class PrivateMessageServiceImpl extends ServiceImpl<PrivateMessageMapper,
@Autowired @Autowired
private IFriendService friendService; private IFriendService friendService;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired @Autowired
private IMClient imClient; private IMClient imClient;
/** /**
@ -61,7 +60,7 @@ public class PrivateMessageServiceImpl extends ServiceImpl<PrivateMessageMapper,
this.save(msg); this.save(msg);
// 推送消息 // 推送消息
PrivateMessageInfo msgInfo = BeanUtils.copyProperties(msg, PrivateMessageInfo.class); PrivateMessageInfo msgInfo = BeanUtils.copyProperties(msg, PrivateMessageInfo.class);
IMPrivateMessage sendMessage = new IMPrivateMessage(); IMPrivateMessage<PrivateMessageInfo> sendMessage = new IMPrivateMessage();
sendMessage.setSendId(msgInfo.getSendId()); sendMessage.setSendId(msgInfo.getSendId());
sendMessage.setRecvId(msgInfo.getRecvId()); sendMessage.setRecvId(msgInfo.getRecvId());
sendMessage.setSendTerminal(session.getTerminal()); sendMessage.setSendTerminal(session.getTerminal());
@ -99,7 +98,7 @@ public class PrivateMessageServiceImpl extends ServiceImpl<PrivateMessageMapper,
msgInfo.setSendTime(new Date()); msgInfo.setSendTime(new Date());
msgInfo.setContent("对方撤回了一条消息"); msgInfo.setContent("对方撤回了一条消息");
IMPrivateMessage sendMessage = new IMPrivateMessage(); IMPrivateMessage<PrivateMessageInfo> sendMessage = new IMPrivateMessage();
sendMessage.setSendId(msgInfo.getSendId()); sendMessage.setSendId(msgInfo.getSendId());
sendMessage.setRecvId(msgInfo.getRecvId()); sendMessage.setRecvId(msgInfo.getRecvId());
sendMessage.setSendTerminal(session.getTerminal()); sendMessage.setSendTerminal(session.getTerminal());
@ -135,10 +134,7 @@ public class PrivateMessageServiceImpl extends ServiceImpl<PrivateMessageMapper,
.last("limit " + stIdx + "," + size); .last("limit " + stIdx + "," + size);
List<PrivateMessage> messages = this.list(wrapper); List<PrivateMessage> messages = this.list(wrapper);
List<PrivateMessageInfo> messageInfos = messages.stream().map(m -> { List<PrivateMessageInfo> messageInfos = messages.stream().map(m -> BeanUtils.copyProperties(m, PrivateMessageInfo.class)).collect(Collectors.toList());
PrivateMessageInfo info = BeanUtils.copyProperties(m, PrivateMessageInfo.class);
return info;
}).collect(Collectors.toList());
log.info("拉取聊天记录,用户id:{},好友id:{},数量:{}", userId, friendId, messageInfos.size()); log.info("拉取聊天记录,用户id:{},好友id:{},数量:{}", userId, friendId, messageInfos.size());
return messageInfos; return messageInfos;
} }
@ -146,7 +142,6 @@ public class PrivateMessageServiceImpl extends ServiceImpl<PrivateMessageMapper,
/** /**
* 异步拉取私聊消息通过websocket异步推送 * 异步拉取私聊消息通过websocket异步推送
* *
* @return
*/ */
@Override @Override
public void pullUnreadMessage() { public void pullUnreadMessage() {
@ -162,10 +157,7 @@ public class PrivateMessageServiceImpl extends ServiceImpl<PrivateMessageMapper,
List<PrivateMessage> messages = this.list(queryWrapper); List<PrivateMessage> messages = this.list(queryWrapper);
// 上传至redis,等待推送 // 上传至redis,等待推送
if (!messages.isEmpty()) { if (!messages.isEmpty()) {
List<PrivateMessageInfo> messageInfos = messages.stream().map(m -> { List<PrivateMessageInfo> messageInfos = messages.stream().map(m -> BeanUtils.copyProperties(m, PrivateMessageInfo.class)).collect(Collectors.toList());
PrivateMessageInfo msgInfo = BeanUtils.copyProperties(m, PrivateMessageInfo.class);
return msgInfo;
}).collect(Collectors.toList());
// 推送消息 // 推送消息
IMPrivateMessage<PrivateMessageInfo> sendMessage = new IMPrivateMessage(); IMPrivateMessage<PrivateMessageInfo> sendMessage = new IMPrivateMessage();
sendMessage.setRecvId(userId); sendMessage.setRecvId(userId);

1083
im-platform/src/main/java/com/bx/implatform/util/DateTimeUtils.java

File diff suppressed because it is too large

5
im-server/src/main/java/com/bx/imserver/IMServerApp.java

@ -13,14 +13,11 @@ import org.springframework.scheduling.annotation.EnableScheduling;
@EnableScheduling @EnableScheduling
@ComponentScan(basePackages={"com.bx"}) @ComponentScan(basePackages={"com.bx"})
@SpringBootApplication @SpringBootApplication
public class IMServerApp implements CommandLineRunner { public class IMServerApp {
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(IMServerApp.class,args); SpringApplication.run(IMServerApp.class,args);
} }
public void run(String... args) throws Exception {
}
} }

2
im-server/src/main/java/com/bx/imserver/constant/ChannelAttrKey.java

@ -7,6 +7,6 @@ public class ChannelAttrKey {
// 终端类型 // 终端类型
public static final String TERMINAL_TYPE = "TERMINAL_TYPE"; public static final String TERMINAL_TYPE = "TERMINAL_TYPE";
// 心跳次数 // 心跳次数
public static final String HEARTBEAt_TIMES = "HEARTBEAt_TIMES"; public static final String HEARTBEAT_TIMES = "HEARTBEAt_TIMES";
} }

4
im-server/src/main/java/com/bx/imserver/netty/IMChannelHandler.java

@ -4,7 +4,7 @@ import com.bx.imcommon.contant.RedisKey;
import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.enums.IMCmdType;
import com.bx.imcommon.model.IMSendInfo; import com.bx.imcommon.model.IMSendInfo;
import com.bx.imserver.constant.ChannelAttrKey; import com.bx.imserver.constant.ChannelAttrKey;
import com.bx.imserver.netty.processor.MessageProcessor; import com.bx.imserver.netty.processor.AbstractMessageProcessor;
import com.bx.imserver.netty.processor.ProcessorFactory; import com.bx.imserver.netty.processor.ProcessorFactory;
import com.bx.imserver.util.SpringContextHolder; import com.bx.imserver.util.SpringContextHolder;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -34,7 +34,7 @@ public class IMChannelHandler extends SimpleChannelInboundHandler<IMSendInfo> {
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, IMSendInfo sendInfo) throws Exception { protected void channelRead0(ChannelHandlerContext ctx, IMSendInfo sendInfo) throws Exception {
// 创建处理器进行处理 // 创建处理器进行处理
MessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.fromCode(sendInfo.getCmd())); AbstractMessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.fromCode(sendInfo.getCmd()));
processor.process(ctx,processor.transForm(sendInfo.getData())); processor.process(ctx,processor.transForm(sendInfo.getData()));
} }

2
im-server/src/main/java/com/bx/imserver/netty/processor/MessageProcessor.java → im-server/src/main/java/com/bx/imserver/netty/processor/AbstractMessageProcessor.java

@ -3,7 +3,7 @@ package com.bx.imserver.netty.processor;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
public abstract class MessageProcessor<T> { public abstract class AbstractMessageProcessor<T> {
public void process(ChannelHandlerContext ctx,T data){} public void process(ChannelHandlerContext ctx,T data){}

3
im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java

@ -3,7 +3,6 @@ package com.bx.imserver.netty.processor;
import com.bx.imcommon.contant.RedisKey; import com.bx.imcommon.contant.RedisKey;
import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.enums.IMCmdType;
import com.bx.imcommon.enums.IMSendCode; import com.bx.imcommon.enums.IMSendCode;
import com.bx.imcommon.model.GroupMessageInfo;
import com.bx.imcommon.model.IMRecvInfo; import com.bx.imcommon.model.IMRecvInfo;
import com.bx.imcommon.model.IMSendInfo; import com.bx.imcommon.model.IMSendInfo;
import com.bx.imcommon.model.SendResult; import com.bx.imcommon.model.SendResult;
@ -19,7 +18,7 @@ import java.util.List;
@Slf4j @Slf4j
@Component @Component
public class GroupMessageProcessor extends MessageProcessor<IMRecvInfo> { public class GroupMessageProcessor extends AbstractMessageProcessor<IMRecvInfo> {
@Autowired @Autowired
private RedisTemplate<String,Object> redisTemplate; private RedisTemplate<String,Object> redisTemplate;

6
im-server/src/main/java/com/bx/imserver/netty/processor/HeartbeatProcessor.java

@ -20,11 +20,11 @@ import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
@Component @Component
public class HeartbeatProcessor extends MessageProcessor<HeartbeatInfo> { public class HeartbeatProcessor extends AbstractMessageProcessor<HeartbeatInfo> {
@Autowired @Autowired
private WebSocketServer WSServer; private WebSocketServer wsServer;
@Autowired @Autowired
RedisTemplate<String,Object> redisTemplate; RedisTemplate<String,Object> redisTemplate;
@ -37,7 +37,7 @@ public class HeartbeatProcessor extends MessageProcessor<HeartbeatInfo> {
ctx.channel().writeAndFlush(sendInfo); ctx.channel().writeAndFlush(sendInfo);
// 设置属性 // 设置属性
AttributeKey<Long> heartBeatAttr = AttributeKey.valueOf(ChannelAttrKey.HEARTBEAt_TIMES); AttributeKey<Long> heartBeatAttr = AttributeKey.valueOf(ChannelAttrKey.HEARTBEAT_TIMES);
Long heartbeatTimes = ctx.channel().attr(heartBeatAttr).get(); Long heartbeatTimes = ctx.channel().attr(heartBeatAttr).get();
ctx.channel().attr(heartBeatAttr).set(++heartbeatTimes); ctx.channel().attr(heartBeatAttr).set(++heartbeatTimes);
if(heartbeatTimes%10 == 0){ if(heartbeatTimes%10 == 0){

8
im-server/src/main/java/com/bx/imserver/netty/processor/LoginProcessor.java

@ -2,7 +2,6 @@ package com.bx.imserver.netty.processor;
import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.bean.BeanUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bx.imcommon.contant.Constant; import com.bx.imcommon.contant.Constant;
import com.bx.imcommon.contant.RedisKey; import com.bx.imcommon.contant.RedisKey;
import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.enums.IMCmdType;
@ -23,16 +22,11 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
@Component @Component
public class LoginProcessor extends MessageProcessor<LoginInfo> { public class LoginProcessor extends AbstractMessageProcessor<LoginInfo> {
@Autowired
private WebSocketServer WSServer;
@Autowired @Autowired
RedisTemplate<String,Object> redisTemplate; RedisTemplate<String,Object> redisTemplate;

5
im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java

@ -5,7 +5,6 @@ import com.bx.imcommon.enums.IMCmdType;
import com.bx.imcommon.enums.IMSendCode; import com.bx.imcommon.enums.IMSendCode;
import com.bx.imcommon.model.IMRecvInfo; import com.bx.imcommon.model.IMRecvInfo;
import com.bx.imcommon.model.IMSendInfo; import com.bx.imcommon.model.IMSendInfo;
import com.bx.imcommon.model.PrivateMessageInfo;
import com.bx.imcommon.model.SendResult; import com.bx.imcommon.model.SendResult;
import com.bx.imserver.netty.UserChannelCtxMap; import com.bx.imserver.netty.UserChannelCtxMap;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -14,11 +13,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Map;
@Slf4j @Slf4j
@Component @Component
public class PrivateMessageProcessor extends MessageProcessor<IMRecvInfo> { public class PrivateMessageProcessor extends AbstractMessageProcessor<IMRecvInfo> {
@Autowired @Autowired
private RedisTemplate<String,Object> redisTemplate; private RedisTemplate<String,Object> redisTemplate;

12
im-server/src/main/java/com/bx/imserver/netty/processor/ProcessorFactory.java

@ -5,20 +5,20 @@ import com.bx.imserver.util.SpringContextHolder;
public class ProcessorFactory { public class ProcessorFactory {
public static MessageProcessor createProcessor(IMCmdType cmd){ public static AbstractMessageProcessor createProcessor(IMCmdType cmd){
MessageProcessor processor = null; AbstractMessageProcessor processor = null;
switch (cmd){ switch (cmd){
case LOGIN: case LOGIN:
processor = (MessageProcessor) SpringContextHolder.getApplicationContext().getBean(LoginProcessor.class); processor = (AbstractMessageProcessor) SpringContextHolder.getApplicationContext().getBean(LoginProcessor.class);
break; break;
case HEART_BEAT: case HEART_BEAT:
processor = (MessageProcessor) SpringContextHolder.getApplicationContext().getBean(HeartbeatProcessor.class); processor = (AbstractMessageProcessor) SpringContextHolder.getApplicationContext().getBean(HeartbeatProcessor.class);
break; break;
case PRIVATE_MESSAGE: case PRIVATE_MESSAGE:
processor = (MessageProcessor)SpringContextHolder.getApplicationContext().getBean(PrivateMessageProcessor.class); processor = (AbstractMessageProcessor)SpringContextHolder.getApplicationContext().getBean(PrivateMessageProcessor.class);
break; break;
case GROUP_MESSAGE: case GROUP_MESSAGE:
processor = (MessageProcessor)SpringContextHolder.getApplicationContext().getBean(GroupMessageProcessor.class); processor = (AbstractMessageProcessor)SpringContextHolder.getApplicationContext().getBean(GroupMessageProcessor.class);
break; break;
default: default:
break; break;

1
im-server/src/main/java/com/bx/imserver/netty/tcp/endecode/MessageProtocolDecoder.java

@ -13,6 +13,7 @@ import java.util.List;
@Slf4j @Slf4j
public class MessageProtocolDecoder extends ReplayingDecoder { public class MessageProtocolDecoder extends ReplayingDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
if(byteBuf.readableBytes()< 4){ if(byteBuf.readableBytes()< 4){
return; return;

27
im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java

@ -7,11 +7,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import java.util.concurrent.ExecutorService; import java.util.concurrent.*;
import java.util.concurrent.Executors;
@Slf4j @Slf4j
public abstract class AbstractPullMessageTask{ public abstract class AbstractPullMessageTask {
private int threadNum = 1; private int threadNum = 1;
private ExecutorService executorService; private ExecutorService executorService;
@ -19,34 +18,34 @@ public abstract class AbstractPullMessageTask{
@Autowired @Autowired
private IMServerGroup serverGroup; private IMServerGroup serverGroup;
public AbstractPullMessageTask(){ public AbstractPullMessageTask() {
this.threadNum = 1; this.threadNum = 1;
} }
public AbstractPullMessageTask(int threadNum){ public AbstractPullMessageTask(int threadNum) {
this.threadNum = threadNum; this.threadNum = threadNum;
} }
@PostConstruct @PostConstruct
public void init(){ public void init() {
// 初始化定时器 // 初始化定时器
executorService = Executors.newFixedThreadPool(threadNum); executorService = Executors.newFixedThreadPool(threadNum);
for(int i=0;i<threadNum;i++){ for (int i = 0; i < threadNum; i++) {
executorService.execute(new Runnable() { executorService.execute(new Runnable() {
@SneakyThrows @SneakyThrows
@Override @Override
public void run() { public void run() {
try{ try {
if(serverGroup.isReady()){ if (serverGroup.isReady()) {
pullMessage(); pullMessage();
} }
Thread.sleep(100); Thread.sleep(100);
}catch (Exception e){ } catch (Exception e) {
log.error("任务调度异常",e); log.error("任务调度异常", e);
Thread.sleep(200); Thread.sleep(200);
} }
if(!executorService.isShutdown()){ if (!executorService.isShutdown()) {
executorService.execute(this); executorService.execute(this);
} }
} }
@ -55,8 +54,8 @@ public abstract class AbstractPullMessageTask{
} }
@PreDestroy @PreDestroy
public void destroy(){ public void destroy() {
log.info("{}线程任务关闭",this.getClass().getSimpleName()); log.info("{}线程任务关闭", this.getClass().getSimpleName());
executorService.shutdown(); executorService.shutdown();
} }

9
im-server/src/main/java/com/bx/imserver/task/PullUnreadGroupMessageTask.java

@ -2,12 +2,10 @@ package com.bx.imserver.task;
import com.bx.imcommon.contant.RedisKey; import com.bx.imcommon.contant.RedisKey;
import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.enums.IMCmdType;
import com.bx.imcommon.model.GroupMessageInfo;
import com.bx.imcommon.model.IMRecvInfo; import com.bx.imcommon.model.IMRecvInfo;
import com.bx.imserver.netty.IMServerGroup; import com.bx.imserver.netty.IMServerGroup;
import com.bx.imserver.netty.processor.MessageProcessor; import com.bx.imserver.netty.processor.AbstractMessageProcessor;
import com.bx.imserver.netty.processor.ProcessorFactory; import com.bx.imserver.netty.processor.ProcessorFactory;
import com.bx.imserver.netty.ws.WebSocketServer;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
@ -19,9 +17,6 @@ import java.util.List;
@Component @Component
public class PullUnreadGroupMessageTask extends AbstractPullMessageTask { public class PullUnreadGroupMessageTask extends AbstractPullMessageTask {
@Autowired
private WebSocketServer WSServer;
@Autowired @Autowired
private RedisTemplate<String,Object> redisTemplate; private RedisTemplate<String,Object> redisTemplate;
@ -33,7 +28,7 @@ public class PullUnreadGroupMessageTask extends AbstractPullMessageTask {
for(Object o: messageInfos){ for(Object o: messageInfos){
redisTemplate.opsForList().leftPop(key); redisTemplate.opsForList().leftPop(key);
IMRecvInfo recvInfo = (IMRecvInfo)o; IMRecvInfo recvInfo = (IMRecvInfo)o;
MessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.GROUP_MESSAGE); AbstractMessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.GROUP_MESSAGE);
processor.process(recvInfo); processor.process(recvInfo);
} }
} }

8
im-server/src/main/java/com/bx/imserver/task/PullUnreadPrivateMessageTask.java

@ -4,11 +4,9 @@ package com.bx.imserver.task;
import com.bx.imcommon.contant.RedisKey; import com.bx.imcommon.contant.RedisKey;
import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.enums.IMCmdType;
import com.bx.imcommon.model.IMRecvInfo; import com.bx.imcommon.model.IMRecvInfo;
import com.bx.imcommon.model.PrivateMessageInfo;
import com.bx.imserver.netty.IMServerGroup; import com.bx.imserver.netty.IMServerGroup;
import com.bx.imserver.netty.processor.MessageProcessor; import com.bx.imserver.netty.processor.AbstractMessageProcessor;
import com.bx.imserver.netty.processor.ProcessorFactory; import com.bx.imserver.netty.processor.ProcessorFactory;
import com.bx.imserver.netty.ws.WebSocketServer;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
@ -21,8 +19,6 @@ import java.util.List;
@Component @Component
public class PullUnreadPrivateMessageTask extends AbstractPullMessageTask { public class PullUnreadPrivateMessageTask extends AbstractPullMessageTask {
@Autowired
private WebSocketServer WSServer;
@Autowired @Autowired
private RedisTemplate<String,Object> redisTemplate; private RedisTemplate<String,Object> redisTemplate;
@ -35,7 +31,7 @@ public class PullUnreadPrivateMessageTask extends AbstractPullMessageTask {
for(Object o: messageInfos){ for(Object o: messageInfos){
redisTemplate.opsForList().leftPop(key); redisTemplate.opsForList().leftPop(key);
IMRecvInfo recvInfo = (IMRecvInfo)o; IMRecvInfo recvInfo = (IMRecvInfo)o;
MessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.PRIVATE_MESSAGE); AbstractMessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.PRIVATE_MESSAGE);
processor.process(recvInfo); processor.process(recvInfo);
} }

Loading…
Cancel
Save