Implement BullMQ for background job processing

* new REDIS_URL environment variable
This commit is contained in:
Philipinho
2024-05-03 02:56:03 +01:00
parent 19a1f5e12d
commit 7f933addff
15 changed files with 314 additions and 6 deletions

View File

@ -31,6 +31,7 @@
"@casl/ability": "^6.7.1",
"@fastify/multipart": "^8.2.0",
"@fastify/static": "^7.0.3",
"@nestjs/bullmq": "^10.1.1",
"@nestjs/common": "^10.3.8",
"@nestjs/config": "^3.2.2",
"@nestjs/core": "^10.3.8",
@ -44,6 +45,7 @@
"@react-email/render": "^0.0.13",
"@types/pg": "^8.11.5",
"bcrypt": "^5.1.1",
"bullmq": "^5.7.8",
"bytes": "^3.1.2",
"class-transformer": "^0.5.1",
"class-validator": "^0.14.1",

View File

@ -11,6 +11,7 @@ import { DatabaseModule } from '@docmost/db/database.module';
import * as fs from 'fs';
import { StorageModule } from './integrations/storage/storage.module';
import { MailModule } from './integrations/mail/mail.module';
import { QueueModule } from './integrations/queue/queue.module';
const clientDistPath = join(__dirname, '..', '..', 'client/dist');
@ -32,6 +33,7 @@ function getServeStaticModule() {
EnvironmentModule,
CollaborationModule,
WsModule,
QueueModule,
...getServeStaticModule(),
StorageModule.forRootAsync({
imports: [EnvironmentModule],

View File

@ -24,3 +24,16 @@ export async function comparePasswordHash(
export function getRandomInt(min = 4, max = 5) {
return Math.floor(Math.random() * (max - min + 1)) + min;
}
export type RedisConfig = {
host: string;
port: number;
password?: string;
};
export function parseRedisUrl(redisUrl: string): RedisConfig {
// format - redis[s]://[[username][:password]@][host][:port][/db-number]
const { hostname, port, password } = new URL(redisUrl);
const portInt = parseInt(port, 10);
return { host: hostname, port: portInt, password };
}

View File

@ -102,4 +102,11 @@ export class EnvironmentService {
getPostmarkToken(): string {
return this.configService.get<string>('POSTMARK_TOKEN');
}
getRedisUrl(): string {
return this.configService.get<string>(
'REDIS_URL',
'redis://@127.0.0.1:6379',
);
}
}

View File

@ -25,6 +25,7 @@ export class PostmarkDriver implements MailDriver {
this.logger.debug(`Sent mail to ${message.to}`);
} catch (err) {
this.logger.warn(`Failed to send mail to ${message.to}: ${err}`);
throw err;
}
}
}

View File

@ -27,6 +27,7 @@ export class SmtpDriver implements MailDriver {
this.logger.debug(`Sent mail to ${message.to}`);
} catch (err) {
this.logger.warn(`Failed to send mail to ${message.to}: ${err}`);
throw err;
}
}
}

View File

@ -5,9 +5,12 @@ import {
} from './providers/mail.provider';
import { MailModuleOptions } from './interfaces';
import { MailService } from './mail.service';
import { EmailProcessor } from './processors/email.processor.';
@Global()
@Module({})
@Module({
providers: [EmailProcessor],
})
export class MailModule {
static forRootAsync(options: MailModuleOptions): DynamicModule {
return {

View File

@ -3,16 +3,24 @@ import { MAIL_DRIVER_TOKEN } from './mail.constants';
import { MailDriver } from './drivers/interfaces/mail-driver.interface';
import { MailMessage } from './interfaces/mail.message';
import { EnvironmentService } from '../environment/environment.service';
import { InjectQueue } from '@nestjs/bullmq';
import { QueueName, QueueJob } from '../queue/constants';
import { Queue } from 'bullmq';
@Injectable()
export class MailService {
constructor(
@Inject(MAIL_DRIVER_TOKEN) private mailDriver: MailDriver,
private readonly environmentService: EnvironmentService,
@InjectQueue(QueueName.EMAIL_QUEUE) private emailQueue: Queue,
) {}
async sendMail(message: Omit<MailMessage, 'from'>): Promise<void> {
async sendEmail(message: Omit<MailMessage, 'from'>): Promise<void> {
const sender = `${this.environmentService.getMailFromName()} <${this.environmentService.getMailFromAddress()}> `;
await this.mailDriver.sendMail({ from: sender, ...message });
}
async sendToQueue(message: Omit<MailMessage, 'from'>): Promise<void> {
await this.emailQueue.add(QueueJob.SEND_EMAIL, message);
}
}

View File

@ -0,0 +1,39 @@
import { Injectable, Logger } from '@nestjs/common';
import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
import { QueueName } from '../../queue/constants';
import { Job } from 'bullmq';
import { MailService } from '../mail.service';
@Injectable()
@Processor(QueueName.EMAIL_QUEUE)
export class EmailProcessor extends WorkerHost {
private readonly logger = new Logger(EmailProcessor.name);
constructor(private readonly mailService: MailService) {
super();
}
async process(job: Job): Promise<void> {
try {
await this.mailService.sendEmail(job.data);
} catch (err) {
throw err;
}
}
@OnWorkerEvent('active')
onActive(job: Job) {
this.logger.debug(`Processing ${job.name} job`);
}
@OnWorkerEvent('failed')
onError(job: Job) {
this.logger.warn(
`Error processing ${job.name} job. Reason: ${job.failedReason}`,
);
}
@OnWorkerEvent('completed')
onCompleted(job: Job) {
this.logger.debug(`Completed ${job.name} job`);
}
}

View File

@ -0,0 +1 @@
export * from './queue.constants';

View File

@ -0,0 +1,7 @@
export enum QueueName {
EMAIL_QUEUE = '{email-queue}',
}
export enum QueueJob {
SEND_EMAIL = 'send-email',
}

View File

@ -0,0 +1,36 @@
import { Global, Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { EnvironmentService } from '../environment/environment.service';
import { parseRedisUrl } from '../../helpers';
import { QueueName } from './constants';
@Global()
@Module({
imports: [
BullModule.forRootAsync({
useFactory: (environmentService: EnvironmentService) => {
const redisConfig = parseRedisUrl(environmentService.getRedisUrl());
return {
connection: {
host: redisConfig.host,
port: redisConfig.port,
password: redisConfig.password,
},
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 10000,
},
},
};
},
inject: [EnvironmentService],
}),
BullModule.registerQueue({
name: QueueName.EMAIL_QUEUE,
}),
],
exports: [BullModule],
})
export class QueueModule {}

View File

@ -41,11 +41,11 @@ types.setTypeParser(types.builtins.INT8, (val) => Number(val));
log: (event: LogEvent) => {
if (environmentService.getEnv() !== 'development') return;
if (event.level === 'query') {
console.log(event.query.sql);
// console.log(event.query.sql);
//if (event.query.parameters.length > 0) {
//console.log('parameters: ' + event.query.parameters);
//}
console.log('time: ' + event.queryDurationMillis);
// console.log('time: ' + event.queryDurationMillis);
}
},
}),