feat: ghetto durable compute

This commit is contained in:
Mythie
2024-05-16 15:44:39 +10:00
parent 61827ad729
commit 991f808890
20 changed files with 847 additions and 151 deletions

View File

@ -1,11 +1,16 @@
import type { NextApiRequest, NextApiResponse } from 'next';
import { sha256 } from '@noble/hashes/sha256';
import { json } from 'micro';
import { prisma } from '@documenso/prisma';
import { BackgroundJobStatus, Prisma } from '@documenso/prisma/client';
import { NEXT_PUBLIC_WEBAPP_URL } from '../../constants/app';
import { sign } from '../../server-only/crypto/sign';
import { verify } from '../../server-only/crypto/verify';
import type { JobDefinition, JobRunIO, TriggerJobOptions } from './_internal/job';
import type { Json } from './_internal/json';
import { BaseJobProvider } from './base';
export class LocalJobProvider extends BaseJobProvider {
@ -33,33 +38,57 @@ export class LocalJobProvider extends BaseJobProvider {
}
public async triggerJob(options: TriggerJobOptions) {
const signature = sign(options);
console.log({ jobDefinitions: this._jobDefinitions });
await Promise.race([
fetch(`${NEXT_PUBLIC_WEBAPP_URL()}/api/jobs/trigger`, {
method: 'POST',
body: JSON.stringify(options),
headers: {
'Content-Type': 'application/json',
'X-Job-Signature': signature,
},
const eligibleJobs = Object.values(this._jobDefinitions).filter(
(job) => job.trigger.name === options.name,
);
console.log({ options });
console.log(
'Eligible jobs:',
eligibleJobs.map((job) => job.name),
);
await Promise.all(
eligibleJobs.map(async (job) => {
// Ideally we will change this to a createMany with returning later once we upgrade Prisma
// @see: https://github.com/prisma/prisma/releases/tag/5.14.0
const pendingJob = await prisma.backgroundJob.create({
data: {
jobId: job.id,
name: job.name,
version: job.version,
payload: options.payload,
},
});
await this.submitJobToEndpoint({
jobId: pendingJob.id,
jobDefinitionId: pendingJob.jobId,
data: options,
});
}),
new Promise((resolve) => {
setTimeout(resolve, 150);
}),
]);
);
}
public getApiHandler() {
return async (req: NextApiRequest, res: NextApiResponse) => {
if (req.method === 'POST') {
const jobId = req.headers['x-job-id'];
const signature = req.headers['x-job-signature'];
const isRetry = req.headers['x-job-retry'] !== undefined;
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const options = (await json(req)) as TriggerJobOptions;
const definition = this._jobDefinitions[options.name];
if (typeof signature !== 'string' || typeof options !== 'object') {
if (
typeof jobId !== 'string' ||
typeof signature !== 'string' ||
typeof options !== 'object'
) {
res.status(400).send('Bad request');
return;
}
@ -92,10 +121,83 @@ export class LocalJobProvider extends BaseJobProvider {
console.log(`[JOBS]: Triggering job ${options.name} with payload`, options.payload);
await definition.handler({
payload: options.payload,
io: this.createJobRunIO(options.name),
});
let backgroundJob = await prisma.backgroundJob
.update({
where: {
id: jobId,
status: BackgroundJobStatus.PENDING,
},
data: {
status: BackgroundJobStatus.PROCESSING,
retried: {
increment: isRetry ? 1 : 0,
},
lastRetriedAt: isRetry ? new Date() : undefined,
},
})
.catch(() => null);
if (!backgroundJob) {
res.status(404).send('Job not found');
return;
}
try {
await definition.handler({
payload: options.payload,
io: this.createJobRunIO(jobId),
});
backgroundJob = await prisma.backgroundJob.update({
where: {
id: jobId,
status: BackgroundJobStatus.PROCESSING,
},
data: {
status: BackgroundJobStatus.COMPLETED,
completedAt: new Date(),
},
});
} catch (error) {
console.error(`[JOBS]: Job ${options.name} failed`, error);
const taskHasExceededRetries = error instanceof BackgroundTaskExceededRetriesError;
const jobHasExceededRetries =
backgroundJob.retried >= backgroundJob.maxRetries &&
!(error instanceof BackgroundTaskFailedError);
if (taskHasExceededRetries || jobHasExceededRetries) {
backgroundJob = await prisma.backgroundJob.update({
where: {
id: jobId,
status: BackgroundJobStatus.PROCESSING,
},
data: {
status: BackgroundJobStatus.FAILED,
completedAt: new Date(),
},
});
res.status(500).send('Task exceeded retries');
return;
}
backgroundJob = await prisma.backgroundJob.update({
where: {
id: jobId,
status: BackgroundJobStatus.PROCESSING,
},
data: {
status: BackgroundJobStatus.PENDING,
},
});
await this.submitJobToEndpoint({
jobId,
jobDefinitionId: backgroundJob.jobId,
data: options,
});
}
res.status(200).send('OK');
} else {
@ -104,9 +206,105 @@ export class LocalJobProvider extends BaseJobProvider {
};
}
private async submitJobToEndpoint(options: {
jobId: string;
jobDefinitionId: string;
data: TriggerJobOptions;
isRetry?: boolean;
}) {
const { jobId, jobDefinitionId, data, isRetry } = options;
const endpoint = `${NEXT_PUBLIC_WEBAPP_URL()}/api/jobs/${jobDefinitionId}/${jobId}`;
const signature = sign(data);
const headers: Record<string, string> = {
'Content-Type': 'application/json',
'X-Job-Id': jobId,
'X-Job-Signature': signature,
};
if (isRetry) {
headers['X-Job-Retry'] = '1';
}
console.log('Submitting job to endpoint:', endpoint);
await Promise.race([
fetch(endpoint, {
method: 'POST',
body: JSON.stringify(data),
headers,
}).catch(() => null),
new Promise((resolve) => {
setTimeout(resolve, 150);
}),
]);
}
private createJobRunIO(jobId: string): JobRunIO {
return {
stableRun: async (_cacheKey, callback) => await callback(),
runTask: async <T extends void | Json>(cacheKey: string, callback: () => Promise<T>) => {
const hashedKey = Buffer.from(sha256(cacheKey)).toString('hex');
let task = await prisma.backgroundJobTask.findFirst({
where: {
id: `task-${hashedKey}--${jobId}`,
jobId,
},
});
if (!task) {
task = await prisma.backgroundJobTask.create({
data: {
id: `task-${hashedKey}--${jobId}`,
name: cacheKey,
jobId,
status: BackgroundJobStatus.PENDING,
},
});
}
if (task.status === BackgroundJobStatus.COMPLETED) {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
return task.result as T;
}
if (task.retried >= 3) {
throw new BackgroundTaskExceededRetriesError('Task exceeded retries');
}
try {
const result = await callback();
task = await prisma.backgroundJobTask.update({
where: {
id: task.id,
jobId,
},
data: {
status: BackgroundJobStatus.COMPLETED,
result: result === null ? Prisma.JsonNull : result,
completedAt: new Date(),
},
});
return result;
} catch {
task = await prisma.backgroundJobTask.update({
where: {
id: task.id,
jobId,
},
data: {
status: BackgroundJobStatus.PENDING,
retried: {
increment: 1,
},
},
});
throw new BackgroundTaskFailedError('Task failed');
}
},
triggerJob: async (_cacheKey, payload) => await this.triggerJob(payload),
logger: {
debug: (...args) => console.debug(`[${jobId}]`, ...args),
@ -122,3 +320,17 @@ export class LocalJobProvider extends BaseJobProvider {
};
}
}
class BackgroundTaskFailedError extends Error {
constructor(message: string) {
super(message);
this.name = 'BackgroundTaskFailedError';
}
}
class BackgroundTaskExceededRetriesError extends Error {
constructor(message: string) {
super(message);
this.name = 'BackgroundTaskExceededRetriesError';
}
}