feat: add job queue

This commit is contained in:
Ephraim Atta-Duncan
2024-04-07 18:30:16 +00:00
parent fc70f78e61
commit 53abb8f00b
7 changed files with 315 additions and 4 deletions

View File

@ -0,0 +1,52 @@
import type { WorkHandler } from 'pg-boss';
import PgBoss from 'pg-boss';
import { jobHandlers } from './job';
type QueueState = {
isReady: boolean;
queue: PgBoss | null;
};
let initPromise: Promise<PgBoss> | null = null;
const state: QueueState = {
isReady: false,
queue: null,
};
export async function initQueue() {
if (state.isReady) {
return state.queue as PgBoss;
}
if (initPromise) {
return initPromise;
}
initPromise = (async () => {
const queue = new PgBoss({
connectionString: 'postgres://postgres:password@127.0.0.1:54321/queue',
schema: 'documenso_queue',
});
try {
await queue.start();
} catch (error) {
console.error('Failed to start queue', error);
}
await Promise.all(
Object.entries(jobHandlers).map(async ([job, jobHandler]) => {
await queue.work(job, jobHandler as WorkHandler<unknown>);
}),
);
state.isReady = true;
state.queue = queue;
return queue;
})();
return initPromise;
}

View File

@ -0,0 +1,34 @@
import type { WorkHandler } from 'pg-boss';
import { initQueue } from '.';
import {
type SendDocumentOptions as SendCompletedDocumentOptions,
sendCompletedEmail,
} from '../document/send-completed-email';
type JobOptions = {
'send-completed-email': SendCompletedDocumentOptions;
};
export const jobHandlers: {
[K in keyof JobOptions]: WorkHandler<JobOptions[K]>;
} = {
'send-completed-email': async ({ data }) => {
await sendCompletedEmail({
documentId: data.documentId,
requestMetadata: data.requestMetadata,
});
},
};
export const queueJob = async ({
job,
args,
}: {
job: keyof JobOptions;
args?: JobOptions[keyof JobOptions];
}) => {
const queue = await initQueue();
await queue.send(job, args ?? {});
};