feat: new ws handler

This commit is contained in:
DecDuck
2024-11-16 17:03:52 +11:00
parent 8463e35a10
commit bc0c47c487
3 changed files with 60 additions and 32 deletions

View File

@ -1,7 +1,15 @@
import type { TaskMessage } from "~/server/internal/tasks";
import { WebSocketHandler } from "./ws";
let ws: WebSocket | undefined;
const msgQueue: Array<string> = [];
const websocketHandler = new WebSocketHandler("/api/v1/task");
websocketHandler.listen((message) => {
const msg = JSON.parse(message) as TaskMessage;
const taskStates = useTaskStates();
const state = taskStates.value[msg.id];
if (!state) return;
state.value = msg;
});
const useTaskStates = () =>
useState<{ [key: string]: Ref<TaskMessage> }>("task-states", () => ({
@ -15,32 +23,6 @@ const useTaskStates = () =>
})),
}));
function initWs() {
const isSecure = location.protocol === "https:";
const url = (isSecure ? "wss://" : "ws://") + location.host + "/api/v1/task";
ws = new WebSocket(url);
ws.onmessage = (e) => {
const msg = JSON.parse(e.data) as TaskMessage;
const taskStates = useTaskStates();
const state = taskStates.value[msg.id];
if (!state) return;
state.value = msg;
};
ws.onopen = () => {
for (const message of msgQueue) {
ws?.send(message);
}
};
return ws;
}
function sendMessage(msg: string) {
if (!ws) return msgQueue.push(msg);
if (ws.readyState == 0) return msgQueue.push(msg);
return ws.send(msg);
}
export const useTaskReady = () => {
const taskStates = useTaskStates();
return taskStates.value["connect"];
@ -51,8 +33,6 @@ export const useTask = (taskId: string): Ref<TaskMessage> => {
const taskStates = useTaskStates();
if (taskStates.value[taskId]) return taskStates.value[taskId];
if (!ws) initWs();
taskStates.value[taskId] = useState(`task-${taskId}`, () => ({
id: taskId,
name: "loading...",
@ -61,6 +41,6 @@ export const useTask = (taskId: string): Ref<TaskMessage> => {
error: undefined,
log: [],
}));
sendMessage(`connect/${taskId}`);
websocketHandler.send(`connect/${taskId}`);
return taskStates.value[taskId];
};

48
composables/ws.ts Normal file
View File

@ -0,0 +1,48 @@
export type WebSocketCallback = (message: string) => void;
export class WebSocketHandler {
private listeners: Array<WebSocketCallback> = [];
private outQueue: Array<string> = [];
private inQueue: Array<string> = [];
private ws: WebSocket | undefined = undefined;
private connected: boolean = false;
constructor(route: string) {
if (import.meta.server) return;
const isSecure = location.protocol === "https:";
const url = (isSecure ? "wss://" : "ws://") + location.host + route;
this.ws = new WebSocket(url);
this.ws.onopen = () => {
this.connected = true;
for (const message of this.outQueue) {
this.ws?.send(message);
}
};
this.ws.onmessage = (e) => {
const message = e.data;
if (this.listeners.length == 0) {
this.inQueue.push(message);
return;
}
for (const listener of this.listeners) {
listener(message);
}
};
}
listen(callback: WebSocketCallback) {
this.listeners.push(callback);
}
send(message: string) {
if (!this.connected || !this.ws) {
this.outQueue.push(message);
return;
}
this.ws.send(message);
}
}