committed by
Gitee
22 changed files with 200 additions and 90 deletions
@ -1,34 +1,59 @@ |
|||||
package com.bx.imclient.task; |
package com.bx.imclient.task; |
||||
|
|
||||
|
import cn.hutool.core.util.StrUtil; |
||||
import com.alibaba.fastjson.JSONObject; |
import com.alibaba.fastjson.JSONObject; |
||||
import com.bx.imclient.listener.MessageListenerMulticaster; |
import com.bx.imclient.listener.MessageListenerMulticaster; |
||||
import com.bx.imcommon.contant.IMRedisKey; |
import com.bx.imcommon.contant.IMRedisKey; |
||||
import com.bx.imcommon.enums.IMListenerType; |
import com.bx.imcommon.enums.IMListenerType; |
||||
import com.bx.imcommon.model.IMSendResult; |
import com.bx.imcommon.model.IMSendResult; |
||||
import lombok.AllArgsConstructor; |
import lombok.RequiredArgsConstructor; |
||||
|
import org.springframework.beans.factory.annotation.Value; |
||||
import org.springframework.data.redis.core.RedisTemplate; |
import org.springframework.data.redis.core.RedisTemplate; |
||||
import org.springframework.stereotype.Component; |
import org.springframework.stereotype.Component; |
||||
|
|
||||
import javax.annotation.Resource; |
import javax.annotation.Resource; |
||||
import java.util.concurrent.TimeUnit; |
import java.util.LinkedList; |
||||
|
import java.util.List; |
||||
|
import java.util.Objects; |
||||
|
|
||||
|
|
||||
@Component |
@Component |
||||
@AllArgsConstructor |
@RequiredArgsConstructor |
||||
public class GroupMessageResultResultTask extends AbstractMessageResultTask { |
public class GroupMessageResultResultTask extends AbstractMessageResultTask { |
||||
|
|
||||
@Resource(name = "IMRedisTemplate") |
@Resource(name = "IMRedisTemplate") |
||||
private RedisTemplate<String,Object> redisTemplate; |
private RedisTemplate<String,Object> redisTemplate; |
||||
|
|
||||
|
@Value("${spring.application.name}") |
||||
|
private String appName; |
||||
|
|
||||
|
@Value("${im.result.batch:100}") |
||||
|
private int batchSize; |
||||
|
|
||||
private final MessageListenerMulticaster listenerMulticaster; |
private final MessageListenerMulticaster listenerMulticaster; |
||||
|
|
||||
@Override |
@Override |
||||
public void pullMessage() { |
public void pullMessage() { |
||||
String key = IMRedisKey.IM_RESULT_GROUP_QUEUE; |
List<IMSendResult> results; |
||||
JSONObject jsonObject = (JSONObject)redisTemplate.opsForList().leftPop(key,10, TimeUnit.SECONDS); |
do { |
||||
if(jsonObject != null) { |
results = loadBatch(); |
||||
IMSendResult result = jsonObject.toJavaObject(IMSendResult.class); |
if(!results.isEmpty()){ |
||||
listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE,result); |
listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE, results); |
||||
|
} |
||||
|
} while (results.size() >= batchSize); |
||||
|
} |
||||
|
|
||||
|
List<IMSendResult> loadBatch() { |
||||
|
String key = StrUtil.join(":", IMRedisKey.IM_RESULT_GROUP_QUEUE, appName); |
||||
|
//这个接口redis6.2以上才支持
|
||||
|
//List<Object> list = redisTemplate.opsForList().leftPop(key, batchSize);
|
||||
|
List<IMSendResult> results = new LinkedList<>(); |
||||
|
JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); |
||||
|
while (!Objects.isNull(jsonObject) && results.size() < batchSize) { |
||||
|
results.add(jsonObject.toJavaObject(IMSendResult.class)); |
||||
|
jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); |
||||
} |
} |
||||
|
return results; |
||||
} |
} |
||||
|
|
||||
} |
} |
||||
|
|||||
@ -1,37 +1,60 @@ |
|||||
package com.bx.imclient.task; |
package com.bx.imclient.task; |
||||
|
|
||||
|
import cn.hutool.core.util.StrUtil; |
||||
import com.alibaba.fastjson.JSONObject; |
import com.alibaba.fastjson.JSONObject; |
||||
import com.bx.imclient.listener.MessageListenerMulticaster; |
import com.bx.imclient.listener.MessageListenerMulticaster; |
||||
import com.bx.imcommon.contant.IMRedisKey; |
import com.bx.imcommon.contant.IMRedisKey; |
||||
import com.bx.imcommon.enums.IMListenerType; |
import com.bx.imcommon.enums.IMListenerType; |
||||
import com.bx.imcommon.model.IMSendResult; |
import com.bx.imcommon.model.IMSendResult; |
||||
import lombok.AllArgsConstructor; |
import lombok.RequiredArgsConstructor; |
||||
import lombok.extern.slf4j.Slf4j; |
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.springframework.beans.factory.annotation.Value; |
||||
import org.springframework.data.redis.core.RedisTemplate; |
import org.springframework.data.redis.core.RedisTemplate; |
||||
import org.springframework.stereotype.Component; |
import org.springframework.stereotype.Component; |
||||
|
|
||||
import javax.annotation.Resource; |
import javax.annotation.Resource; |
||||
import java.util.concurrent.TimeUnit; |
import java.util.LinkedList; |
||||
|
import java.util.List; |
||||
|
import java.util.Objects; |
||||
|
|
||||
@Slf4j |
@Slf4j |
||||
@Component |
@Component |
||||
@AllArgsConstructor |
@RequiredArgsConstructor |
||||
public class PrivateMessageResultResultTask extends AbstractMessageResultTask { |
public class PrivateMessageResultResultTask extends AbstractMessageResultTask { |
||||
|
|
||||
|
|
||||
@Resource(name = "IMRedisTemplate") |
@Resource(name = "IMRedisTemplate") |
||||
private RedisTemplate<String,Object> redisTemplate; |
private RedisTemplate<String, Object> redisTemplate; |
||||
|
|
||||
|
@Value("${spring.application.name}") |
||||
|
private String appName; |
||||
|
|
||||
|
@Value("${im.result.batch:100}") |
||||
|
private int batchSize; |
||||
|
|
||||
private final MessageListenerMulticaster listenerMulticaster; |
private final MessageListenerMulticaster listenerMulticaster; |
||||
|
|
||||
@Override |
@Override |
||||
public void pullMessage() { |
public void pullMessage() { |
||||
String key = IMRedisKey.IM_RESULT_PRIVATE_QUEUE; |
List<IMSendResult> results; |
||||
JSONObject jsonObject = (JSONObject)redisTemplate.opsForList().leftPop(key,10, TimeUnit.SECONDS); |
do { |
||||
if(jsonObject != null) { |
results = loadBatch(); |
||||
IMSendResult result = jsonObject.toJavaObject(IMSendResult.class); |
if(!results.isEmpty()){ |
||||
listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, result); |
listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, results); |
||||
|
} |
||||
|
} while (results.size() >= batchSize); |
||||
|
} |
||||
|
|
||||
|
List<IMSendResult> loadBatch() { |
||||
|
String key = StrUtil.join(":", IMRedisKey.IM_RESULT_PRIVATE_QUEUE, appName); |
||||
|
//这个接口redis6.2以上才支持
|
||||
|
//List<Object> list = redisTemplate.opsForList().leftPop(key, batchSize);
|
||||
|
List<IMSendResult> results = new LinkedList<>(); |
||||
|
JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); |
||||
|
while (!Objects.isNull(jsonObject) && results.size() < batchSize) { |
||||
|
results.add(jsonObject.toJavaObject(IMSendResult.class)); |
||||
|
jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key); |
||||
} |
} |
||||
|
return results; |
||||
} |
} |
||||
|
|
||||
} |
} |
||||
|
|||||
Loading…
Reference in new issue