mirror of
https://github.com/documenso/documenso.git
synced 2025-11-12 15:53:02 +10:00
feat: migrate webhook execution to background jobs (#1694)
This commit is contained in:
@ -9,6 +9,7 @@ import { SEND_TEAM_DELETED_EMAIL_JOB_DEFINITION } from './definitions/emails/sen
|
|||||||
import { SEND_TEAM_MEMBER_JOINED_EMAIL_JOB_DEFINITION } from './definitions/emails/send-team-member-joined-email';
|
import { SEND_TEAM_MEMBER_JOINED_EMAIL_JOB_DEFINITION } from './definitions/emails/send-team-member-joined-email';
|
||||||
import { SEND_TEAM_MEMBER_LEFT_EMAIL_JOB_DEFINITION } from './definitions/emails/send-team-member-left-email';
|
import { SEND_TEAM_MEMBER_LEFT_EMAIL_JOB_DEFINITION } from './definitions/emails/send-team-member-left-email';
|
||||||
import { BULK_SEND_TEMPLATE_JOB_DEFINITION } from './definitions/internal/bulk-send-template';
|
import { BULK_SEND_TEMPLATE_JOB_DEFINITION } from './definitions/internal/bulk-send-template';
|
||||||
|
import { EXECUTE_WEBHOOK_JOB_DEFINITION } from './definitions/internal/execute-webhook';
|
||||||
import { SEAL_DOCUMENT_JOB_DEFINITION } from './definitions/internal/seal-document';
|
import { SEAL_DOCUMENT_JOB_DEFINITION } from './definitions/internal/seal-document';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -27,6 +28,7 @@ export const jobsClient = new JobClient([
|
|||||||
SEND_RECIPIENT_SIGNED_EMAIL_JOB_DEFINITION,
|
SEND_RECIPIENT_SIGNED_EMAIL_JOB_DEFINITION,
|
||||||
SEND_DOCUMENT_CANCELLED_EMAILS_JOB_DEFINITION,
|
SEND_DOCUMENT_CANCELLED_EMAILS_JOB_DEFINITION,
|
||||||
BULK_SEND_TEMPLATE_JOB_DEFINITION,
|
BULK_SEND_TEMPLATE_JOB_DEFINITION,
|
||||||
|
EXECUTE_WEBHOOK_JOB_DEFINITION,
|
||||||
] as const);
|
] as const);
|
||||||
|
|
||||||
export const jobs = jobsClient;
|
export const jobs = jobsClient;
|
||||||
|
|||||||
@ -0,0 +1,74 @@
|
|||||||
|
import { Prisma, WebhookCallStatus } from '@prisma/client';
|
||||||
|
|
||||||
|
import { prisma } from '@documenso/prisma';
|
||||||
|
|
||||||
|
import type { JobRunIO } from '../../client/_internal/job';
|
||||||
|
import type { TExecuteWebhookJobDefinition } from './execute-webhook';
|
||||||
|
|
||||||
|
export const run = async ({
|
||||||
|
payload,
|
||||||
|
io,
|
||||||
|
}: {
|
||||||
|
payload: TExecuteWebhookJobDefinition;
|
||||||
|
io: JobRunIO;
|
||||||
|
}) => {
|
||||||
|
const { event, webhookId, data } = payload;
|
||||||
|
|
||||||
|
const webhook = await prisma.webhook.findUniqueOrThrow({
|
||||||
|
where: {
|
||||||
|
id: webhookId,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const { webhookUrl: url, secret } = webhook;
|
||||||
|
|
||||||
|
await io.runTask('execute-webhook', async () => {
|
||||||
|
const payloadData = {
|
||||||
|
event,
|
||||||
|
payload: data,
|
||||||
|
createdAt: new Date().toISOString(),
|
||||||
|
webhookEndpoint: url,
|
||||||
|
};
|
||||||
|
|
||||||
|
const response = await fetch(url, {
|
||||||
|
method: 'POST',
|
||||||
|
body: JSON.stringify(payloadData),
|
||||||
|
headers: {
|
||||||
|
'Content-Type': 'application/json',
|
||||||
|
'X-Documenso-Secret': secret ?? '',
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const body = await response.text();
|
||||||
|
|
||||||
|
let responseBody: Prisma.InputJsonValue | Prisma.JsonNullValueInput = Prisma.JsonNull;
|
||||||
|
|
||||||
|
try {
|
||||||
|
responseBody = JSON.parse(body);
|
||||||
|
} catch (err) {
|
||||||
|
responseBody = body;
|
||||||
|
}
|
||||||
|
|
||||||
|
await prisma.webhookCall.create({
|
||||||
|
data: {
|
||||||
|
url,
|
||||||
|
event,
|
||||||
|
status: response.ok ? WebhookCallStatus.SUCCESS : WebhookCallStatus.FAILED,
|
||||||
|
requestBody: payloadData as Prisma.InputJsonValue,
|
||||||
|
responseCode: response.status,
|
||||||
|
responseBody,
|
||||||
|
responseHeaders: Object.fromEntries(response.headers.entries()),
|
||||||
|
webhookId: webhook.id,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
throw new Error(`Webhook execution failed with status ${response.status}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
success: response.ok,
|
||||||
|
status: response.status,
|
||||||
|
};
|
||||||
|
});
|
||||||
|
};
|
||||||
34
packages/lib/jobs/definitions/internal/execute-webhook.ts
Normal file
34
packages/lib/jobs/definitions/internal/execute-webhook.ts
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
import { WebhookTriggerEvents } from '@prisma/client';
|
||||||
|
import { z } from 'zod';
|
||||||
|
|
||||||
|
import { ZRequestMetadataSchema } from '../../../universal/extract-request-metadata';
|
||||||
|
import { type JobDefinition } from '../../client/_internal/job';
|
||||||
|
|
||||||
|
const EXECUTE_WEBHOOK_JOB_DEFINITION_ID = 'internal.execute-webhook';
|
||||||
|
|
||||||
|
const EXECUTE_WEBHOOK_JOB_DEFINITION_SCHEMA = z.object({
|
||||||
|
event: z.nativeEnum(WebhookTriggerEvents),
|
||||||
|
webhookId: z.string(),
|
||||||
|
data: z.unknown(),
|
||||||
|
requestMetadata: ZRequestMetadataSchema.optional(),
|
||||||
|
});
|
||||||
|
|
||||||
|
export type TExecuteWebhookJobDefinition = z.infer<typeof EXECUTE_WEBHOOK_JOB_DEFINITION_SCHEMA>;
|
||||||
|
|
||||||
|
export const EXECUTE_WEBHOOK_JOB_DEFINITION = {
|
||||||
|
id: EXECUTE_WEBHOOK_JOB_DEFINITION_ID,
|
||||||
|
name: 'Execute Webhook',
|
||||||
|
version: '1.0.0',
|
||||||
|
trigger: {
|
||||||
|
name: EXECUTE_WEBHOOK_JOB_DEFINITION_ID,
|
||||||
|
schema: EXECUTE_WEBHOOK_JOB_DEFINITION_SCHEMA,
|
||||||
|
},
|
||||||
|
handler: async ({ payload, io }) => {
|
||||||
|
const handler = await import('./execute-webhook.handler');
|
||||||
|
|
||||||
|
await handler.run({ payload, io });
|
||||||
|
},
|
||||||
|
} as const satisfies JobDefinition<
|
||||||
|
typeof EXECUTE_WEBHOOK_JOB_DEFINITION_ID,
|
||||||
|
TExecuteWebhookJobDefinition
|
||||||
|
>;
|
||||||
@ -1,54 +0,0 @@
|
|||||||
import { Prisma, type Webhook, WebhookCallStatus, type WebhookTriggerEvents } from '@prisma/client';
|
|
||||||
|
|
||||||
import { prisma } from '@documenso/prisma';
|
|
||||||
|
|
||||||
export type ExecuteWebhookOptions = {
|
|
||||||
event: WebhookTriggerEvents;
|
|
||||||
webhook: Webhook;
|
|
||||||
data: unknown;
|
|
||||||
};
|
|
||||||
|
|
||||||
export const executeWebhook = async ({ event, webhook, data }: ExecuteWebhookOptions) => {
|
|
||||||
const { webhookUrl: url, secret } = webhook;
|
|
||||||
|
|
||||||
console.log('Executing webhook', { event, url });
|
|
||||||
|
|
||||||
const payload = {
|
|
||||||
event,
|
|
||||||
payload: data,
|
|
||||||
createdAt: new Date().toISOString(),
|
|
||||||
webhookEndpoint: url,
|
|
||||||
};
|
|
||||||
|
|
||||||
const response = await fetch(url, {
|
|
||||||
method: 'POST',
|
|
||||||
body: JSON.stringify(payload),
|
|
||||||
headers: {
|
|
||||||
'Content-Type': 'application/json',
|
|
||||||
'X-Documenso-Secret': secret ?? '',
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
const body = await response.text();
|
|
||||||
|
|
||||||
let responseBody: Prisma.InputJsonValue | Prisma.JsonNullValueInput = Prisma.JsonNull;
|
|
||||||
|
|
||||||
try {
|
|
||||||
responseBody = JSON.parse(body);
|
|
||||||
} catch (err) {
|
|
||||||
responseBody = body;
|
|
||||||
}
|
|
||||||
|
|
||||||
await prisma.webhookCall.create({
|
|
||||||
data: {
|
|
||||||
url,
|
|
||||||
event,
|
|
||||||
status: response.ok ? WebhookCallStatus.SUCCESS : WebhookCallStatus.FAILED,
|
|
||||||
requestBody: payload as Prisma.InputJsonValue,
|
|
||||||
responseCode: response.status,
|
|
||||||
responseBody,
|
|
||||||
responseHeaders: Object.fromEntries(response.headers.entries()),
|
|
||||||
webhookId: webhook.id,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
};
|
|
||||||
@ -1,6 +1,6 @@
|
|||||||
|
import { jobs } from '../../../jobs/client';
|
||||||
import { verify } from '../../crypto/verify';
|
import { verify } from '../../crypto/verify';
|
||||||
import { getAllWebhooksByEventTrigger } from '../get-all-webhooks-by-event-trigger';
|
import { getAllWebhooksByEventTrigger } from '../get-all-webhooks-by-event-trigger';
|
||||||
import { executeWebhook } from './execute-webhook';
|
|
||||||
import { ZTriggerWebhookBodySchema } from './schema';
|
import { ZTriggerWebhookBodySchema } from './schema';
|
||||||
|
|
||||||
export type HandlerTriggerWebhooksResponse =
|
export type HandlerTriggerWebhooksResponse =
|
||||||
@ -42,17 +42,20 @@ export const handlerTriggerWebhooks = async (req: Request) => {
|
|||||||
const allWebhooks = await getAllWebhooksByEventTrigger({ event, userId, teamId });
|
const allWebhooks = await getAllWebhooksByEventTrigger({ event, userId, teamId });
|
||||||
|
|
||||||
await Promise.allSettled(
|
await Promise.allSettled(
|
||||||
allWebhooks.map(async (webhook) =>
|
allWebhooks.map(async (webhook) => {
|
||||||
executeWebhook({
|
await jobs.triggerJob({
|
||||||
event,
|
name: 'internal.execute-webhook',
|
||||||
webhook,
|
payload: {
|
||||||
data,
|
event,
|
||||||
}),
|
webhookId: webhook.id,
|
||||||
),
|
data,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}),
|
||||||
);
|
);
|
||||||
|
|
||||||
return Response.json(
|
return Response.json(
|
||||||
{ success: true, message: 'Webhooks executed successfully' },
|
{ success: true, message: 'Webhooks queued for execution' },
|
||||||
{ status: 200 },
|
{ status: 200 },
|
||||||
);
|
);
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user