This commit is contained in:
Philipinho
2024-05-03 15:52:25 +01:00
parent 7f933addff
commit c1cd090252
7 changed files with 22 additions and 13 deletions

View File

@ -29,7 +29,7 @@ export class CollaborationGateway {
this.hocuspocus.handleConnection(client, request); this.hocuspocus.handleConnection(client, request);
} }
destroy() { async destroy(): Promise<void> {
this.hocuspocus.destroy(); await this.hocuspocus.destroy();
} }
} }

View File

@ -38,9 +38,9 @@ export class CollaborationModule implements OnModuleInit, OnModuleDestroy {
}); });
} }
onModuleDestroy(): any { async onModuleDestroy(): Promise<void> {
if (this.collaborationGateway) { if (this.collaborationGateway) {
this.collaborationGateway.destroy(); await this.collaborationGateway.destroy();
} }
if (this.collabWsAdapter) { if (this.collabWsAdapter) {
this.collabWsAdapter.destroy(); this.collabWsAdapter.destroy();

View File

@ -1,5 +1,5 @@
export interface MailMessage { export interface MailMessage {
from: string; from?: string;
to: string; to: string;
subject: string; subject: string;
text?: string; text?: string;

View File

@ -5,7 +5,7 @@ import {
} from './providers/mail.provider'; } from './providers/mail.provider';
import { MailModuleOptions } from './interfaces'; import { MailModuleOptions } from './interfaces';
import { MailService } from './mail.service'; import { MailService } from './mail.service';
import { EmailProcessor } from './processors/email.processor.'; import { EmailProcessor } from './processors/email.processor';
@Global() @Global()
@Module({ @Module({

View File

@ -15,12 +15,12 @@ export class MailService {
@InjectQueue(QueueName.EMAIL_QUEUE) private emailQueue: Queue, @InjectQueue(QueueName.EMAIL_QUEUE) private emailQueue: Queue,
) {} ) {}
async sendEmail(message: Omit<MailMessage, 'from'>): Promise<void> { async sendEmail(message: MailMessage): Promise<void> {
const sender = `${this.environmentService.getMailFromName()} <${this.environmentService.getMailFromAddress()}> `; const sender = `${this.environmentService.getMailFromName()} <${this.environmentService.getMailFromAddress()}> `;
await this.mailDriver.sendMail({ from: sender, ...message }); await this.mailDriver.sendMail({ from: sender, ...message });
} }
async sendToQueue(message: Omit<MailMessage, 'from'>): Promise<void> { async sendToQueue(message: MailMessage): Promise<void> {
await this.emailQueue.add(QueueJob.SEND_EMAIL, message); await this.emailQueue.add(QueueJob.SEND_EMAIL, message);
} }
} }

View File

@ -1,18 +1,18 @@
import { Injectable, Logger } from '@nestjs/common'; import { Logger, OnModuleDestroy } from '@nestjs/common';
import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq'; import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
import { QueueName } from '../../queue/constants'; import { QueueName } from '../../queue/constants';
import { Job } from 'bullmq'; import { Job } from 'bullmq';
import { MailService } from '../mail.service'; import { MailService } from '../mail.service';
import { MailMessage } from '../interfaces/mail.message';
@Injectable()
@Processor(QueueName.EMAIL_QUEUE) @Processor(QueueName.EMAIL_QUEUE)
export class EmailProcessor extends WorkerHost { export class EmailProcessor extends WorkerHost implements OnModuleDestroy {
private readonly logger = new Logger(EmailProcessor.name); private readonly logger = new Logger(EmailProcessor.name);
constructor(private readonly mailService: MailService) { constructor(private readonly mailService: MailService) {
super(); super();
} }
async process(job: Job): Promise<void> { async process(job: Job<MailMessage, void>): Promise<void> {
try { try {
await this.mailService.sendEmail(job.data); await this.mailService.sendEmail(job.data);
} catch (err) { } catch (err) {
@ -27,7 +27,7 @@ export class EmailProcessor extends WorkerHost {
@OnWorkerEvent('failed') @OnWorkerEvent('failed')
onError(job: Job) { onError(job: Job) {
this.logger.warn( this.logger.error(
`Error processing ${job.name} job. Reason: ${job.failedReason}`, `Error processing ${job.name} job. Reason: ${job.failedReason}`,
); );
} }
@ -36,4 +36,10 @@ export class EmailProcessor extends WorkerHost {
onCompleted(job: Job) { onCompleted(job: Job) {
this.logger.debug(`Completed ${job.name} job`); this.logger.debug(`Completed ${job.name} job`);
} }
async onModuleDestroy(): Promise<void> {
if (this.worker) {
await this.worker.close();
}
}
} }

View File

@ -15,6 +15,9 @@ import { QueueName } from './constants';
host: redisConfig.host, host: redisConfig.host,
port: redisConfig.port, port: redisConfig.port,
password: redisConfig.password, password: redisConfig.password,
retryStrategy: function (times: number) {
return Math.max(Math.min(Math.exp(times), 20000), 1000);
},
}, },
defaultJobOptions: { defaultJobOptions: {
attempts: 3, attempts: 3,