Browse Source

消息撤回后采用逻辑删除

master
xie.bx 3 years ago
parent
commit
04167892c5
  1. 4
      commom/src/main/java/com/bx/common/enums/MessageStatusEnum.java
  2. 6
      im-platform/src/main/java/com/bx/implatform/entity/GroupMessage.java
  3. 10
      im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java
  4. 6
      im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java
  5. 1
      im-platform/src/main/java/com/bx/implatform/task/PullAlreadyReadMessageTask.java
  6. 7
      im-platform/src/main/resources/db/db.sql
  7. 9
      im-server/src/main/java/com/bx/imserver/websocket/processor/GroupMessageProcessor.java
  8. 10
      im-server/src/main/java/com/bx/imserver/websocket/processor/PrivateMessageProcessor.java

4
commom/src/main/java/com/bx/common/enums/MessageStatusEnum.java

@ -4,8 +4,8 @@ package com.bx.common.enums;
public enum MessageStatusEnum { public enum MessageStatusEnum {
UNREAD(0,"未读"), UNREAD(0,"未读"),
ALREADY_READ(1,"已读"); ALREADY_READ(1,"已读"),
RECALL(2,"已撤回");
private Integer code; private Integer code;

6
im-platform/src/main/java/com/bx/implatform/entity/GroupMessage.java

@ -56,6 +56,12 @@ public class GroupMessage extends Model<GroupMessage> {
@TableField("type") @TableField("type")
private Integer type; private Integer type;
/**
* 状态
*/
@TableField("status")
private Integer status;
/** /**
* 发送时间 * 发送时间
*/ */

10
im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java

@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.bx.common.contant.Constant; import com.bx.common.contant.Constant;
import com.bx.common.contant.RedisKey; import com.bx.common.contant.RedisKey;
import com.bx.common.enums.MessageStatusEnum;
import com.bx.common.enums.MessageTypeEnum; import com.bx.common.enums.MessageTypeEnum;
import com.bx.common.enums.ResultCode; import com.bx.common.enums.ResultCode;
import com.bx.common.model.im.GroupMessageInfo; import com.bx.common.model.im.GroupMessageInfo;
@ -100,14 +101,16 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
if(member == null){ if(member == null){
throw new GlobalException(ResultCode.PROGRAM_ERROR,"您已不在群聊里面,无法撤回消息"); throw new GlobalException(ResultCode.PROGRAM_ERROR,"您已不在群聊里面,无法撤回消息");
} }
// 直接物理删除 // 修改数据库
this.removeById(id); msg.setStatus(MessageStatusEnum.RECALL.getCode());
this.updateById(msg);
// 群发 // 群发
List<Long> userIds = groupMemberService.findUserIdsByGroupId(msg.getGroupId()); List<Long> userIds = groupMemberService.findUserIdsByGroupId(msg.getGroupId());
GroupMessageInfo msgInfo = BeanUtils.copyProperties(msg, GroupMessageInfo.class); GroupMessageInfo msgInfo = BeanUtils.copyProperties(msg, GroupMessageInfo.class);
msgInfo.setType(MessageTypeEnum.TIP.getCode()); msgInfo.setType(MessageTypeEnum.TIP.getCode());
String content = String.format("'%s'撤回了一条消息",member.getAliasName()); String content = String.format("'%s'撤回了一条消息",member.getAliasName());
msgInfo.setContent(content); msgInfo.setContent(content);
msgInfo.setSendTime(new Date());
this.sendMessage(userIds,msgInfo); this.sendMessage(userIds,msgInfo);
log.info("撤回群聊消息,发送id:{},群聊id:{},内容:{}",userId,msg.getGroupId(),msg.getContent()); log.info("撤回群聊消息,发送id:{},群聊id:{},内容:{}",userId,msg.getGroupId(),msg.getContent());
} }
@ -135,7 +138,8 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
Integer maxReadedId = (Integer)redisTemplate.opsForValue().get(key); Integer maxReadedId = (Integer)redisTemplate.opsForValue().get(key);
QueryWrapper<GroupMessage> wrapper = new QueryWrapper(); QueryWrapper<GroupMessage> wrapper = new QueryWrapper();
wrapper.lambda().eq(GroupMessage::getGroupId,member.getGroupId()) wrapper.lambda().eq(GroupMessage::getGroupId,member.getGroupId())
.gt(GroupMessage::getSendTime,member.getCreatedTime()); .gt(GroupMessage::getSendTime,member.getCreatedTime())
.ne(GroupMessage::getStatus,MessageStatusEnum.RECALL.getCode());
if(maxReadedId!=null){ if(maxReadedId!=null){
wrapper.lambda().gt(GroupMessage::getId,maxReadedId); wrapper.lambda().gt(GroupMessage::getId,maxReadedId);
} }

6
im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java

@ -84,8 +84,9 @@ public class PrivateMessageServiceImpl extends ServiceImpl<PrivateMessageMapper,
if(System.currentTimeMillis() - msg.getSendTime().getTime() > Constant.ALLOW_RECALL_SECOND * 1000){ if(System.currentTimeMillis() - msg.getSendTime().getTime() > Constant.ALLOW_RECALL_SECOND * 1000){
throw new GlobalException(ResultCode.PROGRAM_ERROR,"消息已发送超过5分钟,无法撤回"); throw new GlobalException(ResultCode.PROGRAM_ERROR,"消息已发送超过5分钟,无法撤回");
} }
// 直接物理删除 // 修改消息状态
this.removeById(id); msg.setStatus(MessageStatusEnum.RECALL.getCode());
this.updateById(msg);
// 获取对方连接的channelId // 获取对方连接的channelId
String key = RedisKey.IM_USER_SERVER_ID+msg.getRecvId(); String key = RedisKey.IM_USER_SERVER_ID+msg.getRecvId();
Integer serverId = (Integer)redisTemplate.opsForValue().get(key); Integer serverId = (Integer)redisTemplate.opsForValue().get(key);
@ -94,6 +95,7 @@ public class PrivateMessageServiceImpl extends ServiceImpl<PrivateMessageMapper,
String sendKey = RedisKey.IM_UNREAD_PRIVATE_MESSAGE + serverId; String sendKey = RedisKey.IM_UNREAD_PRIVATE_MESSAGE + serverId;
PrivateMessageInfo msgInfo = BeanUtils.copyProperties(msg, PrivateMessageInfo.class); PrivateMessageInfo msgInfo = BeanUtils.copyProperties(msg, PrivateMessageInfo.class);
msgInfo.setType(MessageTypeEnum.TIP.getCode()); msgInfo.setType(MessageTypeEnum.TIP.getCode());
msgInfo.setSendTime(new Date());
msgInfo.setContent("对方撤回了一条消息"); msgInfo.setContent("对方撤回了一条消息");
redisTemplate.opsForList().rightPush(sendKey,msgInfo); redisTemplate.opsForList().rightPush(sendKey,msgInfo);
} }

1
im-platform/src/main/java/com/bx/implatform/task/PullAlreadyReadMessageTask.java

@ -55,6 +55,7 @@ public class PullAlreadyReadMessageTask {
if(msgId!=null){ if(msgId!=null){
UpdateWrapper<PrivateMessage> updateWrapper = new UpdateWrapper<>(); UpdateWrapper<PrivateMessage> updateWrapper = new UpdateWrapper<>();
updateWrapper.lambda().eq(PrivateMessage::getId,msgId) updateWrapper.lambda().eq(PrivateMessage::getId,msgId)
.eq(PrivateMessage::getStatus,MessageStatusEnum.UNREAD.getCode())
.set(PrivateMessage::getStatus, MessageStatusEnum.ALREADY_READ.getCode()); .set(PrivateMessage::getStatus, MessageStatusEnum.ALREADY_READ.getCode());
privateMessageService.update(updateWrapper); privateMessageService.update(updateWrapper);
log.info("消息已读,id:{}",msgId); log.info("消息已读,id:{}",msgId);

7
im-platform/src/main/resources/db/db.sql

@ -30,8 +30,8 @@ create table `im_private_message`(
`send_id` bigint not null comment '发送用户id', `send_id` bigint not null comment '发送用户id',
`recv_id` bigint not null comment '接收用户id', `recv_id` bigint not null comment '接收用户id',
`content` text comment '发送内容', `content` text comment '发送内容',
`type` tinyint(1) NOT NULL comment '消息类型 0:文字 1:图片 2:文件', `type` tinyint(1) NOT NULL comment '消息类型 0:文字 1:图片 2:文件 3:语音 10:系统提示',
`status` tinyint(1) NOT NULL comment '状态 0:未读 1:已读 ', `status` tinyint(1) NOT NULL comment '状态 0:未读 1:已读 2:撤回',
`send_time` datetime DEFAULT CURRENT_TIMESTAMP comment '发送时间', `send_time` datetime DEFAULT CURRENT_TIMESTAMP comment '发送时间',
key `idx_send_recv_id` (`send_id`,`recv_id`) key `idx_send_recv_id` (`send_id`,`recv_id`)
)ENGINE=InnoDB CHARSET=utf8mb3 comment '私聊消息'; )ENGINE=InnoDB CHARSET=utf8mb3 comment '私聊消息';
@ -67,7 +67,8 @@ create table `im_group_message`(
`group_id` bigint not null comment '群id', `group_id` bigint not null comment '群id',
`send_id` bigint not null comment '发送用户id', `send_id` bigint not null comment '发送用户id',
`content` text comment '发送内容', `content` text comment '发送内容',
`type` tinyint(1) NOT NULL comment '消息类型 0:文字 1:图片 2:文件', `type` tinyint(1) NOT NULL comment '消息类型 0:文字 1:图片 2:文件 3:语音 10:系统提示' ,
`status` tinyint(1) DEFAULT 0 comment '状态 0:正常 2:撤回',
`send_time` datetime DEFAULT CURRENT_TIMESTAMP comment '发送时间', `send_time` datetime DEFAULT CURRENT_TIMESTAMP comment '发送时间',
key `idx_group_id` (group_id) key `idx_group_id` (group_id)
)ENGINE=InnoDB CHARSET=utf8mb3 comment '群消息'; )ENGINE=InnoDB CHARSET=utf8mb3 comment '群消息';

9
im-server/src/main/java/com/bx/imserver/websocket/processor/GroupMessageProcessor.java

@ -1,6 +1,7 @@
package com.bx.imserver.websocket.processor; package com.bx.imserver.websocket.processor;
import com.bx.common.contant.RedisKey; import com.bx.common.contant.RedisKey;
import com.bx.common.enums.MessageTypeEnum;
import com.bx.common.enums.WSCmdEnum; import com.bx.common.enums.WSCmdEnum;
import com.bx.common.model.im.GroupMessageInfo; import com.bx.common.model.im.GroupMessageInfo;
import com.bx.common.model.im.SendInfo; import com.bx.common.model.im.SendInfo;
@ -39,9 +40,11 @@ public class GroupMessageProcessor extends MessageProcessor<GroupMessageInfo> {
sendInfo.setData(data); sendInfo.setData(data);
channelCtx.channel().writeAndFlush(sendInfo); channelCtx.channel().writeAndFlush(sendInfo);
} }
// 设置已读最大id if(data.getType() != MessageTypeEnum.TIP.getCode()){
String key = RedisKey.IM_GROUP_READED_POSITION + data.getGroupId()+":"+recvId; // 设置已读最大id
redisTemplate.opsForValue().set(key,data.getId()); String key = RedisKey.IM_GROUP_READED_POSITION + data.getGroupId()+":"+recvId;
redisTemplate.opsForValue().set(key,data.getId());
}
}else { }else {
log.error("未找到WS连接,发送者:{},群id:{},接收id:{},内容:{}",data.getSendId(),data.getGroupId(),data.getRecvIds()); log.error("未找到WS连接,发送者:{},群id:{},接收id:{},内容:{}",data.getSendId(),data.getGroupId(),data.getRecvIds());
} }

10
im-server/src/main/java/com/bx/imserver/websocket/processor/PrivateMessageProcessor.java

@ -1,6 +1,7 @@
package com.bx.imserver.websocket.processor; package com.bx.imserver.websocket.processor;
import com.bx.common.contant.RedisKey; import com.bx.common.contant.RedisKey;
import com.bx.common.enums.MessageTypeEnum;
import com.bx.common.enums.WSCmdEnum; import com.bx.common.enums.WSCmdEnum;
import com.bx.common.model.im.PrivateMessageInfo; import com.bx.common.model.im.PrivateMessageInfo;
import com.bx.common.model.im.SendInfo; import com.bx.common.model.im.SendInfo;
@ -29,9 +30,12 @@ public class PrivateMessageProcessor extends MessageProcessor<PrivateMessageInf
sendInfo.setCmd(WSCmdEnum.PRIVATE_MESSAGE.getCode()); sendInfo.setCmd(WSCmdEnum.PRIVATE_MESSAGE.getCode());
sendInfo.setData(data); sendInfo.setData(data);
channelCtx.channel().writeAndFlush(sendInfo); channelCtx.channel().writeAndFlush(sendInfo);
// 已读消息推送至redis,等待更新数据库
String key = RedisKey.IM_READED_PRIVATE_MESSAGE_ID; if(data.getType() != MessageTypeEnum.TIP.getCode()) {
redisTemplate.opsForList().rightPush(key,data.getId()); // 已读消息推送至redis,等待更新数据库
String key = RedisKey.IM_READED_PRIVATE_MESSAGE_ID;
redisTemplate.opsForList().rightPush(key, data.getId());
}
}else{ }else{
log.error("未找到WS连接,发送者:{},接收者:{},内容:{}",data.getSendId(),data.getRecvId(),data.getContent()); log.error("未找到WS连接,发送者:{},接收者:{},内容:{}",data.getSendId(),data.getRecvId(),data.getContent());
} }

Loading…
Cancel
Save