wip: background tasks

This commit is contained in:
Mythie
2024-05-15 18:55:05 +10:00
parent 1647f7c4a0
commit 108054a133
23 changed files with 752 additions and 29 deletions

View File

@ -19,8 +19,8 @@
"micro": "^10.0.1",
"next": "14.0.3",
"next-auth": "4.24.5",
"react": "18.2.0",
"react": "18.3.1",
"ts-pattern": "^5.0.5",
"zod": "^3.22.4"
}
}
}

View File

@ -0,0 +1,3 @@
import { JobClient } from './client/client';
export const jobsClient = JobClient.getInstance();

View File

@ -0,0 +1,41 @@
import { z } from 'zod';
export const ZTriggerJobOptionsSchema = z.object({
id: z.string().optional(),
name: z.string(),
payload: z.unknown().refine((x) => x !== undefined, { message: 'payload is required' }),
timestamp: z.number().optional(),
});
// The Omit is a temporary workaround for a "bug" in the zod library
// @see: https://github.com/colinhacks/zod/issues/2966
export type TriggerJobOptions = Omit<z.infer<typeof ZTriggerJobOptionsSchema>, 'payload'> & {
payload: unknown;
};
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type JobDefinition<T = any> = {
id: string;
name: string;
version: string;
enabled?: boolean;
trigger: {
name: string;
schema?: z.ZodSchema<T>;
};
handler: (options: { payload: T; io: JobRunIO }) => Promise<Json | void>;
};
export interface JobRunIO {
// stableRun<T extends Json | void>(cacheKey: string, callback: (io: JobRunIO) => T | Promise<T>): Promise<T>;
stableRun<T extends Json | void>(cacheKey: string, callback: () => Promise<T>): Promise<T>;
triggerJob(cacheKey: string, options: TriggerJobOptions): Promise<unknown>;
wait(cacheKey: string, ms: number): Promise<void>;
logger: {
info(...args: unknown[]): void;
error(...args: unknown[]): void;
debug(...args: unknown[]): void;
warn(...args: unknown[]): void;
log(...args: unknown[]): void;
};
}

View File

@ -0,0 +1,14 @@
/**
* Below type is borrowed from Trigger.dev's SDK, it may be moved elsewhere later.
*/
type JsonPrimitive = string | number | boolean | null | undefined | Date | symbol;
type JsonArray = Json[];
type JsonRecord<T> = {
[Property in keyof T]: Json;
};
// eslint-disable-next-line @typescript-eslint/no-explicit-any
type Json<T = any> = JsonPrimitive | JsonArray | JsonRecord<T>;

View File

@ -0,0 +1,19 @@
import type { NextApiRequest, NextApiResponse } from 'next';
import type { JobDefinition, TriggerJobOptions } from './_internal/job';
export abstract class BaseJobProvider {
// eslint-disable-next-line @typescript-eslint/require-await
public async triggerJob(_options: TriggerJobOptions): Promise<void> {
throw new Error('Not implemented');
}
// eslint-disable-next-line @typescript-eslint/require-await
public defineJob<T>(_job: JobDefinition<T>): void {
throw new Error('Not implemented');
}
public getApiHandler(): (req: NextApiRequest, res: NextApiResponse) => Promise<void> {
throw new Error('Not implemented');
}
}

View File

@ -0,0 +1,38 @@
import type { JobDefinition, TriggerJobOptions } from './_internal/job';
import type { BaseJobProvider as JobClientProvider } from './base';
import { LocalJobProvider } from './local';
import { TriggerJobProvider } from './trigger';
export class JobClient {
private static _instance: JobClient;
private _provider: JobClientProvider;
private constructor() {
if (process.env.NEXT_PRIVATE_JOBS_PROVIDER === 'trigger') {
this._provider = TriggerJobProvider.getInstance();
}
this._provider = LocalJobProvider.getInstance();
}
public static getInstance() {
if (!this._instance) {
this._instance = new JobClient();
}
return this._instance;
}
public async triggerJob(options: TriggerJobOptions) {
return this._provider.triggerJob(options);
}
public defineJob<T>(job: JobDefinition<T>) {
return this._provider.defineJob(job);
}
public getApiHandler() {
return this._provider.getApiHandler();
}
}

View File

@ -0,0 +1,124 @@
import type { NextApiRequest, NextApiResponse } from 'next';
import { json } from 'micro';
import { NEXT_PUBLIC_WEBAPP_URL } from '../../constants/app';
import { sign } from '../../server-only/crypto/sign';
import { verify } from '../../server-only/crypto/verify';
import type { JobDefinition, JobRunIO, TriggerJobOptions } from './_internal/job';
import { BaseJobProvider } from './base';
export class LocalJobProvider extends BaseJobProvider {
private static _instance: LocalJobProvider;
private _jobDefinitions: Record<string, JobDefinition> = {};
private constructor() {
super();
}
static getInstance() {
if (!this._instance) {
this._instance = new LocalJobProvider();
}
return this._instance;
}
public defineJob<T>(definition: JobDefinition<T>) {
this._jobDefinitions[definition.id] = {
...definition,
enabled: definition.enabled ?? true,
};
}
public async triggerJob(options: TriggerJobOptions) {
const signature = sign(options);
await Promise.race([
fetch(`${NEXT_PUBLIC_WEBAPP_URL()}/api/jobs/trigger`, {
method: 'POST',
body: JSON.stringify(options),
headers: {
'Content-Type': 'application/json',
'X-Job-Signature': signature,
},
}),
new Promise((resolve) => {
setTimeout(resolve, 150);
}),
]);
}
public getApiHandler() {
return async (req: NextApiRequest, res: NextApiResponse) => {
if (req.method === 'POST') {
const signature = req.headers['x-job-signature'];
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const options = (await json(req)) as TriggerJobOptions;
const definition = this._jobDefinitions[options.name];
if (typeof signature !== 'string' || typeof options !== 'object') {
res.status(400).send('Bad request');
return;
}
if (!definition) {
res.status(404).send('Job not found');
return;
}
if (definition && !definition.enabled) {
console.log('Attempted to trigger a disabled job', options.name);
res.status(404).send('Job not found');
return;
}
if (!signature || !verify(options, signature)) {
res.status(401).send('Unauthorized');
return;
}
if (definition.trigger.schema) {
const result = definition.trigger.schema.safeParse(options.payload);
if (!result.success) {
res.status(400).send('Bad request');
return;
}
}
console.log(`[JOBS]: Triggering job ${options.name} with payload`, options.payload);
await definition.handler({
payload: options.payload,
io: this.createJobRunIO(options.name),
});
res.status(200).send('OK');
} else {
res.status(405).send('Method not allowed');
}
};
}
private createJobRunIO(jobId: string): JobRunIO {
return {
stableRun: async (_cacheKey, callback) => await callback(),
triggerJob: async (_cacheKey, payload) => await this.triggerJob(payload),
logger: {
debug: (...args) => console.debug(`[${jobId}]`, ...args),
error: (...args) => console.error(`[${jobId}]`, ...args),
info: (...args) => console.info(`[${jobId}]`, ...args),
log: (...args) => console.log(`[${jobId}]`, ...args),
warn: (...args) => console.warn(`[${jobId}]`, ...args),
},
// eslint-disable-next-line @typescript-eslint/require-await
wait: async () => {
throw new Error('Not implemented');
},
};
}
}

View File

@ -0,0 +1,75 @@
import type { NextApiRequest, NextApiResponse } from 'next';
import { createPagesRoute } from '@trigger.dev/nextjs';
import type { IO } from '@trigger.dev/sdk';
import { TriggerClient, eventTrigger } from '@trigger.dev/sdk';
import type { JobDefinition, JobRunIO, TriggerJobOptions } from './_internal/job';
import { BaseJobProvider } from './base';
export class TriggerJobProvider extends BaseJobProvider {
private static _instance: TriggerJobProvider;
private _client: TriggerClient;
private constructor(options: { client: TriggerClient }) {
super();
this._client = options.client;
}
static getInstance() {
if (!this._instance) {
const client = new TriggerClient({
id: 'documenso-app',
apiKey: process.env.NEXT_PRIVATE_TRIGGER_API_KEY,
apiUrl: process.env.NEXT_PRIVATE_TRIGGER_API_URL,
});
this._instance = new TriggerJobProvider({ client });
}
return this._instance;
}
public defineJob<T>(job: JobDefinition<T>): void {
this._client.defineJob({
id: job.id,
name: job.name,
version: job.version,
trigger: eventTrigger({
name: job.trigger.name,
schema: job.trigger.schema,
}),
run: async (payload, io) => job.handler({ payload, io: this.convertTriggerIoToJobRunIo(io) }),
});
}
public async triggerJob(_options: TriggerJobOptions): Promise<void> {
await this._client.sendEvent({
id: _options.id,
name: _options.name,
payload: _options.payload,
timestamp: _options.timestamp ? new Date(_options.timestamp) : undefined,
});
}
public getApiHandler(): (req: NextApiRequest, res: NextApiResponse) => Promise<void> {
const { handler } = createPagesRoute(this._client);
return handler;
}
private convertTriggerIoToJobRunIo(io: IO) {
return {
wait: io.wait,
logger: io.logger,
stableRun: async (cacheKey, callback) => io.runTask(cacheKey, callback),
triggerJob: async (cacheKey, payload) =>
io.sendEvent(cacheKey, {
...payload,
timestamp: payload.timestamp ? new Date(payload.timestamp) : undefined,
}),
} satisfies JobRunIO;
}
}

View File

@ -0,0 +1 @@
export * from './send-confirmation-email';

View File

@ -0,0 +1,23 @@
import { z } from 'zod';
import { sendConfirmationToken } from '../../server-only/user/send-confirmation-token';
import { jobsClient } from '../client';
jobsClient.defineJob({
id: 'send.confirmation.email',
name: 'Send Confirmation Email',
version: '1-0-0',
trigger: {
name: 'send.confirmation.email',
schema: z.object({
email: z.string().email(),
force: z.boolean().optional(),
}),
},
handler: async ({ payload }) => {
await sendConfirmationToken({
email: payload.email,
force: payload.force,
});
},
});

View File

@ -14,11 +14,11 @@ import { prisma } from '@documenso/prisma';
import { IdentityProvider, UserSecurityAuditLogType } from '@documenso/prisma/client';
import { AppError, AppErrorCode } from '../errors/app-error';
import { jobsClient } from '../jobs/client';
import { isTwoFactorAuthenticationEnabled } from '../server-only/2fa/is-2fa-availble';
import { validateTwoFactorAuthentication } from '../server-only/2fa/validate-2fa';
import { getMostRecentVerificationTokenByUserId } from '../server-only/user/get-most-recent-verification-token-by-user-id';
import { getUserByEmail } from '../server-only/user/get-user-by-email';
import { sendConfirmationToken } from '../server-only/user/send-confirmation-token';
import type { TAuthenticationResponseJSONSchema } from '../types/webauthn';
import { ZAuthenticationResponseJSONSchema } from '../types/webauthn';
import { extractNextAuthRequestMetadata } from '../universal/extract-request-metadata';
@ -108,7 +108,12 @@ export const NEXT_AUTH_OPTIONS: AuthOptions = {
mostRecentToken.expires.valueOf() <= Date.now() ||
DateTime.fromJSDate(mostRecentToken.createdAt).diffNow('minutes').minutes > -5
) {
await sendConfirmationToken({ email });
await jobsClient.triggerJob({
name: 'send.confirmation.email',
payload: {
email: user.email,
},
});
}
throw new Error(ErrorCode.UNVERIFIED_EMAIL);

View File

@ -32,6 +32,8 @@
"@pdf-lib/fontkit": "^1.1.1",
"@scure/base": "^1.1.3",
"@sindresorhus/slugify": "^2.2.1",
"@trigger.dev/nextjs": "^2.3.18",
"@trigger.dev/sdk": "^2.3.18",
"@upstash/redis": "^1.20.6",
"@vvo/tzdb": "^6.117.0",
"kysely": "^0.26.3",
@ -43,7 +45,7 @@
"pdf-lib": "^1.17.1",
"pg": "^8.11.3",
"playwright": "1.43.0",
"react": "18.2.0",
"react": "18.3.1",
"remeda": "^1.27.1",
"stripe": "^12.7.0",
"ts-pattern": "^5.0.5",

View File

@ -2,7 +2,7 @@ import { DateTime } from 'luxon';
import { prisma } from '@documenso/prisma';
import { sendConfirmationToken } from './send-confirmation-token';
import { jobsClient } from '../../jobs/client';
export type VerifyEmailProps = {
token: string;
@ -40,7 +40,12 @@ export const verifyEmail = async ({ token }: VerifyEmailProps) => {
!mostRecentToken ||
DateTime.now().minus({ hours: 1 }).toJSDate() > mostRecentToken.createdAt
) {
await sendConfirmationToken({ email: verificationToken.user.email });
await jobsClient.triggerJob({
name: 'send.confirmation.email',
payload: {
email: verificationToken.user.email,
},
});
}
return valid;

View File

@ -5,6 +5,7 @@ import { env } from 'next-runtime-env';
import { IS_BILLING_ENABLED } from '@documenso/lib/constants/app';
import { AppError, AppErrorCode } from '@documenso/lib/errors/app-error';
import { jobsClient } from '@documenso/lib/jobs/client';
import { ErrorCode } from '@documenso/lib/next-auth/error-codes';
import { createPasskey } from '@documenso/lib/server-only/auth/create-passkey';
import { createPasskeyAuthenticationOptions } from '@documenso/lib/server-only/auth/create-passkey-authentication-options';
@ -15,7 +16,6 @@ import { findPasskeys } from '@documenso/lib/server-only/auth/find-passkeys';
import { compareSync } from '@documenso/lib/server-only/auth/hash';
import { updatePasskey } from '@documenso/lib/server-only/auth/update-passkey';
import { createUser } from '@documenso/lib/server-only/user/create-user';
import { sendConfirmationToken } from '@documenso/lib/server-only/user/send-confirmation-token';
import { extractNextApiRequestMetadata } from '@documenso/lib/universal/extract-request-metadata';
import { authenticatedProcedure, procedure, router } from '../trpc';
@ -52,7 +52,12 @@ export const authRouter = router({
const user = await createUser({ name, email, password, signature, url });
await sendConfirmationToken({ email: user.email });
await jobsClient.triggerJob({
name: 'send.confirmation.email',
payload: {
email: user.email,
},
});
return user;
} catch (err) {

View File

@ -2,13 +2,13 @@ import { TRPCError } from '@trpc/server';
import { IS_BILLING_ENABLED } from '@documenso/lib/constants/app';
import { AppError, AppErrorCode } from '@documenso/lib/errors/app-error';
import { jobsClient } from '@documenso/lib/jobs/client';
import { getSubscriptionsByUserId } from '@documenso/lib/server-only/subscription/get-subscriptions-by-user-id';
import { deleteUser } from '@documenso/lib/server-only/user/delete-user';
import { findUserSecurityAuditLogs } from '@documenso/lib/server-only/user/find-user-security-audit-logs';
import { forgotPassword } from '@documenso/lib/server-only/user/forgot-password';
import { getUserById } from '@documenso/lib/server-only/user/get-user-by-id';
import { resetPassword } from '@documenso/lib/server-only/user/reset-password';
import { sendConfirmationToken } from '@documenso/lib/server-only/user/send-confirmation-token';
import { updatePassword } from '@documenso/lib/server-only/user/update-password';
import { updateProfile } from '@documenso/lib/server-only/user/update-profile';
import { updatePublicProfile } from '@documenso/lib/server-only/user/update-public-profile';
@ -200,7 +200,12 @@ export const profileRouter = router({
try {
const { email } = input;
return await sendConfirmationToken({ email });
await jobsClient.triggerJob({
name: 'send.confirmation.email',
payload: {
email,
},
});
} catch (err) {
console.error(err);

View File

@ -68,6 +68,14 @@ declare namespace NodeJS {
//
NEXT_PRIVATE_BROWSERLESS_URL?: string;
NEXT_PRIVATE_JOBS_PROVIDER?: 'trigger' | 'local';
/**
* Trigger.dev environment variables
*/
NEXT_PRIVATE_TRIGGER_API_KEY?: string;
NEXT_PRIVATE_TRIGGER_API_URL?: string;
/**
* Vercel environment variables
*/

View File

@ -22,7 +22,7 @@
"@types/luxon": "^3.3.2",
"@types/react": "18.2.18",
"@types/react-dom": "18.2.7",
"react": "18.2.0",
"react": "18.3.1",
"typescript": "5.2.2"
},
"dependencies": {