Browse Source

消息结果推送使用批量方式

master
blue 2 years ago
parent
commit
6fc3a66c58
  1. 4
      im-client/src/main/java/com/bx/imclient/listener/MessageListener.java
  2. 25
      im-client/src/main/java/com/bx/imclient/listener/MessageListenerMulticaster.java
  3. 38
      im-client/src/main/java/com/bx/imclient/sender/IMSender.java
  4. 16
      im-client/src/main/java/com/bx/imclient/task/AbstractMessageResultTask.java
  5. 33
      im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java
  6. 34
      im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java
  7. 13
      im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java
  8. 24
      im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java
  9. 1
      im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java
  10. 1
      im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java
  11. 4
      im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java
  12. 8
      im-server/src/main/java/com/bx/imserver/task/PullGroupMessageTask.java
  13. 15
      im-server/src/main/java/com/bx/imserver/task/PullPrivateMessageTask.java

4
im-client/src/main/java/com/bx/imclient/listener/MessageListener.java

@ -3,8 +3,10 @@ package com.bx.imclient.listener;
import com.bx.imcommon.model.IMSendResult;
import java.util.List;
public interface MessageListener<T> {
void process(IMSendResult<T> result);
void process(List<IMSendResult<T>> result);
}

25
im-client/src/main/java/com/bx/imclient/listener/MessageListenerMulticaster.java

@ -1,6 +1,7 @@
package com.bx.imclient.listener;
import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSONObject;
import com.bx.imclient.annotation.IMListener;
import com.bx.imcommon.enums.IMListenerType;
@ -19,19 +20,25 @@ public class MessageListenerMulticaster {
@Autowired(required = false)
private List<MessageListener> messageListeners = Collections.emptyList();
public void multicast(IMListenerType listenerType, IMSendResult result){
public void multicast(IMListenerType listenerType, List<IMSendResult> results){
if(CollUtil.isEmpty(results)){
return;
}
for(MessageListener listener:messageListeners){
IMListener annotation = listener.getClass().getAnnotation(IMListener.class);
if(annotation!=null && (annotation.type().equals(IMListenerType.ALL) || annotation.type().equals(listenerType))){
// 将data转回对象类型
if(result.getData() instanceof JSONObject){
Type superClass = listener.getClass().getGenericInterfaces()[0];
Type type = ((ParameterizedType) superClass).getActualTypeArguments()[0];
JSONObject data = (JSONObject)result.getData();
result.setData(data.toJavaObject(type));
}
results.forEach(result->{
// 将data转回对象类型
if(result.getData() instanceof JSONObject){
Type superClass = listener.getClass().getGenericInterfaces()[0];
Type type = ((ParameterizedType) superClass).getActualTypeArguments()[0];
JSONObject data = (JSONObject)result.getData();
result.setData(data.toJavaObject(type));
}
});
// 回调到调用方处理
listener.process(result);
listener.process(results);
}
}
}

38
im-client/src/main/java/com/bx/imclient/sender/IMSender.java

@ -29,6 +29,7 @@ public class IMSender {
private final MessageListenerMulticaster listenerMulticaster;
public<T> void sendPrivateMessage(IMPrivateMessage<T> message) {
List<IMSendResult> results = new LinkedList<>();
for (Integer terminal : message.getRecvTerminals()) {
// 获取对方连接的channelId
String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, message.getRecvId().toString(), terminal.toString());
@ -44,14 +45,13 @@ public class IMSender {
recvInfo.setReceivers(Collections.singletonList(new IMUserInfo(message.getRecvId(), terminal)));
recvInfo.setData(message.getData());
redisTemplate.opsForList().rightPush(sendKey, recvInfo);
} else if (message.getSendResult()) {
// 回复消息状态
} else {
IMSendResult result = new IMSendResult();
result.setSender(message.getSender());
result.setReceiver(new IMUserInfo(message.getRecvId(), terminal));
result.setCode(IMSendCode.NOT_ONLINE.code());
result.setData(message.getData());
listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, result);
results.add(result);
}
}
// 推送给自己的其他终端
@ -77,10 +77,14 @@ public class IMSender {
}
}
}
// 对离线用户回复消息状态
if(message.getSendResult() && !results.isEmpty()){
listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, results);
}
}
public<T> void sendGroupMessage(IMGroupMessage<T> message) {
// 根据群聊每个成员所连的IM-server,进行分组
Map<String, IMUserInfo> sendMap = new HashMap<>();
for (Integer terminal : message.getRecvTerminals()) {
@ -118,17 +122,7 @@ public class IMSender {
String key = String.join(":", IMRedisKey.IM_MESSAGE_GROUP_QUEUE, entry.getKey().toString());
redisTemplate.opsForList().rightPush(key, recvInfo);
}
// 对离线用户回复消息状态
if (message.getSendResult()) {
for (IMUserInfo offLineUser : offLineUsers) {
IMSendResult result = new IMSendResult();
result.setSender(message.getSender());
result.setReceiver(offLineUser);
result.setCode(IMSendCode.NOT_ONLINE.code());
result.setData(message.getData());
listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE, result);
}
}
// 推送给自己的其他终端
if (message.getSendToSelf()) {
for (Integer terminal : IMTerminalType.codes()) {
@ -152,6 +146,20 @@ public class IMSender {
}
}
}
// 对离线用户回复消息状态
if(message.getSendResult() && !offLineUsers.isEmpty()){
List<IMSendResult> results = new LinkedList<>();
for (IMUserInfo offLineUser : offLineUsers) {
IMSendResult result = new IMSendResult();
result.setSender(message.getSender());
result.setReceiver(offLineUser);
result.setCode(IMSendCode.NOT_ONLINE.code());
result.setData(message.getData());
results.add(result);
}
listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE, results);
}
}
public Map<Long,List<IMTerminalType>> getOnlineTerminal(List<Long> userIds){

16
im-client/src/main/java/com/bx/imclient/task/AbstractMessageResultTask.java

@ -20,13 +20,13 @@ public abstract class AbstractMessageResultTask implements CommandLineRunner {
@SneakyThrows
@Override
public void run() {
try{
try {
pullMessage();
}catch (Exception e){
log.error("任务调度异常",e);
Thread.sleep(200);
} catch (Exception e) {
log.error("任务调度异常", e);
}
if(!EXECUTOR_SERVICE.isShutdown()){
if (!EXECUTOR_SERVICE.isShutdown()) {
Thread.sleep(100);
EXECUTOR_SERVICE.execute(this);
}
}
@ -35,10 +35,10 @@ public abstract class AbstractMessageResultTask implements CommandLineRunner {
@PreDestroy
public void destroy(){
log.info("{}线程任务关闭",this.getClass().getSimpleName());
public void destroy() {
log.info("{}线程任务关闭", this.getClass().getSimpleName());
EXECUTOR_SERVICE.shutdown();
}
public abstract void pullMessage();
public abstract void pullMessage() throws InterruptedException;
}

33
im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java

@ -6,14 +6,16 @@ import com.bx.imclient.listener.MessageListenerMulticaster;
import com.bx.imcommon.contant.IMRedisKey;
import com.bx.imcommon.enums.IMListenerType;
import com.bx.imcommon.model.IMSendResult;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
@Component
@RequiredArgsConstructor
@ -25,16 +27,33 @@ public class GroupMessageResultResultTask extends AbstractMessageResultTask {
@Value("${spring.application.name}")
private String appName;
@Value("${im.result.batch:100}")
private int batchSize;
private final MessageListenerMulticaster listenerMulticaster;
@Override
public void pullMessage() {
String key = StrUtil.join(":",IMRedisKey.IM_RESULT_GROUP_QUEUE,appName);
JSONObject jsonObject = (JSONObject)redisTemplate.opsForList().leftPop(key,10, TimeUnit.SECONDS);
if(jsonObject != null) {
IMSendResult result = jsonObject.toJavaObject(IMSendResult.class);
listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE,result);
List<IMSendResult> results;
do {
results = loadBatch();
if(!results.isEmpty()){
listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE, results);
}
} while (results.size() < batchSize);
}
List<IMSendResult> loadBatch() {
String key = StrUtil.join(":", IMRedisKey.IM_RESULT_GROUP_QUEUE, appName);
//这个接口redis6.2以上才支持
//List<Object> list = redisTemplate.opsForList().leftPop(key, 100);
List<IMSendResult> results = new LinkedList<>();
JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key);
while (!Objects.isNull(jsonObject) && results.size() < batchSize) {
results.add(jsonObject.toJavaObject(IMSendResult.class));
jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key);
}
return results;
}
}

34
im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java

@ -6,7 +6,6 @@ import com.bx.imclient.listener.MessageListenerMulticaster;
import com.bx.imcommon.contant.IMRedisKey;
import com.bx.imcommon.enums.IMListenerType;
import com.bx.imcommon.model.IMSendResult;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
@ -14,7 +13,9 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
@Slf4j
@Component
@ -22,21 +23,38 @@ import java.util.concurrent.TimeUnit;
public class PrivateMessageResultResultTask extends AbstractMessageResultTask {
@Resource(name = "IMRedisTemplate")
private RedisTemplate<String,Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Value("${spring.application.name}")
private String appName;
@Value("${im.result.batch:100}")
private int batchSize;
private final MessageListenerMulticaster listenerMulticaster;
@Override
public void pullMessage() {
String key = StrUtil.join(":",IMRedisKey.IM_RESULT_PRIVATE_QUEUE,appName);
JSONObject jsonObject = (JSONObject)redisTemplate.opsForList().leftPop(key,10, TimeUnit.SECONDS);
if(jsonObject != null) {
IMSendResult result = jsonObject.toJavaObject(IMSendResult.class);
listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, result);
List<IMSendResult> results;
do {
results = loadBatch();
if(!results.isEmpty()){
listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, results);
}
} while (results.size() < batchSize);
}
List<IMSendResult> loadBatch() {
String key = StrUtil.join(":", IMRedisKey.IM_RESULT_PRIVATE_QUEUE, appName);
//这个接口redis6.2以上才支持
//List<Object> list = redisTemplate.opsForList().leftPop(key, 100);
List<IMSendResult> results = new LinkedList<>();
JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key);
while (!Objects.isNull(jsonObject) && results.size() < batchSize) {
results.add(jsonObject.toJavaObject(IMSendResult.class));
jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key);
}
return results;
}
}

13
im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java

@ -7,10 +7,13 @@ import com.bx.imcommon.enums.IMSendCode;
import com.bx.imcommon.model.IMSendResult;
import com.bx.implatform.contant.RedisKey;
import com.bx.implatform.vo.GroupMessageVO;
import com.bx.implatform.vo.PrivateMessageVO;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.List;
@Slf4j
@IMListener(type = IMListenerType.GROUP_MESSAGE)
@AllArgsConstructor
@ -19,9 +22,13 @@ public class GroupMessageListener implements MessageListener<GroupMessageVO> {
private final RedisTemplate<String, Object> redisTemplate;
@Override
public void process(IMSendResult<GroupMessageVO> result) {
GroupMessageVO messageInfo = result.getData();
// 空空如也
public void process(List<IMSendResult<GroupMessageVO>> results) {
for(IMSendResult<GroupMessageVO> result:results){
GroupMessageVO messageInfo = result.getData();
if (result.getCode().equals(IMSendCode.SUCCESS.code())) {
log.info("消息送达,消息id:{},发送者:{},接收者:{},终端:{}", messageInfo.getId(), result.getSender().getId(), result.getReceiver().getId(), result.getReceiver().getTerminal());
}
}
}
}

24
im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java

@ -1,5 +1,6 @@
package com.bx.implatform.listener;
import cn.hutool.core.collection.CollUtil;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.bx.imclient.annotation.IMListener;
import com.bx.imclient.listener.MessageListener;
@ -14,6 +15,10 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@Slf4j
@IMListener(type = IMListenerType.PRIVATE_MESSAGE)
public class PrivateMessageListener implements MessageListener<PrivateMessageVO> {
@ -23,16 +28,23 @@ public class PrivateMessageListener implements MessageListener<PrivateMessageVO>
private IPrivateMessageService privateMessageService;
@Override
public void process(IMSendResult<PrivateMessageVO> result) {
PrivateMessageVO messageInfo = result.getData();
// 更新消息状态,这里只处理成功消息,失败的消息继续保持未读状态
if (result.getCode().equals(IMSendCode.SUCCESS.code())) {
public void process(List<IMSendResult<PrivateMessageVO>> results) {
Set<Long> messageIds = new HashSet<>();
for(IMSendResult<PrivateMessageVO> result : results){
PrivateMessageVO messageInfo = result.getData();
// 更新消息状态,这里只处理成功消息,失败的消息继续保持未读状态
if (result.getCode().equals(IMSendCode.SUCCESS.code())) {
messageIds.add(messageInfo.getId());
log.info("消息送达,消息id:{},发送者:{},接收者:{},终端:{}", messageInfo.getId(), result.getSender().getId(), result.getReceiver().getId(), result.getReceiver().getTerminal());
}
}
// 批量修改状态
if(CollUtil.isNotEmpty(messageIds)){
UpdateWrapper<PrivateMessage> updateWrapper = new UpdateWrapper<>();
updateWrapper.lambda().eq(PrivateMessage::getId, messageInfo.getId())
updateWrapper.lambda().in(PrivateMessage::getId, messageIds)
.eq(PrivateMessage::getStatus, MessageStatus.UNSEND.code())
.set(PrivateMessage::getStatus, MessageStatus.SENDED.code());
privateMessageService.update(updateWrapper);
log.info("消息已读,消息id:{},发送者:{},接收者:{},终端:{}", messageInfo.getId(), result.getSender().getId(), result.getReceiver().getId(), result.getReceiver().getTerminal());
}
}

1
im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java

@ -65,6 +65,7 @@ public class PrivateMessageServiceImpl extends ServiceImpl<PrivateMessageMapper,
sendMessage.setRecvId(msgInfo.getRecvId());
sendMessage.setSendToSelf(true);
sendMessage.setData(msgInfo);
sendMessage.setSendResult(true);
imClient.sendPrivateMessage(sendMessage);
log.info("发送私聊消息,发送id:{},接收id:{},内容:{}", session.getUserId(), dto.getRecvId(), dto.getContent());
return msg.getId();

1
im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java

@ -25,7 +25,6 @@ public class GroupMessageProcessor extends AbstractMessageProcessor<IMRecvInfo>
private final RedisTemplate<String, Object> redisTemplate;
@Async
@Override
public void process(IMRecvInfo recvInfo) {
IMUserInfo sender = recvInfo.getSender();

4
im-server/src/main/java/com/bx/imserver/task/AbstractPullMessageTask.java

@ -30,9 +30,9 @@ public abstract class AbstractPullMessageTask implements CommandLineRunner {
}
} catch (Exception e) {
log.error("任务调度异常", e);
Thread.sleep(200);
}
if (!EXECUTOR_SERVICE.isShutdown()) {
Thread.sleep(100);
EXECUTOR_SERVICE.execute(this);
}
}
@ -45,5 +45,5 @@ public abstract class AbstractPullMessageTask implements CommandLineRunner {
EXECUTOR_SERVICE.shutdown();
}
public abstract void pullMessage();
public abstract void pullMessage() throws InterruptedException;
}

8
im-server/src/main/java/com/bx/imserver/task/PullGroupMessageTask.java

@ -12,7 +12,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
import java.util.Objects;
@Slf4j
@Component
@ -25,11 +25,13 @@ public class PullGroupMessageTask extends AbstractPullMessageTask {
public void pullMessage() {
// 从redis拉取未读消息
String key = String.join(":", IMRedisKey.IM_MESSAGE_GROUP_QUEUE, IMServerGroup.serverId + "");
JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key, 10, TimeUnit.SECONDS);
if (jsonObject != null) {
JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key);
while (!Objects.isNull(jsonObject)) {
IMRecvInfo recvInfo = jsonObject.toJavaObject(IMRecvInfo.class);
AbstractMessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.GROUP_MESSAGE);
processor.process(recvInfo);
// 下一条消息
jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key);
}
}

15
im-server/src/main/java/com/bx/imserver/task/PullPrivateMessageTask.java

@ -12,25 +12,26 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
import java.util.Objects;
@Slf4j
@Component
@RequiredArgsConstructor
public class PullPrivateMessageTask extends AbstractPullMessageTask {
public class PullPrivateMessageTask extends AbstractPullMessageTask {
private final RedisTemplate<String,Object> redisTemplate;
private final RedisTemplate<String, Object> redisTemplate;
@Override
public void pullMessage() {
// 从redis拉取未读消息
String key = String.join(":", IMRedisKey.IM_MESSAGE_PRIVATE_QUEUE,IMServerGroup.serverId+"");
JSONObject jsonObject = (JSONObject)redisTemplate.opsForList().leftPop(key,10, TimeUnit.SECONDS);
if(jsonObject!=null){
String key = String.join(":", IMRedisKey.IM_MESSAGE_PRIVATE_QUEUE, IMServerGroup.serverId + "");
JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key);
while (!Objects.isNull(jsonObject)) {
IMRecvInfo recvInfo = jsonObject.toJavaObject(IMRecvInfo.class);
AbstractMessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.PRIVATE_MESSAGE);
processor.process(recvInfo);
// 下一条消息
jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key);
}
}
}

Loading…
Cancel
Save