From 2fb909f73d4d9d636791b5e6937dd60e1eb4e3fc Mon Sep 17 00:00:00 2001 From: DecDuck Date: Thu, 8 May 2025 21:43:54 +1000 Subject: [PATCH] Revert "fix: convert socket sessions to cacheHandler" This reverts commit 733aee397793cf88cb1d0b250f4cdaaaed390128. --- server/api/v1/notifications/ws.get.ts | 10 +++++----- server/api/v1/task/index.get.ts | 16 ++++++++-------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/server/api/v1/notifications/ws.get.ts b/server/api/v1/notifications/ws.get.ts index 0bb2484..58db068 100644 --- a/server/api/v1/notifications/ws.get.ts +++ b/server/api/v1/notifications/ws.get.ts @@ -1,9 +1,9 @@ import notificationSystem from "~/server/internal/notifications"; import aclManager from "~/server/internal/acls"; -import cacheHandler from "~/server/internal/cache"; +// TODO add web socket sessions for horizontal scaling // Peer ID to user ID -const socketSessions = cacheHandler.createCache("notificationSocketSessions"); +const socketSessions = new Map(); export default defineWebSocketHandler({ async open(peer) { @@ -23,7 +23,7 @@ export default defineWebSocketHandler({ userIds.push("system"); } - await socketSessions.set(peer.id, userId); + socketSessions.set(peer.id, userId); for (const listenUserId of userIds) { notificationSystem.listen(listenUserId, peer.id, (notification) => { @@ -32,7 +32,7 @@ export default defineWebSocketHandler({ } }, async close(peer, _details) { - const userId = await socketSessions.get(peer.id); + const userId = socketSessions.get(peer.id); if (!userId) { console.log(`skipping websocket close for ${peer.id}`); return; @@ -40,6 +40,6 @@ export default defineWebSocketHandler({ notificationSystem.unlisten(userId, peer.id); notificationSystem.unlisten("system", peer.id); // In case we were listening as 'system' - await socketSessions.remove(peer.id); + socketSessions.delete(peer.id); }, }); diff --git a/server/api/v1/task/index.get.ts b/server/api/v1/task/index.get.ts index 8770e22..6f25a53 100644 --- a/server/api/v1/task/index.get.ts +++ b/server/api/v1/task/index.get.ts @@ -1,9 +1,9 @@ import taskHandler from "~/server/internal/tasks"; import type { MinimumRequestObject } from "~/server/h3"; -import cacheHandler from "~/server/internal/cache"; +// TODO add web socket sessions for horizontal scaling // ID to admin -const socketHeaders = cacheHandler.createCache("taskSocketHeaders"); +const socketHeaders = new Map(); export default defineWebSocketHandler({ async open(peer) { @@ -13,15 +13,15 @@ export default defineWebSocketHandler({ return; } - await socketHeaders.set(peer.id, { + socketHeaders.set(peer.id, { headers: request.headers ?? new Headers(), }); peer.send(`connect`); }, - async message(peer, message) { + message(peer, message) { if (!peer.id) return; - const headers = await socketHeaders.get(peer.id); - if (!headers) return; + const headers = socketHeaders.get(peer.id); + if (headers === undefined) return; const text = message.text(); if (text.startsWith("connect/")) { const id = text.substring("connect/".length); @@ -29,10 +29,10 @@ export default defineWebSocketHandler({ return; } }, - async close(peer, _details) { + close(peer, _details) { if (!peer.id) return; if (!socketHeaders.has(peer.id)) return; - await socketHeaders.remove(peer.id); + socketHeaders.delete(peer.id); taskHandler.disconnectAll(peer.id); },