mirror of
https://github.com/documenso/documenso.git
synced 2025-11-19 19:21:39 +10:00
fix: wip
This commit is contained in:
@ -1,5 +1,3 @@
|
||||
import type { NextApiRequest, NextApiResponse } from 'next';
|
||||
|
||||
import type { Context as HonoContext } from 'hono';
|
||||
|
||||
import type { JobDefinition, SimpleTriggerJobOptions } from './_internal/job';
|
||||
@ -15,11 +13,7 @@ export abstract class BaseJobProvider {
|
||||
throw new Error('Not implemented');
|
||||
}
|
||||
|
||||
public getApiHandler(): (req: NextApiRequest, res: NextApiResponse) => Promise<Response | void> {
|
||||
throw new Error('Not implemented');
|
||||
}
|
||||
|
||||
public getHonoApiHandler(): (req: HonoContext) => Promise<Response | void> {
|
||||
public getApiHandler(): (req: HonoContext) => Promise<Response | void> {
|
||||
throw new Error('Not implemented');
|
||||
}
|
||||
}
|
||||
|
||||
@ -5,7 +5,6 @@ import type { JobDefinition, TriggerJobOptions } from './_internal/job';
|
||||
import type { BaseJobProvider as JobClientProvider } from './base';
|
||||
import { InngestJobProvider } from './inngest';
|
||||
import { LocalJobProvider } from './local';
|
||||
import { TriggerJobProvider } from './trigger';
|
||||
|
||||
export class JobClient<T extends ReadonlyArray<JobDefinition> = []> {
|
||||
private _provider: JobClientProvider;
|
||||
@ -13,7 +12,6 @@ export class JobClient<T extends ReadonlyArray<JobDefinition> = []> {
|
||||
public constructor(definitions: T) {
|
||||
this._provider = match(env('NEXT_PRIVATE_JOBS_PROVIDER'))
|
||||
.with('inngest', () => InngestJobProvider.getInstance())
|
||||
.with('trigger', () => TriggerJobProvider.getInstance())
|
||||
.otherwise(() => LocalJobProvider.getInstance());
|
||||
|
||||
definitions.forEach((definition) => {
|
||||
@ -28,8 +26,4 @@ export class JobClient<T extends ReadonlyArray<JobDefinition> = []> {
|
||||
public getApiHandler() {
|
||||
return this._provider.getApiHandler();
|
||||
}
|
||||
|
||||
public getHonoApiHandler() {
|
||||
return this._provider.getHonoApiHandler();
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,13 +1,8 @@
|
||||
import type { NextApiRequest, NextApiResponse } from 'next';
|
||||
import type { NextRequest } from 'next/server';
|
||||
|
||||
import type { Context as HonoContext } from 'hono';
|
||||
import type { Context, Handler, InngestFunction } from 'inngest';
|
||||
import { Inngest as InngestClient } from 'inngest';
|
||||
import { serve as createHonoPagesRoute } from 'inngest/hono';
|
||||
import type { Logger } from 'inngest/middleware/logger';
|
||||
import { serve as createPagesRoute } from 'inngest/next';
|
||||
import { json } from 'micro';
|
||||
|
||||
import { env } from '../../utils/env';
|
||||
import type { JobDefinition, JobRunIO, SimpleTriggerJobOptions } from './_internal/job';
|
||||
@ -76,29 +71,29 @@ export class InngestJobProvider extends BaseJobProvider {
|
||||
});
|
||||
}
|
||||
|
||||
public getApiHandler() {
|
||||
const handler = createPagesRoute({
|
||||
client: this._client,
|
||||
functions: this._functions,
|
||||
});
|
||||
// public getApiHandler() {
|
||||
// const handler = createPagesRoute({
|
||||
// client: this._client,
|
||||
// functions: this._functions,
|
||||
// });
|
||||
|
||||
return async (req: NextApiRequest, res: NextApiResponse) => {
|
||||
// Since body-parser is disabled for this route we need to patch in the parsed body
|
||||
if (req.headers['content-type'] === 'application/json') {
|
||||
Object.assign(req, {
|
||||
body: await json(req),
|
||||
});
|
||||
}
|
||||
// return async (req: NextApiRequest, res: NextApiResponse) => {
|
||||
// // Since body-parser is disabled for this route we need to patch in the parsed body
|
||||
// if (req.headers['content-type'] === 'application/json') {
|
||||
// Object.assign(req, {
|
||||
// body: await json(req),
|
||||
// });
|
||||
// }
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
|
||||
const nextReq = req as unknown as NextRequest;
|
||||
// // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
|
||||
// const nextReq = req as unknown as NextRequest;
|
||||
|
||||
return await handler(nextReq, res);
|
||||
};
|
||||
}
|
||||
// return await handler(nextReq, res);
|
||||
// };
|
||||
// }
|
||||
|
||||
// Todo: Do we need to handle the above?
|
||||
public getHonoApiHandler() {
|
||||
public getApiHandler() {
|
||||
return async (context: HonoContext) => {
|
||||
const handler = createHonoPagesRoute({
|
||||
client: this._client,
|
||||
|
||||
@ -1,9 +1,6 @@
|
||||
import type { NextApiRequest, NextApiResponse } from 'next';
|
||||
|
||||
import { sha256 } from '@noble/hashes/sha256';
|
||||
import { BackgroundJobStatus, Prisma } from '@prisma/client';
|
||||
import type { Context as HonoContext } from 'hono';
|
||||
import { json } from 'micro';
|
||||
|
||||
import { prisma } from '@documenso/prisma';
|
||||
|
||||
@ -71,150 +68,7 @@ export class LocalJobProvider extends BaseJobProvider {
|
||||
);
|
||||
}
|
||||
|
||||
public getApiHandler() {
|
||||
return async (req: NextApiRequest, res: NextApiResponse) => {
|
||||
if (req.method !== 'POST') {
|
||||
res.status(405).send('Method not allowed');
|
||||
}
|
||||
|
||||
const jobId = req.headers['x-job-id'];
|
||||
const signature = req.headers['x-job-signature'];
|
||||
const isRetry = req.headers['x-job-retry'] !== undefined;
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
|
||||
const options = await json(req)
|
||||
.then(async (data) => ZSimpleTriggerJobOptionsSchema.parseAsync(data))
|
||||
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
|
||||
.then((data) => data as SimpleTriggerJobOptions)
|
||||
.catch(() => null);
|
||||
|
||||
if (!options) {
|
||||
res.status(400).send('Bad request');
|
||||
return;
|
||||
}
|
||||
|
||||
const definition = this._jobDefinitions[options.name];
|
||||
|
||||
if (
|
||||
typeof jobId !== 'string' ||
|
||||
typeof signature !== 'string' ||
|
||||
typeof options !== 'object'
|
||||
) {
|
||||
res.status(400).send('Bad request');
|
||||
return;
|
||||
}
|
||||
|
||||
if (!definition) {
|
||||
res.status(404).send('Job not found');
|
||||
return;
|
||||
}
|
||||
|
||||
if (definition && !definition.enabled) {
|
||||
console.log('Attempted to trigger a disabled job', options.name);
|
||||
|
||||
res.status(404).send('Job not found');
|
||||
return;
|
||||
}
|
||||
|
||||
if (!signature || !verify(options, signature)) {
|
||||
res.status(401).send('Unauthorized');
|
||||
return;
|
||||
}
|
||||
|
||||
if (definition.trigger.schema) {
|
||||
const result = definition.trigger.schema.safeParse(options.payload);
|
||||
|
||||
if (!result.success) {
|
||||
res.status(400).send('Bad request');
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
console.log(`[JOBS]: Triggering job ${options.name} with payload`, options.payload);
|
||||
|
||||
let backgroundJob = await prisma.backgroundJob
|
||||
.update({
|
||||
where: {
|
||||
id: jobId,
|
||||
status: BackgroundJobStatus.PENDING,
|
||||
},
|
||||
data: {
|
||||
status: BackgroundJobStatus.PROCESSING,
|
||||
retried: {
|
||||
increment: isRetry ? 1 : 0,
|
||||
},
|
||||
lastRetriedAt: isRetry ? new Date() : undefined,
|
||||
},
|
||||
})
|
||||
.catch(() => null);
|
||||
|
||||
if (!backgroundJob) {
|
||||
res.status(404).send('Job not found');
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
await definition.handler({
|
||||
payload: options.payload,
|
||||
io: this.createJobRunIO(jobId),
|
||||
});
|
||||
|
||||
backgroundJob = await prisma.backgroundJob.update({
|
||||
where: {
|
||||
id: jobId,
|
||||
status: BackgroundJobStatus.PROCESSING,
|
||||
},
|
||||
data: {
|
||||
status: BackgroundJobStatus.COMPLETED,
|
||||
completedAt: new Date(),
|
||||
},
|
||||
});
|
||||
} catch (error) {
|
||||
console.log(`[JOBS]: Job ${options.name} failed`, error);
|
||||
|
||||
const taskHasExceededRetries = error instanceof BackgroundTaskExceededRetriesError;
|
||||
const jobHasExceededRetries =
|
||||
backgroundJob.retried >= backgroundJob.maxRetries &&
|
||||
!(error instanceof BackgroundTaskFailedError);
|
||||
|
||||
if (taskHasExceededRetries || jobHasExceededRetries) {
|
||||
backgroundJob = await prisma.backgroundJob.update({
|
||||
where: {
|
||||
id: jobId,
|
||||
status: BackgroundJobStatus.PROCESSING,
|
||||
},
|
||||
data: {
|
||||
status: BackgroundJobStatus.FAILED,
|
||||
completedAt: new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
res.status(500).send('Task exceeded retries');
|
||||
return;
|
||||
}
|
||||
|
||||
backgroundJob = await prisma.backgroundJob.update({
|
||||
where: {
|
||||
id: jobId,
|
||||
status: BackgroundJobStatus.PROCESSING,
|
||||
},
|
||||
data: {
|
||||
status: BackgroundJobStatus.PENDING,
|
||||
},
|
||||
});
|
||||
|
||||
await this.submitJobToEndpoint({
|
||||
jobId,
|
||||
jobDefinitionId: backgroundJob.jobId,
|
||||
data: options,
|
||||
});
|
||||
}
|
||||
|
||||
res.status(200).send('OK');
|
||||
};
|
||||
}
|
||||
|
||||
public getHonoApiHandler(): (context: HonoContext) => Promise<Response | void> {
|
||||
public getApiHandler(): (context: HonoContext) => Promise<Response | void> {
|
||||
return async (context: HonoContext) => {
|
||||
const req = context.req;
|
||||
|
||||
|
||||
@ -1,79 +0,0 @@
|
||||
import { createPagesRoute } from '@trigger.dev/nextjs';
|
||||
import type { IO } from '@trigger.dev/sdk';
|
||||
import { TriggerClient, eventTrigger } from '@trigger.dev/sdk';
|
||||
|
||||
import { env } from '../../utils/env';
|
||||
import type { JobDefinition, JobRunIO, SimpleTriggerJobOptions } from './_internal/job';
|
||||
import { BaseJobProvider } from './base';
|
||||
|
||||
export class TriggerJobProvider extends BaseJobProvider {
|
||||
private static _instance: TriggerJobProvider;
|
||||
|
||||
private _client: TriggerClient;
|
||||
|
||||
private constructor(options: { client: TriggerClient }) {
|
||||
super();
|
||||
|
||||
this._client = options.client;
|
||||
}
|
||||
|
||||
static getInstance() {
|
||||
if (!this._instance) {
|
||||
const client = new TriggerClient({
|
||||
id: 'documenso-app',
|
||||
apiKey: env('NEXT_PRIVATE_TRIGGER_API_KEY'),
|
||||
apiUrl: env('NEXT_PRIVATE_TRIGGER_API_URL'),
|
||||
});
|
||||
|
||||
this._instance = new TriggerJobProvider({ client });
|
||||
}
|
||||
|
||||
return this._instance;
|
||||
}
|
||||
|
||||
public defineJob<N extends string, T>(job: JobDefinition<N, T>): void {
|
||||
this._client.defineJob({
|
||||
id: job.id,
|
||||
name: job.name,
|
||||
version: job.version,
|
||||
trigger: eventTrigger({
|
||||
name: job.trigger.name,
|
||||
schema: job.trigger.schema,
|
||||
}),
|
||||
run: async (payload, io) => job.handler({ payload, io: this.convertTriggerIoToJobRunIo(io) }),
|
||||
});
|
||||
}
|
||||
|
||||
public async triggerJob(options: SimpleTriggerJobOptions): Promise<void> {
|
||||
await this._client.sendEvent({
|
||||
id: options.id,
|
||||
name: options.name,
|
||||
payload: options.payload,
|
||||
timestamp: options.timestamp ? new Date(options.timestamp) : undefined,
|
||||
});
|
||||
}
|
||||
|
||||
public getApiHandler() {
|
||||
const { handler } = createPagesRoute(this._client);
|
||||
|
||||
return handler;
|
||||
}
|
||||
|
||||
// Hono v2 is being deprecated so not sure if we will be required.
|
||||
// public getHonoApiHandler(): (req: HonoContext) => Promise<Response | void> {
|
||||
// throw new Error('Not implemented');
|
||||
// }
|
||||
|
||||
private convertTriggerIoToJobRunIo(io: IO) {
|
||||
return {
|
||||
wait: io.wait,
|
||||
logger: io.logger,
|
||||
runTask: async (cacheKey, callback) => io.runTask(cacheKey, callback),
|
||||
triggerJob: async (cacheKey, payload) =>
|
||||
io.sendEvent(cacheKey, {
|
||||
...payload,
|
||||
timestamp: payload.timestamp ? new Date(payload.timestamp) : undefined,
|
||||
}),
|
||||
} satisfies JobRunIO;
|
||||
}
|
||||
}
|
||||
@ -1,9 +0,0 @@
|
||||
import { NextRequest } from 'next/server';
|
||||
|
||||
export const toNextRequest = (req: Request) => {
|
||||
const headers = Object.fromEntries(req.headers.entries());
|
||||
|
||||
return new NextRequest(req, {
|
||||
headers: headers,
|
||||
});
|
||||
};
|
||||
@ -1,28 +0,0 @@
|
||||
import { NextApiResponse } from 'next';
|
||||
import { NextResponse } from 'next/server';
|
||||
|
||||
type NarrowedResponse<T> = T extends NextResponse
|
||||
? NextResponse
|
||||
: T extends NextApiResponse<infer U>
|
||||
? NextApiResponse<U>
|
||||
: never;
|
||||
|
||||
export const withStaleWhileRevalidate = <T>(
|
||||
res: NarrowedResponse<T>,
|
||||
cacheInSeconds = 60,
|
||||
staleCacheInSeconds = 300,
|
||||
) => {
|
||||
if ('headers' in res) {
|
||||
res.headers.set(
|
||||
'Cache-Control',
|
||||
`public, s-maxage=${cacheInSeconds}, stale-while-revalidate=${staleCacheInSeconds}`,
|
||||
);
|
||||
} else {
|
||||
res.setHeader(
|
||||
'Cache-Control',
|
||||
`public, s-maxage=${cacheInSeconds}, stale-while-revalidate=${staleCacheInSeconds}`,
|
||||
);
|
||||
}
|
||||
|
||||
return res;
|
||||
};
|
||||
@ -1,6 +1,3 @@
|
||||
import type { NextApiRequest } from 'next';
|
||||
|
||||
import type { RequestInternal } from 'next-auth';
|
||||
import { z } from 'zod';
|
||||
|
||||
const ZIpSchema = z.string().ip();
|
||||
@ -53,35 +50,3 @@ export const extractRequestMetadata = (req: Request): RequestMetadata => {
|
||||
userAgent: userAgent ?? undefined,
|
||||
};
|
||||
};
|
||||
|
||||
export const extractNextApiRequestMetadata = (req: NextApiRequest): RequestMetadata => {
|
||||
const parsedIp = ZIpSchema.safeParse(req.headers['x-forwarded-for'] || req.socket.remoteAddress);
|
||||
|
||||
const ipAddress = parsedIp.success ? parsedIp.data : undefined;
|
||||
const userAgent = req.headers['user-agent'];
|
||||
|
||||
return {
|
||||
ipAddress,
|
||||
userAgent,
|
||||
};
|
||||
};
|
||||
|
||||
export const extractNextAuthRequestMetadata = (
|
||||
req: Pick<RequestInternal, 'body' | 'query' | 'headers' | 'method'>,
|
||||
): RequestMetadata => {
|
||||
return extractNextHeaderRequestMetadata(req.headers ?? {});
|
||||
};
|
||||
|
||||
export const extractNextHeaderRequestMetadata = (
|
||||
headers: Record<string, string>,
|
||||
): RequestMetadata => {
|
||||
const parsedIp = ZIpSchema.safeParse(headers?.['x-forwarded-for']);
|
||||
|
||||
const ipAddress = parsedIp.success ? parsedIp.data : undefined;
|
||||
const userAgent = headers?.['user-agent'];
|
||||
|
||||
return {
|
||||
ipAddress,
|
||||
userAgent,
|
||||
};
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user