From 9f28d8c891391473caf59d11553597ad80582dac Mon Sep 17 00:00:00 2001 From: xsx <825657193@qq.com> Date: Tue, 16 Jul 2024 22:58:49 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20im-client=E5=A2=9E=E5=8A=A0=E5=8F=91?= =?UTF-8?q?=E9=80=81=E7=B3=BB=E7=BB=9F=E6=B6=88=E6=81=AFapi?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/com/bx/imclient/IMClient.java | 11 ++++ .../java/com/bx/imclient/sender/IMSender.java | 59 ++++++++++++++++- .../task/SystemMessageResultResultTask.java | 43 ++++++++++++ .../com/bx/imcommon/contant/IMRedisKey.java | 14 +++- .../java/com/bx/imcommon/enums/IMCmdType.java | 6 +- .../com/bx/imcommon/enums/IMListenerType.java | 8 ++- .../bx/imcommon/model/IMPrivateMessage.java | 2 +- .../com/bx/imcommon/model/IMSendResult.java | 2 +- .../bx/imcommon/model/IMSystemMessage.java | 34 ++++++++++ .../com/bx/imcommon/mq/RedisMQConsumer.java | 6 ++ .../com/bx/imcommon/mq/RedisMQPullTask.java | 8 +-- .../listener/SystemMessageListener.java | 36 ++++++++++ .../task/UserBannedConsumerTask.java | 29 +++------ .../com/bx/implatform/vo/SystemMessageVO.java | 27 ++++++++ .../processor/GroupMessageProcessor.java | 4 +- .../processor/PrivateMessageProcessor.java | 6 +- .../netty/processor/ProcessorFactory.java | 3 + .../processor/SystemMessageProcessor.java | 65 +++++++++++++++++++ .../imserver/task/PullGroupMessageTask.java | 2 +- .../imserver/task/PullPrivateMessageTask.java | 2 +- .../imserver/task/PullSystemMessageTask.java | 34 ++++++++++ 21 files changed, 359 insertions(+), 42 deletions(-) create mode 100644 im-client/src/main/java/com/bx/imclient/task/SystemMessageResultResultTask.java create mode 100644 im-commom/src/main/java/com/bx/imcommon/model/IMSystemMessage.java create mode 100644 im-platform/src/main/java/com/bx/implatform/listener/SystemMessageListener.java create mode 100644 im-platform/src/main/java/com/bx/implatform/vo/SystemMessageVO.java create mode 100644 im-server/src/main/java/com/bx/imserver/netty/processor/SystemMessageProcessor.java create mode 100644 im-server/src/main/java/com/bx/imserver/task/PullSystemMessageTask.java diff --git a/im-client/src/main/java/com/bx/imclient/IMClient.java b/im-client/src/main/java/com/bx/imclient/IMClient.java index 7a476b1..e13518e 100644 --- a/im-client/src/main/java/com/bx/imclient/IMClient.java +++ b/im-client/src/main/java/com/bx/imclient/IMClient.java @@ -4,6 +4,7 @@ import com.bx.imclient.sender.IMSender; import com.bx.imcommon.enums.IMTerminalType; import com.bx.imcommon.model.IMGroupMessage; import com.bx.imcommon.model.IMPrivateMessage; +import com.bx.imcommon.model.IMSystemMessage; import lombok.AllArgsConstructor; import org.springframework.context.annotation.Configuration; @@ -46,6 +47,16 @@ public class IMClient { return imSender.getOnlineTerminal(userIds); } + /** + * 发送系统消息(发送结果通过MessageListener接收) + * + * @param message 私有消息 + */ + public void sendSystemMessage(IMSystemMessage message){ + imSender.sendSystemMessage(message); + } + + /** * 发送私聊消息(发送结果通过MessageListener接收) * diff --git a/im-client/src/main/java/com/bx/imclient/sender/IMSender.java b/im-client/src/main/java/com/bx/imclient/sender/IMSender.java index 2a4fb88..61d0a4d 100644 --- a/im-client/src/main/java/com/bx/imclient/sender/IMSender.java +++ b/im-client/src/main/java/com/bx/imclient/sender/IMSender.java @@ -28,6 +28,59 @@ public class IMSender { private final MessageListenerMulticaster listenerMulticaster; + + public void sendSystemMessage(IMSystemMessage message){ + // 根据群聊每个成员所连的IM-server,进行分组 + Map sendMap = new HashMap<>(); + for (Integer terminal : message.getRecvTerminals()) { + message.getRecvIds().forEach(id -> { + String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, id.toString(), terminal.toString()); + sendMap.put(key,new IMUserInfo(id, terminal)); + }); + } + // 批量拉取 + List serverIds = redisMQTemplate.opsForValue().multiGet(sendMap.keySet()); + // 格式:map<服务器id,list<接收方>> + Map> serverMap = new HashMap<>(); + List offLineUsers = new LinkedList<>(); + int idx = 0; + for (Map.Entry entry : sendMap.entrySet()) { + Integer serverId = (Integer)serverIds.get(idx++); + if (!Objects.isNull(serverId)) { + List list = serverMap.computeIfAbsent(serverId, o -> new LinkedList<>()); + list.add(entry.getValue()); + } else { + // 加入离线列表 + offLineUsers.add(entry.getValue()); + } + } + // 逐个server发送 + for (Map.Entry> entry : serverMap.entrySet()) { + IMRecvInfo recvInfo = new IMRecvInfo(); + recvInfo.setCmd(IMCmdType.SYSTEM_MESSAGE.code()); + recvInfo.setReceivers(new LinkedList<>(entry.getValue())); + recvInfo.setServiceName(appName); + recvInfo.setSendResult(message.getSendResult()); + recvInfo.setData(message.getData()); + // 推送至队列 + String key = String.join(":", IMRedisKey.IM_MESSAGE_SYSTEM_QUEUE, entry.getKey().toString()); + redisMQTemplate.opsForList().rightPush(key, recvInfo); + } + // 对离线用户回复消息状态 + if(message.getSendResult() && !offLineUsers.isEmpty()){ + List results = new LinkedList<>(); + for (IMUserInfo offLineUser : offLineUsers) { + IMSendResult result = new IMSendResult(); + result.setReceiver(offLineUser); + result.setCode(IMSendCode.NOT_ONLINE.code()); + result.setData(message.getData()); + results.add(result); + } + listenerMulticaster.multicast(IMListenerType.SYSTEM_MESSAGE, results); + } + } + + public void sendPrivateMessage(IMPrivateMessage message) { List results = new LinkedList<>(); if(!Objects.isNull(message.getRecvId())){ @@ -84,10 +137,10 @@ public class IMSender { if(message.getSendResult() && !results.isEmpty()){ listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, results); } + } public void sendGroupMessage(IMGroupMessage message) { - // 根据群聊每个成员所连的IM-server,进行分组 Map sendMap = new HashMap<>(); for (Integer terminal : message.getRecvTerminals()) { @@ -104,7 +157,7 @@ public class IMSender { int idx = 0; for (Map.Entry entry : sendMap.entrySet()) { Integer serverId = (Integer)serverIds.get(idx++); - if (serverId != null) { + if (!Objects.isNull(serverId)) { List list = serverMap.computeIfAbsent(serverId, o -> new LinkedList<>()); list.add(entry.getValue()); } else { @@ -136,7 +189,7 @@ public class IMSender { String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, message.getSender().getId().toString(), terminal.toString()); Integer serverId = (Integer)redisMQTemplate.opsForValue().get(key); // 如果终端在线,将数据存储至redis,等待拉取推送 - if (serverId != null) { + if (!Objects.isNull(serverId)) { IMRecvInfo recvInfo = new IMRecvInfo(); recvInfo.setCmd(IMCmdType.GROUP_MESSAGE.code()); recvInfo.setSender(message.getSender()); diff --git a/im-client/src/main/java/com/bx/imclient/task/SystemMessageResultResultTask.java b/im-client/src/main/java/com/bx/imclient/task/SystemMessageResultResultTask.java new file mode 100644 index 0000000..7bd3491 --- /dev/null +++ b/im-client/src/main/java/com/bx/imclient/task/SystemMessageResultResultTask.java @@ -0,0 +1,43 @@ +package com.bx.imclient.task; + +import cn.hutool.core.util.StrUtil; +import com.alibaba.fastjson.JSONObject; +import com.bx.imclient.listener.MessageListenerMulticaster; +import com.bx.imcommon.contant.IMRedisKey; +import com.bx.imcommon.enums.IMListenerType; +import com.bx.imcommon.model.IMSendResult; +import com.bx.imcommon.mq.RedisMQConsumer; +import com.bx.imcommon.mq.RedisMQListener; +import com.bx.imcommon.mq.RedisMQTemplate; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; + +@Slf4j +@Component +@RequiredArgsConstructor +@RedisMQListener(queue = IMRedisKey.IM_RESULT_SYSTEM_QUEUE, batchSize = 100, period = 100) +public class SystemMessageResultResultTask extends RedisMQConsumer { + + @Value("${spring.application.name}") + private String appName; + + private final MessageListenerMulticaster listenerMulticaster; + + @Override + public void onMessage(List results) { + listenerMulticaster.multicast(IMListenerType.SYSTEM_MESSAGE, results); + } + + @Override + public String generateKey() { + return StrUtil.join(":", IMRedisKey.IM_RESULT_SYSTEM_QUEUE, appName); + } + +} diff --git a/im-commom/src/main/java/com/bx/imcommon/contant/IMRedisKey.java b/im-commom/src/main/java/com/bx/imcommon/contant/IMRedisKey.java index eccbac8..a7a8bf3 100644 --- a/im-commom/src/main/java/com/bx/imcommon/contant/IMRedisKey.java +++ b/im-commom/src/main/java/com/bx/imcommon/contant/IMRedisKey.java @@ -2,7 +2,6 @@ package com.bx.imcommon.contant; public final class IMRedisKey { - private IMRedisKey() {} /** * im-server最大id,从0开始递增 @@ -13,13 +12,22 @@ public final class IMRedisKey { */ public static final String IM_USER_SERVER_ID = "im:user:server_id"; /** - * 未读私聊消息队列 + * 系统消息队列 + */ + public static final String IM_MESSAGE_SYSTEM_QUEUE = "im:message:system"; + /** + * 私聊消息队列 */ public static final String IM_MESSAGE_PRIVATE_QUEUE = "im:message:private"; /** - * 未读群聊消息队列 + * 群聊消息队列 */ public static final String IM_MESSAGE_GROUP_QUEUE = "im:message:group"; + + /** + * 系统消息发送结果队列 + */ + public static final String IM_RESULT_SYSTEM_QUEUE = "im:result:system"; /** * 私聊消息发送结果队列 */ diff --git a/im-commom/src/main/java/com/bx/imcommon/enums/IMCmdType.java b/im-commom/src/main/java/com/bx/imcommon/enums/IMCmdType.java index c302aa1..a6731ec 100644 --- a/im-commom/src/main/java/com/bx/imcommon/enums/IMCmdType.java +++ b/im-commom/src/main/java/com/bx/imcommon/enums/IMCmdType.java @@ -24,7 +24,11 @@ public enum IMCmdType { /** * 群发消息 */ - GROUP_MESSAGE(4, "群发消息"); + GROUP_MESSAGE(4, "群发消息"), + /** + * 系统消息 + */ + SYSTEM_MESSAGE(5,"系统消息"); private final Integer code; diff --git a/im-commom/src/main/java/com/bx/imcommon/enums/IMListenerType.java b/im-commom/src/main/java/com/bx/imcommon/enums/IMListenerType.java index 0448d2d..5dbb1b2 100644 --- a/im-commom/src/main/java/com/bx/imcommon/enums/IMListenerType.java +++ b/im-commom/src/main/java/com/bx/imcommon/enums/IMListenerType.java @@ -15,7 +15,13 @@ public enum IMListenerType { /** * 群聊消息 */ - GROUP_MESSAGE(2, "群聊消息"); + GROUP_MESSAGE(2, "群聊消息"), + /** + * 系统消息 + */ + SYSTEM_MESSAGE(3, "群聊消息"); + + private final Integer code; diff --git a/im-commom/src/main/java/com/bx/imcommon/model/IMPrivateMessage.java b/im-commom/src/main/java/com/bx/imcommon/model/IMPrivateMessage.java index 206c882..e33b540 100644 --- a/im-commom/src/main/java/com/bx/imcommon/model/IMPrivateMessage.java +++ b/im-commom/src/main/java/com/bx/imcommon/model/IMPrivateMessage.java @@ -26,7 +26,7 @@ public class IMPrivateMessage { private List recvTerminals = IMTerminalType.codes(); /** - * 是否发送给自己的其他终端,默认true + * 是否同步消息给自己的其他终端,默认true */ private Boolean sendToSelf = true; diff --git a/im-commom/src/main/java/com/bx/imcommon/model/IMSendResult.java b/im-commom/src/main/java/com/bx/imcommon/model/IMSendResult.java index c0354b9..9f4688c 100644 --- a/im-commom/src/main/java/com/bx/imcommon/model/IMSendResult.java +++ b/im-commom/src/main/java/com/bx/imcommon/model/IMSendResult.java @@ -16,7 +16,7 @@ public class IMSendResult { private IMUserInfo receiver; /** - * 发送状态 IMCmdType + * 发送状态编码 IMSendCode */ private Integer code; diff --git a/im-commom/src/main/java/com/bx/imcommon/model/IMSystemMessage.java b/im-commom/src/main/java/com/bx/imcommon/model/IMSystemMessage.java new file mode 100644 index 0000000..2241cb5 --- /dev/null +++ b/im-commom/src/main/java/com/bx/imcommon/model/IMSystemMessage.java @@ -0,0 +1,34 @@ +package com.bx.imcommon.model; + +import com.bx.imcommon.enums.IMTerminalType; +import lombok.Data; + +import java.util.LinkedList; +import java.util.List; + +@Data +public class IMSystemMessage { + + + /** + * 接收者id列表,为空表示向所有在线用户广播 + */ + private List recvIds = new LinkedList<>(); + + /** + * 接收者终端类型,默认全部 + */ + private List recvTerminals = IMTerminalType.codes(); + + /** + * 是否需要回推发送结果,默认true + */ + private Boolean sendResult = true; + + /** + * 消息内容 + */ + private T data; + + +} diff --git a/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQConsumer.java b/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQConsumer.java index b9dd6bc..e3ec88d 100644 --- a/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQConsumer.java +++ b/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQConsumer.java @@ -10,4 +10,10 @@ public abstract class RedisMQConsumer { public void onMessage(T data){} public void onMessage(List datas){} + + public String generateKey(){ + // 默认队列名就是redis的key + RedisMQListener annotation = this.getClass().getAnnotation(RedisMQListener.class); + return annotation.queue(); + } } diff --git a/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQPullTask.java b/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQPullTask.java index e0a3700..e53ae27 100644 --- a/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQPullTask.java +++ b/im-commom/src/main/java/com/bx/imcommon/mq/RedisMQPullTask.java @@ -1,18 +1,12 @@ package com.bx.imcommon.mq; import com.alibaba.fastjson.JSONObject; - import com.bx.imcommon.util.ThreadPoolExecutorFactory; -import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; -import org.springframework.data.redis.connection.RedisConnection; -import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; - import javax.annotation.PreDestroy; -import javax.annotation.Resource; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.util.*; @@ -44,7 +38,7 @@ public class RedisMQPullTask implements CommandLineRunner { consumers.forEach((consumer -> { // 注解参数 RedisMQListener annotation = consumer.getClass().getAnnotation(RedisMQListener.class); - String key = annotation.queue(); + String key = consumer.generateKey(); int batchSize = annotation.batchSize(); int period = annotation.period(); // 获取泛型类型 diff --git a/im-platform/src/main/java/com/bx/implatform/listener/SystemMessageListener.java b/im-platform/src/main/java/com/bx/implatform/listener/SystemMessageListener.java new file mode 100644 index 0000000..2f22099 --- /dev/null +++ b/im-platform/src/main/java/com/bx/implatform/listener/SystemMessageListener.java @@ -0,0 +1,36 @@ +package com.bx.implatform.listener; + +import cn.hutool.core.collection.CollUtil; +import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; +import com.bx.imclient.annotation.IMListener; +import com.bx.imclient.listener.MessageListener; +import com.bx.imcommon.enums.IMListenerType; +import com.bx.imcommon.enums.IMSendCode; +import com.bx.imcommon.model.IMSendResult; +import com.bx.implatform.entity.PrivateMessage; +import com.bx.implatform.enums.MessageStatus; +import com.bx.implatform.service.IPrivateMessageService; +import com.bx.implatform.vo.PrivateMessageVO; +import com.bx.implatform.vo.SystemMessageVO; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@Slf4j +@IMListener(type = IMListenerType.SYSTEM_MESSAGE) +public class SystemMessageListener implements MessageListener { + + @Override + public void process(List> results) { + for(IMSendResult result : results){ + SystemMessageVO messageInfo = result.getData(); + if (result.getCode().equals(IMSendCode.SUCCESS.code())) { + log.info("消息送达,消息id:{},接收者:{},终端:{}", messageInfo.getId(), result.getReceiver().getId(), result.getReceiver().getTerminal()); + } + } + } +} diff --git a/im-platform/src/main/java/com/bx/implatform/task/UserBannedConsumerTask.java b/im-platform/src/main/java/com/bx/implatform/task/UserBannedConsumerTask.java index 31f61b8..e112f9d 100644 --- a/im-platform/src/main/java/com/bx/implatform/task/UserBannedConsumerTask.java +++ b/im-platform/src/main/java/com/bx/implatform/task/UserBannedConsumerTask.java @@ -1,21 +1,17 @@ package com.bx.implatform.task; import com.bx.imclient.IMClient; -import com.bx.imcommon.enums.IMTerminalType; -import com.bx.imcommon.model.IMPrivateMessage; -import com.bx.imcommon.model.IMUserInfo; +import com.bx.imcommon.model.IMSystemMessage; import com.bx.imcommon.mq.RedisMQConsumer; import com.bx.imcommon.mq.RedisMQListener; -import com.bx.implatform.contant.Constant; import com.bx.implatform.contant.RedisKey; import com.bx.implatform.dto.UserBanDTO; -import com.bx.implatform.service.IUserService; -import com.bx.implatform.util.BeanUtils; -import com.bx.implatform.vo.PrivateMessageVO; +import com.bx.implatform.enums.MessageType; +import com.bx.implatform.vo.SystemMessageVO; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.security.core.parameters.P; import org.springframework.stereotype.Component; +import java.util.Collections; /** * @author: 谢绍许 @@ -31,20 +27,15 @@ public class UserBannedConsumerTask extends RedisMQConsumer { private final IMClient imClient; @Override public void onMessage(UserBanDTO dto) { - log.info("用户被封禁处理,userId:{},原因:{}",dto.getId(),dto.getReason()); - // 推送消息 - - PrivateMessageVO msgInfo = new PrivateMessageVO(); - msgInfo.setRecvId(dto.getId()); - msgInfo.setSendId(Constant.SYS_USER_ID); + // 推送消息将用户赶下线 + SystemMessageVO msgInfo = new SystemMessageVO(); + msgInfo.setType(MessageType.USER_BANNED.code()); msgInfo.setContent(dto.getReason()); - IMPrivateMessage sendMessage = new IMPrivateMessage<>(); - sendMessage.setSender(new IMUserInfo(Constant.SYS_USER_ID, IMTerminalType.WEB.code())); - sendMessage.setRecvId(dto.getId()); - sendMessage.setSendToSelf(false); + IMSystemMessage sendMessage = new IMSystemMessage<>(); + sendMessage.setRecvIds(Collections.singletonList(dto.getId())); sendMessage.setData(msgInfo); sendMessage.setSendResult(true); - imClient.sendPrivateMessage(sendMessage); + imClient.sendSystemMessage(sendMessage); } } diff --git a/im-platform/src/main/java/com/bx/implatform/vo/SystemMessageVO.java b/im-platform/src/main/java/com/bx/implatform/vo/SystemMessageVO.java new file mode 100644 index 0000000..692b6e1 --- /dev/null +++ b/im-platform/src/main/java/com/bx/implatform/vo/SystemMessageVO.java @@ -0,0 +1,27 @@ +package com.bx.implatform.vo; + +import com.bx.imcommon.serializer.DateToLongSerializer; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.util.Date; + +@Data +@ApiModel("系统消息VO") +public class SystemMessageVO { + + @ApiModelProperty(value = " 消息id") + private Long id; + + @ApiModelProperty(value = " 发送内容") + private String content; + + @ApiModelProperty(value = "消息内容类型 MessageType") + private Integer type; + + @ApiModelProperty(value = " 发送时间") + @JsonSerialize(using = DateToLongSerializer.class) + private Date sendTime; +} diff --git a/im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java b/im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java index eb75f17..6f62d91 100644 --- a/im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java +++ b/im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java @@ -13,10 +13,10 @@ import io.netty.channel.ChannelHandlerContext; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import java.util.List; +import java.util.Objects; @Slf4j @Component @@ -33,7 +33,7 @@ public class GroupMessageProcessor extends AbstractMessageProcessor for (IMUserInfo receiver : receivers) { try { ChannelHandlerContext channelCtx = UserChannelCtxMap.getChannelCtx(receiver.getId(), receiver.getTerminal()); - if (channelCtx != null) { + if (!Objects.isNull(channelCtx)) { // 推送消息到用户 IMSendInfo sendInfo = new IMSendInfo(); sendInfo.setCmd(IMCmdType.GROUP_MESSAGE.code()); diff --git a/im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java b/im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java index 26803c5..3cae629 100644 --- a/im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java +++ b/im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java @@ -15,6 +15,8 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; +import java.util.Objects; + @Slf4j @Component @RequiredArgsConstructor @@ -26,10 +28,10 @@ public class PrivateMessageProcessor extends AbstractMessageProcessor sendInfo = new IMSendInfo<>(); sendInfo.setCmd(IMCmdType.PRIVATE_MESSAGE.code()); diff --git a/im-server/src/main/java/com/bx/imserver/netty/processor/ProcessorFactory.java b/im-server/src/main/java/com/bx/imserver/netty/processor/ProcessorFactory.java index 72aeb98..117334c 100644 --- a/im-server/src/main/java/com/bx/imserver/netty/processor/ProcessorFactory.java +++ b/im-server/src/main/java/com/bx/imserver/netty/processor/ProcessorFactory.java @@ -20,6 +20,9 @@ public class ProcessorFactory { case GROUP_MESSAGE: processor = SpringContextHolder.getApplicationContext().getBean(GroupMessageProcessor.class); break; + case SYSTEM_MESSAGE: + processor = SpringContextHolder.getApplicationContext().getBean(SystemMessageProcessor.class); + break; default: break; } diff --git a/im-server/src/main/java/com/bx/imserver/netty/processor/SystemMessageProcessor.java b/im-server/src/main/java/com/bx/imserver/netty/processor/SystemMessageProcessor.java new file mode 100644 index 0000000..2020252 --- /dev/null +++ b/im-server/src/main/java/com/bx/imserver/netty/processor/SystemMessageProcessor.java @@ -0,0 +1,65 @@ +package com.bx.imserver.netty.processor; + +import cn.hutool.core.util.StrUtil; +import com.bx.imcommon.contant.IMRedisKey; +import com.bx.imcommon.enums.IMCmdType; +import com.bx.imcommon.enums.IMSendCode; +import com.bx.imcommon.model.IMRecvInfo; +import com.bx.imcommon.model.IMSendInfo; +import com.bx.imcommon.model.IMSendResult; +import com.bx.imcommon.model.IMUserInfo; +import com.bx.imserver.netty.UserChannelCtxMap; +import io.netty.channel.ChannelHandlerContext; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +import java.util.Objects; + +@Slf4j +@Component +@RequiredArgsConstructor +public class SystemMessageProcessor extends AbstractMessageProcessor { + + private final RedisTemplate redisTemplate; + + @Override + public void process(IMRecvInfo recvInfo) { + IMUserInfo receiver = recvInfo.getReceivers().get(0); + log.info("接收到系统消息,接收者:{},内容:{}", receiver.getId(), recvInfo.getData()); + try { + ChannelHandlerContext channelCtx = UserChannelCtxMap.getChannelCtx(receiver.getId(), receiver.getTerminal()); + if (!Objects.isNull(channelCtx)) { + // 推送消息到用户 + IMSendInfo sendInfo = new IMSendInfo<>(); + sendInfo.setCmd(IMCmdType.SYSTEM_MESSAGE.code()); + sendInfo.setData(recvInfo.getData()); + channelCtx.channel().writeAndFlush(sendInfo); + // 消息发送成功确认 + sendResult(recvInfo, IMSendCode.SUCCESS); + } else { + // 消息推送失败确认 + sendResult(recvInfo, IMSendCode.NOT_FIND_CHANNEL); + log.error("未找到channel,接收者:{},内容:{}",receiver.getId(), recvInfo.getData()); + } + } catch (Exception e) { + // 消息推送失败确认 + sendResult(recvInfo, IMSendCode.UNKONW_ERROR); + log.error("发送异常,,接收者:{},内容:{}", receiver.getId(), recvInfo.getData(), e); + } + + } + + private void sendResult(IMRecvInfo recvInfo, IMSendCode sendCode) { + if (recvInfo.getSendResult()) { + IMSendResult result = new IMSendResult<>(); + result.setReceiver(recvInfo.getReceivers().get(0)); + result.setCode(sendCode.code()); + result.setData(recvInfo.getData()); + // 推送到结果队列 + String key = StrUtil.join(":",IMRedisKey.IM_RESULT_SYSTEM_QUEUE,recvInfo.getServiceName()); + redisTemplate.opsForList().rightPush(key, result); + } + } +} diff --git a/im-server/src/main/java/com/bx/imserver/task/PullGroupMessageTask.java b/im-server/src/main/java/com/bx/imserver/task/PullGroupMessageTask.java index 6d8e83f..48d37b5 100644 --- a/im-server/src/main/java/com/bx/imserver/task/PullGroupMessageTask.java +++ b/im-server/src/main/java/com/bx/imserver/task/PullGroupMessageTask.java @@ -23,7 +23,7 @@ public class PullGroupMessageTask extends AbstractPullMessageTask { @Override public void pullMessage() { - // 从redis拉取未读消息 + // 从redis拉取消息 String key = String.join(":", IMRedisKey.IM_MESSAGE_GROUP_QUEUE, IMServerGroup.serverId + ""); JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); while (!Objects.isNull(jsonObject)) { diff --git a/im-server/src/main/java/com/bx/imserver/task/PullPrivateMessageTask.java b/im-server/src/main/java/com/bx/imserver/task/PullPrivateMessageTask.java index 5edd0d6..007414e 100644 --- a/im-server/src/main/java/com/bx/imserver/task/PullPrivateMessageTask.java +++ b/im-server/src/main/java/com/bx/imserver/task/PullPrivateMessageTask.java @@ -23,7 +23,7 @@ public class PullPrivateMessageTask extends AbstractPullMessageTask { @Override public void pullMessage() { - // 从redis拉取未读消息 + // 从redis拉取消息 String key = String.join(":", IMRedisKey.IM_MESSAGE_PRIVATE_QUEUE, IMServerGroup.serverId + ""); JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); while (!Objects.isNull(jsonObject)) { diff --git a/im-server/src/main/java/com/bx/imserver/task/PullSystemMessageTask.java b/im-server/src/main/java/com/bx/imserver/task/PullSystemMessageTask.java new file mode 100644 index 0000000..c4a3f6d --- /dev/null +++ b/im-server/src/main/java/com/bx/imserver/task/PullSystemMessageTask.java @@ -0,0 +1,34 @@ +package com.bx.imserver.task; + +import com.bx.imcommon.contant.IMRedisKey; +import com.bx.imcommon.enums.IMCmdType; +import com.bx.imcommon.model.IMRecvInfo; +import com.bx.imcommon.mq.RedisMQConsumer; +import com.bx.imcommon.mq.RedisMQListener; +import com.bx.imserver.netty.IMServerGroup; +import com.bx.imserver.netty.processor.AbstractMessageProcessor; +import com.bx.imserver.netty.processor.ProcessorFactory; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * @author: 谢绍许 + * @date: 2024-07-16 + * @version: 1.0 + */ +@Slf4j +@Component +@RedisMQListener(queue = IMRedisKey.IM_MESSAGE_SYSTEM_QUEUE,batchSize = 10) +public class PullSystemMessageTask extends RedisMQConsumer { + + @Override + public void onMessage(IMRecvInfo recvInfo) { + AbstractMessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.SYSTEM_MESSAGE); + processor.process(recvInfo); + } + + public String generateKey(){ + // 队列名:im:message:system:{服务id} + return String.join(":", IMRedisKey.IM_MESSAGE_SYSTEM_QUEUE, IMServerGroup.serverId + ""); + } +}