Revert "fix: convert socket sessions to cacheHandler"

This reverts commit 733aee3977.
This commit is contained in:
DecDuck
2025-05-08 21:43:54 +10:00
parent 0f773a9779
commit 2fb909f73d
2 changed files with 13 additions and 13 deletions

View File

@ -1,9 +1,9 @@
import notificationSystem from "~/server/internal/notifications"; import notificationSystem from "~/server/internal/notifications";
import aclManager from "~/server/internal/acls"; import aclManager from "~/server/internal/acls";
import cacheHandler from "~/server/internal/cache";
// TODO add web socket sessions for horizontal scaling
// Peer ID to user ID // Peer ID to user ID
const socketSessions = cacheHandler.createCache<string>("notificationSocketSessions"); const socketSessions = new Map<string, string>();
export default defineWebSocketHandler({ export default defineWebSocketHandler({
async open(peer) { async open(peer) {
@ -23,7 +23,7 @@ export default defineWebSocketHandler({
userIds.push("system"); userIds.push("system");
} }
await socketSessions.set(peer.id, userId); socketSessions.set(peer.id, userId);
for (const listenUserId of userIds) { for (const listenUserId of userIds) {
notificationSystem.listen(listenUserId, peer.id, (notification) => { notificationSystem.listen(listenUserId, peer.id, (notification) => {
@ -32,7 +32,7 @@ export default defineWebSocketHandler({
} }
}, },
async close(peer, _details) { async close(peer, _details) {
const userId = await socketSessions.get(peer.id); const userId = socketSessions.get(peer.id);
if (!userId) { if (!userId) {
console.log(`skipping websocket close for ${peer.id}`); console.log(`skipping websocket close for ${peer.id}`);
return; return;
@ -40,6 +40,6 @@ export default defineWebSocketHandler({
notificationSystem.unlisten(userId, peer.id); notificationSystem.unlisten(userId, peer.id);
notificationSystem.unlisten("system", peer.id); // In case we were listening as 'system' notificationSystem.unlisten("system", peer.id); // In case we were listening as 'system'
await socketSessions.remove(peer.id); socketSessions.delete(peer.id);
}, },
}); });

View File

@ -1,9 +1,9 @@
import taskHandler from "~/server/internal/tasks"; import taskHandler from "~/server/internal/tasks";
import type { MinimumRequestObject } from "~/server/h3"; import type { MinimumRequestObject } from "~/server/h3";
import cacheHandler from "~/server/internal/cache";
// TODO add web socket sessions for horizontal scaling
// ID to admin // ID to admin
const socketHeaders = cacheHandler.createCache<MinimumRequestObject>("taskSocketHeaders"); const socketHeaders = new Map<string, MinimumRequestObject>();
export default defineWebSocketHandler({ export default defineWebSocketHandler({
async open(peer) { async open(peer) {
@ -13,15 +13,15 @@ export default defineWebSocketHandler({
return; return;
} }
await socketHeaders.set(peer.id, { socketHeaders.set(peer.id, {
headers: request.headers ?? new Headers(), headers: request.headers ?? new Headers(),
}); });
peer.send(`connect`); peer.send(`connect`);
}, },
async message(peer, message) { message(peer, message) {
if (!peer.id) return; if (!peer.id) return;
const headers = await socketHeaders.get(peer.id); const headers = socketHeaders.get(peer.id);
if (!headers) return; if (headers === undefined) return;
const text = message.text(); const text = message.text();
if (text.startsWith("connect/")) { if (text.startsWith("connect/")) {
const id = text.substring("connect/".length); const id = text.substring("connect/".length);
@ -29,10 +29,10 @@ export default defineWebSocketHandler({
return; return;
} }
}, },
async close(peer, _details) { close(peer, _details) {
if (!peer.id) return; if (!peer.id) return;
if (!socketHeaders.has(peer.id)) return; if (!socketHeaders.has(peer.id)) return;
await socketHeaders.remove(peer.id); socketHeaders.delete(peer.id);
taskHandler.disconnectAll(peer.id); taskHandler.disconnectAll(peer.id);
}, },