From 6540291055ea4788329bb7252ea153c37da6c77c Mon Sep 17 00:00:00 2001 From: Ephraim Duncan <55143799+ephraimduncan@users.noreply.github.com> Date: Thu, 24 Apr 2025 06:00:53 +0000 Subject: [PATCH] feat: migrate webhook execution to background jobs (#1694) --- packages/lib/jobs/client.ts | 2 + .../internal/execute-webhook.handler.ts | 74 +++++++++++++++++++ .../definitions/internal/execute-webhook.ts | 34 +++++++++ .../webhooks/trigger/execute-webhook.ts | 54 -------------- .../server-only/webhooks/trigger/handler.ts | 21 +++--- 5 files changed, 122 insertions(+), 63 deletions(-) create mode 100644 packages/lib/jobs/definitions/internal/execute-webhook.handler.ts create mode 100644 packages/lib/jobs/definitions/internal/execute-webhook.ts delete mode 100644 packages/lib/server-only/webhooks/trigger/execute-webhook.ts diff --git a/packages/lib/jobs/client.ts b/packages/lib/jobs/client.ts index 713d928d8..8b857b66f 100644 --- a/packages/lib/jobs/client.ts +++ b/packages/lib/jobs/client.ts @@ -9,6 +9,7 @@ import { SEND_TEAM_DELETED_EMAIL_JOB_DEFINITION } from './definitions/emails/sen import { SEND_TEAM_MEMBER_JOINED_EMAIL_JOB_DEFINITION } from './definitions/emails/send-team-member-joined-email'; import { SEND_TEAM_MEMBER_LEFT_EMAIL_JOB_DEFINITION } from './definitions/emails/send-team-member-left-email'; import { BULK_SEND_TEMPLATE_JOB_DEFINITION } from './definitions/internal/bulk-send-template'; +import { EXECUTE_WEBHOOK_JOB_DEFINITION } from './definitions/internal/execute-webhook'; import { SEAL_DOCUMENT_JOB_DEFINITION } from './definitions/internal/seal-document'; /** @@ -27,6 +28,7 @@ export const jobsClient = new JobClient([ SEND_RECIPIENT_SIGNED_EMAIL_JOB_DEFINITION, SEND_DOCUMENT_CANCELLED_EMAILS_JOB_DEFINITION, BULK_SEND_TEMPLATE_JOB_DEFINITION, + EXECUTE_WEBHOOK_JOB_DEFINITION, ] as const); export const jobs = jobsClient; diff --git a/packages/lib/jobs/definitions/internal/execute-webhook.handler.ts b/packages/lib/jobs/definitions/internal/execute-webhook.handler.ts new file mode 100644 index 000000000..a357ea51f --- /dev/null +++ b/packages/lib/jobs/definitions/internal/execute-webhook.handler.ts @@ -0,0 +1,74 @@ +import { Prisma, WebhookCallStatus } from '@prisma/client'; + +import { prisma } from '@documenso/prisma'; + +import type { JobRunIO } from '../../client/_internal/job'; +import type { TExecuteWebhookJobDefinition } from './execute-webhook'; + +export const run = async ({ + payload, + io, +}: { + payload: TExecuteWebhookJobDefinition; + io: JobRunIO; +}) => { + const { event, webhookId, data } = payload; + + const webhook = await prisma.webhook.findUniqueOrThrow({ + where: { + id: webhookId, + }, + }); + + const { webhookUrl: url, secret } = webhook; + + await io.runTask('execute-webhook', async () => { + const payloadData = { + event, + payload: data, + createdAt: new Date().toISOString(), + webhookEndpoint: url, + }; + + const response = await fetch(url, { + method: 'POST', + body: JSON.stringify(payloadData), + headers: { + 'Content-Type': 'application/json', + 'X-Documenso-Secret': secret ?? '', + }, + }); + + const body = await response.text(); + + let responseBody: Prisma.InputJsonValue | Prisma.JsonNullValueInput = Prisma.JsonNull; + + try { + responseBody = JSON.parse(body); + } catch (err) { + responseBody = body; + } + + await prisma.webhookCall.create({ + data: { + url, + event, + status: response.ok ? WebhookCallStatus.SUCCESS : WebhookCallStatus.FAILED, + requestBody: payloadData as Prisma.InputJsonValue, + responseCode: response.status, + responseBody, + responseHeaders: Object.fromEntries(response.headers.entries()), + webhookId: webhook.id, + }, + }); + + if (!response.ok) { + throw new Error(`Webhook execution failed with status ${response.status}`); + } + + return { + success: response.ok, + status: response.status, + }; + }); +}; diff --git a/packages/lib/jobs/definitions/internal/execute-webhook.ts b/packages/lib/jobs/definitions/internal/execute-webhook.ts new file mode 100644 index 000000000..af320034c --- /dev/null +++ b/packages/lib/jobs/definitions/internal/execute-webhook.ts @@ -0,0 +1,34 @@ +import { WebhookTriggerEvents } from '@prisma/client'; +import { z } from 'zod'; + +import { ZRequestMetadataSchema } from '../../../universal/extract-request-metadata'; +import { type JobDefinition } from '../../client/_internal/job'; + +const EXECUTE_WEBHOOK_JOB_DEFINITION_ID = 'internal.execute-webhook'; + +const EXECUTE_WEBHOOK_JOB_DEFINITION_SCHEMA = z.object({ + event: z.nativeEnum(WebhookTriggerEvents), + webhookId: z.string(), + data: z.unknown(), + requestMetadata: ZRequestMetadataSchema.optional(), +}); + +export type TExecuteWebhookJobDefinition = z.infer; + +export const EXECUTE_WEBHOOK_JOB_DEFINITION = { + id: EXECUTE_WEBHOOK_JOB_DEFINITION_ID, + name: 'Execute Webhook', + version: '1.0.0', + trigger: { + name: EXECUTE_WEBHOOK_JOB_DEFINITION_ID, + schema: EXECUTE_WEBHOOK_JOB_DEFINITION_SCHEMA, + }, + handler: async ({ payload, io }) => { + const handler = await import('./execute-webhook.handler'); + + await handler.run({ payload, io }); + }, +} as const satisfies JobDefinition< + typeof EXECUTE_WEBHOOK_JOB_DEFINITION_ID, + TExecuteWebhookJobDefinition +>; diff --git a/packages/lib/server-only/webhooks/trigger/execute-webhook.ts b/packages/lib/server-only/webhooks/trigger/execute-webhook.ts deleted file mode 100644 index df94d9f1a..000000000 --- a/packages/lib/server-only/webhooks/trigger/execute-webhook.ts +++ /dev/null @@ -1,54 +0,0 @@ -import { Prisma, type Webhook, WebhookCallStatus, type WebhookTriggerEvents } from '@prisma/client'; - -import { prisma } from '@documenso/prisma'; - -export type ExecuteWebhookOptions = { - event: WebhookTriggerEvents; - webhook: Webhook; - data: unknown; -}; - -export const executeWebhook = async ({ event, webhook, data }: ExecuteWebhookOptions) => { - const { webhookUrl: url, secret } = webhook; - - console.log('Executing webhook', { event, url }); - - const payload = { - event, - payload: data, - createdAt: new Date().toISOString(), - webhookEndpoint: url, - }; - - const response = await fetch(url, { - method: 'POST', - body: JSON.stringify(payload), - headers: { - 'Content-Type': 'application/json', - 'X-Documenso-Secret': secret ?? '', - }, - }); - - const body = await response.text(); - - let responseBody: Prisma.InputJsonValue | Prisma.JsonNullValueInput = Prisma.JsonNull; - - try { - responseBody = JSON.parse(body); - } catch (err) { - responseBody = body; - } - - await prisma.webhookCall.create({ - data: { - url, - event, - status: response.ok ? WebhookCallStatus.SUCCESS : WebhookCallStatus.FAILED, - requestBody: payload as Prisma.InputJsonValue, - responseCode: response.status, - responseBody, - responseHeaders: Object.fromEntries(response.headers.entries()), - webhookId: webhook.id, - }, - }); -}; diff --git a/packages/lib/server-only/webhooks/trigger/handler.ts b/packages/lib/server-only/webhooks/trigger/handler.ts index 4889e54ed..55d287bac 100644 --- a/packages/lib/server-only/webhooks/trigger/handler.ts +++ b/packages/lib/server-only/webhooks/trigger/handler.ts @@ -1,6 +1,6 @@ +import { jobs } from '../../../jobs/client'; import { verify } from '../../crypto/verify'; import { getAllWebhooksByEventTrigger } from '../get-all-webhooks-by-event-trigger'; -import { executeWebhook } from './execute-webhook'; import { ZTriggerWebhookBodySchema } from './schema'; export type HandlerTriggerWebhooksResponse = @@ -42,17 +42,20 @@ export const handlerTriggerWebhooks = async (req: Request) => { const allWebhooks = await getAllWebhooksByEventTrigger({ event, userId, teamId }); await Promise.allSettled( - allWebhooks.map(async (webhook) => - executeWebhook({ - event, - webhook, - data, - }), - ), + allWebhooks.map(async (webhook) => { + await jobs.triggerJob({ + name: 'internal.execute-webhook', + payload: { + event, + webhookId: webhook.id, + data, + }, + }); + }), ); return Response.json( - { success: true, message: 'Webhooks executed successfully' }, + { success: true, message: 'Webhooks queued for execution' }, { status: 200 }, ); };