diff --git a/im-client/src/main/java/com/bx/imclient/IMAutoConfiguration.java b/im-client/src/main/java/com/bx/imclient/IMAutoConfiguration.java index b3a93fc..9f8fa84 100644 --- a/im-client/src/main/java/com/bx/imclient/IMAutoConfiguration.java +++ b/im-client/src/main/java/com/bx/imclient/IMAutoConfiguration.java @@ -9,4 +9,5 @@ import org.springframework.context.annotation.Configuration; @Configuration @ComponentScan("com.bx.imclient") public class IMAutoConfiguration { + } diff --git a/im-client/src/main/java/com/bx/imclient/listener/MessageListener.java b/im-client/src/main/java/com/bx/imclient/listener/MessageListener.java index 26636dd..0ee2af3 100644 --- a/im-client/src/main/java/com/bx/imclient/listener/MessageListener.java +++ b/im-client/src/main/java/com/bx/imclient/listener/MessageListener.java @@ -3,8 +3,10 @@ package com.bx.imclient.listener; import com.bx.imcommon.model.IMSendResult; +import java.util.List; + public interface MessageListener { - void process(IMSendResult result); + void process(List> result); } diff --git a/im-client/src/main/java/com/bx/imclient/listener/MessageListenerMulticaster.java b/im-client/src/main/java/com/bx/imclient/listener/MessageListenerMulticaster.java index 05eb5b4..bd95284 100644 --- a/im-client/src/main/java/com/bx/imclient/listener/MessageListenerMulticaster.java +++ b/im-client/src/main/java/com/bx/imclient/listener/MessageListenerMulticaster.java @@ -1,6 +1,7 @@ package com.bx.imclient.listener; +import cn.hutool.core.collection.CollUtil; import com.alibaba.fastjson.JSONObject; import com.bx.imclient.annotation.IMListener; import com.bx.imcommon.enums.IMListenerType; @@ -19,19 +20,24 @@ public class MessageListenerMulticaster { @Autowired(required = false) private List messageListeners = Collections.emptyList(); - public void multicast(IMListenerType listenerType, IMSendResult result){ + public void multicast(IMListenerType listenerType, List results){ + if(CollUtil.isEmpty(results)){ + return; + } for(MessageListener listener:messageListeners){ IMListener annotation = listener.getClass().getAnnotation(IMListener.class); if(annotation!=null && (annotation.type().equals(IMListenerType.ALL) || annotation.type().equals(listenerType))){ - // 将data转回对象类型 - if(result.getData() instanceof JSONObject){ - Type superClass = listener.getClass().getGenericInterfaces()[0]; - Type type = ((ParameterizedType) superClass).getActualTypeArguments()[0]; - JSONObject data = (JSONObject)result.getData(); - result.setData(data.toJavaObject(type)); - } + results.forEach(result->{ + // 将data转回对象类型 + if(result.getData() instanceof JSONObject){ + Type superClass = listener.getClass().getGenericInterfaces()[0]; + Type type = ((ParameterizedType) superClass).getActualTypeArguments()[0]; + JSONObject data = (JSONObject)result.getData(); + result.setData(data.toJavaObject(type)); + } + }); // 回调到调用方处理 - listener.process(result); + listener.process(results); } } } diff --git a/im-client/src/main/java/com/bx/imclient/sender/IMSender.java b/im-client/src/main/java/com/bx/imclient/sender/IMSender.java index 0eac628..6fc1297 100644 --- a/im-client/src/main/java/com/bx/imclient/sender/IMSender.java +++ b/im-client/src/main/java/com/bx/imclient/sender/IMSender.java @@ -9,6 +9,7 @@ import com.bx.imcommon.enums.IMSendCode; import com.bx.imcommon.enums.IMTerminalType; import com.bx.imcommon.model.*; import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; @@ -22,9 +23,13 @@ public class IMSender { @Resource(name="IMRedisTemplate") private RedisTemplate redisTemplate; + @Value("${spring.application.name}") + private String appName; + private final MessageListenerMulticaster listenerMulticaster; public void sendPrivateMessage(IMPrivateMessage message) { + List results = new LinkedList<>(); for (Integer terminal : message.getRecvTerminals()) { // 获取对方连接的channelId String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, message.getRecvId().toString(), terminal.toString()); @@ -35,18 +40,18 @@ public class IMSender { IMRecvInfo recvInfo = new IMRecvInfo(); recvInfo.setCmd(IMCmdType.PRIVATE_MESSAGE.code()); recvInfo.setSendResult(message.getSendResult()); + recvInfo.setServiceName(appName); recvInfo.setSender(message.getSender()); recvInfo.setReceivers(Collections.singletonList(new IMUserInfo(message.getRecvId(), terminal))); recvInfo.setData(message.getData()); redisTemplate.opsForList().rightPush(sendKey, recvInfo); - } else if (message.getSendResult()) { - // 回复消息状态 + } else { IMSendResult result = new IMSendResult(); result.setSender(message.getSender()); result.setReceiver(new IMUserInfo(message.getRecvId(), terminal)); result.setCode(IMSendCode.NOT_ONLINE.code()); result.setData(message.getData()); - listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, result); + results.add(result); } } // 推送给自己的其他终端 @@ -72,10 +77,14 @@ public class IMSender { } } } - + // 对离线用户回复消息状态 + if(message.getSendResult() && !results.isEmpty()){ + listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, results); + } } public void sendGroupMessage(IMGroupMessage message) { + // 根据群聊每个成员所连的IM-server,进行分组 Map sendMap = new HashMap<>(); for (Integer terminal : message.getRecvTerminals()) { @@ -106,23 +115,14 @@ public class IMSender { recvInfo.setCmd(IMCmdType.GROUP_MESSAGE.code()); recvInfo.setReceivers(new LinkedList<>(entry.getValue())); recvInfo.setSender(message.getSender()); + recvInfo.setServiceName(appName); recvInfo.setSendResult(message.getSendResult()); recvInfo.setData(message.getData()); // 推送至队列 String key = String.join(":", IMRedisKey.IM_MESSAGE_GROUP_QUEUE, entry.getKey().toString()); redisTemplate.opsForList().rightPush(key, recvInfo); } - // 对离线用户回复消息状态 - if (message.getSendResult()) { - for (IMUserInfo offLineUser : offLineUsers) { - IMSendResult result = new IMSendResult(); - result.setSender(message.getSender()); - result.setReceiver(offLineUser); - result.setCode(IMSendCode.NOT_ONLINE.code()); - result.setData(message.getData()); - listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE, result); - } - } + // 推送给自己的其他终端 if (message.getSendToSelf()) { for (Integer terminal : IMTerminalType.codes()) { @@ -146,6 +146,20 @@ public class IMSender { } } } + // 对离线用户回复消息状态 + if(message.getSendResult() && !offLineUsers.isEmpty()){ + List results = new LinkedList<>(); + for (IMUserInfo offLineUser : offLineUsers) { + IMSendResult result = new IMSendResult(); + result.setSender(message.getSender()); + result.setReceiver(offLineUser); + result.setCode(IMSendCode.NOT_ONLINE.code()); + result.setData(message.getData()); + results.add(result); + } + listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE, results); + } + } public Map> getOnlineTerminal(List userIds){ diff --git a/im-client/src/main/java/com/bx/imclient/task/AbstractMessageResultTask.java b/im-client/src/main/java/com/bx/imclient/task/AbstractMessageResultTask.java index 86b8920..154f2f3 100644 --- a/im-client/src/main/java/com/bx/imclient/task/AbstractMessageResultTask.java +++ b/im-client/src/main/java/com/bx/imclient/task/AbstractMessageResultTask.java @@ -20,13 +20,13 @@ public abstract class AbstractMessageResultTask implements CommandLineRunner { @SneakyThrows @Override public void run() { - try{ + try { pullMessage(); - }catch (Exception e){ - log.error("任务调度异常",e); - Thread.sleep(200); + } catch (Exception e) { + log.error("任务调度异常", e); } - if(!EXECUTOR_SERVICE.isShutdown()){ + if (!EXECUTOR_SERVICE.isShutdown()) { + Thread.sleep(100); EXECUTOR_SERVICE.execute(this); } } @@ -35,10 +35,10 @@ public abstract class AbstractMessageResultTask implements CommandLineRunner { @PreDestroy - public void destroy(){ - log.info("{}线程任务关闭",this.getClass().getSimpleName()); + public void destroy() { + log.info("{}线程任务关闭", this.getClass().getSimpleName()); EXECUTOR_SERVICE.shutdown(); } - public abstract void pullMessage(); + public abstract void pullMessage() throws InterruptedException; } diff --git a/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java b/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java index 4e6f08c..400220d 100644 --- a/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java +++ b/im-client/src/main/java/com/bx/imclient/task/GroupMessageResultResultTask.java @@ -1,34 +1,59 @@ package com.bx.imclient.task; +import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSONObject; import com.bx.imclient.listener.MessageListenerMulticaster; import com.bx.imcommon.contant.IMRedisKey; import com.bx.imcommon.enums.IMListenerType; import com.bx.imcommon.model.IMSendResult; -import lombok.AllArgsConstructor; +import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; -import java.util.concurrent.TimeUnit; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; + @Component -@AllArgsConstructor +@RequiredArgsConstructor public class GroupMessageResultResultTask extends AbstractMessageResultTask { @Resource(name = "IMRedisTemplate") private RedisTemplate redisTemplate; + @Value("${spring.application.name}") + private String appName; + + @Value("${im.result.batch:100}") + private int batchSize; + private final MessageListenerMulticaster listenerMulticaster; @Override public void pullMessage() { - String key = IMRedisKey.IM_RESULT_GROUP_QUEUE; - JSONObject jsonObject = (JSONObject)redisTemplate.opsForList().leftPop(key,10, TimeUnit.SECONDS); - if(jsonObject != null) { - IMSendResult result = jsonObject.toJavaObject(IMSendResult.class); - listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE,result); + List results; + do { + results = loadBatch(); + if(!results.isEmpty()){ + listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE, results); + } + } while (results.size() >= batchSize); + } + + List loadBatch() { + String key = StrUtil.join(":", IMRedisKey.IM_RESULT_GROUP_QUEUE, appName); + //这个接口redis6.2以上才支持 + //List list = redisTemplate.opsForList().leftPop(key, batchSize); + List results = new LinkedList<>(); + JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); + while (!Objects.isNull(jsonObject) && results.size() < batchSize) { + results.add(jsonObject.toJavaObject(IMSendResult.class)); + jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); } + return results; } } diff --git a/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java b/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java index b90a08e..2451db1 100644 --- a/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java +++ b/im-client/src/main/java/com/bx/imclient/task/PrivateMessageResultResultTask.java @@ -1,37 +1,60 @@ package com.bx.imclient.task; +import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSONObject; import com.bx.imclient.listener.MessageListenerMulticaster; import com.bx.imcommon.contant.IMRedisKey; import com.bx.imcommon.enums.IMListenerType; import com.bx.imcommon.model.IMSendResult; -import lombok.AllArgsConstructor; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; -import java.util.concurrent.TimeUnit; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; @Slf4j @Component -@AllArgsConstructor +@RequiredArgsConstructor public class PrivateMessageResultResultTask extends AbstractMessageResultTask { - @Resource(name = "IMRedisTemplate") - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; + + @Value("${spring.application.name}") + private String appName; + + @Value("${im.result.batch:100}") + private int batchSize; private final MessageListenerMulticaster listenerMulticaster; @Override public void pullMessage() { - String key = IMRedisKey.IM_RESULT_PRIVATE_QUEUE; - JSONObject jsonObject = (JSONObject)redisTemplate.opsForList().leftPop(key,10, TimeUnit.SECONDS); - if(jsonObject != null) { - IMSendResult result = jsonObject.toJavaObject(IMSendResult.class); - listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, result); + List results; + do { + results = loadBatch(); + if(!results.isEmpty()){ + listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, results); + } + } while (results.size() >= batchSize); + } + + List loadBatch() { + String key = StrUtil.join(":", IMRedisKey.IM_RESULT_PRIVATE_QUEUE, appName); + //这个接口redis6.2以上才支持 + //List list = redisTemplate.opsForList().leftPop(key, batchSize); + List results = new LinkedList<>(); + JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); + while (!Objects.isNull(jsonObject) && results.size() < batchSize) { + results.add(jsonObject.toJavaObject(IMSendResult.class)); + jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); } + return results; } } diff --git a/im-commom/src/main/java/com/bx/imcommon/model/IMRecvInfo.java b/im-commom/src/main/java/com/bx/imcommon/model/IMRecvInfo.java index 9a6e0df..52c74e6 100644 --- a/im-commom/src/main/java/com/bx/imcommon/model/IMRecvInfo.java +++ b/im-commom/src/main/java/com/bx/imcommon/model/IMRecvInfo.java @@ -27,6 +27,10 @@ public class IMRecvInfo { */ private Boolean sendResult; + /** + * 当前服务名(回调发送结果使用) + */ + private String serviceName; /** * 推送消息体 */ diff --git a/im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java b/im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java index 745d606..d927b99 100644 --- a/im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java +++ b/im-platform/src/main/java/com/bx/implatform/listener/GroupMessageListener.java @@ -7,10 +7,13 @@ import com.bx.imcommon.enums.IMSendCode; import com.bx.imcommon.model.IMSendResult; import com.bx.implatform.contant.RedisKey; import com.bx.implatform.vo.GroupMessageVO; +import com.bx.implatform.vo.PrivateMessageVO; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; +import java.util.List; + @Slf4j @IMListener(type = IMListenerType.GROUP_MESSAGE) @AllArgsConstructor @@ -19,9 +22,13 @@ public class GroupMessageListener implements MessageListener { private final RedisTemplate redisTemplate; @Override - public void process(IMSendResult result) { - GroupMessageVO messageInfo = result.getData(); - // 空空如也 + public void process(List> results) { + for(IMSendResult result:results){ + GroupMessageVO messageInfo = result.getData(); + if (result.getCode().equals(IMSendCode.SUCCESS.code())) { + log.info("消息送达,消息id:{},发送者:{},接收者:{},终端:{}", messageInfo.getId(), result.getSender().getId(), result.getReceiver().getId(), result.getReceiver().getTerminal()); + } + } } } diff --git a/im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java b/im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java index 3779365..8ff7d49 100644 --- a/im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java +++ b/im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java @@ -1,5 +1,6 @@ package com.bx.implatform.listener; +import cn.hutool.core.collection.CollUtil; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.bx.imclient.annotation.IMListener; import com.bx.imclient.listener.MessageListener; @@ -14,6 +15,10 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + @Slf4j @IMListener(type = IMListenerType.PRIVATE_MESSAGE) public class PrivateMessageListener implements MessageListener { @@ -23,16 +28,23 @@ public class PrivateMessageListener implements MessageListener private IPrivateMessageService privateMessageService; @Override - public void process(IMSendResult result) { - PrivateMessageVO messageInfo = result.getData(); - // 更新消息状态,这里只处理成功消息,失败的消息继续保持未读状态 - if (result.getCode().equals(IMSendCode.SUCCESS.code())) { + public void process(List> results) { + Set messageIds = new HashSet<>(); + for(IMSendResult result : results){ + PrivateMessageVO messageInfo = result.getData(); + // 更新消息状态,这里只处理成功消息,失败的消息继续保持未读状态 + if (result.getCode().equals(IMSendCode.SUCCESS.code())) { + messageIds.add(messageInfo.getId()); + log.info("消息送达,消息id:{},发送者:{},接收者:{},终端:{}", messageInfo.getId(), result.getSender().getId(), result.getReceiver().getId(), result.getReceiver().getTerminal()); + } + } + // 批量修改状态 + if(CollUtil.isNotEmpty(messageIds)){ UpdateWrapper updateWrapper = new UpdateWrapper<>(); - updateWrapper.lambda().eq(PrivateMessage::getId, messageInfo.getId()) + updateWrapper.lambda().in(PrivateMessage::getId, messageIds) .eq(PrivateMessage::getStatus, MessageStatus.UNSEND.code()) .set(PrivateMessage::getStatus, MessageStatus.SENDED.code()); privateMessageService.update(updateWrapper); - log.info("消息已读,消息id:{},发送者:{},接收者:{},终端:{}", messageInfo.getId(), result.getSender().getId(), result.getReceiver().getId(), result.getReceiver().getTerminal()); } } diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java index e3ecbaa..0a2edfd 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java +++ b/im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java @@ -1,6 +1,8 @@ package com.bx.implatform.service.impl; +import cn.hutool.core.collection.CollStreamUtil; import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.StrUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; @@ -86,6 +88,7 @@ public class GroupMessageServiceImpl extends ServiceImpl sendMessage = new IMGroupMessage<>(); sendMessage.setSender(new IMUserInfo(session.getUserId(), session.getTerminal())); sendMessage.setRecvIds(userIds); + sendMessage.setSendResult(false); sendMessage.setData(msgInfo); imClient.sendGroupMessage(sendMessage); log.info("发送群聊消息,发送id:{},群聊id:{},内容:{}", session.getUserId(), dto.getGroupId(), dto.getContent()); @@ -145,10 +148,11 @@ public class GroupMessageServiceImpl extends ServiceImpl loadMessage(Long minId) { UserSession session = SessionContext.getSession(); List members = groupMemberService.findByUserId(session.getUserId()); - List ids = members.stream().map(GroupMember::getGroupId).collect(Collectors.toList()); - if (CollectionUtil.isEmpty(ids)) { + if (CollectionUtil.isEmpty(members)) { return new ArrayList<>(); } + Map groupMemberMap = CollStreamUtil.toIdentityMap(members, GroupMember::getGroupId); + Set ids = groupMemberMap.keySet(); // 只能拉取最近1个月的 Date minDate = DateUtils.addMonths(new Date(), -1); LambdaQueryWrapper wrapper = Wrappers.lambdaQuery(); @@ -157,15 +161,21 @@ public class GroupMessageServiceImpl extends ServiceImpl messages = this.list(wrapper); // 转成vo - List vos = messages.stream().map(m -> { - GroupMessageVO vo = BeanUtils.copyProperties(m, GroupMessageVO.class); - // 被@用户列表 - if (StringUtils.isNotBlank(m.getAtUserIds())) { - List atIds = Splitter.on(",").trimResults().splitToList(m.getAtUserIds()); - vo.setAtUserIds(atIds.stream().map(Long::parseLong).collect(Collectors.toList())); - } - return vo; - }).collect(Collectors.toList()); + List vos = messages.stream() + .filter(m -> { + //排除加群之前的消息 + GroupMember member = groupMemberMap.get(m.getGroupId()); + return Objects.nonNull(member) && DateUtil.compare(member.getCreatedTime(), m.getSendTime()) <= 0; + }) + .map(m -> { + GroupMessageVO vo = BeanUtils.copyProperties(m, GroupMessageVO.class); + // 被@用户列表 + if (StringUtils.isNotBlank(m.getAtUserIds()) && Objects.nonNull(vo)) { + List atIds = Splitter.on(",").trimResults().splitToList(m.getAtUserIds()); + vo.setAtUserIds(atIds.stream().map(Long::parseLong).collect(Collectors.toList())); + } + return vo; + }).collect(Collectors.toList()); // 消息状态,数据库没有存群聊的消息状态,需要从redis取 List keys = ids.stream().map(id -> String.join(":", RedisKey.IM_GROUP_READED_POSITION, id.toString(), session.getUserId().toString())) .collect(Collectors.toList()); diff --git a/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java b/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java index 01a2fd8..0ef663e 100644 --- a/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java +++ b/im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java @@ -65,6 +65,7 @@ public class PrivateMessageServiceImpl extends ServiceImpl private final RedisTemplate redisTemplate; - @Async @Override public void process(IMRecvInfo recvInfo) { IMUserInfo sender = recvInfo.getSender(); @@ -64,7 +64,7 @@ public class GroupMessageProcessor extends AbstractMessageProcessor result.setCode(sendCode.code()); result.setData(recvInfo.getData()); // 推送到结果队列 - String key = IMRedisKey.IM_RESULT_GROUP_QUEUE; + String key = StrUtil.join(":",IMRedisKey.IM_RESULT_GROUP_QUEUE,recvInfo.getServiceName()); redisTemplate.opsForList().rightPush(key, result); } } diff --git a/im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java b/im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java index 1f150fd..26803c5 100644 --- a/im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java +++ b/im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java @@ -1,5 +1,6 @@ package com.bx.imserver.netty.processor; +import cn.hutool.core.util.StrUtil; import com.bx.imcommon.contant.IMRedisKey; import com.bx.imcommon.enums.IMCmdType; import com.bx.imcommon.enums.IMSendCode; @@ -57,7 +58,7 @@ public class PrivateMessageProcessor extends AbstractMessageProcessor redisTemplate; + private final RedisTemplate redisTemplate; @Override public void pullMessage() { // 从redis拉取未读消息 - String key = String.join(":", IMRedisKey.IM_MESSAGE_PRIVATE_QUEUE,IMServerGroup.serverId+""); - JSONObject jsonObject = (JSONObject)redisTemplate.opsForList().leftPop(key,10, TimeUnit.SECONDS); - if(jsonObject!=null){ + String key = String.join(":", IMRedisKey.IM_MESSAGE_PRIVATE_QUEUE, IMServerGroup.serverId + ""); + JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); + while (!Objects.isNull(jsonObject)) { IMRecvInfo recvInfo = jsonObject.toJavaObject(IMRecvInfo.class); AbstractMessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.PRIVATE_MESSAGE); processor.process(recvInfo); + // 下一条消息 + jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); } } - } diff --git a/im-ui/src/api/date.js b/im-ui/src/api/date.js index 705f1b7..bba3e43 100644 --- a/im-ui/src/api/date.js +++ b/im-ui/src/api/date.js @@ -21,7 +21,7 @@ let toTimeText = (timeStamp, simple) => { //不属于今年 timeText = formatDateTime(dateTime); if(simple){ - timeText = timeText.substring(2,5); + timeText = timeText.substr(2,8); } } return timeText; diff --git a/im-ui/src/components/chat/ChatBox.vue b/im-ui/src/components/chat/ChatBox.vue index 973c62f..9d63cfa 100644 --- a/im-ui/src/components/chat/ChatBox.vue +++ b/im-ui/src/components/chat/ChatBox.vue @@ -256,7 +256,6 @@ let textNode = document.createTextNode(txt); range.insertNode(textNode) range.collapse(); - } let items = (event.clipboardData || window.clipboardData).items if (items.length) { diff --git a/im-uniapp/common/date.js b/im-uniapp/common/date.js index 705f1b7..bba3e43 100644 --- a/im-uniapp/common/date.js +++ b/im-uniapp/common/date.js @@ -21,7 +21,7 @@ let toTimeText = (timeStamp, simple) => { //不属于今年 timeText = formatDateTime(dateTime); if(simple){ - timeText = timeText.substring(2,5); + timeText = timeText.substr(2,8); } } return timeText; diff --git a/im-uniapp/components/chat-item/chat-item.vue b/im-uniapp/components/chat-item/chat-item.vue index 3ae1a97..8b8e2f5 100644 --- a/im-uniapp/components/chat-item/chat-item.vue +++ b/im-uniapp/components/chat-item/chat-item.vue @@ -7,7 +7,7 @@ {{chat.showName}} - {{$date.toTimeText(chat.lastSendTime)}} + {{$date.toTimeText(chat.lastSendTime,true)}} {{atText}}