This commit is contained in:
Mythie
2025-01-02 15:33:37 +11:00
committed by David Nguyen
parent 9183f668d3
commit f7a98180d7
413 changed files with 29538 additions and 1606 deletions

View File

@ -1,5 +1,7 @@
import type { NextApiRequest, NextApiResponse } from 'next';
import type { Context as HonoContext } from 'hono';
import type { JobDefinition, SimpleTriggerJobOptions } from './_internal/job';
export abstract class BaseJobProvider {
@ -16,4 +18,8 @@ export abstract class BaseJobProvider {
public getApiHandler(): (req: NextApiRequest, res: NextApiResponse) => Promise<Response | void> {
throw new Error('Not implemented');
}
public getHonoApiHandler(): (req: HonoContext) => Promise<Response | void> {
throw new Error('Not implemented');
}
}

View File

@ -27,4 +27,8 @@ export class JobClient<T extends ReadonlyArray<JobDefinition> = []> {
public getApiHandler() {
return this._provider.getApiHandler();
}
public getHonoApiHandler() {
return this._provider.getHonoApiHandler();
}
}

View File

@ -1,8 +1,10 @@
import type { NextApiRequest, NextApiResponse } from 'next';
import type { NextRequest } from 'next/server';
import type { Context as HonoContext } from 'hono';
import type { Context, Handler, InngestFunction } from 'inngest';
import { Inngest as InngestClient } from 'inngest';
import { serve as createHonoPagesRoute } from 'inngest/hono';
import type { Logger } from 'inngest/middleware/logger';
import { serve as createPagesRoute } from 'inngest/next';
import { json } from 'micro';
@ -94,6 +96,18 @@ export class InngestJobProvider extends BaseJobProvider {
};
}
// Todo: Do we need to handle the above?
public getHonoApiHandler() {
return async (context: HonoContext) => {
const handler = createHonoPagesRoute({
client: this._client,
functions: this._functions,
});
return await handler(context);
};
}
private convertInngestIoToJobRunIo(ctx: Context.Any & { logger: Logger }) {
const { step } = ctx;

View File

@ -1,10 +1,11 @@
import type { NextApiRequest, NextApiResponse } from 'next';
import { sha256 } from '@noble/hashes/sha256';
import { BackgroundJobStatus, Prisma } from '@prisma/client';
import type { Context as HonoContext } from 'hono';
import { json } from 'micro';
import { prisma } from '@documenso/prisma';
import { BackgroundJobStatus, Prisma } from '@documenso/prisma/client';
import { NEXT_PRIVATE_INTERNAL_WEBAPP_URL } from '../../constants/app';
import { sign } from '../../server-only/crypto/sign';
@ -213,6 +214,152 @@ export class LocalJobProvider extends BaseJobProvider {
};
}
public getHonoApiHandler(): (context: HonoContext) => Promise<Response | void> {
return async (context: HonoContext) => {
const req = context.req;
if (req.method !== 'POST') {
context.text('Method not allowed', 405);
return;
}
const jobId = req.header('x-job-id');
const signature = req.header('x-job-signature');
const isRetry = req.header('x-job-retry') !== undefined;
const options = await req
.json()
.then(async (data) => ZSimpleTriggerJobOptionsSchema.parseAsync(data))
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
.then((data) => data as SimpleTriggerJobOptions)
.catch(() => null);
if (!options) {
context.text('Bad request', 400);
return;
}
const definition = this._jobDefinitions[options.name];
if (
typeof jobId !== 'string' ||
typeof signature !== 'string' ||
typeof options !== 'object'
) {
context.text('Bad request', 400);
return;
}
if (!definition) {
context.text('Job not found', 404);
return;
}
if (definition && !definition.enabled) {
console.log('Attempted to trigger a disabled job', options.name);
context.text('Job not found', 404);
return;
}
if (!signature || !verify(options, signature)) {
context.text('Unauthorized', 401);
return;
}
if (definition.trigger.schema) {
const result = definition.trigger.schema.safeParse(options.payload);
if (!result.success) {
context.text('Bad request', 400);
return;
}
}
console.log(`[JOBS]: Triggering job ${options.name} with payload`, options.payload);
let backgroundJob = await prisma.backgroundJob
.update({
where: {
id: jobId,
status: BackgroundJobStatus.PENDING,
},
data: {
status: BackgroundJobStatus.PROCESSING,
retried: {
increment: isRetry ? 1 : 0,
},
lastRetriedAt: isRetry ? new Date() : undefined,
},
})
.catch(() => null);
if (!backgroundJob) {
context.text('Job not found', 404);
return;
}
try {
await definition.handler({
payload: options.payload,
io: this.createJobRunIO(jobId),
});
backgroundJob = await prisma.backgroundJob.update({
where: {
id: jobId,
status: BackgroundJobStatus.PROCESSING,
},
data: {
status: BackgroundJobStatus.COMPLETED,
completedAt: new Date(),
},
});
} catch (error) {
console.log(`[JOBS]: Job ${options.name} failed`, error);
const taskHasExceededRetries = error instanceof BackgroundTaskExceededRetriesError;
const jobHasExceededRetries =
backgroundJob.retried >= backgroundJob.maxRetries &&
!(error instanceof BackgroundTaskFailedError);
if (taskHasExceededRetries || jobHasExceededRetries) {
backgroundJob = await prisma.backgroundJob.update({
where: {
id: jobId,
status: BackgroundJobStatus.PROCESSING,
},
data: {
status: BackgroundJobStatus.FAILED,
completedAt: new Date(),
},
});
context.text('Task exceeded retries', 500);
return;
}
backgroundJob = await prisma.backgroundJob.update({
where: {
id: jobId,
status: BackgroundJobStatus.PROCESSING,
},
data: {
status: BackgroundJobStatus.PENDING,
},
});
await this.submitJobToEndpoint({
jobId,
jobDefinitionId: backgroundJob.jobId,
data: options,
});
}
context.text('OK', 200);
};
}
private async submitJobToEndpoint(options: {
jobId: string;
jobDefinitionId: string;

View File

@ -58,6 +58,11 @@ export class TriggerJobProvider extends BaseJobProvider {
return handler;
}
// Hono v2 is being deprecated so not sure if we will be required.
// public getHonoApiHandler(): (req: HonoContext) => Promise<Response | void> {
// throw new Error('Not implemented');
// }
private convertTriggerIoToJobRunIo(io: IO) {
return {
wait: io.wait,