fixed task system

This commit is contained in:
DecDuck
2024-10-21 21:50:21 +11:00
parent c355f6fdbb
commit e1c1d7ea39
2 changed files with 19 additions and 13 deletions

View File

@ -3,12 +3,16 @@ import session from "~/server/internal/session";
import { v4 as uuidv4 } from "uuid";
import taskHandler, { TaskMessage } from "~/server/internal/tasks";
// TODO add web socket sessions for horizontal scaling
// ID to admin
const socketSessions: { [key: string]: boolean } = {};
export default defineWebSocketHandler({
open(peer) {
const dummyEvent = {
node: {
req: {
headers: peer.headers,
headers: peer.request?.headers,
},
},
} as unknown as H3Event;
@ -18,29 +22,31 @@ export default defineWebSocketHandler({
return;
}
const admin = session.getAdminUser(dummyEvent);
const peerId = uuidv4();
peer.ctx.id = peerId;
peer.ctx.admin = admin !== undefined;
socketSessions[peer.id] = admin !== undefined;
const rtMsg: TaskMessage = {
id: "connect",
name: "Connect",
success: true,
progress: 0,
error: undefined,
log: [],
};
peer.send(rtMsg);
peer.send(JSON.stringify(rtMsg));
},
message(peer, message) {
if (!peer.ctx.id) return;
if (!peer.id) return;
if (socketSessions[peer.id] === undefined) return;
const text = message.text();
if (text.startsWith("connect/")) {
const id = text.substring("connect/".length);
taskHandler.connect(peer.ctx.id, id, peer, peer.ctx.admin);
taskHandler.connect(peer.id, id, peer, socketSessions[peer.id]);
return;
}
},
close(peer, details) {
if (!peer.ctx.id) return;
if (!peer.id) return;
if (socketSessions[peer.id] === undefined) return;
delete socketSessions[peer.id];
},
});

View File

@ -41,7 +41,7 @@ class TaskHandler {
};
for (const client of Object.keys(taskEntry.clients)) {
if (!this.clientRegistry[client]) continue;
this.clientRegistry[client].send(taskMessage);
this.clientRegistry[client].send(JSON.stringify(taskMessage));
}
updateCollectTimeout = undefined;
}, 100);
@ -91,9 +91,9 @@ class TaskHandler {
connect(id: string, taskId: string, peer: PeerImpl, isAdmin = false) {
const task = this.taskRegistry[taskId];
if (!task) return false;
if (!task) return "Invalid task";
if (task.requireAdmin && !isAdmin) return false;
if (task.requireAdmin && !isAdmin) return "Requires admin";
this.clientRegistry[id] = peer;
this.taskRegistry[taskId].clients[id] = true; // Uniquely insert client to avoid sending duplicate traffic
@ -106,7 +106,7 @@ class TaskHandler {
log: task.log,
progress: task.progress,
};
peer.send(catchupMessage);
peer.send(JSON.stringify(catchupMessage));
return true;
}
@ -150,7 +150,7 @@ export type TaskMessage = {
};
export type PeerImpl = {
send: (message: TaskMessage) => void;
send: (message: string) => void;
};
export const taskHandler = new TaskHandler();