diff --git a/apps/server/src/collaboration/collaboration.gateway.ts b/apps/server/src/collaboration/collaboration.gateway.ts index a12894e8..40369926 100644 --- a/apps/server/src/collaboration/collaboration.gateway.ts +++ b/apps/server/src/collaboration/collaboration.gateway.ts @@ -29,7 +29,7 @@ export class CollaborationGateway { this.hocuspocus.handleConnection(client, request); } - destroy() { - this.hocuspocus.destroy(); + async destroy(): Promise { + await this.hocuspocus.destroy(); } } diff --git a/apps/server/src/collaboration/collaboration.module.ts b/apps/server/src/collaboration/collaboration.module.ts index 7b48edc7..54e5ff7f 100644 --- a/apps/server/src/collaboration/collaboration.module.ts +++ b/apps/server/src/collaboration/collaboration.module.ts @@ -38,9 +38,9 @@ export class CollaborationModule implements OnModuleInit, OnModuleDestroy { }); } - onModuleDestroy(): any { + async onModuleDestroy(): Promise { if (this.collaborationGateway) { - this.collaborationGateway.destroy(); + await this.collaborationGateway.destroy(); } if (this.collabWsAdapter) { this.collabWsAdapter.destroy(); diff --git a/apps/server/src/integrations/mail/interfaces/mail.message.ts b/apps/server/src/integrations/mail/interfaces/mail.message.ts index c6952dd5..a6a3d1c3 100644 --- a/apps/server/src/integrations/mail/interfaces/mail.message.ts +++ b/apps/server/src/integrations/mail/interfaces/mail.message.ts @@ -1,5 +1,5 @@ export interface MailMessage { - from: string; + from?: string; to: string; subject: string; text?: string; diff --git a/apps/server/src/integrations/mail/mail.module.ts b/apps/server/src/integrations/mail/mail.module.ts index 6fb902cd..8ce04a63 100644 --- a/apps/server/src/integrations/mail/mail.module.ts +++ b/apps/server/src/integrations/mail/mail.module.ts @@ -5,7 +5,7 @@ import { } from './providers/mail.provider'; import { MailModuleOptions } from './interfaces'; import { MailService } from './mail.service'; -import { EmailProcessor } from './processors/email.processor.'; +import { EmailProcessor } from './processors/email.processor'; @Global() @Module({ diff --git a/apps/server/src/integrations/mail/mail.service.ts b/apps/server/src/integrations/mail/mail.service.ts index 6af69868..517ac067 100644 --- a/apps/server/src/integrations/mail/mail.service.ts +++ b/apps/server/src/integrations/mail/mail.service.ts @@ -15,12 +15,12 @@ export class MailService { @InjectQueue(QueueName.EMAIL_QUEUE) private emailQueue: Queue, ) {} - async sendEmail(message: Omit): Promise { + async sendEmail(message: MailMessage): Promise { const sender = `${this.environmentService.getMailFromName()} <${this.environmentService.getMailFromAddress()}> `; await this.mailDriver.sendMail({ from: sender, ...message }); } - async sendToQueue(message: Omit): Promise { + async sendToQueue(message: MailMessage): Promise { await this.emailQueue.add(QueueJob.SEND_EMAIL, message); } } diff --git a/apps/server/src/integrations/mail/processors/email.processor..ts b/apps/server/src/integrations/mail/processors/email.processor.ts similarity index 68% rename from apps/server/src/integrations/mail/processors/email.processor..ts rename to apps/server/src/integrations/mail/processors/email.processor.ts index e845ff2d..699bab54 100644 --- a/apps/server/src/integrations/mail/processors/email.processor..ts +++ b/apps/server/src/integrations/mail/processors/email.processor.ts @@ -1,18 +1,18 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Logger, OnModuleDestroy } from '@nestjs/common'; import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq'; import { QueueName } from '../../queue/constants'; import { Job } from 'bullmq'; import { MailService } from '../mail.service'; +import { MailMessage } from '../interfaces/mail.message'; -@Injectable() @Processor(QueueName.EMAIL_QUEUE) -export class EmailProcessor extends WorkerHost { +export class EmailProcessor extends WorkerHost implements OnModuleDestroy { private readonly logger = new Logger(EmailProcessor.name); constructor(private readonly mailService: MailService) { super(); } - async process(job: Job): Promise { + async process(job: Job): Promise { try { await this.mailService.sendEmail(job.data); } catch (err) { @@ -27,7 +27,7 @@ export class EmailProcessor extends WorkerHost { @OnWorkerEvent('failed') onError(job: Job) { - this.logger.warn( + this.logger.error( `Error processing ${job.name} job. Reason: ${job.failedReason}`, ); } @@ -36,4 +36,10 @@ export class EmailProcessor extends WorkerHost { onCompleted(job: Job) { this.logger.debug(`Completed ${job.name} job`); } + + async onModuleDestroy(): Promise { + if (this.worker) { + await this.worker.close(); + } + } } diff --git a/apps/server/src/integrations/queue/queue.module.ts b/apps/server/src/integrations/queue/queue.module.ts index d09c9ec2..4ba9257c 100644 --- a/apps/server/src/integrations/queue/queue.module.ts +++ b/apps/server/src/integrations/queue/queue.module.ts @@ -15,6 +15,9 @@ import { QueueName } from './constants'; host: redisConfig.host, port: redisConfig.port, password: redisConfig.password, + retryStrategy: function (times: number) { + return Math.max(Math.min(Math.exp(times), 20000), 1000); + }, }, defaultJobOptions: { attempts: 3,