Browse Source

!83 同步代码

Merge pull request !83 from blue/v_3.0.0
master
blue 2 years ago
committed by Gitee
parent
commit
4a65331e7c
No known key found for this signature in database GPG Key ID: 173E9B9CA92EEF8F
  1. 2
      README.md
  2. 39
      im-platform/src/main/java/com/bx/implatform/config/RedissonConfig.java
  3. 4
      im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java
  4. 26
      im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java
  5. 35
      im-server/src/main/java/com/bx/imserver/config/RedisConfig.java
  6. 2
      im-server/src/main/java/com/bx/imserver/netty/IMChannelHandler.java

2
README.md

@ -57,7 +57,7 @@
- redis记录了每个用户的websocket连接的是哪个im-server,当用户发送消息时,im-platform将根据所连接的im-server的id,决定将消息推向哪个queue - redis记录了每个用户的websocket连接的是哪个im-server,当用户发送消息时,im-platform将根据所连接的im-server的id,决定将消息推向哪个queue
#### 本地快速部署 #### 本地启动
1.安装运行环境 1.安装运行环境
- 安装node:v18.19.0 - 安装node:v18.19.0
- 安装jdk:17 - 安装jdk:17

39
im-platform/src/main/java/com/bx/implatform/config/RedissonConfig.java

@ -1,39 +0,0 @@
package com.bx.implatform.config;
import cn.hutool.core.util.StrUtil;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.StringCodec;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.context.annotation.Bean;
/**
* @author: Blue
* @date: 2024-06-09
* @version: 1.0
*/
//@Configuration
//@ConditionalOnClass(Config.class)
//@EnableConfigurationProperties(RedisProperties.class)
public class RedissonConfig {
@Bean
RedissonClient redissonClient(RedisProperties redisProperties) {
Config config = new Config();
config.setCodec(new StringCodec());
String address = "redis://" + redisProperties.getHost()+":"+redisProperties.getPort();
SingleServerConfig serverConfig = config.useSingleServer()
.setAddress(address)
.setDatabase(redisProperties.getDatabase());
if(StrUtil.isNotEmpty(redisProperties.getPassword())) {
serverConfig.setPassword(redisProperties.getPassword());
}
return Redisson.create(config);
}
}

4
im-platform/src/main/java/com/bx/implatform/listener/PrivateMessageListener.java

@ -40,8 +40,8 @@ public class PrivateMessageListener implements MessageListener<PrivateMessageVO>
if(CollUtil.isNotEmpty(messageIds)){ if(CollUtil.isNotEmpty(messageIds)){
UpdateWrapper<PrivateMessage> updateWrapper = new UpdateWrapper<>(); UpdateWrapper<PrivateMessage> updateWrapper = new UpdateWrapper<>();
updateWrapper.lambda().in(PrivateMessage::getId, messageIds) updateWrapper.lambda().in(PrivateMessage::getId, messageIds)
.eq(PrivateMessage::getStatus, MessageStatus.UNSEND.code()) .eq(PrivateMessage::getStatus, MessageStatus.UNSEND.code())
.set(PrivateMessage::getStatus, MessageStatus.SENDED.code()); .set(PrivateMessage::getStatus, MessageStatus.SENDED.code());
privateMessageService.update(updateWrapper); privateMessageService.update(updateWrapper);
} }
} }

26
im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java

@ -121,13 +121,13 @@ public class PrivateMessageServiceImpl extends ServiceImpl<PrivateMessageMapper,
long stIdx = (page - 1) * size; long stIdx = (page - 1) * size;
QueryWrapper<PrivateMessage> wrapper = new QueryWrapper<>(); QueryWrapper<PrivateMessage> wrapper = new QueryWrapper<>();
wrapper.lambda().and(wrap -> wrap.and( wrapper.lambda().and(wrap -> wrap.and(
wp -> wp.eq(PrivateMessage::getSendId, userId) wp -> wp.eq(PrivateMessage::getSendId, userId)
.eq(PrivateMessage::getRecvId, friendId)) .eq(PrivateMessage::getRecvId, friendId))
.or(wp -> wp.eq(PrivateMessage::getRecvId, userId) .or(wp -> wp.eq(PrivateMessage::getRecvId, userId)
.eq(PrivateMessage::getSendId, friendId))) .eq(PrivateMessage::getSendId, friendId)))
.ne(PrivateMessage::getStatus, MessageStatus.RECALL.code()) .ne(PrivateMessage::getStatus, MessageStatus.RECALL.code())
.orderByDesc(PrivateMessage::getId) .orderByDesc(PrivateMessage::getId)
.last("limit " + stIdx + "," + size); .last("limit " + stIdx + "," + size);
List<PrivateMessage> messages = this.list(wrapper); List<PrivateMessage> messages = this.list(wrapper);
List<PrivateMessageVO> messageInfos = messages.stream().map(m -> BeanUtils.copyProperties(m, PrivateMessageVO.class)).collect(Collectors.toList()); List<PrivateMessageVO> messageInfos = messages.stream().map(m -> BeanUtils.copyProperties(m, PrivateMessageVO.class)).collect(Collectors.toList());
@ -215,9 +215,9 @@ public class PrivateMessageServiceImpl extends ServiceImpl<PrivateMessageMapper,
// 修改消息状态为已读 // 修改消息状态为已读
LambdaUpdateWrapper<PrivateMessage> updateWrapper = Wrappers.lambdaUpdate(); LambdaUpdateWrapper<PrivateMessage> updateWrapper = Wrappers.lambdaUpdate();
updateWrapper.eq(PrivateMessage::getSendId, friendId) updateWrapper.eq(PrivateMessage::getSendId, friendId)
.eq(PrivateMessage::getRecvId, session.getUserId()) .eq(PrivateMessage::getRecvId, session.getUserId())
.eq(PrivateMessage::getStatus, MessageStatus.SENDED.code()) .eq(PrivateMessage::getStatus, MessageStatus.SENDED.code())
.set(PrivateMessage::getStatus, MessageStatus.READED.code()); .set(PrivateMessage::getStatus, MessageStatus.READED.code());
this.update(updateWrapper); this.update(updateWrapper);
log.info("消息已读,接收方id:{},发送方id:{}", session.getUserId(), friendId); log.info("消息已读,接收方id:{},发送方id:{}", session.getUserId(), friendId);
} }
@ -228,11 +228,11 @@ public class PrivateMessageServiceImpl extends ServiceImpl<PrivateMessageMapper,
UserSession session = SessionContext.getSession(); UserSession session = SessionContext.getSession();
LambdaQueryWrapper<PrivateMessage> wrapper = Wrappers.lambdaQuery(); LambdaQueryWrapper<PrivateMessage> wrapper = Wrappers.lambdaQuery();
wrapper.eq(PrivateMessage::getSendId, session.getUserId()) wrapper.eq(PrivateMessage::getSendId, session.getUserId())
.eq(PrivateMessage::getRecvId, friendId) .eq(PrivateMessage::getRecvId, friendId)
.eq(PrivateMessage::getStatus, MessageStatus.READED.code()) .eq(PrivateMessage::getStatus, MessageStatus.READED.code())
.orderByDesc(PrivateMessage::getId) .orderByDesc(PrivateMessage::getId)
.select(PrivateMessage::getId) .select(PrivateMessage::getId)
.last("limit 1"); .last("limit 1");
PrivateMessage message = this.getOne(wrapper); PrivateMessage message = this.getOne(wrapper);
if(Objects.isNull(message)){ if(Objects.isNull(message)){
return -1L; return -1L;

35
im-server/src/main/java/com/bx/imserver/config/RedisConfig.java

@ -0,0 +1,35 @@
package com.bx.imserver.config;
import com.alibaba.fastjson.support.spring.FastJsonRedisSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Primary
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
// 设置值(value)的序列化采用FastJsonRedisSerializer
redisTemplate.setValueSerializer(fastJsonRedisSerializer());
redisTemplate.setHashValueSerializer(fastJsonRedisSerializer());
// 设置键(key)的序列化采用StringRedisSerializer。
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
public FastJsonRedisSerializer fastJsonRedisSerializer() {
return new FastJsonRedisSerializer<>(Object.class);
}
}

2
im-server/src/main/java/com/bx/imserver/netty/IMChannelHandler.java

@ -71,7 +71,7 @@ public class IMChannelHandler extends SimpleChannelInboundHandler<IMSendInfo> {
// 移除channel // 移除channel
UserChannelCtxMap.removeChannelCtx(userId, terminal); UserChannelCtxMap.removeChannelCtx(userId, terminal);
// 用户下线 // 用户下线
RedisTemplate<String, Object> redisTemplate = SpringContextHolder.getBean("redisTemplate"); RedisTemplate<String, Object> redisTemplate = SpringContextHolder.getBean(RedisTemplate.class);
String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, userId.toString(), terminal.toString()); String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, userId.toString(), terminal.toString());
redisTemplate.delete(key); redisTemplate.delete(key);
log.info("断开连接,userId:{},终端类型:{},{}", userId, terminal, ctx.channel().id().asLongText()); log.info("断开连接,userId:{},终端类型:{},{}", userId, terminal, ctx.channel().id().asLongText());

Loading…
Cancel
Save