diff --git a/packages/lib/jobs/client/bullmq.ts b/packages/lib/jobs/client/bullmq.ts index dce4b6352..3897de93c 100644 --- a/packages/lib/jobs/client/bullmq.ts +++ b/packages/lib/jobs/client/bullmq.ts @@ -233,13 +233,17 @@ export class BullMQJobProvider extends BaseJobProvider { backgroundJobId?: string; }; + let payload = jobData.payload; + if (definition.trigger.schema) { - const result = definition.trigger.schema.safeParse(jobData.payload); + const result = definition.trigger.schema.safeParse(payload); if (!result.success) { console.error(`[JOBS]: Payload validation failed for ${definitionId}`, result.error); throw new Error(`Payload validation failed for ${definitionId}`); } + + payload = result.data; } const backgroundJobId = jobData.backgroundJobId; @@ -260,11 +264,11 @@ export class BullMQJobProvider extends BaseJobProvider { .catch(() => null); } - console.log(`[JOBS]: Processing job ${definitionId} with payload`, jobData.payload); + console.log(`[JOBS]: Processing job ${definitionId} with payload`, payload); try { await definition.handler({ - payload: jobData.payload, + payload, io: this.createJobRunIO(backgroundJobId ?? job.id ?? definitionId), }); diff --git a/packages/lib/jobs/client/local.ts b/packages/lib/jobs/client/local.ts index 12fdcaba6..4ad9ab6f0 100644 --- a/packages/lib/jobs/client/local.ts +++ b/packages/lib/jobs/client/local.ts @@ -260,15 +260,19 @@ export class LocalJobProvider extends BaseJobProvider { return c.text('Unauthorized', 401); } + let payload = options.payload; + if (definition.trigger.schema) { - const result = definition.trigger.schema.safeParse(options.payload); + const result = definition.trigger.schema.safeParse(payload); if (!result.success) { return c.text('Bad request', 400); } + + payload = result.data; } - console.log(`[JOBS]: Triggering job ${options.name} with payload`, options.payload); + console.log(`[JOBS]: Triggering job ${options.name} with payload`, payload); let backgroundJob = await prisma.backgroundJob .update({ @@ -292,7 +296,7 @@ export class LocalJobProvider extends BaseJobProvider { try { await definition.handler({ - payload: options.payload, + payload, io: this.createJobRunIO(jobId), });