|
@@ -6,9 +6,12 @@ import org.springframework.stereotype.Service;
|
|
|
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
|
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
|
|
|
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
|
|
|
+import java.util.ArrayList;
|
|
|
|
|
+import java.util.List;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.Executors;
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
|
|
|
+import java.util.concurrent.ScheduledFuture;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -27,6 +30,9 @@ public class SseServiceImpl implements shop.alien.dining.service.SseService {
|
|
|
// 定时任务执行器,用于发送心跳
|
|
// 定时任务执行器,用于发送心跳
|
|
|
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10);
|
|
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10);
|
|
|
|
|
|
|
|
|
|
+ /** 每个 SSE 连接对应的心跳任务,连接移除时必须 cancel,否则会对已 complete 的 emitter 继续 send 误报 ERROR */
|
|
|
|
|
+ private final ConcurrentHashMap<String, ScheduledFuture<?>> heartbeatTasks = new ConcurrentHashMap<>();
|
|
|
|
|
+
|
|
|
@Override
|
|
@Override
|
|
|
public SseEmitter createConnection(Integer tableId) {
|
|
public SseEmitter createConnection(Integer tableId) {
|
|
|
log.info("创建SSE连接, tableId={}", tableId);
|
|
log.info("创建SSE连接, tableId={}", tableId);
|
|
@@ -109,14 +115,10 @@ public class SseServiceImpl implements shop.alien.dining.service.SseService {
|
|
|
log.info("关闭SSE连接, tableId={}", tableId);
|
|
log.info("关闭SSE连接, tableId={}", tableId);
|
|
|
ConcurrentHashMap<String, SseEmitter> tableConnections = connections.get(tableId);
|
|
ConcurrentHashMap<String, SseEmitter> tableConnections = connections.get(tableId);
|
|
|
if (tableConnections != null) {
|
|
if (tableConnections != null) {
|
|
|
- tableConnections.forEach((connectionId, emitter) -> {
|
|
|
|
|
- try {
|
|
|
|
|
- emitter.complete();
|
|
|
|
|
- } catch (Exception e) {
|
|
|
|
|
- log.error("关闭SSE连接失败, tableId={}, connectionId={}", tableId, connectionId, e);
|
|
|
|
|
- }
|
|
|
|
|
- });
|
|
|
|
|
- connections.remove(tableId);
|
|
|
|
|
|
|
+ List<String> ids = new ArrayList<>(tableConnections.keySet());
|
|
|
|
|
+ for (String connectionId : ids) {
|
|
|
|
|
+ removeConnection(tableId, connectionId);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -125,11 +127,19 @@ public class SseServiceImpl implements shop.alien.dining.service.SseService {
|
|
|
*/
|
|
*/
|
|
|
private boolean isClientDisconnect(Throwable ex) {
|
|
private boolean isClientDisconnect(Throwable ex) {
|
|
|
if (ex == null) return false;
|
|
if (ex == null) return false;
|
|
|
|
|
+ String cn = ex.getClass().getSimpleName();
|
|
|
|
|
+ if ("AsyncRequestNotUsableException".equals(cn) || "ClientAbortException".equals(cn)) {
|
|
|
|
|
+ return true;
|
|
|
|
|
+ }
|
|
|
String msg = ex.getMessage();
|
|
String msg = ex.getMessage();
|
|
|
if (msg != null) {
|
|
if (msg != null) {
|
|
|
String lower = msg.toLowerCase();
|
|
String lower = msg.toLowerCase();
|
|
|
if (lower.contains("broken pipe") || lower.contains("connection reset")
|
|
if (lower.contains("broken pipe") || lower.contains("connection reset")
|
|
|
- || lower.contains("connection closed") || lower.contains("an established connection was aborted")) {
|
|
|
|
|
|
|
+ || lower.contains("connection closed") || lower.contains("an established connection was aborted")
|
|
|
|
|
+ || lower.contains("connection abort") || lower.contains("abort") || lower.contains("stream closed")
|
|
|
|
|
+ || lower.contains("closed channel") || lower.contains("already completed")
|
|
|
|
|
+ || lower.contains("responsebodyemitter") || lower.contains("not usable")
|
|
|
|
|
+ || msg.contains("中止") || msg.contains("已关闭")) {
|
|
|
return true;
|
|
return true;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -140,6 +150,10 @@ public class SseServiceImpl implements shop.alien.dining.service.SseService {
|
|
|
* 移除连接
|
|
* 移除连接
|
|
|
*/
|
|
*/
|
|
|
private void removeConnection(Integer tableId, String connectionId) {
|
|
private void removeConnection(Integer tableId, String connectionId) {
|
|
|
|
|
+ ScheduledFuture<?> hb = heartbeatTasks.remove(connectionId);
|
|
|
|
|
+ if (hb != null) {
|
|
|
|
|
+ hb.cancel(false);
|
|
|
|
|
+ }
|
|
|
ConcurrentHashMap<String, SseEmitter> tableConnections = connections.get(tableId);
|
|
ConcurrentHashMap<String, SseEmitter> tableConnections = connections.get(tableId);
|
|
|
if (tableConnections != null) {
|
|
if (tableConnections != null) {
|
|
|
SseEmitter emitter = tableConnections.remove(connectionId);
|
|
SseEmitter emitter = tableConnections.remove(connectionId);
|
|
@@ -160,13 +174,20 @@ public class SseServiceImpl implements shop.alien.dining.service.SseService {
|
|
|
* 启动心跳任务
|
|
* 启动心跳任务
|
|
|
*/
|
|
*/
|
|
|
private void startHeartbeat(Integer tableId, String connectionId, SseEmitter emitter) {
|
|
private void startHeartbeat(Integer tableId, String connectionId, SseEmitter emitter) {
|
|
|
- scheduler.scheduleAtFixedRate(() -> {
|
|
|
|
|
- try {
|
|
|
|
|
- if (emitter != null) {
|
|
|
|
|
- emitter.send(SseEmitter.event()
|
|
|
|
|
- .name("heartbeat")
|
|
|
|
|
- .data("ping"));
|
|
|
|
|
|
|
+ final ScheduledFuture<?>[] holder = new ScheduledFuture<?>[1];
|
|
|
|
|
+ holder[0] = scheduler.scheduleAtFixedRate(() -> {
|
|
|
|
|
+ ConcurrentHashMap<String, SseEmitter> tableConnections = connections.get(tableId);
|
|
|
|
|
+ if (tableConnections == null || tableConnections.get(connectionId) != emitter) {
|
|
|
|
|
+ heartbeatTasks.remove(connectionId, holder[0]);
|
|
|
|
|
+ if (holder[0] != null) {
|
|
|
|
|
+ holder[0].cancel(false);
|
|
|
}
|
|
}
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ try {
|
|
|
|
|
+ emitter.send(SseEmitter.event()
|
|
|
|
|
+ .name("heartbeat")
|
|
|
|
|
+ .data("ping"));
|
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
|
if (isClientDisconnect(e)) {
|
|
if (isClientDisconnect(e)) {
|
|
|
log.debug("心跳时客户端已断开, tableId={}, connectionId={}", tableId, connectionId);
|
|
log.debug("心跳时客户端已断开, tableId={}, connectionId={}", tableId, connectionId);
|
|
@@ -175,6 +196,7 @@ public class SseServiceImpl implements shop.alien.dining.service.SseService {
|
|
|
}
|
|
}
|
|
|
removeConnection(tableId, connectionId);
|
|
removeConnection(tableId, connectionId);
|
|
|
}
|
|
}
|
|
|
- }, 30, 30, TimeUnit.SECONDS); // 每30秒发送一次心跳
|
|
|
|
|
|
|
+ }, 30, 30, TimeUnit.SECONDS);
|
|
|
|
|
+ heartbeatTasks.put(connectionId, holder[0]);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|