25 changed files with 22 additions and 645 deletions
@ -1,45 +0,0 @@ |
|||
import { getToken } from '@/utils/auth'; |
|||
import { ElNotification } from 'element-plus'; |
|||
import useNoticeStore from '@/store/modules/notice'; |
|||
|
|||
// 初始化
|
|||
export const initSSE = (url: any) => { |
|||
if (import.meta.env.VITE_APP_SSE === 'false') { |
|||
return; |
|||
} |
|||
|
|||
url = url + '?Authorization=Bearer ' + getToken() + '&clientid=' + import.meta.env.VITE_APP_CLIENT_ID |
|||
const { |
|||
data, |
|||
error |
|||
} = useEventSource(url, [], { |
|||
autoReconnect: { |
|||
retries: 10, |
|||
delay: 3000, |
|||
onFailed() { |
|||
console.log('Failed to connect after 10 retries'); |
|||
} |
|||
} |
|||
}); |
|||
|
|||
watch(error, () => { |
|||
console.log('SSE connection error:', error.value); |
|||
error.value = null; |
|||
}); |
|||
|
|||
watch(data, () => { |
|||
if (!data.value) return; |
|||
useNoticeStore().addNotice({ |
|||
message: data.value, |
|||
read: false, |
|||
time: new Date().toLocaleString() |
|||
}); |
|||
ElNotification({ |
|||
title: '消息', |
|||
message: data.value, |
|||
type: 'success', |
|||
duration: 3000 |
|||
}); |
|||
data.value = null; |
|||
}); |
|||
}; |
|||
@ -1,62 +0,0 @@ |
|||
package org.dromara.common.core.exception; |
|||
|
|||
import lombok.AllArgsConstructor; |
|||
import lombok.Data; |
|||
import lombok.EqualsAndHashCode; |
|||
import lombok.NoArgsConstructor; |
|||
|
|||
import java.io.Serial; |
|||
|
|||
/** |
|||
* sse 特制异常 |
|||
* |
|||
* @author LionLi |
|||
*/ |
|||
@Data |
|||
@EqualsAndHashCode(callSuper = true) |
|||
@NoArgsConstructor |
|||
@AllArgsConstructor |
|||
public final class SseException extends RuntimeException { |
|||
|
|||
@Serial |
|||
private static final long serialVersionUID = 1L; |
|||
|
|||
/** |
|||
* 错误码 |
|||
*/ |
|||
private Integer code; |
|||
|
|||
/** |
|||
* 错误提示 |
|||
*/ |
|||
private String message; |
|||
|
|||
/** |
|||
* 错误明细,内部调试错误 |
|||
*/ |
|||
private String detailMessage; |
|||
|
|||
public SseException(String message) { |
|||
this.message = message; |
|||
} |
|||
|
|||
public SseException(String message, Integer code) { |
|||
this.message = message; |
|||
this.code = code; |
|||
} |
|||
|
|||
@Override |
|||
public String getMessage() { |
|||
return message; |
|||
} |
|||
|
|||
public SseException setMessage(String message) { |
|||
this.message = message; |
|||
return this; |
|||
} |
|||
|
|||
public SseException setDetailMessage(String detailMessage) { |
|||
this.detailMessage = detailMessage; |
|||
return this; |
|||
} |
|||
} |
|||
@ -1,36 +0,0 @@ |
|||
<?xml version="1.0" encoding="UTF-8"?> |
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" |
|||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
|||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
|||
<parent> |
|||
<groupId>org.dromara</groupId> |
|||
<artifactId>ruoyi-common</artifactId> |
|||
<version>${revision}</version> |
|||
</parent> |
|||
<modelVersion>4.0.0</modelVersion> |
|||
|
|||
<artifactId>ruoyi-common-sse</artifactId> |
|||
|
|||
<description> |
|||
ruoyi-common-sse 模块 |
|||
</description> |
|||
|
|||
<dependencies> |
|||
<dependency> |
|||
<groupId>org.dromara</groupId> |
|||
<artifactId>ruoyi-common-core</artifactId> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>org.dromara</groupId> |
|||
<artifactId>ruoyi-common-redis</artifactId> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>org.dromara</groupId> |
|||
<artifactId>ruoyi-common-satoken</artifactId> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>org.dromara</groupId> |
|||
<artifactId>ruoyi-common-json</artifactId> |
|||
</dependency> |
|||
</dependencies> |
|||
</project> |
|||
@ -1,36 +0,0 @@ |
|||
package org.dromara.common.sse.config; |
|||
|
|||
import org.dromara.common.sse.controller.SseController; |
|||
import org.dromara.common.sse.core.SseEmitterManager; |
|||
import org.dromara.common.sse.listener.SseTopicListener; |
|||
import org.springframework.boot.autoconfigure.AutoConfiguration; |
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; |
|||
import org.springframework.boot.context.properties.EnableConfigurationProperties; |
|||
import org.springframework.context.annotation.Bean; |
|||
|
|||
/** |
|||
* SSE 自动装配 |
|||
* |
|||
* @author Lion Li |
|||
*/ |
|||
@AutoConfiguration |
|||
@ConditionalOnProperty(value = "sse.enabled", havingValue = "true") |
|||
@EnableConfigurationProperties(SseProperties.class) |
|||
public class SseAutoConfiguration { |
|||
|
|||
@Bean |
|||
public SseEmitterManager sseEmitterManager() { |
|||
return new SseEmitterManager(); |
|||
} |
|||
|
|||
@Bean |
|||
public SseTopicListener sseTopicListener() { |
|||
return new SseTopicListener(); |
|||
} |
|||
|
|||
@Bean |
|||
public SseController sseController(SseEmitterManager sseEmitterManager) { |
|||
return new SseController(sseEmitterManager); |
|||
} |
|||
|
|||
} |
|||
@ -1,21 +0,0 @@ |
|||
package org.dromara.common.sse.config; |
|||
|
|||
import lombok.Data; |
|||
import org.springframework.boot.context.properties.ConfigurationProperties; |
|||
|
|||
/** |
|||
* SSE 配置项 |
|||
* |
|||
* @author Lion Li |
|||
*/ |
|||
@Data |
|||
@ConfigurationProperties("sse") |
|||
public class SseProperties { |
|||
|
|||
private Boolean enabled; |
|||
|
|||
/** |
|||
* 路径 |
|||
*/ |
|||
private String path; |
|||
} |
|||
@ -1,87 +0,0 @@ |
|||
package org.dromara.common.sse.controller; |
|||
|
|||
import cn.dev33.satoken.annotation.SaIgnore; |
|||
import cn.dev33.satoken.stp.StpUtil; |
|||
import lombok.RequiredArgsConstructor; |
|||
import org.dromara.common.core.domain.R; |
|||
import org.dromara.common.satoken.utils.LoginHelper; |
|||
import org.dromara.common.sse.core.SseEmitterManager; |
|||
import org.dromara.common.sse.dto.SseMessageDto; |
|||
import org.springframework.beans.factory.DisposableBean; |
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; |
|||
import org.springframework.http.MediaType; |
|||
import org.springframework.web.bind.annotation.GetMapping; |
|||
import org.springframework.web.bind.annotation.RestController; |
|||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; |
|||
|
|||
import java.util.List; |
|||
|
|||
/** |
|||
* SSE 控制器 |
|||
* |
|||
* @author Lion Li |
|||
*/ |
|||
@RestController |
|||
@ConditionalOnProperty(value = "sse.enabled", havingValue = "true") |
|||
@RequiredArgsConstructor |
|||
public class SseController implements DisposableBean { |
|||
|
|||
private final SseEmitterManager sseEmitterManager; |
|||
|
|||
/** |
|||
* 建立 SSE 连接 |
|||
*/ |
|||
@GetMapping(value = "${sse.path}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) |
|||
public SseEmitter connect() { |
|||
String tokenValue = StpUtil.getTokenValue(); |
|||
Long userId = LoginHelper.getUserId(); |
|||
return sseEmitterManager.connect(userId, tokenValue); |
|||
} |
|||
|
|||
/** |
|||
* 关闭 SSE 连接 |
|||
*/ |
|||
@SaIgnore |
|||
@GetMapping(value = "${sse.path}/close") |
|||
public R<Void> close() { |
|||
String tokenValue = StpUtil.getTokenValue(); |
|||
Long userId = LoginHelper.getUserId(); |
|||
sseEmitterManager.disconnect(userId, tokenValue); |
|||
return R.ok(); |
|||
} |
|||
|
|||
/** |
|||
* 向特定用户发送消息 |
|||
* |
|||
* @param userId 目标用户的 ID |
|||
* @param msg 要发送的消息内容 |
|||
*/ |
|||
@GetMapping(value = "${sse.path}/send") |
|||
public R<Void> send(Long userId, String msg) { |
|||
SseMessageDto dto = new SseMessageDto(); |
|||
dto.setUserIds(List.of(userId)); |
|||
dto.setMessage(msg); |
|||
sseEmitterManager.publishMessage(dto); |
|||
return R.ok(); |
|||
} |
|||
|
|||
/** |
|||
* 向所有用户发送消息 |
|||
* |
|||
* @param msg 要发送的消息内容 |
|||
*/ |
|||
@GetMapping(value = "${sse.path}/sendAll") |
|||
public R<Void> send(String msg) { |
|||
sseEmitterManager.publishAll(msg); |
|||
return R.ok(); |
|||
} |
|||
|
|||
/** |
|||
* 清理资源。此方法目前不执行任何操作,但避免因未实现而导致错误 |
|||
*/ |
|||
@Override |
|||
public void destroy() throws Exception { |
|||
// 销毁时不需要做什么 此方法避免无用操作报错
|
|||
} |
|||
|
|||
} |
|||
@ -1,145 +0,0 @@ |
|||
package org.dromara.common.sse.core; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.dromara.common.redis.utils.RedisUtils; |
|||
import org.dromara.common.sse.dto.SseMessageDto; |
|||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; |
|||
|
|||
import java.io.IOException; |
|||
import java.util.Map; |
|||
import java.util.concurrent.ConcurrentHashMap; |
|||
import java.util.function.Consumer; |
|||
|
|||
/** |
|||
* 管理 Server-Sent Events (SSE) 连接 |
|||
* |
|||
* @author Lion Li |
|||
*/ |
|||
@Slf4j |
|||
public class SseEmitterManager { |
|||
|
|||
/** |
|||
* 订阅的频道 |
|||
*/ |
|||
private final static String SSE_TOPIC = "global:sse"; |
|||
|
|||
private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>(); |
|||
|
|||
/** |
|||
* 建立与指定用户的 SSE 连接 |
|||
* |
|||
* @param userId 用户的唯一标识符,用于区分不同用户的连接 |
|||
* @param token 用户的唯一令牌,用于识别具体的连接 |
|||
* @return 返回一个 SseEmitter 实例,客户端可以通过该实例接收 SSE 事件 |
|||
*/ |
|||
public SseEmitter connect(Long userId, String token) { |
|||
// 从 USER_TOKEN_EMITTERS 中获取或创建当前用户的 SseEmitter 映射表(ConcurrentHashMap)
|
|||
// 每个用户可以有多个 SSE 连接,通过 token 进行区分
|
|||
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>()); |
|||
|
|||
// 创建一个新的 SseEmitter 实例,超时时间设置为 0 表示无限制
|
|||
SseEmitter emitter = new SseEmitter(0L); |
|||
|
|||
emitters.put(token, emitter); |
|||
|
|||
// 当 emitter 完成、超时或发生错误时,从映射表中移除对应的 token
|
|||
emitter.onCompletion(() -> emitters.remove(token)); |
|||
emitter.onTimeout(() -> emitters.remove(token)); |
|||
emitter.onError((e) -> emitters.remove(token)); |
|||
|
|||
try { |
|||
// 向客户端发送一条连接成功的事件
|
|||
emitter.send(SseEmitter.event().comment("connected")); |
|||
} catch (IOException e) { |
|||
// 如果发送消息失败,则从映射表中移除 emitter
|
|||
emitters.remove(token); |
|||
} |
|||
return emitter; |
|||
} |
|||
|
|||
/** |
|||
* 断开指定用户的 SSE 连接 |
|||
* |
|||
* @param userId 用户的唯一标识符,用于区分不同用户的连接 |
|||
* @param token 用户的唯一令牌,用于识别具体的连接 |
|||
*/ |
|||
public void disconnect(Long userId, String token) { |
|||
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId); |
|||
if (emitters != null) { |
|||
try { |
|||
emitters.get(token).send(SseEmitter.event().comment("disconnected")); |
|||
} catch (Exception ignore) { |
|||
} |
|||
emitters.remove(token); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 订阅SSE消息主题,并提供一个消费者函数来处理接收到的消息 |
|||
* |
|||
* @param consumer 处理SSE消息的消费者函数 |
|||
*/ |
|||
public void subscribeMessage(Consumer<SseMessageDto> consumer) { |
|||
RedisUtils.subscribe(SSE_TOPIC, SseMessageDto.class, consumer); |
|||
} |
|||
|
|||
/** |
|||
* 向指定的用户会话发送消息 |
|||
* |
|||
* @param userId 要发送消息的用户id |
|||
* @param message 要发送的消息内容 |
|||
*/ |
|||
public void sendMessage(Long userId, String message) { |
|||
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId); |
|||
if (emitters != null) { |
|||
for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) { |
|||
try { |
|||
entry.getValue().send(SseEmitter.event() |
|||
.name("message") |
|||
.data(message)); |
|||
} catch (Exception e) { |
|||
emitters.remove(entry.getKey()); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 本机全用户会话发送消息 |
|||
* |
|||
* @param message 要发送的消息内容 |
|||
*/ |
|||
public void sendMessage(String message) { |
|||
for (Long userId : USER_TOKEN_EMITTERS.keySet()) { |
|||
sendMessage(userId, message); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 发布SSE订阅消息 |
|||
* |
|||
* @param sseMessageDto 要发布的SSE消息对象 |
|||
*/ |
|||
public void publishMessage(SseMessageDto sseMessageDto) { |
|||
SseMessageDto broadcastMessage = new SseMessageDto(); |
|||
broadcastMessage.setMessage(sseMessageDto.getMessage()); |
|||
broadcastMessage.setUserIds(sseMessageDto.getUserIds()); |
|||
RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { |
|||
log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}", |
|||
SSE_TOPIC, sseMessageDto.getUserIds(), sseMessageDto.getMessage()); |
|||
}); |
|||
} |
|||
|
|||
/** |
|||
* 向所有的用户发布订阅的消息(群发) |
|||
* |
|||
* @param message 要发布的消息内容 |
|||
*/ |
|||
public void publishAll(String message) { |
|||
SseMessageDto broadcastMessage = new SseMessageDto(); |
|||
broadcastMessage.setMessage(message); |
|||
RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { |
|||
log.info("SSE发送主题订阅消息topic:{} message:{}", SSE_TOPIC, message); |
|||
}); |
|||
} |
|||
} |
|||
@ -1,29 +0,0 @@ |
|||
package org.dromara.common.sse.dto; |
|||
|
|||
import lombok.Data; |
|||
|
|||
import java.io.Serial; |
|||
import java.io.Serializable; |
|||
import java.util.List; |
|||
|
|||
/** |
|||
* 消息的dto |
|||
* |
|||
* @author zendwang |
|||
*/ |
|||
@Data |
|||
public class SseMessageDto implements Serializable { |
|||
|
|||
@Serial |
|||
private static final long serialVersionUID = 1L; |
|||
|
|||
/** |
|||
* 需要推送到的session key 列表 |
|||
*/ |
|||
private List<Long> userIds; |
|||
|
|||
/** |
|||
* 需要发送的消息 |
|||
*/ |
|||
private String message; |
|||
} |
|||
@ -1,48 +0,0 @@ |
|||
package org.dromara.common.sse.listener; |
|||
|
|||
import cn.hutool.core.collection.CollUtil; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.dromara.common.sse.core.SseEmitterManager; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.boot.ApplicationArguments; |
|||
import org.springframework.boot.ApplicationRunner; |
|||
import org.springframework.core.Ordered; |
|||
|
|||
/** |
|||
* SSE 主题订阅监听器 |
|||
* |
|||
* @author Lion Li |
|||
*/ |
|||
@Slf4j |
|||
public class SseTopicListener implements ApplicationRunner, Ordered { |
|||
|
|||
@Autowired |
|||
private SseEmitterManager sseEmitterManager; |
|||
|
|||
/** |
|||
* 在Spring Boot应用程序启动时初始化SSE主题订阅监听器 |
|||
* |
|||
* @param args 应用程序参数 |
|||
* @throws Exception 初始化过程中可能抛出的异常 |
|||
*/ |
|||
@Override |
|||
public void run(ApplicationArguments args) throws Exception { |
|||
sseEmitterManager.subscribeMessage((message) -> { |
|||
log.info("SSE主题订阅收到消息session keys={} message={}", message.getUserIds(), message.getMessage()); |
|||
// 如果key不为空就按照key发消息 如果为空就群发
|
|||
if (CollUtil.isNotEmpty(message.getUserIds())) { |
|||
message.getUserIds().forEach(key -> { |
|||
sseEmitterManager.sendMessage(key, message.getMessage()); |
|||
}); |
|||
} else { |
|||
sseEmitterManager.sendMessage(message.getMessage()); |
|||
} |
|||
}); |
|||
log.info("初始化SSE主题订阅监听器成功"); |
|||
} |
|||
|
|||
@Override |
|||
public int getOrder() { |
|||
return -1; |
|||
} |
|||
} |
|||
@ -1,58 +0,0 @@ |
|||
package org.dromara.common.sse.utils; |
|||
|
|||
import lombok.AccessLevel; |
|||
import lombok.NoArgsConstructor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.dromara.common.core.utils.SpringUtils; |
|||
import org.dromara.common.sse.core.SseEmitterManager; |
|||
import org.dromara.common.sse.dto.SseMessageDto; |
|||
|
|||
/** |
|||
* SSE工具类 |
|||
* |
|||
* @author Lion Li |
|||
*/ |
|||
@Slf4j |
|||
@NoArgsConstructor(access = AccessLevel.PRIVATE) |
|||
public class SseMessageUtils { |
|||
|
|||
private final static SseEmitterManager MANAGER = SpringUtils.getBean(SseEmitterManager.class); |
|||
|
|||
/** |
|||
* 向指定的WebSocket会话发送消息 |
|||
* |
|||
* @param userId 要发送消息的用户id |
|||
* @param message 要发送的消息内容 |
|||
*/ |
|||
public static void sendMessage(Long userId, String message) { |
|||
MANAGER.sendMessage(userId, message); |
|||
} |
|||
|
|||
/** |
|||
* 本机全用户会话发送消息 |
|||
* |
|||
* @param message 要发送的消息内容 |
|||
*/ |
|||
public static void sendMessage(String message) { |
|||
MANAGER.sendMessage(message); |
|||
} |
|||
|
|||
/** |
|||
* 发布SSE订阅消息 |
|||
* |
|||
* @param sseMessageDto 要发布的SSE消息对象 |
|||
*/ |
|||
public static void publishMessage(SseMessageDto sseMessageDto) { |
|||
MANAGER.publishMessage(sseMessageDto); |
|||
} |
|||
|
|||
/** |
|||
* 向所有的用户发布订阅的消息(群发) |
|||
* |
|||
* @param message 要发布的消息内容 |
|||
*/ |
|||
public static void publishAll(String message) { |
|||
MANAGER.publishAll(message); |
|||
} |
|||
|
|||
} |
|||
@ -1 +0,0 @@ |
|||
org.dromara.common.sse.config.SseAutoConfiguration |
|||
Loading…
Reference in new issue