From e96cf0ed4634da052330ba46f23aa368e0f7c89d Mon Sep 17 00:00:00 2001 From: Philipinho <16838612+Philipinho@users.noreply.github.com> Date: Thu, 10 Jul 2025 15:45:11 -0700 Subject: [PATCH] notifications module - POC --- apps/server/src/core/core.module.ts | 2 + .../core/notification/INTEGRATION_GUIDE.md | 276 ++++++++++++++++ .../controllers/notification.controller.ts | 122 +++++++ .../dto/create-notification.dto.ts | 58 ++++ .../notification/dto/get-notifications.dto.ts | 34 ++ .../notification/dto/update-preference.dto.ts | 75 +++++ .../events/notification.events.ts | 45 +++ .../core/notification/notification.module.ts | 32 ++ .../queues/notification-batch.processor.ts | 70 ++++ .../notification-aggregation.service.ts | 259 +++++++++++++++ .../services/notification-batching.service.ts | 215 +++++++++++++ .../notification-deduplication.service.ts | 151 +++++++++ .../services/notification-delivery.service.ts | 194 +++++++++++ .../notification-preference.service.ts | 303 ++++++++++++++++++ .../services/notification.service.ts | 275 ++++++++++++++++ .../templates/batch-notification.template.tsx | 153 +++++++++ .../templates/comment-on-page.template.tsx | 72 +++++ .../templates/export-completed.template.tsx | 117 +++++++ .../templates/mention-in-comment.template.tsx | 93 ++++++ .../templates/mention-in-page.template.tsx | 73 +++++ .../notification/types/notification.types.ts | 220 +++++++++++++ apps/server/src/database/database.module.ts | 16 +- ...50710T092527-create-notification-tables.ts | 233 ++++++++++++++ .../notification-aggregation.repo.ts | 125 ++++++++ .../notification/notification-batch.repo.ts | 110 +++++++ .../notification-preference.repo.ts | 134 ++++++++ .../repos/notification/notification.repo.ts | 175 ++++++++++ apps/server/src/database/types/db.d.ts | 78 +++++ .../server/src/database/types/entity.types.ts | 24 ++ apps/server/src/ws/ws.gateway.ts | 64 +++- apps/server/src/ws/ws.module.ts | 1 + 31 files changed, 3794 insertions(+), 5 deletions(-) create mode 100644 apps/server/src/core/notification/INTEGRATION_GUIDE.md create mode 100644 apps/server/src/core/notification/controllers/notification.controller.ts create mode 100644 apps/server/src/core/notification/dto/create-notification.dto.ts create mode 100644 apps/server/src/core/notification/dto/get-notifications.dto.ts create mode 100644 apps/server/src/core/notification/dto/update-preference.dto.ts create mode 100644 apps/server/src/core/notification/events/notification.events.ts create mode 100644 apps/server/src/core/notification/notification.module.ts create mode 100644 apps/server/src/core/notification/queues/notification-batch.processor.ts create mode 100644 apps/server/src/core/notification/services/notification-aggregation.service.ts create mode 100644 apps/server/src/core/notification/services/notification-batching.service.ts create mode 100644 apps/server/src/core/notification/services/notification-deduplication.service.ts create mode 100644 apps/server/src/core/notification/services/notification-delivery.service.ts create mode 100644 apps/server/src/core/notification/services/notification-preference.service.ts create mode 100644 apps/server/src/core/notification/services/notification.service.ts create mode 100644 apps/server/src/core/notification/templates/batch-notification.template.tsx create mode 100644 apps/server/src/core/notification/templates/comment-on-page.template.tsx create mode 100644 apps/server/src/core/notification/templates/export-completed.template.tsx create mode 100644 apps/server/src/core/notification/templates/mention-in-comment.template.tsx create mode 100644 apps/server/src/core/notification/templates/mention-in-page.template.tsx create mode 100644 apps/server/src/core/notification/types/notification.types.ts create mode 100644 apps/server/src/database/migrations/20250710T092527-create-notification-tables.ts create mode 100644 apps/server/src/database/repos/notification/notification-aggregation.repo.ts create mode 100644 apps/server/src/database/repos/notification/notification-batch.repo.ts create mode 100644 apps/server/src/database/repos/notification/notification-preference.repo.ts create mode 100644 apps/server/src/database/repos/notification/notification.repo.ts diff --git a/apps/server/src/core/core.module.ts b/apps/server/src/core/core.module.ts index f7f4f785..ed2aa5c4 100644 --- a/apps/server/src/core/core.module.ts +++ b/apps/server/src/core/core.module.ts @@ -16,6 +16,7 @@ import { GroupModule } from './group/group.module'; import { CaslModule } from './casl/casl.module'; import { DomainMiddleware } from '../common/middlewares/domain.middleware'; import { ShareModule } from './share/share.module'; +import { NotificationModule } from './notification/notification.module'; @Module({ imports: [ @@ -30,6 +31,7 @@ import { ShareModule } from './share/share.module'; GroupModule, CaslModule, ShareModule, + NotificationModule, ], }) export class CoreModule implements NestModule { diff --git a/apps/server/src/core/notification/INTEGRATION_GUIDE.md b/apps/server/src/core/notification/INTEGRATION_GUIDE.md new file mode 100644 index 00000000..4acd551b --- /dev/null +++ b/apps/server/src/core/notification/INTEGRATION_GUIDE.md @@ -0,0 +1,276 @@ +# Notification System Integration Guide + +This guide explains how to integrate the notification system into existing services. + +## Quick Start + +### 1. Import NotificationService + +```typescript +import { NotificationService } from '@/core/notification/services/notification.service'; +import { NotificationType } from '@/core/notification/types/notification.types'; +``` + +### 2. Inject the Service + +```typescript +constructor( + private readonly notificationService: NotificationService, + // ... other dependencies +) {} +``` + +### 3. Create Notifications + +```typescript +// Example: Notify user when mentioned in a comment +await this.notificationService.createNotification({ + workspaceId: workspace.id, + recipientId: mentionedUserId, + actorId: currentUser.id, + type: NotificationType.MENTION_IN_COMMENT, + entityType: 'comment', + entityId: comment.id, + context: { + pageId: page.id, + pageTitle: page.title, + commentText: comment.content.substring(0, 100), + actorName: currentUser.name, + threadRootId: comment.parentCommentId || comment.id, + }, + priority: NotificationPriority.HIGH, + groupKey: `comment:${comment.id}:mentions`, + deduplicationKey: `mention:${mentionedUserId}:comment:${comment.id}`, +}); +``` + +## Integration Examples + +### CommentService Integration + +```typescript +// In comment.service.ts +import { NotificationService } from '@/core/notification/services/notification.service'; +import { NotificationType, NotificationPriority } from '@/core/notification/types/notification.types'; + +export class CommentService { + constructor( + private readonly notificationService: NotificationService, + // ... other dependencies + ) {} + + async createComment(dto: CreateCommentDto, user: User): Promise { + const comment = await this.commentRepo.create(dto); + + // Notify page owner about new comment + if (page.creatorId !== user.id) { + await this.notificationService.createNotification({ + workspaceId: workspace.id, + recipientId: page.creatorId, + actorId: user.id, + type: NotificationType.COMMENT_ON_PAGE, + entityType: 'comment', + entityId: comment.id, + context: { + pageId: page.id, + pageTitle: page.title, + commentText: comment.content.substring(0, 100), + actorName: user.name, + }, + groupKey: `page:${page.id}:comments`, + }); + } + + // Check for mentions and notify mentioned users + const mentionedUserIds = this.extractMentions(comment.content); + for (const mentionedUserId of mentionedUserIds) { + await this.notificationService.createNotification({ + workspaceId: workspace.id, + recipientId: mentionedUserId, + actorId: user.id, + type: NotificationType.MENTION_IN_COMMENT, + entityType: 'comment', + entityId: comment.id, + context: { + pageId: page.id, + pageTitle: page.title, + commentText: comment.content.substring(0, 100), + actorName: user.name, + threadRootId: comment.parentCommentId || comment.id, + }, + priority: NotificationPriority.HIGH, + deduplicationKey: `mention:${mentionedUserId}:comment:${comment.id}`, + }); + } + + return comment; + } + + async resolveComment(commentId: string, user: User): Promise { + const comment = await this.commentRepo.findById(commentId); + + // Notify comment creator that their comment was resolved + if (comment.creatorId !== user.id) { + await this.notificationService.createNotification({ + workspaceId: workspace.id, + recipientId: comment.creatorId, + actorId: user.id, + type: NotificationType.COMMENT_RESOLVED, + entityType: 'comment', + entityId: comment.id, + context: { + pageId: page.id, + pageTitle: page.title, + resolverName: user.name, + }, + }); + } + } +} +``` + +### PageService Integration + +```typescript +// In page.service.ts +async exportPage(pageId: string, format: string, user: User): Promise { + // Start export process... + + // When export is complete + await this.notificationService.createNotification({ + workspaceId: workspace.id, + recipientId: user.id, + actorId: user.id, // System notification + type: NotificationType.EXPORT_COMPLETED, + entityType: 'page', + entityId: pageId, + context: { + pageTitle: page.title, + exportFormat: format, + downloadUrl: exportUrl, + expiresAt: expiryDate.toISOString(), + }, + priority: NotificationPriority.LOW, + }); +} + +async updatePage(pageId: string, content: any, user: User): Promise { + // Check for mentions in the content + const mentionedUserIds = this.extractMentionsFromContent(content); + + for (const mentionedUserId of mentionedUserIds) { + await this.notificationService.createNotification({ + workspaceId: workspace.id, + recipientId: mentionedUserId, + actorId: user.id, + type: NotificationType.MENTION_IN_PAGE, + entityType: 'page', + entityId: pageId, + context: { + pageTitle: page.title, + actorName: user.name, + mentionContext: this.extractMentionContext(content, mentionedUserId), + }, + priority: NotificationPriority.HIGH, + deduplicationKey: `mention:${mentionedUserId}:page:${pageId}:${Date.now()}`, + }); + } +} +``` + +### WsGateway Integration for Real-time Notifications + +The notification system automatically sends real-time updates through WebSocket. The WsGateway is already injected into NotificationDeliveryService. + +```typescript +// In ws.gateway.ts - Already implemented in NotificationDeliveryService +async sendNotificationToUser(userId: string, notification: any): Promise { + const userSockets = await this.getUserSockets(userId); + + for (const socketId of userSockets) { + this.server.to(socketId).emit('notification:new', { + id: notification.id, + type: notification.type, + entityType: notification.entityType, + entityId: notification.entityId, + context: notification.context, + createdAt: notification.createdAt, + readAt: notification.readAt, + }); + } +} +``` + +## Notification Types + +Available notification types: +- `MENTION_IN_PAGE` - User mentioned in a page +- `MENTION_IN_COMMENT` - User mentioned in a comment +- `COMMENT_ON_PAGE` - New comment on user's page +- `COMMENT_IN_THREAD` - Reply to user's comment +- `COMMENT_RESOLVED` - User's comment was resolved +- `EXPORT_COMPLETED` - Export job finished + +## Best Practices + +1. **Use Deduplication Keys**: Prevent duplicate notifications for the same event + ```typescript + deduplicationKey: `mention:${userId}:comment:${commentId}` + ``` + +2. **Set Appropriate Priority**: + - HIGH: Mentions, direct replies + - NORMAL: Comments on owned content + - LOW: System notifications, exports + +3. **Group Related Notifications**: Use groupKey for notifications that should be batched + ```typescript + groupKey: `page:${pageId}:comments` + ``` + +4. **Include Relevant Context**: Provide enough information for email templates + ```typescript + context: { + pageId: page.id, + pageTitle: page.title, + actorName: user.name, + // ... other relevant data + } + ``` + +5. **Check User Preferences**: The notification service automatically checks user preferences, but you can pre-check if needed: + ```typescript + const preferences = await notificationPreferenceService.getUserPreferences(userId, workspaceId); + if (preferences.emailEnabled) { + // Create notification + } + ``` + +## Testing Notifications + +Use the test endpoint to send test notifications: + +```bash +curl -X POST http://localhost:3000/api/notifications/test \ + -H "Authorization: Bearer YOUR_TOKEN" \ + -H "Content-Type: application/json" \ + -d '{ + "type": "MENTION_IN_PAGE", + "recipientId": "USER_ID" + }' +``` + +## Email Templates + +Email templates are located in `/core/notification/templates/`. To add a new template: + +1. Create a new React component in the templates directory +2. Update the email sending logic in NotificationDeliveryService +3. Test the template using the React Email preview server + +## Monitoring + +Monitor notification delivery through logs: +- Check for `NotificationService` logs for creation events +- Check for `NotificationDeliveryService` logs for delivery status +- Check for `NotificationBatchProcessor` logs for batch processing \ No newline at end of file diff --git a/apps/server/src/core/notification/controllers/notification.controller.ts b/apps/server/src/core/notification/controllers/notification.controller.ts new file mode 100644 index 00000000..d666769b --- /dev/null +++ b/apps/server/src/core/notification/controllers/notification.controller.ts @@ -0,0 +1,122 @@ +import { + Controller, + Get, + Post, + Put, + Body, + Param, + Query, + UseGuards, +} from '@nestjs/common'; +import { JwtAuthGuard } from '../../../common/guards/jwt-auth.guard'; +import { AuthUser } from '../../../common/decorators/auth-user.decorator'; +import { AuthWorkspace } from '../../../common/decorators/auth-workspace.decorator'; +import { User, Workspace } from '@docmost/db/types/entity.types'; +import { NotificationService } from '../services/notification.service'; +import { NotificationPreferenceService } from '../services/notification-preference.service'; +import { GetNotificationsDto } from '../dto/get-notifications.dto'; +import { UpdateNotificationPreferencesDto } from '../dto/update-preference.dto'; +import { NotificationType } from '../types/notification.types'; + +@Controller('notifications') +@UseGuards(JwtAuthGuard) +export class NotificationController { + constructor( + private readonly notificationService: NotificationService, + private readonly preferenceService: NotificationPreferenceService, + ) {} + + @Get() + async getNotifications( + @AuthUser() user: User, + @AuthWorkspace() workspace: Workspace, + @Query() query: GetNotificationsDto, + ) { + const { grouped = true, status, limit = 20, offset = 0 } = query; + + if (grouped) { + return await this.notificationService.getGroupedNotifications( + user.id, + workspace.id, + { status, limit, offset }, + ); + } + + return await this.notificationService.getNotifications( + user.id, + workspace.id, + { status, limit, offset }, + ); + } + + @Get('unread-count') + async getUnreadCount(@AuthUser() user: User) { + const count = await this.notificationService.getUnreadCount(user.id); + return { count }; + } + + @Post(':id/read') + async markAsRead( + @AuthUser() user: User, + @Param('id') notificationId: string, + ) { + await this.notificationService.markAsRead(notificationId, user.id); + return { success: true }; + } + + @Post('mark-all-read') + async markAllAsRead(@AuthUser() user: User) { + await this.notificationService.markAllAsRead(user.id); + return { success: true }; + } + + @Get('preferences') + async getPreferences( + @AuthUser() user: User, + @AuthWorkspace() workspace: Workspace, + ) { + return await this.preferenceService.getUserPreferences( + user.id, + workspace.id, + ); + } + + @Put('preferences') + async updatePreferences( + @AuthUser() user: User, + @AuthWorkspace() workspace: Workspace, + @Body() dto: UpdateNotificationPreferencesDto, + ) { + return await this.preferenceService.updateUserPreferences( + user.id, + workspace.id, + dto, + ); + } + + @Get('preferences/stats') + async getNotificationStats( + @AuthUser() user: User, + @AuthWorkspace() workspace: Workspace, + ) { + return await this.preferenceService.getNotificationStats( + user.id, + workspace.id, + ); + } + + @Post('test') + async sendTestNotification( + @AuthUser() user: User, + @AuthWorkspace() workspace: Workspace, + @Body() dto: { type: NotificationType }, + ) { + await this.notificationService.createTestNotification( + user.id, + workspace.id, + dto.type, + ); + + return { success: true, message: 'Test notification sent' }; + } +} diff --git a/apps/server/src/core/notification/dto/create-notification.dto.ts b/apps/server/src/core/notification/dto/create-notification.dto.ts new file mode 100644 index 00000000..0933e89c --- /dev/null +++ b/apps/server/src/core/notification/dto/create-notification.dto.ts @@ -0,0 +1,58 @@ +import { + IsEnum, + IsNotEmpty, + IsObject, + IsOptional, + IsString, + IsUUID, +} from 'class-validator'; +import { + NotificationType, + NotificationPriority, +} from '../types/notification.types'; + +export class CreateNotificationDto { + @IsUUID() + @IsNotEmpty() + workspaceId: string; + + @IsUUID() + @IsNotEmpty() + recipientId: string; + + @IsUUID() + @IsOptional() + actorId?: string; + + @IsEnum(NotificationType) + @IsNotEmpty() + type: NotificationType; + + @IsString() + @IsNotEmpty() + entityType: string; + + @IsUUID() + @IsNotEmpty() + entityId: string; + + @IsObject() + @IsNotEmpty() + context: Record; + + @IsEnum(NotificationPriority) + @IsOptional() + priority?: NotificationPriority; + + @IsString() + @IsOptional() + groupKey?: string; + + @IsString() + @IsOptional() + deduplicationKey?: string; + + // For scheduling notifications (quiet hours, etc.) + @IsOptional() + scheduledFor?: Date; +} diff --git a/apps/server/src/core/notification/dto/get-notifications.dto.ts b/apps/server/src/core/notification/dto/get-notifications.dto.ts new file mode 100644 index 00000000..1872c88b --- /dev/null +++ b/apps/server/src/core/notification/dto/get-notifications.dto.ts @@ -0,0 +1,34 @@ +import { + IsEnum, + IsOptional, + IsBoolean, + IsNumber, + Min, + Max, +} from 'class-validator'; +import { Transform } from 'class-transformer'; +import { NotificationStatus } from '../types/notification.types'; + +export class GetNotificationsDto { + @IsEnum(NotificationStatus) + @IsOptional() + status?: NotificationStatus; + + @IsBoolean() + @Transform(({ value }) => value === 'true' || value === true) + @IsOptional() + grouped?: boolean = true; + + @IsNumber() + @Transform(({ value }) => parseInt(value, 10)) + @Min(1) + @Max(100) + @IsOptional() + limit?: number = 20; + + @IsNumber() + @Transform(({ value }) => parseInt(value, 10)) + @Min(0) + @IsOptional() + offset?: number = 0; +} diff --git a/apps/server/src/core/notification/dto/update-preference.dto.ts b/apps/server/src/core/notification/dto/update-preference.dto.ts new file mode 100644 index 00000000..1dc13fa7 --- /dev/null +++ b/apps/server/src/core/notification/dto/update-preference.dto.ts @@ -0,0 +1,75 @@ +import { + IsBoolean, + IsEnum, + IsNumber, + IsObject, + IsOptional, + IsString, + Max, + Min, + Matches, + IsArray, +} from 'class-validator'; +import { + EmailFrequency, + NotificationTypeSettings, +} from '../types/notification.types'; + +export class UpdateNotificationPreferencesDto { + @IsBoolean() + @IsOptional() + emailEnabled?: boolean; + + @IsBoolean() + @IsOptional() + inAppEnabled?: boolean; + + @IsObject() + @IsOptional() + notificationSettings?: Record; + + @IsNumber() + @Min(5) + @Max(60) + @IsOptional() + batchWindowMinutes?: number; + + @IsNumber() + @Min(1) + @Max(100) + @IsOptional() + maxBatchSize?: number; + + @IsArray() + @IsString({ each: true }) + @IsOptional() + batchTypes?: string[]; + + @IsEnum(EmailFrequency) + @IsOptional() + emailFrequency?: EmailFrequency; + + @Matches(/^([0-1]?[0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]$/) + @IsOptional() + digestTime?: string; + + @IsBoolean() + @IsOptional() + quietHoursEnabled?: boolean; + + @Matches(/^([0-1]?[0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]$/) + @IsOptional() + quietHoursStart?: string; + + @Matches(/^([0-1]?[0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]$/) + @IsOptional() + quietHoursEnd?: string; + + @IsString() + @IsOptional() + timezone?: string; + + @IsBoolean() + @IsOptional() + weekendNotifications?: boolean; +} diff --git a/apps/server/src/core/notification/events/notification.events.ts b/apps/server/src/core/notification/events/notification.events.ts new file mode 100644 index 00000000..bb7dab7b --- /dev/null +++ b/apps/server/src/core/notification/events/notification.events.ts @@ -0,0 +1,45 @@ +import { Notification } from '@docmost/db/types/entity.types'; + +export class NotificationCreatedEvent { + constructor( + public readonly notification: Notification, + public readonly workspaceId: string, + ) {} +} + +export class NotificationReadEvent { + constructor( + public readonly notificationId: string, + public readonly userId: string, + ) {} +} + +export class NotificationAllReadEvent { + constructor( + public readonly userId: string, + public readonly notificationIds: string[], + ) {} +} + +export class NotificationBatchScheduledEvent { + constructor( + public readonly batchId: string, + public readonly scheduledFor: Date, + ) {} +} + +export class NotificationAggregatedEvent { + constructor( + public readonly aggregationId: string, + public readonly notificationIds: string[], + ) {} +} + +// Event names as constants +export const NOTIFICATION_EVENTS = { + CREATED: 'notification.created', + READ: 'notification.read', + ALL_READ: 'notification.allRead', + BATCH_SCHEDULED: 'notification.batchScheduled', + AGGREGATED: 'notification.aggregated', +} as const; diff --git a/apps/server/src/core/notification/notification.module.ts b/apps/server/src/core/notification/notification.module.ts new file mode 100644 index 00000000..0fbd2354 --- /dev/null +++ b/apps/server/src/core/notification/notification.module.ts @@ -0,0 +1,32 @@ +import { Module } from '@nestjs/common'; +import { BullModule } from '@nestjs/bullmq'; +import { NotificationService } from './services/notification.service'; +import { NotificationPreferenceService } from './services/notification-preference.service'; +import { NotificationDeduplicationService } from './services/notification-deduplication.service'; +import { NotificationDeliveryService } from './services/notification-delivery.service'; +import { NotificationBatchingService } from './services/notification-batching.service'; +import { NotificationAggregationService } from './services/notification-aggregation.service'; +import { NotificationController } from './controllers/notification.controller'; +import { NotificationBatchProcessor } from './queues/notification-batch.processor'; +import { WsModule } from '../../ws/ws.module'; + +@Module({ + imports: [ + BullModule.registerQueue({ + name: 'notification-batch', + }), + WsModule, + ], + controllers: [NotificationController], + providers: [ + NotificationService, + NotificationPreferenceService, + NotificationDeduplicationService, + NotificationDeliveryService, + NotificationBatchingService, + NotificationAggregationService, + NotificationBatchProcessor, + ], + exports: [NotificationService, NotificationPreferenceService], +}) +export class NotificationModule {} diff --git a/apps/server/src/core/notification/queues/notification-batch.processor.ts b/apps/server/src/core/notification/queues/notification-batch.processor.ts new file mode 100644 index 00000000..4efeb17b --- /dev/null +++ b/apps/server/src/core/notification/queues/notification-batch.processor.ts @@ -0,0 +1,70 @@ +import { Processor } from '@nestjs/bullmq'; +import { WorkerHost } from '@nestjs/bullmq'; +import { Injectable, Logger } from '@nestjs/common'; +import { Job } from 'bullmq'; +import { NotificationBatchingService } from '../services/notification-batching.service'; + +@Processor('notification-batch') +export class NotificationBatchProcessor extends WorkerHost { + private readonly logger = new Logger(NotificationBatchProcessor.name); + + constructor(private readonly batchingService: NotificationBatchingService) { + super(); + } + + async process(job: Job) { + if (job.name === 'process-batch') { + return this.processBatch(job); + } else if (job.name === 'check-pending-batches') { + return this.checkPendingBatches(job); + } + } + + async processBatch(job: Job<{ batchId: string }>) { + this.logger.debug(`Processing notification batch: ${job.data.batchId}`); + + try { + await this.batchingService.processBatch(job.data.batchId); + return { success: true, batchId: job.data.batchId }; + } catch (error) { + this.logger.error( + `Failed to process batch ${job.data.batchId}:`, + error instanceof Error ? error.stack : String(error), + ); + throw error; + } + } + + async checkPendingBatches(job: Job) { + this.logger.debug('Checking for pending notification batches'); + + try { + const pendingBatches = await this.batchingService.getPendingBatches(); + + for (const batch of pendingBatches) { + // Calculate delay + const delay = Math.max(0, batch.scheduled_for.getTime() - Date.now()); + + // Add to queue with appropriate delay + await this.queue.add('process-batch', { batchId: batch.id }, { delay }); + + this.logger.debug( + `Scheduled batch ${batch.id} for processing in ${delay}ms`, + ); + } + + return { processedCount: pendingBatches.length }; + } catch (error) { + this.logger.error( + 'Failed to check pending batches:', + error instanceof Error ? error.stack : String(error), + ); + throw error; + } + } + + // Reference to the queue (injected by Bull) + private get queue() { + return (this as any).queue; + } +} diff --git a/apps/server/src/core/notification/services/notification-aggregation.service.ts b/apps/server/src/core/notification/services/notification-aggregation.service.ts new file mode 100644 index 00000000..c67c9a8d --- /dev/null +++ b/apps/server/src/core/notification/services/notification-aggregation.service.ts @@ -0,0 +1,259 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { NotificationAggregationRepo } from '@docmost/db/repos/notification/notification-aggregation.repo'; +import { NotificationRepo } from '@docmost/db/repos/notification/notification.repo'; +import { Notification, NotificationAggregation } from '@docmost/db/types/entity.types'; +import { + NotificationType, + AggregationType, + AggregatedNotificationMessage, +} from '../types/notification.types'; + +interface AggregationRule { + types: NotificationType[]; + timeWindow: number; + minCount: number; + aggregationType: 'actor_based' | 'time_based' | 'count_based'; +} + +@Injectable() +export class NotificationAggregationService { + private readonly logger = new Logger(NotificationAggregationService.name); + + private readonly aggregationRules: Map = + new Map([ + [ + NotificationType.COMMENT_ON_PAGE, + { + types: [NotificationType.COMMENT_ON_PAGE], + timeWindow: 3600000, // 1 hour + minCount: 2, + aggregationType: 'actor_based', + }, + ], + [ + NotificationType.MENTION_IN_COMMENT, + { + types: [ + NotificationType.MENTION_IN_COMMENT, + NotificationType.MENTION_IN_PAGE, + ], + timeWindow: 1800000, // 30 minutes + minCount: 3, + aggregationType: 'count_based', + }, + ], + [ + NotificationType.COMMENT_IN_THREAD, + { + types: [NotificationType.COMMENT_IN_THREAD], + timeWindow: 3600000, // 1 hour + minCount: 2, + aggregationType: 'actor_based', + }, + ], + ]); + + constructor( + private readonly aggregationRepo: NotificationAggregationRepo, + private readonly notificationRepo: NotificationRepo, + ) {} + + async aggregateNotifications( + recipientId: string, + type: NotificationType, + entityId: string, + timeWindow: number = 3600000, // 1 hour default + ): Promise { + const aggregationKey = this.generateAggregationKey( + recipientId, + type, + entityId, + ); + + // Check if there's an existing aggregation within time window + const existing = await this.aggregationRepo.findByKey(aggregationKey); + + if (existing && this.isWithinTimeWindow(existing.updatedAt, timeWindow)) { + return existing; + } + + // Find recent notifications to aggregate + const recentNotifications = await this.notificationRepo.findRecent({ + recipientId, + type, + entityId, + since: new Date(Date.now() - timeWindow), + }); + + const rule = this.aggregationRules.get(type); + if (!rule || recentNotifications.length < rule.minCount) { + return null; + } + + // Create new aggregation + return await this.createAggregation( + aggregationKey, + recentNotifications, + type, + ); + } + + async updateAggregation( + aggregation: NotificationAggregation, + notification: Notification, + ): Promise { + await this.aggregationRepo.addNotificationToAggregation( + aggregation.aggregationKey, + notification.id, + notification.actorId || undefined, + ); + + this.logger.debug( + `Updated aggregation ${aggregation.id} with notification ${notification.id}`, + ); + } + + private async createAggregation( + key: string, + notifications: Notification[], + type: NotificationType, + ): Promise { + const actors = [ + ...new Set(notifications.map((n) => n.actorId).filter(Boolean)), + ]; + const notificationIds = notifications.map((n) => n.id); + + const summaryData = { + totalCount: notifications.length, + actorCount: actors.length, + firstActorId: actors[0], + recentActors: actors.slice(0, 3), + timeSpan: { + start: notifications[notifications.length - 1].createdAt.toISOString(), + end: notifications[0].createdAt.toISOString(), + }, + }; + + const aggregation = await this.aggregationRepo.insertAggregation({ + aggregationKey: key, + recipientId: notifications[0].recipientId, + aggregationType: this.getAggregationType(type), + entityType: notifications[0].entityType, + entityId: notifications[0].entityId, + actorIds: actors, + notificationIds: notificationIds, + summaryData: summaryData, + }); + + this.logger.log( + `Created aggregation ${aggregation.id} for ${notifications.length} notifications`, + ); + + return aggregation; + } + + private generateAggregationKey( + recipientId: string, + type: NotificationType, + entityId: string, + ): string { + return `${recipientId}:${type}:${entityId}`; + } + + private isWithinTimeWindow(updatedAt: Date, timeWindow: number): boolean { + return Date.now() - updatedAt.getTime() < timeWindow; + } + + private getAggregationType(type: NotificationType): AggregationType { + switch (type) { + case NotificationType.COMMENT_ON_PAGE: + case NotificationType.COMMENT_RESOLVED: + return AggregationType.COMMENTS_ON_PAGE; + + case NotificationType.MENTION_IN_PAGE: + return AggregationType.MENTIONS_IN_PAGE; + + case NotificationType.MENTION_IN_COMMENT: + return AggregationType.MENTIONS_IN_COMMENTS; + + case NotificationType.COMMENT_IN_THREAD: + return AggregationType.THREAD_ACTIVITY; + + default: + return AggregationType.COMMENTS_ON_PAGE; + } + } + + async createAggregatedNotificationMessage( + aggregation: NotificationAggregation, + ): Promise { + // TODO: Load actor information from user service + // For now, return a simplified version + const actors = aggregation.actorIds.slice(0, 3).map((id) => ({ + id, + name: 'User', // TODO: Load actual user name + avatarUrl: undefined, + })); + + const primaryActor = actors[0]; + const otherActorsCount = aggregation.actorIds.length - 1; + + let message: string; + let title: string; + + switch (aggregation.aggregationType) { + case AggregationType.COMMENTS_ON_PAGE: + if (otherActorsCount === 0) { + title = `${primaryActor.name} commented on a page`; + message = 'View the comment'; + } else if (otherActorsCount === 1) { + title = `${primaryActor.name} and 1 other commented on a page`; + message = 'View 2 comments'; + } else { + title = `${primaryActor.name} and ${otherActorsCount} others commented on a page`; + message = `View ${aggregation.notificationIds.length} comments`; + } + break; + + case AggregationType.MENTIONS_IN_PAGE: + case AggregationType.MENTIONS_IN_COMMENTS: { + const totalMentions = aggregation.notificationIds.length; + if (totalMentions === 1) { + title = `${primaryActor.name} mentioned you`; + message = 'View mention'; + } else { + title = `You were mentioned ${totalMentions} times`; + message = `By ${primaryActor.name} and ${otherActorsCount} others`; + } + break; + } + + default: + title = `${aggregation.notificationIds.length} new notifications`; + message = 'View all'; + } + + return { + id: aggregation.id, + title, + message, + actors, + totalCount: aggregation.notificationIds.length, + entityId: aggregation.entityId, + entityType: aggregation.entityType, + createdAt: aggregation.createdAt, + updatedAt: aggregation.updatedAt, + }; + } + + async cleanupOldAggregations(olderThan: Date): Promise { + const deletedCount = + await this.aggregationRepo.deleteOldAggregations(olderThan); + + if (deletedCount > 0) { + this.logger.log(`Cleaned up ${deletedCount} old aggregations`); + } + + return deletedCount; + } +} \ No newline at end of file diff --git a/apps/server/src/core/notification/services/notification-batching.service.ts b/apps/server/src/core/notification/services/notification-batching.service.ts new file mode 100644 index 00000000..d30762c7 --- /dev/null +++ b/apps/server/src/core/notification/services/notification-batching.service.ts @@ -0,0 +1,215 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { InjectQueue } from '@nestjs/bullmq'; +import { Queue } from 'bullmq'; +import { NotificationRepo } from '../../../database/repos/notification/notification.repo'; +import { NotificationBatchRepo } from '../../../database/repos/notification/notification-batch.repo'; +import { NotificationPreferenceService } from './notification-preference.service'; +import { Notification } from '@docmost/db/types/entity.types'; +import { NotificationType, BatchType } from '../types/notification.types'; + +interface NotificationGroup { + type: NotificationType; + entityId: string; + entityType: string; + notifications: Notification[]; + actors: Set; + summary: string; +} + +@Injectable() +export class NotificationBatchingService { + private readonly logger = new Logger(NotificationBatchingService.name); + + constructor( + private readonly notificationRepo: NotificationRepo, + private readonly batchRepo: NotificationBatchRepo, + private readonly preferenceService: NotificationPreferenceService, + @InjectQueue('notification-batch') private readonly batchQueue: Queue, + ) {} + + async addToBatch(notification: Notification): Promise { + try { + const preferences = await this.preferenceService.getUserPreferences( + notification.recipientId, + notification.workspaceId, + ); + + const batchKey = this.generateBatchKey(notification); + + // Find or create batch + let batch = await this.batchRepo.findByBatchKey( + batchKey, + notification.recipientId, + true, // notSentOnly + ); + + if (!batch) { + // Create new batch + const scheduledFor = new Date(); + scheduledFor.setMinutes(scheduledFor.getMinutes() + preferences.batchWindowMinutes); + + batch = await this.batchRepo.insertBatch({ + recipientId: notification.recipientId, + workspaceId: notification.workspaceId, + batchType: BatchType.SIMILAR_ACTIVITY, + batchKey: batchKey, + notificationCount: 1, + firstNotificationId: notification.id, + scheduledFor: scheduledFor, + }); + + // Schedule batch processing + await this.batchQueue.add( + 'process-batch', + { batchId: batch.id }, + { + delay: preferences.batchWindowMinutes * 60 * 1000, + }, + ); + } else { + // Add to existing batch + await this.batchRepo.incrementNotificationCount(batch.id); + } + + // Update notification with batch ID + await this.notificationRepo.updateNotification(notification.id, { + batchId: batch.id, + isBatched: true, + }); + + this.logger.debug(`Notification ${notification.id} added to batch ${batch.id}`); + } catch (error) { + this.logger.error( + `Failed to batch notification ${notification.id}: ${error instanceof Error ? error.message : String(error)}`, + error instanceof Error ? error.stack : undefined, + ); + // Fall back to instant delivery on error + throw error; + } + } + + private generateBatchKey(notification: Notification): string { + switch (notification.type) { + case NotificationType.COMMENT_ON_PAGE: + case NotificationType.COMMENT_RESOLVED: { + const context = notification.context as any; + return `page:${context?.pageId}:comments`; + } + + case NotificationType.MENTION_IN_PAGE: + return `page:${notification.entityId}:mentions`; + + case NotificationType.COMMENT_IN_THREAD: { + const mentionContext = notification.context as any; + return `thread:${mentionContext?.threadRootId}`; + } + + default: + return `${notification.entityType}:${notification.entityId}:${notification.type}`; + } + } + + async processBatch(batchId: string): Promise { + const batch = await this.batchRepo.findById(batchId); + if (!batch || batch.sentAt) { + this.logger.debug(`Batch ${batchId} not found or already sent`); + return; + } + + const notifications = await this.notificationRepo.findByBatchId(batchId); + + if (notifications.length === 0) { + this.logger.debug(`No notifications found for batch ${batchId}`); + return; + } + + // Group notifications by type for smart formatting + const grouped = this.groupNotificationsByType(notifications); + + // Send batch email + await this.sendBatchEmail(batch.recipientId, batch.workspaceId, grouped); + + // Mark batch as sent + await this.batchRepo.markAsSent(batchId); + + // Update email sent timestamp for all notifications + const notificationIds = notifications.map(n => n.id); + await Promise.all( + notificationIds.map(id => + this.notificationRepo.updateNotification(id, { emailSentAt: new Date() }) + ), + ); + + this.logger.log(`Batch ${batchId} processed with ${notifications.length} notifications`); + } + + private groupNotificationsByType(notifications: Notification[]): NotificationGroup[] { + const groups = new Map(); + + for (const notification of notifications) { + const key = `${notification.type}:${notification.entityId}`; + + if (!groups.has(key)) { + groups.set(key, { + type: notification.type as NotificationType, + entityId: notification.entityId, + entityType: notification.entityType, + notifications: [], + actors: new Set(), + summary: '', + }); + } + + const group = groups.get(key)!; + group.notifications.push(notification); + if (notification.actorId) { + group.actors.add(notification.actorId); + } + } + + // Generate summaries for each group + for (const group of groups.values()) { + group.summary = this.generateSummary(group.type, group.notifications); + } + + return Array.from(groups.values()); + } + + private generateSummary(type: NotificationType, notifications: Notification[]): string { + const count = notifications.length; + const actors = new Set(notifications.map(n => n.actorId).filter(Boolean)); + + switch (type) { + case NotificationType.COMMENT_ON_PAGE: + if (count === 1) return 'commented on a page you follow'; + return `and ${actors.size - 1} others commented on a page you follow`; + + case NotificationType.MENTION_IN_COMMENT: + if (count === 1) return 'mentioned you in a comment'; + return `mentioned you ${count} times in comments`; + + case NotificationType.COMMENT_RESOLVED: + if (count === 1) return 'resolved a comment'; + return `resolved ${count} comments`; + + default: + return `${count} new activities`; + } + } + + private async sendBatchEmail( + recipientId: string, + workspaceId: string, + groups: NotificationGroup[], + ): Promise { + // TODO: Implement email sending with batch template + // This will be implemented when we create email templates + this.logger.log( + `Sending batch email to ${recipientId} with ${groups.length} notification groups`, + ); + } + + async getPendingBatches(): Promise { + return await this.batchRepo.getPendingBatches(); + } +} \ No newline at end of file diff --git a/apps/server/src/core/notification/services/notification-deduplication.service.ts b/apps/server/src/core/notification/services/notification-deduplication.service.ts new file mode 100644 index 00000000..79ce0221 --- /dev/null +++ b/apps/server/src/core/notification/services/notification-deduplication.service.ts @@ -0,0 +1,151 @@ +import { Injectable } from '@nestjs/common'; +import { NotificationType } from '../types/notification.types'; +import { CreateNotificationDto } from '../dto/create-notification.dto'; +import { createHash } from 'crypto'; + +@Injectable() +export class NotificationDeduplicationService { + /** + * Generate a unique deduplication key based on notification type and context + */ + generateDeduplicationKey(params: CreateNotificationDto): string | null { + switch (params.type) { + case NotificationType.MENTION_IN_PAGE: + // Only one notification per mention in a page (until page is updated again) + return this.hash([ + 'mention', + 'page', + params.entityId, + params.actorId, + params.recipientId, + ]); + + case NotificationType.MENTION_IN_COMMENT: + // One notification per comment mention + return this.hash([ + 'mention', + 'comment', + params.entityId, + params.actorId, + params.recipientId, + ]); + + case NotificationType.COMMENT_ON_PAGE: + // Allow multiple notifications for different comments on the same page + return null; // No deduplication, rely on batching instead + + case NotificationType.REPLY_TO_COMMENT: + // One notification per reply + return this.hash([ + 'reply', + params.entityId, + params.actorId, + params.recipientId, + ]); + + case NotificationType.COMMENT_RESOLVED: + // One notification per comment resolution + return this.hash([ + 'resolved', + params.context.commentId, + params.actorId, + params.recipientId, + ]); + + case NotificationType.EXPORT_COMPLETED: + case NotificationType.EXPORT_FAILED: + // One notification per export job + return this.hash([ + 'export', + params.context.jobId || params.entityId, + params.recipientId, + ]); + + case NotificationType.PAGE_SHARED: + // One notification per page share action + return this.hash([ + 'share', + params.entityId, + params.actorId, + params.recipientId, + Date.now().toString(), // Include timestamp to allow re-sharing + ]); + + default: + // For other types, generate a key based on common fields + return this.hash([ + params.type, + params.entityId, + params.actorId, + params.recipientId, + ]); + } + } + + /** + * Check if a notification should be deduplicated based on recent activity + */ + shouldDeduplicate(type: NotificationType): boolean { + const deduplicatedTypes = [ + NotificationType.MENTION_IN_PAGE, + NotificationType.MENTION_IN_COMMENT, + NotificationType.REPLY_TO_COMMENT, + NotificationType.COMMENT_RESOLVED, + NotificationType.EXPORT_COMPLETED, + NotificationType.EXPORT_FAILED, + ]; + + return deduplicatedTypes.includes(type); + } + + /** + * Get the time window for deduplication (in milliseconds) + */ + getDeduplicationWindow(type: NotificationType): number { + switch (type) { + case NotificationType.MENTION_IN_PAGE: + return 24 * 60 * 60 * 1000; // 24 hours + + case NotificationType.MENTION_IN_COMMENT: + return 60 * 60 * 1000; // 1 hour + + case NotificationType.EXPORT_COMPLETED: + case NotificationType.EXPORT_FAILED: + return 5 * 60 * 1000; // 5 minutes + + default: + return 30 * 60 * 1000; // 30 minutes default + } + } + + /** + * Create a hash from array of values + */ + private hash(values: (string | null | undefined)[]): string { + const filtered = values.filter((v) => v !== null && v !== undefined); + const input = filtered.join(':'); + return createHash('sha256').update(input).digest('hex').substring(0, 32); + } + + /** + * Generate a key for custom deduplication scenarios + */ + generateCustomKey( + type: string, + entityId: string, + recipientId: string, + additionalData?: Record, + ): string { + const baseValues = [type, entityId, recipientId]; + + if (additionalData) { + // Sort keys for consistent hashing + const sortedKeys = Object.keys(additionalData).sort(); + for (const key of sortedKeys) { + baseValues.push(`${key}:${additionalData[key]}`); + } + } + + return this.hash(baseValues); + } +} diff --git a/apps/server/src/core/notification/services/notification-delivery.service.ts b/apps/server/src/core/notification/services/notification-delivery.service.ts new file mode 100644 index 00000000..0af1464a --- /dev/null +++ b/apps/server/src/core/notification/services/notification-delivery.service.ts @@ -0,0 +1,194 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { OnEvent } from '@nestjs/event-emitter'; +import { InjectQueue } from '@nestjs/bullmq'; +import { Queue } from 'bullmq'; +import { QueueName } from '../../../integrations/queue/constants'; +import { WsGateway } from '../../../ws/ws.gateway'; +import { NotificationBatchingService } from './notification-batching.service'; +import { NotificationPreferenceService } from './notification-preference.service'; +import { NotificationRepo } from '@docmost/db/repos/notification/notification.repo'; +import { Notification } from '@docmost/db/types/entity.types'; +import { + NotificationCreatedEvent, + NotificationReadEvent, + NotificationAllReadEvent, + NOTIFICATION_EVENTS, +} from '../events/notification.events'; +import { NotificationType, NotificationPriority } from '../types/notification.types'; + +@Injectable() +export class NotificationDeliveryService { + private readonly logger = new Logger(NotificationDeliveryService.name); + + constructor( + @InjectQueue(QueueName.EMAIL_QUEUE) private readonly mailQueue: Queue, + private readonly wsGateway: WsGateway, + private readonly batchingService: NotificationBatchingService, + private readonly preferenceService: NotificationPreferenceService, + private readonly notificationRepo: NotificationRepo, + ) {} + + @OnEvent(NOTIFICATION_EVENTS.CREATED) + async handleNotificationCreated(event: NotificationCreatedEvent) { + const { notification, workspaceId } = event; + + try { + const decision = await this.preferenceService.makeNotificationDecision( + notification.recipientId, + workspaceId, + notification.type as NotificationType, + notification.priority as NotificationPriority, + ); + + // In-app delivery (always immediate) + if (decision.channels.includes('in_app')) { + await this.deliverInApp(notification, workspaceId); + } + + // Email delivery (may be batched) + if (decision.channels.includes('email')) { + if (decision.batchingEnabled) { + await this.batchingService.addToBatch(notification); + } else { + await this.deliverEmailInstant(notification); + } + } + } catch (error) { + this.logger.error( + `Failed to deliver notification ${notification.id}: ${error instanceof Error ? error.message : String(error)}`, + error instanceof Error ? error.stack : undefined, + ); + } + } + + private async deliverInApp(notification: Notification, workspaceId: string) { + try { + // Send notification via WebSocket to user's workspace room + const notificationData = { + id: notification.id, + type: notification.type, + status: notification.status, + priority: notification.priority, + actorId: notification.actorId, + entityType: notification.entityType, + entityId: notification.entityId, + context: notification.context, + createdAt: notification.createdAt, + }; + + // Emit to user-specific room + this.wsGateway.emitToUser( + notification.recipientId, + 'notification:new', + notificationData, + ); + + // Update unread count + const unreadCount = await this.notificationRepo.getUnreadCount( + notification.recipientId, + ); + this.wsGateway.emitToUser( + notification.recipientId, + 'notification:unreadCount', + { count: unreadCount }, + ); + + // Update delivery status + await this.notificationRepo.updateNotification(notification.id, { + inAppDeliveredAt: new Date(), + }); + + this.logger.debug(`In-app notification delivered: ${notification.id}`); + } catch (error) { + this.logger.error( + `Failed to deliver in-app notification ${notification.id}: ${error instanceof Error ? error.message : String(error)}`, + error instanceof Error ? error.stack : undefined, + ); + } + } + + private async deliverEmailInstant(notification: Notification) { + try { + await this.mailQueue.add( + 'send-notification-email', + { + notificationId: notification.id, + type: notification.type, + }, + { + attempts: 3, + backoff: { + type: 'exponential', + delay: 2000, + }, + }, + ); + + this.logger.debug(`Email notification queued: ${notification.id}`); + } catch (error) { + this.logger.error( + `Failed to queue email notification ${notification.id}: ${error instanceof Error ? error.message : String(error)}`, + error instanceof Error ? error.stack : undefined, + ); + } + } + + @OnEvent(NOTIFICATION_EVENTS.READ) + async handleNotificationRead(event: NotificationReadEvent) { + const { notificationId, userId } = event; + + // Send real-time update to user + this.wsGateway.emitToUser(userId, 'notification:read', { + notificationId, + }); + + // Update unread count + const unreadCount = await this.notificationRepo.getUnreadCount(userId); + this.wsGateway.emitToUser(userId, 'notification:unreadCount', { + count: unreadCount, + }); + } + + @OnEvent(NOTIFICATION_EVENTS.ALL_READ) + async handleAllNotificationsRead(event: NotificationAllReadEvent) { + const { userId, notificationIds } = event; + + // Send real-time update to user + this.wsGateway.emitToUser(userId, 'notification:allRead', { + notificationIds, + }); + + // Update unread count (should be 0) + this.wsGateway.emitToUser(userId, 'notification:unreadCount', { count: 0 }); + } + + /** + * Process email delivery for a notification + * Called by the mail queue processor + */ + async processEmailNotification(notificationId: string) { + const notification = await this.notificationRepo.findById(notificationId); + if (!notification) { + throw new Error(`Notification not found: ${notificationId}`); + } + + // Check if already sent + if (notification.emailSentAt) { + this.logger.debug( + `Notification already sent via email: ${notificationId}`, + ); + return; + } + + // TODO: Load user and workspace data + // TODO: Render appropriate email template based on notification type + // TODO: Send email using mail service + + // For now, just mark as sent + await this.notificationRepo.updateNotification(notificationId, { + emailSentAt: new Date(), + }); + + this.logger.log(`Email notification sent: ${notificationId}`); + } +} \ No newline at end of file diff --git a/apps/server/src/core/notification/services/notification-preference.service.ts b/apps/server/src/core/notification/services/notification-preference.service.ts new file mode 100644 index 00000000..f49a3b15 --- /dev/null +++ b/apps/server/src/core/notification/services/notification-preference.service.ts @@ -0,0 +1,303 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { NotificationPreferenceRepo } from '@docmost/db/repos/notification/notification-preference.repo'; +import { UpdateNotificationPreferencesDto } from '../dto/update-preference.dto'; +import { NotificationPreference } from '@docmost/db/types/entity.types'; +import { + NotificationType, + NotificationPriority, +} from '../types/notification.types'; +import { + addDays, + setHours, + setMinutes, + setSeconds, + getDay, + differenceInMilliseconds, + startOfDay, + addHours +} from 'date-fns'; + +interface NotificationDecision { + shouldNotify: boolean; + channels: ('email' | 'in_app')[]; + delay?: number; + batchingEnabled: boolean; +} + +@Injectable() +export class NotificationPreferenceService { + private readonly logger = new Logger(NotificationPreferenceService.name); + + constructor(private readonly preferenceRepo: NotificationPreferenceRepo) {} + + async getUserPreferences( + userId: string, + workspaceId: string, + ): Promise { + return await this.preferenceRepo.findOrCreate(userId, workspaceId); + } + + async updateUserPreferences( + userId: string, + workspaceId: string, + updates: UpdateNotificationPreferencesDto, + ): Promise { + const existing = await this.getUserPreferences(userId, workspaceId); + + // Merge notification settings if provided + let mergedSettings = existing.notificationSettings; + if (updates.notificationSettings) { + mergedSettings = { + ...((existing.notificationSettings as Record) || {}), + ...(updates.notificationSettings || {}), + }; + } + + // Validate batch window + if (updates.batchWindowMinutes !== undefined) { + updates.batchWindowMinutes = Math.max( + 5, + Math.min(60, updates.batchWindowMinutes), + ); + } + + const updated = await this.preferenceRepo.updatePreference( + userId, + workspaceId, + { + ...updates, + notificationSettings: mergedSettings, + }, + ); + + this.logger.log(`User ${userId} updated notification preferences`, { + userId, + workspaceId, + changes: updates, + }); + + return updated; + } + + async shouldNotify( + recipientId: string, + type: NotificationType, + workspaceId: string, + ): Promise { + const preferences = await this.getUserPreferences(recipientId, workspaceId); + const decision = await this.makeNotificationDecision( + recipientId, + workspaceId, + type, + NotificationPriority.NORMAL, + ); + + return decision.shouldNotify; + } + + async makeNotificationDecision( + userId: string, + workspaceId: string, + type: NotificationType, + priority: NotificationPriority = NotificationPriority.NORMAL, + ): Promise { + const preferences = await this.getUserPreferences(userId, workspaceId); + + // Global check + if (!preferences.emailEnabled && !preferences.inAppEnabled) { + return { + shouldNotify: false, + channels: [], + batchingEnabled: false, + }; + } + + // Type-specific settings + const typeSettings = this.getTypeSettings(preferences, type); + + const channels: ('email' | 'in_app')[] = []; + if (preferences.emailEnabled && typeSettings.email) channels.push('email'); + if (preferences.inAppEnabled && typeSettings.in_app) + channels.push('in_app'); + + if (channels.length === 0) { + return { + shouldNotify: false, + channels: [], + batchingEnabled: false, + }; + } + + // Check quiet hours + const quietHoursDelay = this.calculateQuietHoursDelay( + preferences, + priority, + ); + + // Check weekend preferences + if ( + !preferences.weekendNotifications && + this.isWeekend(preferences.timezone) + ) { + if (priority !== NotificationPriority.HIGH) { + const mondayDelay = this.getDelayUntilMonday(preferences.timezone); + return { + shouldNotify: true, + channels, + delay: mondayDelay, + batchingEnabled: true, + }; + } + } + + return { + shouldNotify: true, + channels, + delay: quietHoursDelay, + batchingEnabled: + typeSettings.batch && preferences.emailFrequency === 'smart', + }; + } + + private getTypeSettings( + preferences: NotificationPreference, + type: NotificationType, + ): any { + const settings = preferences.notificationSettings as any; + return settings[type] || { email: true, in_app: true, batch: false }; + } + + private calculateQuietHoursDelay( + preferences: NotificationPreference, + priority: NotificationPriority, + ): number | undefined { + if ( + !preferences.quietHoursEnabled || + priority === NotificationPriority.HIGH + ) { + return undefined; + } + + // TODO: Implement proper timezone conversion + const now = new Date(); + const quietStart = this.parseTime( + preferences.quietHoursStart, + preferences.timezone, + ); + const quietEnd = this.parseTime( + preferences.quietHoursEnd, + preferences.timezone, + ); + + if (this.isInQuietHours(now, quietStart, quietEnd)) { + return this.getDelayUntilEndOfQuietHours(now, quietEnd); + } + + return undefined; + } + + private parseTime(timeStr: string, timezone: string): Date { + const [hours, minutes, seconds] = timeStr.split(':').map(Number); + // TODO: Implement proper timezone conversion + const now = new Date(); + return setSeconds(setMinutes(setHours(now, hours), minutes), seconds || 0); + } + + private isInQuietHours( + now: Date, + start: Date, + end: Date, + ): boolean { + const nowMinutes = now.getHours() * 60 + now.getMinutes(); + const startMinutes = start.getHours() * 60 + start.getMinutes(); + const endMinutes = end.getHours() * 60 + end.getMinutes(); + + if (startMinutes <= endMinutes) { + // Quiet hours don't cross midnight + return nowMinutes >= startMinutes && nowMinutes < endMinutes; + } else { + // Quiet hours cross midnight + return nowMinutes >= startMinutes || nowMinutes < endMinutes; + } + } + + private getDelayUntilEndOfQuietHours(now: Date, end: Date): number { + let endTime = end; + + // If end time is before current time, it means quiet hours end tomorrow + if ( + end.getHours() < now.getHours() || + (end.getHours() === now.getHours() && end.getMinutes() <= now.getMinutes()) + ) { + endTime = addDays(endTime, 1); + } + + return differenceInMilliseconds(endTime, now); + } + + private isWeekend(timezone: string): boolean { + // TODO: Implement proper timezone conversion + const now = new Date(); + const dayOfWeek = getDay(now); + return dayOfWeek === 0 || dayOfWeek === 6; // 0 = Sunday, 6 = Saturday + } + + private getDelayUntilMonday(timezone: string): number { + // TODO: Implement proper timezone conversion + const now = new Date(); + const currentDay = getDay(now); + const daysUntilMonday = currentDay === 0 ? 1 : (8 - currentDay) % 7 || 7; + const nextMonday = addDays(now, daysUntilMonday); + const mondayMorning = addHours(startOfDay(nextMonday), 9); // 9 AM Monday + return differenceInMilliseconds(mondayMorning, now); + } + + async getNotificationStats( + userId: string, + workspaceId: string, + ): Promise<{ + preferences: NotificationPreference; + stats: { + emailEnabled: boolean; + inAppEnabled: boolean; + quietHoursActive: boolean; + batchingEnabled: boolean; + typesDisabled: string[]; + }; + }> { + const preferences = await this.getUserPreferences(userId, workspaceId); + // TODO: Implement proper timezone conversion + const now = new Date(); + const quietStart = this.parseTime( + preferences.quietHoursStart, + preferences.timezone, + ); + const quietEnd = this.parseTime( + preferences.quietHoursEnd, + preferences.timezone, + ); + + const typesDisabled: string[] = []; + const settings = preferences.notificationSettings as any; + + for (const [type, config] of Object.entries(settings)) { + const typeSettings = config as any; + if (!typeSettings.email && !typeSettings.in_app) { + typesDisabled.push(type); + } + } + + return { + preferences, + stats: { + emailEnabled: preferences.emailEnabled, + inAppEnabled: preferences.inAppEnabled, + quietHoursActive: + preferences.quietHoursEnabled && + this.isInQuietHours(now, quietStart, quietEnd), + batchingEnabled: preferences.emailFrequency !== 'instant', + typesDisabled, + }, + }; + } +} diff --git a/apps/server/src/core/notification/services/notification.service.ts b/apps/server/src/core/notification/services/notification.service.ts new file mode 100644 index 00000000..7956f3b2 --- /dev/null +++ b/apps/server/src/core/notification/services/notification.service.ts @@ -0,0 +1,275 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { EventEmitter2 } from '@nestjs/event-emitter'; +import { NotificationRepo } from '@docmost/db/repos/notification/notification.repo'; +import { NotificationDeduplicationService } from './notification-deduplication.service'; +import { NotificationPreferenceService } from './notification-preference.service'; +import { CreateNotificationDto } from '../dto/create-notification.dto'; +import { Notification } from '@docmost/db/types/entity.types'; +import { + NotificationStatus, + NotificationPriority, + NotificationType, +} from '../types/notification.types'; +import { + NotificationCreatedEvent, + NotificationReadEvent, + NotificationAllReadEvent, + NOTIFICATION_EVENTS, +} from '../events/notification.events'; + +@Injectable() +export class NotificationService { + private readonly logger = new Logger(NotificationService.name); + + constructor( + private readonly notificationRepo: NotificationRepo, + private readonly eventEmitter: EventEmitter2, + private readonly deduplicationService: NotificationDeduplicationService, + private readonly preferenceService: NotificationPreferenceService, + ) {} + + async createNotification( + dto: CreateNotificationDto, + ): Promise { + try { + // Set default priority if not provided + const priority = dto.priority || NotificationPriority.NORMAL; + + // Check user preferences first + const decision = await this.preferenceService.makeNotificationDecision( + dto.recipientId, + dto.workspaceId, + dto.type, + priority, + ); + + if (!decision.shouldNotify) { + this.logger.debug( + `Notification blocked by user preferences: ${dto.type} for ${dto.recipientId}`, + ); + return null; + } + + // Generate deduplication key + let deduplicationKey = dto.deduplicationKey; + if ( + !deduplicationKey && + this.deduplicationService.shouldDeduplicate(dto.type) + ) { + deduplicationKey = + this.deduplicationService.generateDeduplicationKey(dto); + } + + // Check if duplicate + if ( + deduplicationKey && + (await this.notificationRepo.existsByDeduplicationKey(deduplicationKey)) + ) { + this.logger.debug( + `Duplicate notification prevented: ${deduplicationKey}`, + ); + return null; + } + + // Generate group key if not provided + const groupKey = dto.groupKey || this.generateGroupKey(dto); + + // Calculate expiration + const expiresAt = this.calculateExpiration(dto.type); + + // Create notification + const notification = await this.notificationRepo.insertNotification({ + workspaceId: dto.workspaceId, + recipientId: dto.recipientId, + actorId: dto.actorId || null, + type: dto.type, + status: NotificationStatus.UNREAD, + priority, + entityType: dto.entityType, + entityId: dto.entityId, + context: dto.context, + groupKey: groupKey, + groupCount: 1, + deduplicationKey: deduplicationKey, + batchId: null, + isBatched: false, + emailSentAt: null, + inAppDeliveredAt: null, + readAt: null, + expiresAt: expiresAt, + }); + + // Emit event for delivery processing + this.eventEmitter.emit( + NOTIFICATION_EVENTS.CREATED, + new NotificationCreatedEvent(notification, dto.workspaceId), + ); + + this.logger.debug( + `Notification created: ${notification.id} for user ${dto.recipientId}`, + ); + + return notification; + } catch (error) { + this.logger.error( + `Failed to create notification: ${error instanceof Error ? error.message : String(error)}`, + error instanceof Error ? error.stack : undefined, + ); + throw error; + } + } + + async getNotifications( + userId: string, + workspaceId: string, + options: { + status?: NotificationStatus; + limit?: number; + offset?: number; + } = {}, + ): Promise { + return await this.notificationRepo.findByRecipient(userId, options); + } + + async getGroupedNotifications( + userId: string, + workspaceId: string, + options: { + status?: NotificationStatus; + limit?: number; + offset?: number; + } = {}, + ): Promise<{ + notifications: Notification[]; + groups: Map; + }> { + const notifications = await this.getNotifications( + userId, + workspaceId, + options, + ); + + // Group notifications by group_key + const groups = new Map(); + + for (const notification of notifications) { + if (notification.groupKey) { + const group = groups.get(notification.groupKey) || []; + group.push(notification); + groups.set(notification.groupKey, group); + } + } + + return { notifications, groups }; + } + + async markAsRead(notificationId: string, userId: string): Promise { + const notification = await this.notificationRepo.findById(notificationId); + + if (!notification || notification.recipientId !== userId) { + throw new Error('Notification not found or unauthorized'); + } + + if (notification.status === NotificationStatus.READ) { + return; // Already read + } + + await this.notificationRepo.markAsRead(notificationId); + + // Emit event for real-time update + this.eventEmitter.emit( + NOTIFICATION_EVENTS.READ, + new NotificationReadEvent(notificationId, userId), + ); + + this.logger.debug(`Notification marked as read: ${notificationId}`); + } + + async markAllAsRead(userId: string): Promise { + const unreadNotifications = await this.notificationRepo.findByRecipient( + userId, + { + status: NotificationStatus.UNREAD, + }, + ); + + const ids = unreadNotifications.map((n) => n.id); + + if (ids.length > 0) { + await this.notificationRepo.markManyAsRead(ids); + + // Emit event for real-time update + this.eventEmitter.emit( + NOTIFICATION_EVENTS.ALL_READ, + new NotificationAllReadEvent(userId, ids), + ); + + this.logger.debug( + `Marked ${ids.length} notifications as read for user ${userId}`, + ); + } + } + + async getUnreadCount(userId: string): Promise { + return await this.notificationRepo.getUnreadCount(userId); + } + + async deleteExpiredNotifications(): Promise { + const deletedCount = await this.notificationRepo.deleteExpired(); + + if (deletedCount > 0) { + this.logger.log(`Deleted ${deletedCount} expired notifications`); + } + + return deletedCount; + } + + private generateGroupKey(dto: CreateNotificationDto): string { + // Generate a group key based on notification type and entity + return `${dto.type}:${dto.entityType}:${dto.entityId}`; + } + + private calculateExpiration(type: string): Date | null { + // Set expiration based on notification type + const expirationDays = { + [NotificationType.EXPORT_COMPLETED]: 7, // Expire after 7 days + [NotificationType.EXPORT_FAILED]: 3, // Expire after 3 days + [NotificationType.MENTION_IN_PAGE]: 30, // Expire after 30 days + [NotificationType.MENTION_IN_COMMENT]: 30, + [NotificationType.COMMENT_ON_PAGE]: 60, // Expire after 60 days + [NotificationType.REPLY_TO_COMMENT]: 60, + [NotificationType.COMMENT_IN_THREAD]: 60, + [NotificationType.COMMENT_RESOLVED]: 90, // Expire after 90 days + [NotificationType.PAGE_SHARED]: 90, + }; + + const days = expirationDays[type as NotificationType]; + if (!days) { + return null; // No expiration + } + + const expirationDate = new Date(); + expirationDate.setDate(expirationDate.getDate() + days); + return expirationDate; + } + + async createTestNotification( + userId: string, + workspaceId: string, + type: NotificationType, + ): Promise { + return await this.createNotification({ + workspaceId, + recipientId: userId, + actorId: userId, + type, + entityType: 'test', + entityId: 'test-notification', + context: { + message: 'This is a test notification', + timestamp: new Date(), + }, + priority: NotificationPriority.NORMAL, + }); + } +} diff --git a/apps/server/src/core/notification/templates/batch-notification.template.tsx b/apps/server/src/core/notification/templates/batch-notification.template.tsx new file mode 100644 index 00000000..903ebc85 --- /dev/null +++ b/apps/server/src/core/notification/templates/batch-notification.template.tsx @@ -0,0 +1,153 @@ +import * as React from 'react'; +import { Button, Section, Text, Link, Hr, Heading } from '@react-email/components'; +import { MailBody } from '@docmost/transactional/partials/partials'; +import { content, paragraph, button, h1 } from '@docmost/transactional/css/styles'; + +interface NotificationGroup { + type: string; + title: string; + summary: string; + count: number; + actors: string[]; + url: string; + preview: string[]; +} + +interface BatchNotificationEmailProps { + recipientName: string; + groups: NotificationGroup[]; + totalCount: number; + workspaceName: string; + settingsUrl: string; + viewAllUrl: string; +} + +export const BatchNotificationEmail = ({ + recipientName, + groups, + totalCount, + workspaceName, + settingsUrl, + viewAllUrl, +}: BatchNotificationEmailProps) => { + return ( + +
+ Hi {recipientName}, + + + You have {totalCount} new notifications in {workspaceName}: + + + {groups.map((group, index) => ( +
+ + {group.title} + + + + {formatActors(group.actors)} {group.summary} + + + {group.preview.slice(0, 3).map((item, i) => ( + + • {item} + + ))} + + {group.count > 3 && ( + + And {group.count - 3} more... + + )} + + +
+ ))} + +
+ + + + + You received this because you have smart notifications enabled.{' '} + + Manage your preferences + + +
+
+ ); +}; + +function formatActors(actors: string[]): string { + if (actors.length === 0) return ''; + if (actors.length === 1) return actors[0]; + if (actors.length === 2) return `${actors[0]} and ${actors[1]}`; + return `${actors[0]}, ${actors[1]} and ${actors.length - 2} others`; +} + +const notificationGroup: React.CSSProperties = { + backgroundColor: '#f9f9f9', + borderRadius: '4px', + padding: '16px', + marginBottom: '16px', +}; + +const groupTitle: React.CSSProperties = { + ...paragraph, + fontSize: '16px', + fontWeight: 'bold', + marginBottom: '8px', +}; + +const actorList: React.CSSProperties = { + ...paragraph, + marginBottom: '12px', +}; + +const notificationItem: React.CSSProperties = { + ...paragraph, + marginLeft: '8px', + marginBottom: '4px', + color: '#666', +}; + +const moreText: React.CSSProperties = { + ...paragraph, + fontStyle: 'italic', + color: '#999', + marginLeft: '8px', + marginBottom: '12px', +}; + +const viewButton: React.CSSProperties = { + ...button, + width: 'auto', + padding: '8px 16px', + fontSize: '14px', + marginTop: '8px', +}; + +const viewAllButton: React.CSSProperties = { + ...button, + width: 'auto', + padding: '12px 24px', + margin: '16px auto', +}; + +const divider: React.CSSProperties = { + borderColor: '#e0e0e0', + margin: '24px 0', +}; + +const footerText: React.CSSProperties = { + ...paragraph, + fontSize: '12px', + color: '#666', + marginTop: '24px', +}; \ No newline at end of file diff --git a/apps/server/src/core/notification/templates/comment-on-page.template.tsx b/apps/server/src/core/notification/templates/comment-on-page.template.tsx new file mode 100644 index 00000000..16bc00ba --- /dev/null +++ b/apps/server/src/core/notification/templates/comment-on-page.template.tsx @@ -0,0 +1,72 @@ +import * as React from 'react'; +import { Button, Section, Text, Link } from '@react-email/components'; +import { MailBody } from '../../../integrations/transactional/partials/partials'; +import { content, paragraph, button, h1 } from '../../../integrations/transactional/css/styles'; + +interface CommentOnPageEmailProps { + recipientName: string; + actorName: string; + pageTitle: string; + commentExcerpt: string; + pageUrl: string; + workspaceName: string; + settingsUrl: string; +} + +export const CommentOnPageEmail = ({ + recipientName, + actorName, + pageTitle, + commentExcerpt, + pageUrl, + workspaceName, + settingsUrl, +}: CommentOnPageEmailProps) => { + return ( + +
+ Hi {recipientName}, + + + {actorName} commented on "{pageTitle}": + + +
+ + {commentExcerpt} + +
+ + + + + This notification was sent from {workspaceName}.{' '} + + Manage your notification preferences + + +
+
+ ); +}; + +const commentSection: React.CSSProperties = { + backgroundColor: '#f5f5f5', + borderRadius: '4px', + padding: '16px', + margin: '16px 0', +}; + +const commentText: React.CSSProperties = { + ...paragraph, + margin: 0, +}; + +const footerText: React.CSSProperties = { + ...paragraph, + fontSize: '12px', + color: '#666', + marginTop: '24px', +}; \ No newline at end of file diff --git a/apps/server/src/core/notification/templates/export-completed.template.tsx b/apps/server/src/core/notification/templates/export-completed.template.tsx new file mode 100644 index 00000000..c83c79ef --- /dev/null +++ b/apps/server/src/core/notification/templates/export-completed.template.tsx @@ -0,0 +1,117 @@ +import * as React from 'react'; +import { Button, Section, Text, Link, Row, Column } from '@react-email/components'; +import { MailBody } from '../../../integrations/transactional/partials/partials'; +import { content, paragraph, button, h1 } from '../../../integrations/transactional/css/styles'; + +interface ExportCompletedEmailProps { + recipientName: string; + exportType: string; + entityName: string; + fileSize: string; + downloadUrl: string; + expiresAt: string; + workspaceName: string; + settingsUrl: string; +} + +export const ExportCompletedEmail = ({ + recipientName, + exportType, + entityName, + fileSize, + downloadUrl, + expiresAt, + workspaceName, + settingsUrl, +}: ExportCompletedEmailProps) => { + return ( + +
+ Export Complete! + + + Hi {recipientName}, + + + + Your {exportType.toUpperCase()} export of "{entityName}" has been completed successfully. + + +
+ + File Size: + {fileSize} + + + Format: + {exportType.toUpperCase()} + + + Expires: + {expiresAt} + +
+ + + + + ⚠️ This download link will expire on {expiresAt}. + Please download your file before then. + + + + This notification was sent from {workspaceName}.{' '} + + Manage your notification preferences + + +
+
+ ); +}; + +const exportDetails: React.CSSProperties = { + backgroundColor: '#f5f5f5', + borderRadius: '4px', + padding: '16px', + margin: '16px 0', +}; + +const detailLabel: React.CSSProperties = { + ...paragraph, + fontWeight: 'bold', + width: '120px', + paddingBottom: '8px', +}; + +const detailValue: React.CSSProperties = { + ...paragraph, + paddingBottom: '8px', +}; + +const downloadButton: React.CSSProperties = { + ...button, + backgroundColor: '#28a745', + width: 'auto', + padding: '12px 24px', + margin: '0 auto', +}; + +const warningText: React.CSSProperties = { + ...paragraph, + backgroundColor: '#fff3cd', + border: '1px solid #ffeeba', + borderRadius: '4px', + color: '#856404', + padding: '12px', + marginTop: '16px', +}; + +const footerText: React.CSSProperties = { + ...paragraph, + fontSize: '12px', + color: '#666', + marginTop: '24px', +}; \ No newline at end of file diff --git a/apps/server/src/core/notification/templates/mention-in-comment.template.tsx b/apps/server/src/core/notification/templates/mention-in-comment.template.tsx new file mode 100644 index 00000000..89cf2762 --- /dev/null +++ b/apps/server/src/core/notification/templates/mention-in-comment.template.tsx @@ -0,0 +1,93 @@ +import * as React from 'react'; +import { Button, Section, Text, Link } from '@react-email/components'; +import { MailBody } from '../../../integrations/transactional/partials/partials'; +import { content, paragraph, button, h1 } from '../../../integrations/transactional/css/styles'; + +interface MentionInCommentEmailProps { + recipientName: string; + actorName: string; + pageTitle: string; + commentExcerpt: string; + mentionContext: string; + commentUrl: string; + workspaceName: string; + settingsUrl: string; +} + +export const MentionInCommentEmail = ({ + recipientName, + actorName, + pageTitle, + commentExcerpt, + mentionContext, + commentUrl, + workspaceName, + settingsUrl, +}: MentionInCommentEmailProps) => { + return ( + +
+ Hi {recipientName}, + + + {actorName} mentioned you in a comment on "{pageTitle}": + + +
+ {actorName} commented: + + {commentExcerpt} + + {mentionContext && ( + + Context: ...{mentionContext}... + + )} +
+ + + + + This notification was sent from {workspaceName}.{' '} + + Manage your notification preferences + + +
+
+ ); +}; + +const commentSection: React.CSSProperties = { + backgroundColor: '#f5f5f5', + borderRadius: '4px', + padding: '16px', + margin: '16px 0', +}; + +const commentAuthor: React.CSSProperties = { + ...paragraph, + fontWeight: 'bold', + marginBottom: '8px', +}; + +const commentText: React.CSSProperties = { + ...paragraph, + margin: '0 0 8px 0', +}; + +const mentionHighlight: React.CSSProperties = { + ...paragraph, + fontStyle: 'italic', + color: '#666', + margin: 0, +}; + +const footerText: React.CSSProperties = { + ...paragraph, + fontSize: '12px', + color: '#666', + marginTop: '24px', +}; \ No newline at end of file diff --git a/apps/server/src/core/notification/templates/mention-in-page.template.tsx b/apps/server/src/core/notification/templates/mention-in-page.template.tsx new file mode 100644 index 00000000..b52064a6 --- /dev/null +++ b/apps/server/src/core/notification/templates/mention-in-page.template.tsx @@ -0,0 +1,73 @@ +import * as React from 'react'; +import { Button, Section, Text, Link } from '@react-email/components'; +import { MailBody } from '../../../integrations/transactional/partials/partials'; +import { content, paragraph, button, h1 } from '../../../integrations/transactional/css/styles'; + +interface MentionInPageEmailProps { + recipientName: string; + actorName: string; + pageTitle: string; + mentionContext: string; + pageUrl: string; + workspaceName: string; + settingsUrl: string; +} + +export const MentionInPageEmail = ({ + recipientName, + actorName, + pageTitle, + mentionContext, + pageUrl, + workspaceName, + settingsUrl, +}: MentionInPageEmailProps) => { + return ( + +
+ Hi {recipientName}, + + + {actorName} mentioned you in the page "{pageTitle}": + + +
+ + ...{mentionContext}... + +
+ + + + + This notification was sent from {workspaceName}.{' '} + + Manage your notification preferences + + +
+
+ ); +}; + +const mentionSection: React.CSSProperties = { + backgroundColor: '#f5f5f5', + borderRadius: '4px', + padding: '16px', + margin: '16px 0', +}; + +const mentionText: React.CSSProperties = { + ...paragraph, + fontStyle: 'italic', + margin: 0, +}; + +const footerText: React.CSSProperties = { + ...paragraph, + fontSize: '12px', + color: '#666', + marginTop: '24px', +}; \ No newline at end of file diff --git a/apps/server/src/core/notification/types/notification.types.ts b/apps/server/src/core/notification/types/notification.types.ts new file mode 100644 index 00000000..8322fd93 --- /dev/null +++ b/apps/server/src/core/notification/types/notification.types.ts @@ -0,0 +1,220 @@ +export enum NotificationType { + // Mentions + MENTION_IN_PAGE = 'mention_in_page', + MENTION_IN_COMMENT = 'mention_in_comment', + + // Comments + COMMENT_ON_PAGE = 'comment_on_page', + REPLY_TO_COMMENT = 'reply_to_comment', + COMMENT_IN_THREAD = 'comment_in_thread', + COMMENT_RESOLVED = 'comment_resolved', + + // Exports + EXPORT_COMPLETED = 'export_completed', + EXPORT_FAILED = 'export_failed', + + // Pages + PAGE_SHARED = 'page_shared', + PAGE_UPDATED = 'page_updated', + + // Tasks (Future) + TASK_ASSIGNED = 'task_assigned', + TASK_DUE = 'task_due', + TASK_COMPLETED = 'task_completed', + + // System + SYSTEM_UPDATE = 'system_update', + SYSTEM_ANNOUNCEMENT = 'system_announcement', +} + +export enum NotificationStatus { + UNREAD = 'unread', + READ = 'read', + ARCHIVED = 'archived', +} + +export enum NotificationPriority { + HIGH = 'high', + NORMAL = 'normal', + LOW = 'low', +} + +export enum EmailFrequency { + INSTANT = 'instant', + SMART = 'smart', + DIGEST_DAILY = 'digest_daily', + DIGEST_WEEKLY = 'digest_weekly', +} + +export interface NotificationTypeSettings { + email: boolean; + in_app: boolean; + batch: boolean; +} + +export enum BatchType { + SIMILAR_ACTIVITY = 'similar_activity', + DIGEST = 'digest', + GROUPED_MENTIONS = 'grouped_mentions', +} + +export enum AggregationType { + COMMENTS_ON_PAGE = 'comments_on_page', + MENTIONS_IN_PAGE = 'mentions_in_page', + MENTIONS_IN_COMMENTS = 'mentions_in_comments', + THREAD_ACTIVITY = 'thread_activity', +} + +export interface AggregationSummaryData { + total_count: number; + actor_count: number; + first_actor_id: string; + recent_actors: string[]; + time_span: { + start: Date; + end: Date; + }; + [key: string]: any; // Allow additional type-specific data +} + +export interface AggregatedNotificationMessage { + id: string; + title: string; + message: string; + actors: Array<{ + id: string; + name: string; + avatarUrl?: string; + }>; + totalCount: number; + entityId: string; + entityType: string; + createdAt: Date; + updatedAt: Date; +} + +// Context interfaces for different notification types +export interface MentionInPageContext { + pageId: string; + pageTitle: string; + mentionContext: string; + mentionBy: string; +} + +export interface MentionInCommentContext { + pageId: string; + pageTitle: string; + commentId: string; + commentText: string; + mentionBy: string; +} + +export interface CommentOnPageContext { + pageId: string; + pageTitle: string; + commentId: string; + commentText: string; + commentBy: string; +} + +export interface ReplyToCommentContext { + pageId: string; + pageTitle: string; + commentId: string; + parentCommentId: string; + commentText: string; + replyBy: string; +} + +export interface CommentInThreadContext { + pageId: string; + pageTitle: string; + threadId: string; + commentId: string; + commentText: string; + commentBy: string; +} + +export interface CommentResolvedContext { + pageId: string; + pageTitle: string; + threadId: string; + resolvedBy: string; +} + +export interface ExportCompletedContext { + exportId: string; + exportType: string; + downloadUrl: string; + expiresAt: string; +} + +export interface ExportFailedContext { + exportId: string; + exportType: string; + errorMessage: string; +} + +export interface PageSharedContext { + pageId: string; + pageTitle: string; + sharedBy: string; + sharedWith: string[]; + permissions: string[]; +} + +export interface PageUpdatedContext { + pageId: string; + pageTitle: string; + updatedBy: string; + changeType: 'content' | 'title' | 'permissions'; +} + +export interface TaskAssignedContext { + taskId: string; + taskTitle: string; + assignedBy: string; + dueDate?: string; +} + +export interface TaskDueContext { + taskId: string; + taskTitle: string; + dueDate: string; + daysOverdue?: number; +} + +export interface TaskCompletedContext { + taskId: string; + taskTitle: string; + completedBy: string; +} + +export interface SystemUpdateContext { + updateType: string; + version?: string; + description: string; +} + +export interface SystemAnnouncementContext { + message: string; + link?: string; +} + +export type NotificationContext = + | MentionInPageContext + | MentionInCommentContext + | CommentOnPageContext + | ReplyToCommentContext + | CommentInThreadContext + | CommentResolvedContext + | ExportCompletedContext + | ExportFailedContext + | PageSharedContext + | PageUpdatedContext + | TaskAssignedContext + | TaskDueContext + | TaskCompletedContext + | SystemUpdateContext + | SystemAnnouncementContext + | Record; \ No newline at end of file diff --git a/apps/server/src/database/database.module.ts b/apps/server/src/database/database.module.ts index 68c35dd3..73ea55d1 100644 --- a/apps/server/src/database/database.module.ts +++ b/apps/server/src/database/database.module.ts @@ -25,6 +25,10 @@ import { MigrationService } from '@docmost/db/services/migration.service'; import { UserTokenRepo } from './repos/user-token/user-token.repo'; import { BacklinkRepo } from '@docmost/db/repos/backlink/backlink.repo'; import { ShareRepo } from '@docmost/db/repos/share/share.repo'; +import { NotificationRepo } from './repos/notification/notification.repo'; +import { NotificationPreferenceRepo } from './repos/notification/notification-preference.repo'; +import { NotificationBatchRepo } from './repos/notification/notification-batch.repo'; +import { NotificationAggregationRepo } from './repos/notification/notification-aggregation.repo'; // https://github.com/brianc/node-postgres/issues/811 types.setTypeParser(types.builtins.INT8, (val) => Number(val)); @@ -75,7 +79,11 @@ types.setTypeParser(types.builtins.INT8, (val) => Number(val)); AttachmentRepo, UserTokenRepo, BacklinkRepo, - ShareRepo + ShareRepo, + NotificationRepo, + NotificationPreferenceRepo, + NotificationBatchRepo, + NotificationAggregationRepo, ], exports: [ WorkspaceRepo, @@ -90,7 +98,11 @@ types.setTypeParser(types.builtins.INT8, (val) => Number(val)); AttachmentRepo, UserTokenRepo, BacklinkRepo, - ShareRepo + ShareRepo, + NotificationRepo, + NotificationPreferenceRepo, + NotificationBatchRepo, + NotificationAggregationRepo, ], }) export class DatabaseModule diff --git a/apps/server/src/database/migrations/20250710T092527-create-notification-tables.ts b/apps/server/src/database/migrations/20250710T092527-create-notification-tables.ts new file mode 100644 index 00000000..1d5fd3b8 --- /dev/null +++ b/apps/server/src/database/migrations/20250710T092527-create-notification-tables.ts @@ -0,0 +1,233 @@ +import { Kysely, sql } from 'kysely'; + +export async function up(db: Kysely): Promise { + // Create notifications table + await db.schema + .createTable('notifications') + .addColumn('id', 'uuid', (col) => + col.primaryKey().defaultTo(sql`gen_uuid_v7()`), + ) + .addColumn('workspace_id', 'uuid', (col) => + col.notNull().references('workspaces.id').onDelete('cascade'), + ) + .addColumn('recipient_id', 'uuid', (col) => + col.notNull().references('users.id').onDelete('cascade'), + ) + .addColumn('actor_id', 'uuid', (col) => + col.references('users.id').onDelete('set null'), + ) + .addColumn('type', 'varchar(50)', (col) => col.notNull()) + .addColumn('status', 'varchar(20)', (col) => + col.notNull().defaultTo('unread'), + ) + .addColumn('priority', 'varchar(20)', (col) => + col.notNull().defaultTo('normal'), + ) + .addColumn('entity_type', 'varchar(50)', (col) => col.notNull()) + .addColumn('entity_id', 'uuid', (col) => col.notNull()) + .addColumn('context', 'jsonb', (col) => col.notNull().defaultTo('{}')) + .addColumn('group_key', 'varchar(255)') + .addColumn('group_count', 'integer', (col) => col.defaultTo(1)) + .addColumn('deduplication_key', 'varchar(255)') + .addColumn('batch_id', 'uuid') + .addColumn('is_batched', 'boolean', (col) => col.defaultTo(false)) + .addColumn('email_sent_at', 'timestamp') + .addColumn('in_app_delivered_at', 'timestamp') + .addColumn('read_at', 'timestamp') + .addColumn('created_at', 'timestamp', (col) => + col.notNull().defaultTo(sql`CURRENT_TIMESTAMP`), + ) + .addColumn('updated_at', 'timestamp', (col) => + col.notNull().defaultTo(sql`CURRENT_TIMESTAMP`), + ) + .addColumn('expires_at', 'timestamp') + .execute(); + + // Create indexes for notifications + await db.schema + .createIndex('idx_notifications_recipient_status') + .on('notifications') + .columns(['recipient_id', 'status']) + .execute(); + + await db.schema + .createIndex('idx_notifications_group_key') + .on('notifications') + .columns(['group_key', 'created_at']) + .execute(); + + await db.schema + .createIndex('idx_notifications_batch_pending') + .on('notifications') + .columns(['batch_id', 'is_batched']) + .where('is_batched', '=', false) + .execute(); + + await db.schema + .createIndex('idx_notifications_expires') + .on('notifications') + .column('expires_at') + .where('expires_at', 'is not', null) + .execute(); + + await db.schema + .createIndex('idx_notifications_deduplication') + .unique() + .on('notifications') + .column('deduplication_key') + .where('deduplication_key', 'is not', null) + .execute(); + + // Create notification_preferences table + await db.schema + .createTable('notification_preferences') + .addColumn('id', 'uuid', (col) => + col.primaryKey().defaultTo(sql`gen_uuid_v7()`), + ) + .addColumn('user_id', 'uuid', (col) => + col.notNull().references('users.id').onDelete('cascade'), + ) + .addColumn('workspace_id', 'uuid', (col) => + col.notNull().references('workspaces.id').onDelete('cascade'), + ) + .addColumn('email_enabled', 'boolean', (col) => + col.notNull().defaultTo(true), + ) + .addColumn('in_app_enabled', 'boolean', (col) => + col.notNull().defaultTo(true), + ) + .addColumn('notification_settings', 'jsonb', (col) => + col.notNull().defaultTo(sql`'{ + "mention_in_page": {"email": true, "in_app": true, "batch": false}, + "mention_in_comment": {"email": true, "in_app": true, "batch": false}, + "comment_on_page": {"email": true, "in_app": true, "batch": true}, + "reply_to_comment": {"email": true, "in_app": true, "batch": false}, + "comment_in_thread": {"email": true, "in_app": true, "batch": true}, + "comment_resolved": {"email": true, "in_app": true, "batch": true}, + "export_completed": {"email": true, "in_app": true, "batch": false}, + "page_shared": {"email": true, "in_app": true, "batch": true}, + "task_assigned": {"email": true, "in_app": true, "batch": false} + }'::jsonb`), + ) + .addColumn('batch_window_minutes', 'integer', (col) => col.defaultTo(15)) + .addColumn('max_batch_size', 'integer', (col) => col.defaultTo(20)) + .addColumn('batch_types', sql`text[]`, (col) => + col.defaultTo( + sql`ARRAY['comment_on_page', 'comment_in_thread', 'comment_resolved']`, + ), + ) + .addColumn('email_frequency', 'varchar(20)', (col) => + col.notNull().defaultTo('smart'), + ) + .addColumn('digest_time', 'time', (col) => col.defaultTo('09:00:00')) + .addColumn('quiet_hours_enabled', 'boolean', (col) => col.defaultTo(false)) + .addColumn('quiet_hours_start', 'time', (col) => col.defaultTo('18:00:00')) + .addColumn('quiet_hours_end', 'time', (col) => col.defaultTo('09:00:00')) + .addColumn('timezone', 'varchar(50)', (col) => col.defaultTo('UTC')) + .addColumn('weekend_notifications', 'boolean', (col) => col.defaultTo(true)) + .addColumn('created_at', 'timestamp', (col) => + col.notNull().defaultTo(sql`CURRENT_TIMESTAMP`), + ) + .addColumn('updated_at', 'timestamp', (col) => + col.notNull().defaultTo(sql`CURRENT_TIMESTAMP`), + ) + .execute(); + + // Create unique index for user_workspace + await db.schema + .createIndex('idx_notification_preferences_user_workspace') + .unique() + .on('notification_preferences') + .columns(['user_id', 'workspace_id']) + .execute(); + + // Create notification_batches table + await db.schema + .createTable('notification_batches') + .addColumn('id', 'uuid', (col) => + col.primaryKey().defaultTo(sql`gen_uuid_v7()`), + ) + .addColumn('recipient_id', 'uuid', (col) => + col.notNull().references('users.id').onDelete('cascade'), + ) + .addColumn('workspace_id', 'uuid', (col) => + col.notNull().references('workspaces.id').onDelete('cascade'), + ) + .addColumn('batch_type', 'varchar(50)', (col) => col.notNull()) + .addColumn('batch_key', 'varchar(255)', (col) => col.notNull()) + .addColumn('notification_count', 'integer', (col) => + col.notNull().defaultTo(0), + ) + .addColumn('first_notification_id', 'uuid', (col) => + col.references('notifications.id').onDelete('set null'), + ) + .addColumn('scheduled_for', 'timestamp', (col) => col.notNull()) + .addColumn('sent_at', 'timestamp') + .addColumn('created_at', 'timestamp', (col) => + col.notNull().defaultTo(sql`CURRENT_TIMESTAMP`), + ) + .execute(); + + // Create indexes for notification_batches + await db.schema + .createIndex('idx_notification_batches_scheduled_pending') + .on('notification_batches') + .columns(['scheduled_for', 'sent_at']) + .where('sent_at', 'is', null) + .execute(); + + await db.schema + .createIndex('idx_notification_batches_batch_key') + .on('notification_batches') + .columns(['batch_key', 'recipient_id']) + .execute(); + + // Create notification_aggregations table + await db.schema + .createTable('notification_aggregations') + .addColumn('id', 'uuid', (col) => + col.primaryKey().defaultTo(sql`gen_uuid_v7()`), + ) + .addColumn('aggregation_key', 'varchar(255)', (col) => col.notNull()) + .addColumn('recipient_id', 'uuid', (col) => + col.notNull().references('users.id').onDelete('cascade'), + ) + .addColumn('aggregation_type', 'varchar(50)', (col) => col.notNull()) + .addColumn('entity_type', 'varchar(50)', (col) => col.notNull()) + .addColumn('entity_id', 'uuid', (col) => col.notNull()) + .addColumn('actor_ids', sql`uuid[]`, (col) => + col.notNull().defaultTo(sql`'{}'`), + ) + .addColumn('notification_ids', sql`uuid[]`, (col) => + col.notNull().defaultTo(sql`'{}'`), + ) + .addColumn('summary_data', 'jsonb', (col) => col.notNull().defaultTo('{}')) + .addColumn('created_at', 'timestamp', (col) => + col.notNull().defaultTo(sql`CURRENT_TIMESTAMP`), + ) + .addColumn('updated_at', 'timestamp', (col) => + col.notNull().defaultTo(sql`CURRENT_TIMESTAMP`), + ) + .execute(); + + // Create indexes for notification_aggregations + await db.schema + .createIndex('idx_notification_aggregations_key') + .unique() + .on('notification_aggregations') + .column('aggregation_key') + .execute(); + + await db.schema + .createIndex('idx_notification_aggregations_recipient_updated') + .on('notification_aggregations') + .columns(['recipient_id', 'updated_at']) + .execute(); +} + +export async function down(db: Kysely): Promise { + await db.schema.dropTable('notification_aggregations').ifExists().execute(); + await db.schema.dropTable('notification_batches').ifExists().execute(); + await db.schema.dropTable('notification_preferences').ifExists().execute(); + await db.schema.dropTable('notifications').ifExists().execute(); +} diff --git a/apps/server/src/database/repos/notification/notification-aggregation.repo.ts b/apps/server/src/database/repos/notification/notification-aggregation.repo.ts new file mode 100644 index 00000000..f09a684b --- /dev/null +++ b/apps/server/src/database/repos/notification/notification-aggregation.repo.ts @@ -0,0 +1,125 @@ +import { Injectable } from '@nestjs/common'; +import { InjectKysely } from 'nestjs-kysely'; +import { KyselyDB } from '@docmost/db/types/kysely.types'; +import { + NotificationAggregation, + InsertableNotificationAggregation, + UpdatableNotificationAggregation +} from '@docmost/db/types/entity.types'; + +@Injectable() +export class NotificationAggregationRepo { + constructor(@InjectKysely() private readonly db: KyselyDB) {} + + async insertAggregation(aggregation: InsertableNotificationAggregation): Promise { + return await this.db + .insertInto('notificationAggregations') + .values(aggregation) + .returningAll() + .executeTakeFirstOrThrow(); + } + + async findById(id: string): Promise { + return await this.db + .selectFrom('notificationAggregations') + .selectAll() + .where('id', '=', id) + .executeTakeFirst(); + } + + async findByKey(aggregationKey: string): Promise { + return await this.db + .selectFrom('notificationAggregations') + .selectAll() + .where('aggregationKey', '=', aggregationKey) + .executeTakeFirst(); + } + + async updateAggregation( + aggregationKey: string, + update: UpdatableNotificationAggregation + ): Promise { + return await this.db + .updateTable('notificationAggregations') + .set({ + ...update, + updatedAt: new Date(), + }) + .where('aggregationKey', '=', aggregationKey) + .returningAll() + .executeTakeFirstOrThrow(); + } + + async addNotificationToAggregation( + aggregationKey: string, + notificationId: string, + actorId?: string + ): Promise { + const aggregation = await this.findByKey(aggregationKey); + if (!aggregation) { + throw new Error(`Aggregation not found: ${aggregationKey}`); + } + + const updates: UpdatableNotificationAggregation = { + notificationIds: [...aggregation.notificationIds, notificationId], + updatedAt: new Date(), + }; + + if (actorId && !aggregation.actorIds.includes(actorId)) { + updates.actorIds = [...aggregation.actorIds, actorId]; + } + + // Update summary data + updates.summaryData = { + ...(aggregation.summaryData as Record || {}), + totalCount: aggregation.notificationIds.length + 1, + actorCount: updates.actorIds?.length || aggregation.actorIds.length, + timeSpan: { + ...((aggregation.summaryData as any).timeSpan || {}), + end: new Date(), + }, + }; + + await this.updateAggregation(aggregationKey, updates); + } + + async findRecentByRecipient( + recipientId: string, + limit = 10 + ): Promise { + return await this.db + .selectFrom('notificationAggregations') + .selectAll() + .where('recipientId', '=', recipientId) + .orderBy('updatedAt', 'desc') + .limit(limit) + .execute(); + } + + async deleteOldAggregations(olderThan: Date): Promise { + const result = await this.db + .deleteFrom('notificationAggregations') + .where('updatedAt', '<', olderThan) + .execute(); + + return Number(result[0].numDeletedRows); + } + + async getAggregationsByEntity( + entityType: string, + entityId: string, + recipientId?: string + ): Promise { + let query = this.db + .selectFrom('notificationAggregations') + .selectAll() + .where('entityType', '=', entityType) + .where('entityId', '=', entityId); + + if (recipientId) { + query = query.where('recipientId', '=', recipientId); + } + + return await query.orderBy('updatedAt', 'desc').execute(); + } +} \ No newline at end of file diff --git a/apps/server/src/database/repos/notification/notification-batch.repo.ts b/apps/server/src/database/repos/notification/notification-batch.repo.ts new file mode 100644 index 00000000..52d8229c --- /dev/null +++ b/apps/server/src/database/repos/notification/notification-batch.repo.ts @@ -0,0 +1,110 @@ +import { Injectable } from '@nestjs/common'; +import { InjectKysely } from 'nestjs-kysely'; +import { KyselyDB } from '@docmost/db/types/kysely.types'; +import { + NotificationBatch, + InsertableNotificationBatch, + UpdatableNotificationBatch +} from '@docmost/db/types/entity.types'; + +@Injectable() +export class NotificationBatchRepo { + constructor(@InjectKysely() private readonly db: KyselyDB) {} + + async insertBatch(batch: InsertableNotificationBatch): Promise { + return await this.db + .insertInto('notificationBatches') + .values(batch) + .returningAll() + .executeTakeFirstOrThrow(); + } + + async findById(id: string): Promise { + return await this.db + .selectFrom('notificationBatches') + .selectAll() + .where('id', '=', id) + .executeTakeFirst(); + } + + async findByBatchKey( + batchKey: string, + recipientId: string, + notSentOnly = true + ): Promise { + let query = this.db + .selectFrom('notificationBatches') + .selectAll() + .where('batchKey', '=', batchKey) + .where('recipientId', '=', recipientId); + + if (notSentOnly) { + query = query.where('sentAt', 'is', null); + } + + return await query.executeTakeFirst(); + } + + async getPendingBatches(limit = 100): Promise { + return await this.db + .selectFrom('notificationBatches') + .selectAll() + .where('sentAt', 'is', null) + .where('scheduledFor', '<=', new Date()) + .orderBy('scheduledFor', 'asc') + .limit(limit) + .execute(); + } + + async updateBatch(id: string, update: UpdatableNotificationBatch): Promise { + return await this.db + .updateTable('notificationBatches') + .set(update) + .where('id', '=', id) + .returningAll() + .executeTakeFirstOrThrow(); + } + + async markAsSent(id: string): Promise { + await this.db + .updateTable('notificationBatches') + .set({ + sentAt: new Date(), + }) + .where('id', '=', id) + .execute(); + } + + async incrementNotificationCount(id: string): Promise { + await this.db + .updateTable('notificationBatches') + .set((eb) => ({ + notificationCount: eb('notificationCount', '+', 1), + })) + .where('id', '=', id) + .execute(); + } + + async deleteOldBatches(olderThan: Date): Promise { + const result = await this.db + .deleteFrom('notificationBatches') + .where('sentAt', '<', olderThan) + .execute(); + + return Number(result[0].numDeletedRows); + } + + async getScheduledBatchesForUser( + recipientId: string, + workspaceId: string + ): Promise { + return await this.db + .selectFrom('notificationBatches') + .selectAll() + .where('recipientId', '=', recipientId) + .where('workspaceId', '=', workspaceId) + .where('sentAt', 'is', null) + .orderBy('scheduledFor', 'asc') + .execute(); + } +} \ No newline at end of file diff --git a/apps/server/src/database/repos/notification/notification-preference.repo.ts b/apps/server/src/database/repos/notification/notification-preference.repo.ts new file mode 100644 index 00000000..984ef739 --- /dev/null +++ b/apps/server/src/database/repos/notification/notification-preference.repo.ts @@ -0,0 +1,134 @@ +import { Injectable } from '@nestjs/common'; +import { InjectKysely } from 'nestjs-kysely'; +import { KyselyDB } from '@docmost/db/types/kysely.types'; +import { sql } from 'kysely'; +import { + NotificationPreference, + InsertableNotificationPreference, + UpdatableNotificationPreference +} from '@docmost/db/types/entity.types'; + +export const DEFAULT_NOTIFICATION_SETTINGS = { + mention_in_page: { email: true, in_app: true, batch: false }, + mention_in_comment: { email: true, in_app: true, batch: false }, + comment_on_page: { email: true, in_app: true, batch: true }, + reply_to_comment: { email: true, in_app: true, batch: false }, + comment_in_thread: { email: true, in_app: true, batch: true }, + comment_resolved: { email: true, in_app: true, batch: true }, + export_completed: { email: true, in_app: true, batch: false }, + export_failed: { email: true, in_app: true, batch: false }, + page_shared: { email: true, in_app: true, batch: true }, + page_updated: { email: false, in_app: true, batch: true }, + task_assigned: { email: true, in_app: true, batch: false }, + task_due_soon: { email: true, in_app: true, batch: false }, +}; + +@Injectable() +export class NotificationPreferenceRepo { + constructor(@InjectKysely() private readonly db: KyselyDB) {} + + async insertPreference(preference: InsertableNotificationPreference): Promise { + return await this.db + .insertInto('notificationPreferences') + .values(preference) + .returningAll() + .executeTakeFirstOrThrow(); + } + + async findByUserAndWorkspace( + userId: string, + workspaceId: string + ): Promise { + return await this.db + .selectFrom('notificationPreferences') + .selectAll() + .where('userId', '=', userId) + .where('workspaceId', '=', workspaceId) + .executeTakeFirst(); + } + + async findOrCreate( + userId: string, + workspaceId: string + ): Promise { + const existing = await this.findByUserAndWorkspace(userId, workspaceId); + if (existing) { + return existing; + } + + return await this.insertPreference({ + userId: userId, + workspaceId: workspaceId, + emailEnabled: true, + inAppEnabled: true, + notificationSettings: DEFAULT_NOTIFICATION_SETTINGS, + batchWindowMinutes: 15, + maxBatchSize: 20, + batchTypes: ['comment_on_page', 'comment_in_thread', 'comment_resolved'], + emailFrequency: 'smart', + digestTime: '09:00:00', + quietHoursEnabled: false, + quietHoursStart: '18:00:00', + quietHoursEnd: '09:00:00', + timezone: 'UTC', + weekendNotifications: true, + }); + } + + async updatePreference( + userId: string, + workspaceId: string, + update: UpdatableNotificationPreference + ): Promise { + return await this.db + .updateTable('notificationPreferences') + .set({ + ...update, + updatedAt: new Date(), + }) + .where('userId', '=', userId) + .where('workspaceId', '=', workspaceId) + .returningAll() + .executeTakeFirstOrThrow(); + } + + async findUsersWithBatchingEnabled( + workspaceId: string, + notificationType: string + ): Promise { + return await this.db + .selectFrom('notificationPreferences') + .selectAll() + .where('workspaceId', '=', workspaceId) + .where('emailEnabled', '=', true) + .where('emailFrequency', '!=', 'instant') + .where( + sql`notification_settings::jsonb->'${sql.raw(notificationType)}'->>'batch' = 'true'` + ) + .execute(); + } + + async findUsersForDigest( + workspaceId: string, + currentTime: string, + timezone: string + ): Promise { + return await this.db + .selectFrom('notificationPreferences') + .selectAll() + .where('workspaceId', '=', workspaceId) + .where('emailFrequency', '=', 'daily') + .where('digestTime', '=', currentTime) + .where('timezone', '=', timezone) + .where('emailEnabled', '=', true) + .execute(); + } + + async deletePreference(userId: string, workspaceId: string): Promise { + await this.db + .deleteFrom('notificationPreferences') + .where('userId', '=', userId) + .where('workspaceId', '=', workspaceId) + .execute(); + } +} \ No newline at end of file diff --git a/apps/server/src/database/repos/notification/notification.repo.ts b/apps/server/src/database/repos/notification/notification.repo.ts new file mode 100644 index 00000000..492c240f --- /dev/null +++ b/apps/server/src/database/repos/notification/notification.repo.ts @@ -0,0 +1,175 @@ +import { Injectable } from '@nestjs/common'; +import { InjectKysely } from 'nestjs-kysely'; +import { KyselyDB } from '@docmost/db/types/kysely.types'; +import { Notification, InsertableNotification, UpdatableNotification } from '@docmost/db/types/entity.types'; +import { PaginationOptions } from '@docmost/db/pagination/pagination-options'; + +@Injectable() +export class NotificationRepo { + constructor(@InjectKysely() private readonly db: KyselyDB) {} + + async insertNotification(notification: InsertableNotification): Promise { + return await this.db + .insertInto('notifications') + .values(notification) + .returningAll() + .executeTakeFirstOrThrow(); + } + + async findById(id: string): Promise { + return await this.db + .selectFrom('notifications') + .selectAll() + .where('id', '=', id) + .executeTakeFirst(); + } + + async findByRecipient( + recipientId: string, + options: { + status?: string; + limit?: number; + offset?: number; + } = {} + ): Promise { + let query = this.db + .selectFrom('notifications') + .selectAll() + .where('recipientId', '=', recipientId) + .orderBy('createdAt', 'desc'); + + if (options.status) { + query = query.where('status', '=', options.status); + } + + if (options.limit) { + query = query.limit(options.limit); + } + + if (options.offset) { + query = query.offset(options.offset); + } + + return await query.execute(); + } + + async findByBatchId(batchId: string): Promise { + return await this.db + .selectFrom('notifications') + .selectAll() + .where('batchId', '=', batchId) + .orderBy('createdAt', 'desc') + .execute(); + } + + async findRecent(params: { + recipientId: string; + type: string; + entityId: string; + since: Date; + }): Promise { + return await this.db + .selectFrom('notifications') + .selectAll() + .where('recipientId', '=', params.recipientId) + .where('type', '=', params.type) + .where('entityId', '=', params.entityId) + .where('createdAt', '>', params.since) + .orderBy('createdAt', 'desc') + .execute(); + } + + async existsByDeduplicationKey(key: string): Promise { + const result = await this.db + .selectFrom('notifications') + .select(['id']) + .where('deduplicationKey', '=', key) + .executeTakeFirst(); + + return !!result; + } + + async updateNotification(id: string, update: UpdatableNotification): Promise { + return await this.db + .updateTable('notifications') + .set(update) + .where('id', '=', id) + .returningAll() + .executeTakeFirstOrThrow(); + } + + async markAsRead(id: string): Promise { + await this.db + .updateTable('notifications') + .set({ + status: 'read', + readAt: new Date(), + updatedAt: new Date(), + }) + .where('id', '=', id) + .execute(); + } + + async markManyAsRead(ids: string[]): Promise { + if (ids.length === 0) return; + + await this.db + .updateTable('notifications') + .set({ + status: 'read', + readAt: new Date(), + updatedAt: new Date(), + }) + .where('id', 'in', ids) + .execute(); + } + + async getUnreadCount(recipientId: string): Promise { + const result = await this.db + .selectFrom('notifications') + .select((eb) => eb.fn.countAll().as('count')) + .where('recipientId', '=', recipientId) + .where('status', '=', 'unread') + .executeTakeFirst(); + + return result?.count || 0; + } + + async deleteExpired(): Promise { + const result = await this.db + .deleteFrom('notifications') + .where('expiresAt', '<', new Date()) + .execute(); + + return Number(result[0].numDeletedRows); + } + + async getNotificationsByGroupKey( + groupKey: string, + recipientId: string, + since: Date + ): Promise { + return await this.db + .selectFrom('notifications') + .selectAll() + .where('groupKey', '=', groupKey) + .where('recipientId', '=', recipientId) + .where('createdAt', '>', since) + .orderBy('createdAt', 'desc') + .execute(); + } + + async updateBatchId(notificationIds: string[], batchId: string): Promise { + if (notificationIds.length === 0) return; + + await this.db + .updateTable('notifications') + .set({ + batchId: batchId, + isBatched: true, + updatedAt: new Date(), + }) + .where('id', 'in', notificationIds) + .execute(); + } +} \ No newline at end of file diff --git a/apps/server/src/database/types/db.d.ts b/apps/server/src/database/types/db.d.ts index b49f15b0..58da39d0 100644 --- a/apps/server/src/database/types/db.d.ts +++ b/apps/server/src/database/types/db.d.ts @@ -62,6 +62,7 @@ export interface AuthProviders { deletedAt: Timestamp | null; id: Generated; isEnabled: Generated; + isGroupSyncEnabled: Generated; name: string; oidcClientId: string | null; oidcClientSecret: string | null; @@ -122,6 +123,7 @@ export interface Comments { pageId: string; parentCommentId: string | null; resolvedAt: Timestamp | null; + resolvedById: string | null; selection: string | null; type: string | null; workspaceId: string; @@ -165,6 +167,78 @@ export interface GroupUsers { userId: string; } +export interface NotificationAggregations { + actorIds: Generated; + aggregationKey: string; + aggregationType: string; + createdAt: Generated; + entityId: string; + entityType: string; + id: Generated; + notificationIds: Generated; + recipientId: string; + summaryData: Generated; + updatedAt: Generated; +} + +export interface NotificationBatches { + batchKey: string; + batchType: string; + createdAt: Generated; + firstNotificationId: string | null; + id: Generated; + notificationCount: Generated; + recipientId: string; + scheduledFor: Timestamp; + sentAt: Timestamp | null; + workspaceId: string; +} + +export interface NotificationPreferences { + batchTypes: Generated; + batchWindowMinutes: Generated; + createdAt: Generated; + digestTime: Generated; + emailEnabled: Generated; + emailFrequency: Generated; + id: Generated; + inAppEnabled: Generated; + maxBatchSize: Generated; + notificationSettings: Generated; + quietHoursEnabled: Generated; + quietHoursEnd: Generated; + quietHoursStart: Generated; + timezone: Generated; + updatedAt: Generated; + userId: string; + weekendNotifications: Generated; + workspaceId: string; +} + +export interface Notifications { + actorId: string | null; + batchId: string | null; + context: Generated; + createdAt: Generated; + deduplicationKey: string | null; + emailSentAt: Timestamp | null; + entityId: string; + entityType: string; + expiresAt: Timestamp | null; + groupCount: Generated; + groupKey: string | null; + id: Generated; + inAppDeliveredAt: Timestamp | null; + isBatched: Generated; + priority: Generated; + readAt: Timestamp | null; + recipientId: string; + status: Generated; + type: string; + updatedAt: Generated; + workspaceId: string; +} + export interface PageHistory { content: Json | null; coverPhoto: string | null; @@ -324,6 +398,10 @@ export interface DB { fileTasks: FileTasks; groups: Groups; groupUsers: GroupUsers; + notificationAggregations: NotificationAggregations; + notificationBatches: NotificationBatches; + notificationPreferences: NotificationPreferences; + notifications: Notifications; pageHistory: PageHistory; pages: Pages; shares: Shares; diff --git a/apps/server/src/database/types/entity.types.ts b/apps/server/src/database/types/entity.types.ts index db2c2823..2ddd87cb 100644 --- a/apps/server/src/database/types/entity.types.ts +++ b/apps/server/src/database/types/entity.types.ts @@ -18,6 +18,10 @@ import { AuthAccounts, Shares, FileTasks, + Notifications, + NotificationPreferences, + NotificationBatches, + NotificationAggregations, } from './db'; // Workspace @@ -113,3 +117,23 @@ export type UpdatableShare = Updateable>; export type FileTask = Selectable; export type InsertableFileTask = Insertable; export type UpdatableFileTask = Updateable>; + +// Notification +export type Notification = Selectable; +export type InsertableNotification = Insertable; +export type UpdatableNotification = Updateable>; + +// NotificationPreference +export type NotificationPreference = Selectable; +export type InsertableNotificationPreference = Insertable; +export type UpdatableNotificationPreference = Updateable>; + +// NotificationBatch +export type NotificationBatch = Selectable; +export type InsertableNotificationBatch = Insertable; +export type UpdatableNotificationBatch = Updateable>; + +// NotificationAggregation +export type NotificationAggregation = Selectable; +export type InsertableNotificationAggregation = Insertable; +export type UpdatableNotificationAggregation = Updateable>; diff --git a/apps/server/src/ws/ws.gateway.ts b/apps/server/src/ws/ws.gateway.ts index eeaec897..2df44073 100644 --- a/apps/server/src/ws/ws.gateway.ts +++ b/apps/server/src/ws/ws.gateway.ts @@ -19,6 +19,10 @@ import * as cookie from 'cookie'; export class WsGateway implements OnGatewayConnection, OnModuleDestroy { @WebSocketServer() server: Server; + + // Map to track which sockets belong to which users + private userSocketMap = new Map>(); + constructor( private tokenService: TokenService, private spaceMemberRepo: SpaceMemberRepo, @@ -26,7 +30,7 @@ export class WsGateway implements OnGatewayConnection, OnModuleDestroy { async handleConnection(client: Socket, ...args: any[]): Promise { try { - const cookies = cookie.parse(client.handshake.headers.cookie); + const cookies = cookie.parse(client.handshake.headers.cookie || ''); const token: JwtPayload = await this.tokenService.verifyJwt( cookies['authToken'], JwtType.ACCESS, @@ -35,18 +39,41 @@ export class WsGateway implements OnGatewayConnection, OnModuleDestroy { const userId = token.sub; const workspaceId = token.workspaceId; + // Store user-socket mapping + if (!this.userSocketMap.has(userId)) { + this.userSocketMap.set(userId, new Set()); + } + this.userSocketMap.get(userId)!.add(client.id); + + // Store user info on socket for later use + (client as any).userId = userId; + (client as any).workspaceId = workspaceId; + const userSpaceIds = await this.spaceMemberRepo.getUserSpaceIds(userId); const workspaceRoom = `workspace-${workspaceId}`; + const userRoom = `user-${userId}`; const spaceRooms = userSpaceIds.map((id) => this.getSpaceRoomName(id)); - client.join([workspaceRoom, ...spaceRooms]); + client.join([workspaceRoom, userRoom, ...spaceRooms]); } catch (err) { client.emit('Unauthorized'); client.disconnect(); } } + handleDisconnect(client: Socket): void { + // Clean up user-socket mapping + const userId = (client as any).userId; + if (userId && this.userSocketMap.has(userId)) { + const userSockets = this.userSocketMap.get(userId)!; + userSockets.delete(client.id); + if (userSockets.size === 0) { + this.userSocketMap.delete(userId); + } + } + } + @SubscribeMessage('message') handleMessage(client: Socket, data: any): void { const spaceEvents = [ @@ -85,4 +112,35 @@ export class WsGateway implements OnGatewayConnection, OnModuleDestroy { getSpaceRoomName(spaceId: string): string { return `space-${spaceId}`; } -} + + /** + * Emit an event to a specific user + */ + emitToUser(userId: string, event: string, data: any): void { + const userRoom = `user-${userId}`; + this.server.to(userRoom).emit(event, data); + } + + /** + * Emit an event to a workspace + */ + emitToWorkspace(workspaceId: string, event: string, data: any): void { + const workspaceRoom = `workspace-${workspaceId}`; + this.server.to(workspaceRoom).emit(event, data); + } + + /** + * Emit an event to a space + */ + emitToSpace(spaceId: string, event: string, data: any): void { + const spaceRoom = this.getSpaceRoomName(spaceId); + this.server.to(spaceRoom).emit(event, data); + } + + /** + * Check if a user is currently connected + */ + isUserConnected(userId: string): boolean { + return this.userSocketMap.has(userId) && this.userSocketMap.get(userId)!.size > 0; + } +} \ No newline at end of file diff --git a/apps/server/src/ws/ws.module.ts b/apps/server/src/ws/ws.module.ts index aa2d9b7c..d48cff94 100644 --- a/apps/server/src/ws/ws.module.ts +++ b/apps/server/src/ws/ws.module.ts @@ -5,5 +5,6 @@ import { TokenModule } from '../core/auth/token.module'; @Module({ imports: [TokenModule], providers: [WsGateway], + exports: [WsGateway], }) export class WsModule {}