notifications module - POC

This commit is contained in:
Philipinho
2025-07-10 15:45:11 -07:00
parent 29388636bf
commit e96cf0ed46
31 changed files with 3794 additions and 5 deletions

View File

@ -16,6 +16,7 @@ import { GroupModule } from './group/group.module';
import { CaslModule } from './casl/casl.module';
import { DomainMiddleware } from '../common/middlewares/domain.middleware';
import { ShareModule } from './share/share.module';
import { NotificationModule } from './notification/notification.module';
@Module({
imports: [
@ -30,6 +31,7 @@ import { ShareModule } from './share/share.module';
GroupModule,
CaslModule,
ShareModule,
NotificationModule,
],
})
export class CoreModule implements NestModule {

View File

@ -0,0 +1,276 @@
# Notification System Integration Guide
This guide explains how to integrate the notification system into existing services.
## Quick Start
### 1. Import NotificationService
```typescript
import { NotificationService } from '@/core/notification/services/notification.service';
import { NotificationType } from '@/core/notification/types/notification.types';
```
### 2. Inject the Service
```typescript
constructor(
private readonly notificationService: NotificationService,
// ... other dependencies
) {}
```
### 3. Create Notifications
```typescript
// Example: Notify user when mentioned in a comment
await this.notificationService.createNotification({
workspaceId: workspace.id,
recipientId: mentionedUserId,
actorId: currentUser.id,
type: NotificationType.MENTION_IN_COMMENT,
entityType: 'comment',
entityId: comment.id,
context: {
pageId: page.id,
pageTitle: page.title,
commentText: comment.content.substring(0, 100),
actorName: currentUser.name,
threadRootId: comment.parentCommentId || comment.id,
},
priority: NotificationPriority.HIGH,
groupKey: `comment:${comment.id}:mentions`,
deduplicationKey: `mention:${mentionedUserId}:comment:${comment.id}`,
});
```
## Integration Examples
### CommentService Integration
```typescript
// In comment.service.ts
import { NotificationService } from '@/core/notification/services/notification.service';
import { NotificationType, NotificationPriority } from '@/core/notification/types/notification.types';
export class CommentService {
constructor(
private readonly notificationService: NotificationService,
// ... other dependencies
) {}
async createComment(dto: CreateCommentDto, user: User): Promise<Comment> {
const comment = await this.commentRepo.create(dto);
// Notify page owner about new comment
if (page.creatorId !== user.id) {
await this.notificationService.createNotification({
workspaceId: workspace.id,
recipientId: page.creatorId,
actorId: user.id,
type: NotificationType.COMMENT_ON_PAGE,
entityType: 'comment',
entityId: comment.id,
context: {
pageId: page.id,
pageTitle: page.title,
commentText: comment.content.substring(0, 100),
actorName: user.name,
},
groupKey: `page:${page.id}:comments`,
});
}
// Check for mentions and notify mentioned users
const mentionedUserIds = this.extractMentions(comment.content);
for (const mentionedUserId of mentionedUserIds) {
await this.notificationService.createNotification({
workspaceId: workspace.id,
recipientId: mentionedUserId,
actorId: user.id,
type: NotificationType.MENTION_IN_COMMENT,
entityType: 'comment',
entityId: comment.id,
context: {
pageId: page.id,
pageTitle: page.title,
commentText: comment.content.substring(0, 100),
actorName: user.name,
threadRootId: comment.parentCommentId || comment.id,
},
priority: NotificationPriority.HIGH,
deduplicationKey: `mention:${mentionedUserId}:comment:${comment.id}`,
});
}
return comment;
}
async resolveComment(commentId: string, user: User): Promise<void> {
const comment = await this.commentRepo.findById(commentId);
// Notify comment creator that their comment was resolved
if (comment.creatorId !== user.id) {
await this.notificationService.createNotification({
workspaceId: workspace.id,
recipientId: comment.creatorId,
actorId: user.id,
type: NotificationType.COMMENT_RESOLVED,
entityType: 'comment',
entityId: comment.id,
context: {
pageId: page.id,
pageTitle: page.title,
resolverName: user.name,
},
});
}
}
}
```
### PageService Integration
```typescript
// In page.service.ts
async exportPage(pageId: string, format: string, user: User): Promise<void> {
// Start export process...
// When export is complete
await this.notificationService.createNotification({
workspaceId: workspace.id,
recipientId: user.id,
actorId: user.id, // System notification
type: NotificationType.EXPORT_COMPLETED,
entityType: 'page',
entityId: pageId,
context: {
pageTitle: page.title,
exportFormat: format,
downloadUrl: exportUrl,
expiresAt: expiryDate.toISOString(),
},
priority: NotificationPriority.LOW,
});
}
async updatePage(pageId: string, content: any, user: User): Promise<void> {
// Check for mentions in the content
const mentionedUserIds = this.extractMentionsFromContent(content);
for (const mentionedUserId of mentionedUserIds) {
await this.notificationService.createNotification({
workspaceId: workspace.id,
recipientId: mentionedUserId,
actorId: user.id,
type: NotificationType.MENTION_IN_PAGE,
entityType: 'page',
entityId: pageId,
context: {
pageTitle: page.title,
actorName: user.name,
mentionContext: this.extractMentionContext(content, mentionedUserId),
},
priority: NotificationPriority.HIGH,
deduplicationKey: `mention:${mentionedUserId}:page:${pageId}:${Date.now()}`,
});
}
}
```
### WsGateway Integration for Real-time Notifications
The notification system automatically sends real-time updates through WebSocket. The WsGateway is already injected into NotificationDeliveryService.
```typescript
// In ws.gateway.ts - Already implemented in NotificationDeliveryService
async sendNotificationToUser(userId: string, notification: any): Promise<void> {
const userSockets = await this.getUserSockets(userId);
for (const socketId of userSockets) {
this.server.to(socketId).emit('notification:new', {
id: notification.id,
type: notification.type,
entityType: notification.entityType,
entityId: notification.entityId,
context: notification.context,
createdAt: notification.createdAt,
readAt: notification.readAt,
});
}
}
```
## Notification Types
Available notification types:
- `MENTION_IN_PAGE` - User mentioned in a page
- `MENTION_IN_COMMENT` - User mentioned in a comment
- `COMMENT_ON_PAGE` - New comment on user's page
- `COMMENT_IN_THREAD` - Reply to user's comment
- `COMMENT_RESOLVED` - User's comment was resolved
- `EXPORT_COMPLETED` - Export job finished
## Best Practices
1. **Use Deduplication Keys**: Prevent duplicate notifications for the same event
```typescript
deduplicationKey: `mention:${userId}:comment:${commentId}`
```
2. **Set Appropriate Priority**:
- HIGH: Mentions, direct replies
- NORMAL: Comments on owned content
- LOW: System notifications, exports
3. **Group Related Notifications**: Use groupKey for notifications that should be batched
```typescript
groupKey: `page:${pageId}:comments`
```
4. **Include Relevant Context**: Provide enough information for email templates
```typescript
context: {
pageId: page.id,
pageTitle: page.title,
actorName: user.name,
// ... other relevant data
}
```
5. **Check User Preferences**: The notification service automatically checks user preferences, but you can pre-check if needed:
```typescript
const preferences = await notificationPreferenceService.getUserPreferences(userId, workspaceId);
if (preferences.emailEnabled) {
// Create notification
}
```
## Testing Notifications
Use the test endpoint to send test notifications:
```bash
curl -X POST http://localhost:3000/api/notifications/test \
-H "Authorization: Bearer YOUR_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"type": "MENTION_IN_PAGE",
"recipientId": "USER_ID"
}'
```
## Email Templates
Email templates are located in `/core/notification/templates/`. To add a new template:
1. Create a new React component in the templates directory
2. Update the email sending logic in NotificationDeliveryService
3. Test the template using the React Email preview server
## Monitoring
Monitor notification delivery through logs:
- Check for `NotificationService` logs for creation events
- Check for `NotificationDeliveryService` logs for delivery status
- Check for `NotificationBatchProcessor` logs for batch processing

View File

@ -0,0 +1,122 @@
import {
Controller,
Get,
Post,
Put,
Body,
Param,
Query,
UseGuards,
} from '@nestjs/common';
import { JwtAuthGuard } from '../../../common/guards/jwt-auth.guard';
import { AuthUser } from '../../../common/decorators/auth-user.decorator';
import { AuthWorkspace } from '../../../common/decorators/auth-workspace.decorator';
import { User, Workspace } from '@docmost/db/types/entity.types';
import { NotificationService } from '../services/notification.service';
import { NotificationPreferenceService } from '../services/notification-preference.service';
import { GetNotificationsDto } from '../dto/get-notifications.dto';
import { UpdateNotificationPreferencesDto } from '../dto/update-preference.dto';
import { NotificationType } from '../types/notification.types';
@Controller('notifications')
@UseGuards(JwtAuthGuard)
export class NotificationController {
constructor(
private readonly notificationService: NotificationService,
private readonly preferenceService: NotificationPreferenceService,
) {}
@Get()
async getNotifications(
@AuthUser() user: User,
@AuthWorkspace() workspace: Workspace,
@Query() query: GetNotificationsDto,
) {
const { grouped = true, status, limit = 20, offset = 0 } = query;
if (grouped) {
return await this.notificationService.getGroupedNotifications(
user.id,
workspace.id,
{ status, limit, offset },
);
}
return await this.notificationService.getNotifications(
user.id,
workspace.id,
{ status, limit, offset },
);
}
@Get('unread-count')
async getUnreadCount(@AuthUser() user: User) {
const count = await this.notificationService.getUnreadCount(user.id);
return { count };
}
@Post(':id/read')
async markAsRead(
@AuthUser() user: User,
@Param('id') notificationId: string,
) {
await this.notificationService.markAsRead(notificationId, user.id);
return { success: true };
}
@Post('mark-all-read')
async markAllAsRead(@AuthUser() user: User) {
await this.notificationService.markAllAsRead(user.id);
return { success: true };
}
@Get('preferences')
async getPreferences(
@AuthUser() user: User,
@AuthWorkspace() workspace: Workspace,
) {
return await this.preferenceService.getUserPreferences(
user.id,
workspace.id,
);
}
@Put('preferences')
async updatePreferences(
@AuthUser() user: User,
@AuthWorkspace() workspace: Workspace,
@Body() dto: UpdateNotificationPreferencesDto,
) {
return await this.preferenceService.updateUserPreferences(
user.id,
workspace.id,
dto,
);
}
@Get('preferences/stats')
async getNotificationStats(
@AuthUser() user: User,
@AuthWorkspace() workspace: Workspace,
) {
return await this.preferenceService.getNotificationStats(
user.id,
workspace.id,
);
}
@Post('test')
async sendTestNotification(
@AuthUser() user: User,
@AuthWorkspace() workspace: Workspace,
@Body() dto: { type: NotificationType },
) {
await this.notificationService.createTestNotification(
user.id,
workspace.id,
dto.type,
);
return { success: true, message: 'Test notification sent' };
}
}

View File

@ -0,0 +1,58 @@
import {
IsEnum,
IsNotEmpty,
IsObject,
IsOptional,
IsString,
IsUUID,
} from 'class-validator';
import {
NotificationType,
NotificationPriority,
} from '../types/notification.types';
export class CreateNotificationDto {
@IsUUID()
@IsNotEmpty()
workspaceId: string;
@IsUUID()
@IsNotEmpty()
recipientId: string;
@IsUUID()
@IsOptional()
actorId?: string;
@IsEnum(NotificationType)
@IsNotEmpty()
type: NotificationType;
@IsString()
@IsNotEmpty()
entityType: string;
@IsUUID()
@IsNotEmpty()
entityId: string;
@IsObject()
@IsNotEmpty()
context: Record<string, any>;
@IsEnum(NotificationPriority)
@IsOptional()
priority?: NotificationPriority;
@IsString()
@IsOptional()
groupKey?: string;
@IsString()
@IsOptional()
deduplicationKey?: string;
// For scheduling notifications (quiet hours, etc.)
@IsOptional()
scheduledFor?: Date;
}

View File

@ -0,0 +1,34 @@
import {
IsEnum,
IsOptional,
IsBoolean,
IsNumber,
Min,
Max,
} from 'class-validator';
import { Transform } from 'class-transformer';
import { NotificationStatus } from '../types/notification.types';
export class GetNotificationsDto {
@IsEnum(NotificationStatus)
@IsOptional()
status?: NotificationStatus;
@IsBoolean()
@Transform(({ value }) => value === 'true' || value === true)
@IsOptional()
grouped?: boolean = true;
@IsNumber()
@Transform(({ value }) => parseInt(value, 10))
@Min(1)
@Max(100)
@IsOptional()
limit?: number = 20;
@IsNumber()
@Transform(({ value }) => parseInt(value, 10))
@Min(0)
@IsOptional()
offset?: number = 0;
}

View File

@ -0,0 +1,75 @@
import {
IsBoolean,
IsEnum,
IsNumber,
IsObject,
IsOptional,
IsString,
Max,
Min,
Matches,
IsArray,
} from 'class-validator';
import {
EmailFrequency,
NotificationTypeSettings,
} from '../types/notification.types';
export class UpdateNotificationPreferencesDto {
@IsBoolean()
@IsOptional()
emailEnabled?: boolean;
@IsBoolean()
@IsOptional()
inAppEnabled?: boolean;
@IsObject()
@IsOptional()
notificationSettings?: Record<string, NotificationTypeSettings>;
@IsNumber()
@Min(5)
@Max(60)
@IsOptional()
batchWindowMinutes?: number;
@IsNumber()
@Min(1)
@Max(100)
@IsOptional()
maxBatchSize?: number;
@IsArray()
@IsString({ each: true })
@IsOptional()
batchTypes?: string[];
@IsEnum(EmailFrequency)
@IsOptional()
emailFrequency?: EmailFrequency;
@Matches(/^([0-1]?[0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]$/)
@IsOptional()
digestTime?: string;
@IsBoolean()
@IsOptional()
quietHoursEnabled?: boolean;
@Matches(/^([0-1]?[0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]$/)
@IsOptional()
quietHoursStart?: string;
@Matches(/^([0-1]?[0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]$/)
@IsOptional()
quietHoursEnd?: string;
@IsString()
@IsOptional()
timezone?: string;
@IsBoolean()
@IsOptional()
weekendNotifications?: boolean;
}

View File

@ -0,0 +1,45 @@
import { Notification } from '@docmost/db/types/entity.types';
export class NotificationCreatedEvent {
constructor(
public readonly notification: Notification,
public readonly workspaceId: string,
) {}
}
export class NotificationReadEvent {
constructor(
public readonly notificationId: string,
public readonly userId: string,
) {}
}
export class NotificationAllReadEvent {
constructor(
public readonly userId: string,
public readonly notificationIds: string[],
) {}
}
export class NotificationBatchScheduledEvent {
constructor(
public readonly batchId: string,
public readonly scheduledFor: Date,
) {}
}
export class NotificationAggregatedEvent {
constructor(
public readonly aggregationId: string,
public readonly notificationIds: string[],
) {}
}
// Event names as constants
export const NOTIFICATION_EVENTS = {
CREATED: 'notification.created',
READ: 'notification.read',
ALL_READ: 'notification.allRead',
BATCH_SCHEDULED: 'notification.batchScheduled',
AGGREGATED: 'notification.aggregated',
} as const;

View File

@ -0,0 +1,32 @@
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { NotificationService } from './services/notification.service';
import { NotificationPreferenceService } from './services/notification-preference.service';
import { NotificationDeduplicationService } from './services/notification-deduplication.service';
import { NotificationDeliveryService } from './services/notification-delivery.service';
import { NotificationBatchingService } from './services/notification-batching.service';
import { NotificationAggregationService } from './services/notification-aggregation.service';
import { NotificationController } from './controllers/notification.controller';
import { NotificationBatchProcessor } from './queues/notification-batch.processor';
import { WsModule } from '../../ws/ws.module';
@Module({
imports: [
BullModule.registerQueue({
name: 'notification-batch',
}),
WsModule,
],
controllers: [NotificationController],
providers: [
NotificationService,
NotificationPreferenceService,
NotificationDeduplicationService,
NotificationDeliveryService,
NotificationBatchingService,
NotificationAggregationService,
NotificationBatchProcessor,
],
exports: [NotificationService, NotificationPreferenceService],
})
export class NotificationModule {}

View File

@ -0,0 +1,70 @@
import { Processor } from '@nestjs/bullmq';
import { WorkerHost } from '@nestjs/bullmq';
import { Injectable, Logger } from '@nestjs/common';
import { Job } from 'bullmq';
import { NotificationBatchingService } from '../services/notification-batching.service';
@Processor('notification-batch')
export class NotificationBatchProcessor extends WorkerHost {
private readonly logger = new Logger(NotificationBatchProcessor.name);
constructor(private readonly batchingService: NotificationBatchingService) {
super();
}
async process(job: Job<any, any, string>) {
if (job.name === 'process-batch') {
return this.processBatch(job);
} else if (job.name === 'check-pending-batches') {
return this.checkPendingBatches(job);
}
}
async processBatch(job: Job<{ batchId: string }>) {
this.logger.debug(`Processing notification batch: ${job.data.batchId}`);
try {
await this.batchingService.processBatch(job.data.batchId);
return { success: true, batchId: job.data.batchId };
} catch (error) {
this.logger.error(
`Failed to process batch ${job.data.batchId}:`,
error instanceof Error ? error.stack : String(error),
);
throw error;
}
}
async checkPendingBatches(job: Job) {
this.logger.debug('Checking for pending notification batches');
try {
const pendingBatches = await this.batchingService.getPendingBatches();
for (const batch of pendingBatches) {
// Calculate delay
const delay = Math.max(0, batch.scheduled_for.getTime() - Date.now());
// Add to queue with appropriate delay
await this.queue.add('process-batch', { batchId: batch.id }, { delay });
this.logger.debug(
`Scheduled batch ${batch.id} for processing in ${delay}ms`,
);
}
return { processedCount: pendingBatches.length };
} catch (error) {
this.logger.error(
'Failed to check pending batches:',
error instanceof Error ? error.stack : String(error),
);
throw error;
}
}
// Reference to the queue (injected by Bull)
private get queue() {
return (this as any).queue;
}
}

View File

@ -0,0 +1,259 @@
import { Injectable, Logger } from '@nestjs/common';
import { NotificationAggregationRepo } from '@docmost/db/repos/notification/notification-aggregation.repo';
import { NotificationRepo } from '@docmost/db/repos/notification/notification.repo';
import { Notification, NotificationAggregation } from '@docmost/db/types/entity.types';
import {
NotificationType,
AggregationType,
AggregatedNotificationMessage,
} from '../types/notification.types';
interface AggregationRule {
types: NotificationType[];
timeWindow: number;
minCount: number;
aggregationType: 'actor_based' | 'time_based' | 'count_based';
}
@Injectable()
export class NotificationAggregationService {
private readonly logger = new Logger(NotificationAggregationService.name);
private readonly aggregationRules: Map<NotificationType, AggregationRule> =
new Map([
[
NotificationType.COMMENT_ON_PAGE,
{
types: [NotificationType.COMMENT_ON_PAGE],
timeWindow: 3600000, // 1 hour
minCount: 2,
aggregationType: 'actor_based',
},
],
[
NotificationType.MENTION_IN_COMMENT,
{
types: [
NotificationType.MENTION_IN_COMMENT,
NotificationType.MENTION_IN_PAGE,
],
timeWindow: 1800000, // 30 minutes
minCount: 3,
aggregationType: 'count_based',
},
],
[
NotificationType.COMMENT_IN_THREAD,
{
types: [NotificationType.COMMENT_IN_THREAD],
timeWindow: 3600000, // 1 hour
minCount: 2,
aggregationType: 'actor_based',
},
],
]);
constructor(
private readonly aggregationRepo: NotificationAggregationRepo,
private readonly notificationRepo: NotificationRepo,
) {}
async aggregateNotifications(
recipientId: string,
type: NotificationType,
entityId: string,
timeWindow: number = 3600000, // 1 hour default
): Promise<NotificationAggregation | null> {
const aggregationKey = this.generateAggregationKey(
recipientId,
type,
entityId,
);
// Check if there's an existing aggregation within time window
const existing = await this.aggregationRepo.findByKey(aggregationKey);
if (existing && this.isWithinTimeWindow(existing.updatedAt, timeWindow)) {
return existing;
}
// Find recent notifications to aggregate
const recentNotifications = await this.notificationRepo.findRecent({
recipientId,
type,
entityId,
since: new Date(Date.now() - timeWindow),
});
const rule = this.aggregationRules.get(type);
if (!rule || recentNotifications.length < rule.minCount) {
return null;
}
// Create new aggregation
return await this.createAggregation(
aggregationKey,
recentNotifications,
type,
);
}
async updateAggregation(
aggregation: NotificationAggregation,
notification: Notification,
): Promise<void> {
await this.aggregationRepo.addNotificationToAggregation(
aggregation.aggregationKey,
notification.id,
notification.actorId || undefined,
);
this.logger.debug(
`Updated aggregation ${aggregation.id} with notification ${notification.id}`,
);
}
private async createAggregation(
key: string,
notifications: Notification[],
type: NotificationType,
): Promise<NotificationAggregation> {
const actors = [
...new Set(notifications.map((n) => n.actorId).filter(Boolean)),
];
const notificationIds = notifications.map((n) => n.id);
const summaryData = {
totalCount: notifications.length,
actorCount: actors.length,
firstActorId: actors[0],
recentActors: actors.slice(0, 3),
timeSpan: {
start: notifications[notifications.length - 1].createdAt.toISOString(),
end: notifications[0].createdAt.toISOString(),
},
};
const aggregation = await this.aggregationRepo.insertAggregation({
aggregationKey: key,
recipientId: notifications[0].recipientId,
aggregationType: this.getAggregationType(type),
entityType: notifications[0].entityType,
entityId: notifications[0].entityId,
actorIds: actors,
notificationIds: notificationIds,
summaryData: summaryData,
});
this.logger.log(
`Created aggregation ${aggregation.id} for ${notifications.length} notifications`,
);
return aggregation;
}
private generateAggregationKey(
recipientId: string,
type: NotificationType,
entityId: string,
): string {
return `${recipientId}:${type}:${entityId}`;
}
private isWithinTimeWindow(updatedAt: Date, timeWindow: number): boolean {
return Date.now() - updatedAt.getTime() < timeWindow;
}
private getAggregationType(type: NotificationType): AggregationType {
switch (type) {
case NotificationType.COMMENT_ON_PAGE:
case NotificationType.COMMENT_RESOLVED:
return AggregationType.COMMENTS_ON_PAGE;
case NotificationType.MENTION_IN_PAGE:
return AggregationType.MENTIONS_IN_PAGE;
case NotificationType.MENTION_IN_COMMENT:
return AggregationType.MENTIONS_IN_COMMENTS;
case NotificationType.COMMENT_IN_THREAD:
return AggregationType.THREAD_ACTIVITY;
default:
return AggregationType.COMMENTS_ON_PAGE;
}
}
async createAggregatedNotificationMessage(
aggregation: NotificationAggregation,
): Promise<AggregatedNotificationMessage> {
// TODO: Load actor information from user service
// For now, return a simplified version
const actors = aggregation.actorIds.slice(0, 3).map((id) => ({
id,
name: 'User', // TODO: Load actual user name
avatarUrl: undefined,
}));
const primaryActor = actors[0];
const otherActorsCount = aggregation.actorIds.length - 1;
let message: string;
let title: string;
switch (aggregation.aggregationType) {
case AggregationType.COMMENTS_ON_PAGE:
if (otherActorsCount === 0) {
title = `${primaryActor.name} commented on a page`;
message = 'View the comment';
} else if (otherActorsCount === 1) {
title = `${primaryActor.name} and 1 other commented on a page`;
message = 'View 2 comments';
} else {
title = `${primaryActor.name} and ${otherActorsCount} others commented on a page`;
message = `View ${aggregation.notificationIds.length} comments`;
}
break;
case AggregationType.MENTIONS_IN_PAGE:
case AggregationType.MENTIONS_IN_COMMENTS: {
const totalMentions = aggregation.notificationIds.length;
if (totalMentions === 1) {
title = `${primaryActor.name} mentioned you`;
message = 'View mention';
} else {
title = `You were mentioned ${totalMentions} times`;
message = `By ${primaryActor.name} and ${otherActorsCount} others`;
}
break;
}
default:
title = `${aggregation.notificationIds.length} new notifications`;
message = 'View all';
}
return {
id: aggregation.id,
title,
message,
actors,
totalCount: aggregation.notificationIds.length,
entityId: aggregation.entityId,
entityType: aggregation.entityType,
createdAt: aggregation.createdAt,
updatedAt: aggregation.updatedAt,
};
}
async cleanupOldAggregations(olderThan: Date): Promise<number> {
const deletedCount =
await this.aggregationRepo.deleteOldAggregations(olderThan);
if (deletedCount > 0) {
this.logger.log(`Cleaned up ${deletedCount} old aggregations`);
}
return deletedCount;
}
}

View File

@ -0,0 +1,215 @@
import { Injectable, Logger } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
import { NotificationRepo } from '../../../database/repos/notification/notification.repo';
import { NotificationBatchRepo } from '../../../database/repos/notification/notification-batch.repo';
import { NotificationPreferenceService } from './notification-preference.service';
import { Notification } from '@docmost/db/types/entity.types';
import { NotificationType, BatchType } from '../types/notification.types';
interface NotificationGroup {
type: NotificationType;
entityId: string;
entityType: string;
notifications: Notification[];
actors: Set<string>;
summary: string;
}
@Injectable()
export class NotificationBatchingService {
private readonly logger = new Logger(NotificationBatchingService.name);
constructor(
private readonly notificationRepo: NotificationRepo,
private readonly batchRepo: NotificationBatchRepo,
private readonly preferenceService: NotificationPreferenceService,
@InjectQueue('notification-batch') private readonly batchQueue: Queue,
) {}
async addToBatch(notification: Notification): Promise<void> {
try {
const preferences = await this.preferenceService.getUserPreferences(
notification.recipientId,
notification.workspaceId,
);
const batchKey = this.generateBatchKey(notification);
// Find or create batch
let batch = await this.batchRepo.findByBatchKey(
batchKey,
notification.recipientId,
true, // notSentOnly
);
if (!batch) {
// Create new batch
const scheduledFor = new Date();
scheduledFor.setMinutes(scheduledFor.getMinutes() + preferences.batchWindowMinutes);
batch = await this.batchRepo.insertBatch({
recipientId: notification.recipientId,
workspaceId: notification.workspaceId,
batchType: BatchType.SIMILAR_ACTIVITY,
batchKey: batchKey,
notificationCount: 1,
firstNotificationId: notification.id,
scheduledFor: scheduledFor,
});
// Schedule batch processing
await this.batchQueue.add(
'process-batch',
{ batchId: batch.id },
{
delay: preferences.batchWindowMinutes * 60 * 1000,
},
);
} else {
// Add to existing batch
await this.batchRepo.incrementNotificationCount(batch.id);
}
// Update notification with batch ID
await this.notificationRepo.updateNotification(notification.id, {
batchId: batch.id,
isBatched: true,
});
this.logger.debug(`Notification ${notification.id} added to batch ${batch.id}`);
} catch (error) {
this.logger.error(
`Failed to batch notification ${notification.id}: ${error instanceof Error ? error.message : String(error)}`,
error instanceof Error ? error.stack : undefined,
);
// Fall back to instant delivery on error
throw error;
}
}
private generateBatchKey(notification: Notification): string {
switch (notification.type) {
case NotificationType.COMMENT_ON_PAGE:
case NotificationType.COMMENT_RESOLVED: {
const context = notification.context as any;
return `page:${context?.pageId}:comments`;
}
case NotificationType.MENTION_IN_PAGE:
return `page:${notification.entityId}:mentions`;
case NotificationType.COMMENT_IN_THREAD: {
const mentionContext = notification.context as any;
return `thread:${mentionContext?.threadRootId}`;
}
default:
return `${notification.entityType}:${notification.entityId}:${notification.type}`;
}
}
async processBatch(batchId: string): Promise<void> {
const batch = await this.batchRepo.findById(batchId);
if (!batch || batch.sentAt) {
this.logger.debug(`Batch ${batchId} not found or already sent`);
return;
}
const notifications = await this.notificationRepo.findByBatchId(batchId);
if (notifications.length === 0) {
this.logger.debug(`No notifications found for batch ${batchId}`);
return;
}
// Group notifications by type for smart formatting
const grouped = this.groupNotificationsByType(notifications);
// Send batch email
await this.sendBatchEmail(batch.recipientId, batch.workspaceId, grouped);
// Mark batch as sent
await this.batchRepo.markAsSent(batchId);
// Update email sent timestamp for all notifications
const notificationIds = notifications.map(n => n.id);
await Promise.all(
notificationIds.map(id =>
this.notificationRepo.updateNotification(id, { emailSentAt: new Date() })
),
);
this.logger.log(`Batch ${batchId} processed with ${notifications.length} notifications`);
}
private groupNotificationsByType(notifications: Notification[]): NotificationGroup[] {
const groups = new Map<string, NotificationGroup>();
for (const notification of notifications) {
const key = `${notification.type}:${notification.entityId}`;
if (!groups.has(key)) {
groups.set(key, {
type: notification.type as NotificationType,
entityId: notification.entityId,
entityType: notification.entityType,
notifications: [],
actors: new Set(),
summary: '',
});
}
const group = groups.get(key)!;
group.notifications.push(notification);
if (notification.actorId) {
group.actors.add(notification.actorId);
}
}
// Generate summaries for each group
for (const group of groups.values()) {
group.summary = this.generateSummary(group.type, group.notifications);
}
return Array.from(groups.values());
}
private generateSummary(type: NotificationType, notifications: Notification[]): string {
const count = notifications.length;
const actors = new Set(notifications.map(n => n.actorId).filter(Boolean));
switch (type) {
case NotificationType.COMMENT_ON_PAGE:
if (count === 1) return 'commented on a page you follow';
return `and ${actors.size - 1} others commented on a page you follow`;
case NotificationType.MENTION_IN_COMMENT:
if (count === 1) return 'mentioned you in a comment';
return `mentioned you ${count} times in comments`;
case NotificationType.COMMENT_RESOLVED:
if (count === 1) return 'resolved a comment';
return `resolved ${count} comments`;
default:
return `${count} new activities`;
}
}
private async sendBatchEmail(
recipientId: string,
workspaceId: string,
groups: NotificationGroup[],
): Promise<void> {
// TODO: Implement email sending with batch template
// This will be implemented when we create email templates
this.logger.log(
`Sending batch email to ${recipientId} with ${groups.length} notification groups`,
);
}
async getPendingBatches(): Promise<any[]> {
return await this.batchRepo.getPendingBatches();
}
}

View File

@ -0,0 +1,151 @@
import { Injectable } from '@nestjs/common';
import { NotificationType } from '../types/notification.types';
import { CreateNotificationDto } from '../dto/create-notification.dto';
import { createHash } from 'crypto';
@Injectable()
export class NotificationDeduplicationService {
/**
* Generate a unique deduplication key based on notification type and context
*/
generateDeduplicationKey(params: CreateNotificationDto): string | null {
switch (params.type) {
case NotificationType.MENTION_IN_PAGE:
// Only one notification per mention in a page (until page is updated again)
return this.hash([
'mention',
'page',
params.entityId,
params.actorId,
params.recipientId,
]);
case NotificationType.MENTION_IN_COMMENT:
// One notification per comment mention
return this.hash([
'mention',
'comment',
params.entityId,
params.actorId,
params.recipientId,
]);
case NotificationType.COMMENT_ON_PAGE:
// Allow multiple notifications for different comments on the same page
return null; // No deduplication, rely on batching instead
case NotificationType.REPLY_TO_COMMENT:
// One notification per reply
return this.hash([
'reply',
params.entityId,
params.actorId,
params.recipientId,
]);
case NotificationType.COMMENT_RESOLVED:
// One notification per comment resolution
return this.hash([
'resolved',
params.context.commentId,
params.actorId,
params.recipientId,
]);
case NotificationType.EXPORT_COMPLETED:
case NotificationType.EXPORT_FAILED:
// One notification per export job
return this.hash([
'export',
params.context.jobId || params.entityId,
params.recipientId,
]);
case NotificationType.PAGE_SHARED:
// One notification per page share action
return this.hash([
'share',
params.entityId,
params.actorId,
params.recipientId,
Date.now().toString(), // Include timestamp to allow re-sharing
]);
default:
// For other types, generate a key based on common fields
return this.hash([
params.type,
params.entityId,
params.actorId,
params.recipientId,
]);
}
}
/**
* Check if a notification should be deduplicated based on recent activity
*/
shouldDeduplicate(type: NotificationType): boolean {
const deduplicatedTypes = [
NotificationType.MENTION_IN_PAGE,
NotificationType.MENTION_IN_COMMENT,
NotificationType.REPLY_TO_COMMENT,
NotificationType.COMMENT_RESOLVED,
NotificationType.EXPORT_COMPLETED,
NotificationType.EXPORT_FAILED,
];
return deduplicatedTypes.includes(type);
}
/**
* Get the time window for deduplication (in milliseconds)
*/
getDeduplicationWindow(type: NotificationType): number {
switch (type) {
case NotificationType.MENTION_IN_PAGE:
return 24 * 60 * 60 * 1000; // 24 hours
case NotificationType.MENTION_IN_COMMENT:
return 60 * 60 * 1000; // 1 hour
case NotificationType.EXPORT_COMPLETED:
case NotificationType.EXPORT_FAILED:
return 5 * 60 * 1000; // 5 minutes
default:
return 30 * 60 * 1000; // 30 minutes default
}
}
/**
* Create a hash from array of values
*/
private hash(values: (string | null | undefined)[]): string {
const filtered = values.filter((v) => v !== null && v !== undefined);
const input = filtered.join(':');
return createHash('sha256').update(input).digest('hex').substring(0, 32);
}
/**
* Generate a key for custom deduplication scenarios
*/
generateCustomKey(
type: string,
entityId: string,
recipientId: string,
additionalData?: Record<string, any>,
): string {
const baseValues = [type, entityId, recipientId];
if (additionalData) {
// Sort keys for consistent hashing
const sortedKeys = Object.keys(additionalData).sort();
for (const key of sortedKeys) {
baseValues.push(`${key}:${additionalData[key]}`);
}
}
return this.hash(baseValues);
}
}

View File

@ -0,0 +1,194 @@
import { Injectable, Logger } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
import { QueueName } from '../../../integrations/queue/constants';
import { WsGateway } from '../../../ws/ws.gateway';
import { NotificationBatchingService } from './notification-batching.service';
import { NotificationPreferenceService } from './notification-preference.service';
import { NotificationRepo } from '@docmost/db/repos/notification/notification.repo';
import { Notification } from '@docmost/db/types/entity.types';
import {
NotificationCreatedEvent,
NotificationReadEvent,
NotificationAllReadEvent,
NOTIFICATION_EVENTS,
} from '../events/notification.events';
import { NotificationType, NotificationPriority } from '../types/notification.types';
@Injectable()
export class NotificationDeliveryService {
private readonly logger = new Logger(NotificationDeliveryService.name);
constructor(
@InjectQueue(QueueName.EMAIL_QUEUE) private readonly mailQueue: Queue,
private readonly wsGateway: WsGateway,
private readonly batchingService: NotificationBatchingService,
private readonly preferenceService: NotificationPreferenceService,
private readonly notificationRepo: NotificationRepo,
) {}
@OnEvent(NOTIFICATION_EVENTS.CREATED)
async handleNotificationCreated(event: NotificationCreatedEvent) {
const { notification, workspaceId } = event;
try {
const decision = await this.preferenceService.makeNotificationDecision(
notification.recipientId,
workspaceId,
notification.type as NotificationType,
notification.priority as NotificationPriority,
);
// In-app delivery (always immediate)
if (decision.channels.includes('in_app')) {
await this.deliverInApp(notification, workspaceId);
}
// Email delivery (may be batched)
if (decision.channels.includes('email')) {
if (decision.batchingEnabled) {
await this.batchingService.addToBatch(notification);
} else {
await this.deliverEmailInstant(notification);
}
}
} catch (error) {
this.logger.error(
`Failed to deliver notification ${notification.id}: ${error instanceof Error ? error.message : String(error)}`,
error instanceof Error ? error.stack : undefined,
);
}
}
private async deliverInApp(notification: Notification, workspaceId: string) {
try {
// Send notification via WebSocket to user's workspace room
const notificationData = {
id: notification.id,
type: notification.type,
status: notification.status,
priority: notification.priority,
actorId: notification.actorId,
entityType: notification.entityType,
entityId: notification.entityId,
context: notification.context,
createdAt: notification.createdAt,
};
// Emit to user-specific room
this.wsGateway.emitToUser(
notification.recipientId,
'notification:new',
notificationData,
);
// Update unread count
const unreadCount = await this.notificationRepo.getUnreadCount(
notification.recipientId,
);
this.wsGateway.emitToUser(
notification.recipientId,
'notification:unreadCount',
{ count: unreadCount },
);
// Update delivery status
await this.notificationRepo.updateNotification(notification.id, {
inAppDeliveredAt: new Date(),
});
this.logger.debug(`In-app notification delivered: ${notification.id}`);
} catch (error) {
this.logger.error(
`Failed to deliver in-app notification ${notification.id}: ${error instanceof Error ? error.message : String(error)}`,
error instanceof Error ? error.stack : undefined,
);
}
}
private async deliverEmailInstant(notification: Notification) {
try {
await this.mailQueue.add(
'send-notification-email',
{
notificationId: notification.id,
type: notification.type,
},
{
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000,
},
},
);
this.logger.debug(`Email notification queued: ${notification.id}`);
} catch (error) {
this.logger.error(
`Failed to queue email notification ${notification.id}: ${error instanceof Error ? error.message : String(error)}`,
error instanceof Error ? error.stack : undefined,
);
}
}
@OnEvent(NOTIFICATION_EVENTS.READ)
async handleNotificationRead(event: NotificationReadEvent) {
const { notificationId, userId } = event;
// Send real-time update to user
this.wsGateway.emitToUser(userId, 'notification:read', {
notificationId,
});
// Update unread count
const unreadCount = await this.notificationRepo.getUnreadCount(userId);
this.wsGateway.emitToUser(userId, 'notification:unreadCount', {
count: unreadCount,
});
}
@OnEvent(NOTIFICATION_EVENTS.ALL_READ)
async handleAllNotificationsRead(event: NotificationAllReadEvent) {
const { userId, notificationIds } = event;
// Send real-time update to user
this.wsGateway.emitToUser(userId, 'notification:allRead', {
notificationIds,
});
// Update unread count (should be 0)
this.wsGateway.emitToUser(userId, 'notification:unreadCount', { count: 0 });
}
/**
* Process email delivery for a notification
* Called by the mail queue processor
*/
async processEmailNotification(notificationId: string) {
const notification = await this.notificationRepo.findById(notificationId);
if (!notification) {
throw new Error(`Notification not found: ${notificationId}`);
}
// Check if already sent
if (notification.emailSentAt) {
this.logger.debug(
`Notification already sent via email: ${notificationId}`,
);
return;
}
// TODO: Load user and workspace data
// TODO: Render appropriate email template based on notification type
// TODO: Send email using mail service
// For now, just mark as sent
await this.notificationRepo.updateNotification(notificationId, {
emailSentAt: new Date(),
});
this.logger.log(`Email notification sent: ${notificationId}`);
}
}

View File

@ -0,0 +1,303 @@
import { Injectable, Logger } from '@nestjs/common';
import { NotificationPreferenceRepo } from '@docmost/db/repos/notification/notification-preference.repo';
import { UpdateNotificationPreferencesDto } from '../dto/update-preference.dto';
import { NotificationPreference } from '@docmost/db/types/entity.types';
import {
NotificationType,
NotificationPriority,
} from '../types/notification.types';
import {
addDays,
setHours,
setMinutes,
setSeconds,
getDay,
differenceInMilliseconds,
startOfDay,
addHours
} from 'date-fns';
interface NotificationDecision {
shouldNotify: boolean;
channels: ('email' | 'in_app')[];
delay?: number;
batchingEnabled: boolean;
}
@Injectable()
export class NotificationPreferenceService {
private readonly logger = new Logger(NotificationPreferenceService.name);
constructor(private readonly preferenceRepo: NotificationPreferenceRepo) {}
async getUserPreferences(
userId: string,
workspaceId: string,
): Promise<NotificationPreference> {
return await this.preferenceRepo.findOrCreate(userId, workspaceId);
}
async updateUserPreferences(
userId: string,
workspaceId: string,
updates: UpdateNotificationPreferencesDto,
): Promise<NotificationPreference> {
const existing = await this.getUserPreferences(userId, workspaceId);
// Merge notification settings if provided
let mergedSettings = existing.notificationSettings;
if (updates.notificationSettings) {
mergedSettings = {
...((existing.notificationSettings as Record<string, any>) || {}),
...(updates.notificationSettings || {}),
};
}
// Validate batch window
if (updates.batchWindowMinutes !== undefined) {
updates.batchWindowMinutes = Math.max(
5,
Math.min(60, updates.batchWindowMinutes),
);
}
const updated = await this.preferenceRepo.updatePreference(
userId,
workspaceId,
{
...updates,
notificationSettings: mergedSettings,
},
);
this.logger.log(`User ${userId} updated notification preferences`, {
userId,
workspaceId,
changes: updates,
});
return updated;
}
async shouldNotify(
recipientId: string,
type: NotificationType,
workspaceId: string,
): Promise<boolean> {
const preferences = await this.getUserPreferences(recipientId, workspaceId);
const decision = await this.makeNotificationDecision(
recipientId,
workspaceId,
type,
NotificationPriority.NORMAL,
);
return decision.shouldNotify;
}
async makeNotificationDecision(
userId: string,
workspaceId: string,
type: NotificationType,
priority: NotificationPriority = NotificationPriority.NORMAL,
): Promise<NotificationDecision> {
const preferences = await this.getUserPreferences(userId, workspaceId);
// Global check
if (!preferences.emailEnabled && !preferences.inAppEnabled) {
return {
shouldNotify: false,
channels: [],
batchingEnabled: false,
};
}
// Type-specific settings
const typeSettings = this.getTypeSettings(preferences, type);
const channels: ('email' | 'in_app')[] = [];
if (preferences.emailEnabled && typeSettings.email) channels.push('email');
if (preferences.inAppEnabled && typeSettings.in_app)
channels.push('in_app');
if (channels.length === 0) {
return {
shouldNotify: false,
channels: [],
batchingEnabled: false,
};
}
// Check quiet hours
const quietHoursDelay = this.calculateQuietHoursDelay(
preferences,
priority,
);
// Check weekend preferences
if (
!preferences.weekendNotifications &&
this.isWeekend(preferences.timezone)
) {
if (priority !== NotificationPriority.HIGH) {
const mondayDelay = this.getDelayUntilMonday(preferences.timezone);
return {
shouldNotify: true,
channels,
delay: mondayDelay,
batchingEnabled: true,
};
}
}
return {
shouldNotify: true,
channels,
delay: quietHoursDelay,
batchingEnabled:
typeSettings.batch && preferences.emailFrequency === 'smart',
};
}
private getTypeSettings(
preferences: NotificationPreference,
type: NotificationType,
): any {
const settings = preferences.notificationSettings as any;
return settings[type] || { email: true, in_app: true, batch: false };
}
private calculateQuietHoursDelay(
preferences: NotificationPreference,
priority: NotificationPriority,
): number | undefined {
if (
!preferences.quietHoursEnabled ||
priority === NotificationPriority.HIGH
) {
return undefined;
}
// TODO: Implement proper timezone conversion
const now = new Date();
const quietStart = this.parseTime(
preferences.quietHoursStart,
preferences.timezone,
);
const quietEnd = this.parseTime(
preferences.quietHoursEnd,
preferences.timezone,
);
if (this.isInQuietHours(now, quietStart, quietEnd)) {
return this.getDelayUntilEndOfQuietHours(now, quietEnd);
}
return undefined;
}
private parseTime(timeStr: string, timezone: string): Date {
const [hours, minutes, seconds] = timeStr.split(':').map(Number);
// TODO: Implement proper timezone conversion
const now = new Date();
return setSeconds(setMinutes(setHours(now, hours), minutes), seconds || 0);
}
private isInQuietHours(
now: Date,
start: Date,
end: Date,
): boolean {
const nowMinutes = now.getHours() * 60 + now.getMinutes();
const startMinutes = start.getHours() * 60 + start.getMinutes();
const endMinutes = end.getHours() * 60 + end.getMinutes();
if (startMinutes <= endMinutes) {
// Quiet hours don't cross midnight
return nowMinutes >= startMinutes && nowMinutes < endMinutes;
} else {
// Quiet hours cross midnight
return nowMinutes >= startMinutes || nowMinutes < endMinutes;
}
}
private getDelayUntilEndOfQuietHours(now: Date, end: Date): number {
let endTime = end;
// If end time is before current time, it means quiet hours end tomorrow
if (
end.getHours() < now.getHours() ||
(end.getHours() === now.getHours() && end.getMinutes() <= now.getMinutes())
) {
endTime = addDays(endTime, 1);
}
return differenceInMilliseconds(endTime, now);
}
private isWeekend(timezone: string): boolean {
// TODO: Implement proper timezone conversion
const now = new Date();
const dayOfWeek = getDay(now);
return dayOfWeek === 0 || dayOfWeek === 6; // 0 = Sunday, 6 = Saturday
}
private getDelayUntilMonday(timezone: string): number {
// TODO: Implement proper timezone conversion
const now = new Date();
const currentDay = getDay(now);
const daysUntilMonday = currentDay === 0 ? 1 : (8 - currentDay) % 7 || 7;
const nextMonday = addDays(now, daysUntilMonday);
const mondayMorning = addHours(startOfDay(nextMonday), 9); // 9 AM Monday
return differenceInMilliseconds(mondayMorning, now);
}
async getNotificationStats(
userId: string,
workspaceId: string,
): Promise<{
preferences: NotificationPreference;
stats: {
emailEnabled: boolean;
inAppEnabled: boolean;
quietHoursActive: boolean;
batchingEnabled: boolean;
typesDisabled: string[];
};
}> {
const preferences = await this.getUserPreferences(userId, workspaceId);
// TODO: Implement proper timezone conversion
const now = new Date();
const quietStart = this.parseTime(
preferences.quietHoursStart,
preferences.timezone,
);
const quietEnd = this.parseTime(
preferences.quietHoursEnd,
preferences.timezone,
);
const typesDisabled: string[] = [];
const settings = preferences.notificationSettings as any;
for (const [type, config] of Object.entries(settings)) {
const typeSettings = config as any;
if (!typeSettings.email && !typeSettings.in_app) {
typesDisabled.push(type);
}
}
return {
preferences,
stats: {
emailEnabled: preferences.emailEnabled,
inAppEnabled: preferences.inAppEnabled,
quietHoursActive:
preferences.quietHoursEnabled &&
this.isInQuietHours(now, quietStart, quietEnd),
batchingEnabled: preferences.emailFrequency !== 'instant',
typesDisabled,
},
};
}
}

View File

@ -0,0 +1,275 @@
import { Injectable, Logger } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { NotificationRepo } from '@docmost/db/repos/notification/notification.repo';
import { NotificationDeduplicationService } from './notification-deduplication.service';
import { NotificationPreferenceService } from './notification-preference.service';
import { CreateNotificationDto } from '../dto/create-notification.dto';
import { Notification } from '@docmost/db/types/entity.types';
import {
NotificationStatus,
NotificationPriority,
NotificationType,
} from '../types/notification.types';
import {
NotificationCreatedEvent,
NotificationReadEvent,
NotificationAllReadEvent,
NOTIFICATION_EVENTS,
} from '../events/notification.events';
@Injectable()
export class NotificationService {
private readonly logger = new Logger(NotificationService.name);
constructor(
private readonly notificationRepo: NotificationRepo,
private readonly eventEmitter: EventEmitter2,
private readonly deduplicationService: NotificationDeduplicationService,
private readonly preferenceService: NotificationPreferenceService,
) {}
async createNotification(
dto: CreateNotificationDto,
): Promise<Notification | null> {
try {
// Set default priority if not provided
const priority = dto.priority || NotificationPriority.NORMAL;
// Check user preferences first
const decision = await this.preferenceService.makeNotificationDecision(
dto.recipientId,
dto.workspaceId,
dto.type,
priority,
);
if (!decision.shouldNotify) {
this.logger.debug(
`Notification blocked by user preferences: ${dto.type} for ${dto.recipientId}`,
);
return null;
}
// Generate deduplication key
let deduplicationKey = dto.deduplicationKey;
if (
!deduplicationKey &&
this.deduplicationService.shouldDeduplicate(dto.type)
) {
deduplicationKey =
this.deduplicationService.generateDeduplicationKey(dto);
}
// Check if duplicate
if (
deduplicationKey &&
(await this.notificationRepo.existsByDeduplicationKey(deduplicationKey))
) {
this.logger.debug(
`Duplicate notification prevented: ${deduplicationKey}`,
);
return null;
}
// Generate group key if not provided
const groupKey = dto.groupKey || this.generateGroupKey(dto);
// Calculate expiration
const expiresAt = this.calculateExpiration(dto.type);
// Create notification
const notification = await this.notificationRepo.insertNotification({
workspaceId: dto.workspaceId,
recipientId: dto.recipientId,
actorId: dto.actorId || null,
type: dto.type,
status: NotificationStatus.UNREAD,
priority,
entityType: dto.entityType,
entityId: dto.entityId,
context: dto.context,
groupKey: groupKey,
groupCount: 1,
deduplicationKey: deduplicationKey,
batchId: null,
isBatched: false,
emailSentAt: null,
inAppDeliveredAt: null,
readAt: null,
expiresAt: expiresAt,
});
// Emit event for delivery processing
this.eventEmitter.emit(
NOTIFICATION_EVENTS.CREATED,
new NotificationCreatedEvent(notification, dto.workspaceId),
);
this.logger.debug(
`Notification created: ${notification.id} for user ${dto.recipientId}`,
);
return notification;
} catch (error) {
this.logger.error(
`Failed to create notification: ${error instanceof Error ? error.message : String(error)}`,
error instanceof Error ? error.stack : undefined,
);
throw error;
}
}
async getNotifications(
userId: string,
workspaceId: string,
options: {
status?: NotificationStatus;
limit?: number;
offset?: number;
} = {},
): Promise<Notification[]> {
return await this.notificationRepo.findByRecipient(userId, options);
}
async getGroupedNotifications(
userId: string,
workspaceId: string,
options: {
status?: NotificationStatus;
limit?: number;
offset?: number;
} = {},
): Promise<{
notifications: Notification[];
groups: Map<string, Notification[]>;
}> {
const notifications = await this.getNotifications(
userId,
workspaceId,
options,
);
// Group notifications by group_key
const groups = new Map<string, Notification[]>();
for (const notification of notifications) {
if (notification.groupKey) {
const group = groups.get(notification.groupKey) || [];
group.push(notification);
groups.set(notification.groupKey, group);
}
}
return { notifications, groups };
}
async markAsRead(notificationId: string, userId: string): Promise<void> {
const notification = await this.notificationRepo.findById(notificationId);
if (!notification || notification.recipientId !== userId) {
throw new Error('Notification not found or unauthorized');
}
if (notification.status === NotificationStatus.READ) {
return; // Already read
}
await this.notificationRepo.markAsRead(notificationId);
// Emit event for real-time update
this.eventEmitter.emit(
NOTIFICATION_EVENTS.READ,
new NotificationReadEvent(notificationId, userId),
);
this.logger.debug(`Notification marked as read: ${notificationId}`);
}
async markAllAsRead(userId: string): Promise<void> {
const unreadNotifications = await this.notificationRepo.findByRecipient(
userId,
{
status: NotificationStatus.UNREAD,
},
);
const ids = unreadNotifications.map((n) => n.id);
if (ids.length > 0) {
await this.notificationRepo.markManyAsRead(ids);
// Emit event for real-time update
this.eventEmitter.emit(
NOTIFICATION_EVENTS.ALL_READ,
new NotificationAllReadEvent(userId, ids),
);
this.logger.debug(
`Marked ${ids.length} notifications as read for user ${userId}`,
);
}
}
async getUnreadCount(userId: string): Promise<number> {
return await this.notificationRepo.getUnreadCount(userId);
}
async deleteExpiredNotifications(): Promise<number> {
const deletedCount = await this.notificationRepo.deleteExpired();
if (deletedCount > 0) {
this.logger.log(`Deleted ${deletedCount} expired notifications`);
}
return deletedCount;
}
private generateGroupKey(dto: CreateNotificationDto): string {
// Generate a group key based on notification type and entity
return `${dto.type}:${dto.entityType}:${dto.entityId}`;
}
private calculateExpiration(type: string): Date | null {
// Set expiration based on notification type
const expirationDays = {
[NotificationType.EXPORT_COMPLETED]: 7, // Expire after 7 days
[NotificationType.EXPORT_FAILED]: 3, // Expire after 3 days
[NotificationType.MENTION_IN_PAGE]: 30, // Expire after 30 days
[NotificationType.MENTION_IN_COMMENT]: 30,
[NotificationType.COMMENT_ON_PAGE]: 60, // Expire after 60 days
[NotificationType.REPLY_TO_COMMENT]: 60,
[NotificationType.COMMENT_IN_THREAD]: 60,
[NotificationType.COMMENT_RESOLVED]: 90, // Expire after 90 days
[NotificationType.PAGE_SHARED]: 90,
};
const days = expirationDays[type as NotificationType];
if (!days) {
return null; // No expiration
}
const expirationDate = new Date();
expirationDate.setDate(expirationDate.getDate() + days);
return expirationDate;
}
async createTestNotification(
userId: string,
workspaceId: string,
type: NotificationType,
): Promise<Notification | null> {
return await this.createNotification({
workspaceId,
recipientId: userId,
actorId: userId,
type,
entityType: 'test',
entityId: 'test-notification',
context: {
message: 'This is a test notification',
timestamp: new Date(),
},
priority: NotificationPriority.NORMAL,
});
}
}

View File

@ -0,0 +1,153 @@
import * as React from 'react';
import { Button, Section, Text, Link, Hr, Heading } from '@react-email/components';
import { MailBody } from '@docmost/transactional/partials/partials';
import { content, paragraph, button, h1 } from '@docmost/transactional/css/styles';
interface NotificationGroup {
type: string;
title: string;
summary: string;
count: number;
actors: string[];
url: string;
preview: string[];
}
interface BatchNotificationEmailProps {
recipientName: string;
groups: NotificationGroup[];
totalCount: number;
workspaceName: string;
settingsUrl: string;
viewAllUrl: string;
}
export const BatchNotificationEmail = ({
recipientName,
groups,
totalCount,
workspaceName,
settingsUrl,
viewAllUrl,
}: BatchNotificationEmailProps) => {
return (
<MailBody>
<Section style={content}>
<Text style={h1}>Hi {recipientName},</Text>
<Text style={paragraph}>
You have {totalCount} new notifications in {workspaceName}:
</Text>
{groups.map((group, index) => (
<Section key={index} style={notificationGroup}>
<Heading as="h3" style={groupTitle}>
{group.title}
</Heading>
<Text style={actorList}>
{formatActors(group.actors)} {group.summary}
</Text>
{group.preview.slice(0, 3).map((item, i) => (
<Text key={i} style={notificationItem}>
{item}
</Text>
))}
{group.count > 3 && (
<Text style={moreText}>
And {group.count - 3} more...
</Text>
)}
<Button href={group.url} style={viewButton}>
View All
</Button>
</Section>
))}
<Hr style={divider} />
<Button href={viewAllUrl} style={viewAllButton}>
View All Notifications
</Button>
<Text style={footerText}>
You received this because you have smart notifications enabled.{' '}
<Link href={settingsUrl} style={{ color: '#176ae5' }}>
Manage your preferences
</Link>
</Text>
</Section>
</MailBody>
);
};
function formatActors(actors: string[]): string {
if (actors.length === 0) return '';
if (actors.length === 1) return actors[0];
if (actors.length === 2) return `${actors[0]} and ${actors[1]}`;
return `${actors[0]}, ${actors[1]} and ${actors.length - 2} others`;
}
const notificationGroup: React.CSSProperties = {
backgroundColor: '#f9f9f9',
borderRadius: '4px',
padding: '16px',
marginBottom: '16px',
};
const groupTitle: React.CSSProperties = {
...paragraph,
fontSize: '16px',
fontWeight: 'bold',
marginBottom: '8px',
};
const actorList: React.CSSProperties = {
...paragraph,
marginBottom: '12px',
};
const notificationItem: React.CSSProperties = {
...paragraph,
marginLeft: '8px',
marginBottom: '4px',
color: '#666',
};
const moreText: React.CSSProperties = {
...paragraph,
fontStyle: 'italic',
color: '#999',
marginLeft: '8px',
marginBottom: '12px',
};
const viewButton: React.CSSProperties = {
...button,
width: 'auto',
padding: '8px 16px',
fontSize: '14px',
marginTop: '8px',
};
const viewAllButton: React.CSSProperties = {
...button,
width: 'auto',
padding: '12px 24px',
margin: '16px auto',
};
const divider: React.CSSProperties = {
borderColor: '#e0e0e0',
margin: '24px 0',
};
const footerText: React.CSSProperties = {
...paragraph,
fontSize: '12px',
color: '#666',
marginTop: '24px',
};

View File

@ -0,0 +1,72 @@
import * as React from 'react';
import { Button, Section, Text, Link } from '@react-email/components';
import { MailBody } from '../../../integrations/transactional/partials/partials';
import { content, paragraph, button, h1 } from '../../../integrations/transactional/css/styles';
interface CommentOnPageEmailProps {
recipientName: string;
actorName: string;
pageTitle: string;
commentExcerpt: string;
pageUrl: string;
workspaceName: string;
settingsUrl: string;
}
export const CommentOnPageEmail = ({
recipientName,
actorName,
pageTitle,
commentExcerpt,
pageUrl,
workspaceName,
settingsUrl,
}: CommentOnPageEmailProps) => {
return (
<MailBody>
<Section style={content}>
<Text style={h1}>Hi {recipientName},</Text>
<Text style={paragraph}>
{actorName} commented on "{pageTitle}":
</Text>
<Section style={commentSection}>
<Text style={commentText}>
{commentExcerpt}
</Text>
</Section>
<Button href={pageUrl} style={button}>
View Comment
</Button>
<Text style={footerText}>
This notification was sent from {workspaceName}.{' '}
<Link href={settingsUrl} style={{ color: '#176ae5' }}>
Manage your notification preferences
</Link>
</Text>
</Section>
</MailBody>
);
};
const commentSection: React.CSSProperties = {
backgroundColor: '#f5f5f5',
borderRadius: '4px',
padding: '16px',
margin: '16px 0',
};
const commentText: React.CSSProperties = {
...paragraph,
margin: 0,
};
const footerText: React.CSSProperties = {
...paragraph,
fontSize: '12px',
color: '#666',
marginTop: '24px',
};

View File

@ -0,0 +1,117 @@
import * as React from 'react';
import { Button, Section, Text, Link, Row, Column } from '@react-email/components';
import { MailBody } from '../../../integrations/transactional/partials/partials';
import { content, paragraph, button, h1 } from '../../../integrations/transactional/css/styles';
interface ExportCompletedEmailProps {
recipientName: string;
exportType: string;
entityName: string;
fileSize: string;
downloadUrl: string;
expiresAt: string;
workspaceName: string;
settingsUrl: string;
}
export const ExportCompletedEmail = ({
recipientName,
exportType,
entityName,
fileSize,
downloadUrl,
expiresAt,
workspaceName,
settingsUrl,
}: ExportCompletedEmailProps) => {
return (
<MailBody>
<Section style={content}>
<Text style={h1}>Export Complete!</Text>
<Text style={paragraph}>
Hi {recipientName},
</Text>
<Text style={paragraph}>
Your {exportType.toUpperCase()} export of "{entityName}" has been completed successfully.
</Text>
<Section style={exportDetails}>
<Row>
<Column style={detailLabel}>File Size:</Column>
<Column style={detailValue}>{fileSize}</Column>
</Row>
<Row>
<Column style={detailLabel}>Format:</Column>
<Column style={detailValue}>{exportType.toUpperCase()}</Column>
</Row>
<Row>
<Column style={detailLabel}>Expires:</Column>
<Column style={detailValue}>{expiresAt}</Column>
</Row>
</Section>
<Button href={downloadUrl} style={downloadButton}>
Download Export
</Button>
<Text style={warningText}>
This download link will expire on {expiresAt}.
Please download your file before then.
</Text>
<Text style={footerText}>
This notification was sent from {workspaceName}.{' '}
<Link href={settingsUrl} style={{ color: '#176ae5' }}>
Manage your notification preferences
</Link>
</Text>
</Section>
</MailBody>
);
};
const exportDetails: React.CSSProperties = {
backgroundColor: '#f5f5f5',
borderRadius: '4px',
padding: '16px',
margin: '16px 0',
};
const detailLabel: React.CSSProperties = {
...paragraph,
fontWeight: 'bold',
width: '120px',
paddingBottom: '8px',
};
const detailValue: React.CSSProperties = {
...paragraph,
paddingBottom: '8px',
};
const downloadButton: React.CSSProperties = {
...button,
backgroundColor: '#28a745',
width: 'auto',
padding: '12px 24px',
margin: '0 auto',
};
const warningText: React.CSSProperties = {
...paragraph,
backgroundColor: '#fff3cd',
border: '1px solid #ffeeba',
borderRadius: '4px',
color: '#856404',
padding: '12px',
marginTop: '16px',
};
const footerText: React.CSSProperties = {
...paragraph,
fontSize: '12px',
color: '#666',
marginTop: '24px',
};

View File

@ -0,0 +1,93 @@
import * as React from 'react';
import { Button, Section, Text, Link } from '@react-email/components';
import { MailBody } from '../../../integrations/transactional/partials/partials';
import { content, paragraph, button, h1 } from '../../../integrations/transactional/css/styles';
interface MentionInCommentEmailProps {
recipientName: string;
actorName: string;
pageTitle: string;
commentExcerpt: string;
mentionContext: string;
commentUrl: string;
workspaceName: string;
settingsUrl: string;
}
export const MentionInCommentEmail = ({
recipientName,
actorName,
pageTitle,
commentExcerpt,
mentionContext,
commentUrl,
workspaceName,
settingsUrl,
}: MentionInCommentEmailProps) => {
return (
<MailBody>
<Section style={content}>
<Text style={h1}>Hi {recipientName},</Text>
<Text style={paragraph}>
{actorName} mentioned you in a comment on "{pageTitle}":
</Text>
<Section style={commentSection}>
<Text style={commentAuthor}>{actorName} commented:</Text>
<Text style={commentText}>
{commentExcerpt}
</Text>
{mentionContext && (
<Text style={mentionHighlight}>
Context: ...{mentionContext}...
</Text>
)}
</Section>
<Button href={commentUrl} style={button}>
View Comment
</Button>
<Text style={footerText}>
This notification was sent from {workspaceName}.{' '}
<Link href={settingsUrl} style={{ color: '#176ae5' }}>
Manage your notification preferences
</Link>
</Text>
</Section>
</MailBody>
);
};
const commentSection: React.CSSProperties = {
backgroundColor: '#f5f5f5',
borderRadius: '4px',
padding: '16px',
margin: '16px 0',
};
const commentAuthor: React.CSSProperties = {
...paragraph,
fontWeight: 'bold',
marginBottom: '8px',
};
const commentText: React.CSSProperties = {
...paragraph,
margin: '0 0 8px 0',
};
const mentionHighlight: React.CSSProperties = {
...paragraph,
fontStyle: 'italic',
color: '#666',
margin: 0,
};
const footerText: React.CSSProperties = {
...paragraph,
fontSize: '12px',
color: '#666',
marginTop: '24px',
};

View File

@ -0,0 +1,73 @@
import * as React from 'react';
import { Button, Section, Text, Link } from '@react-email/components';
import { MailBody } from '../../../integrations/transactional/partials/partials';
import { content, paragraph, button, h1 } from '../../../integrations/transactional/css/styles';
interface MentionInPageEmailProps {
recipientName: string;
actorName: string;
pageTitle: string;
mentionContext: string;
pageUrl: string;
workspaceName: string;
settingsUrl: string;
}
export const MentionInPageEmail = ({
recipientName,
actorName,
pageTitle,
mentionContext,
pageUrl,
workspaceName,
settingsUrl,
}: MentionInPageEmailProps) => {
return (
<MailBody>
<Section style={content}>
<Text style={h1}>Hi {recipientName},</Text>
<Text style={paragraph}>
{actorName} mentioned you in the page "{pageTitle}":
</Text>
<Section style={mentionSection}>
<Text style={mentionText}>
...{mentionContext}...
</Text>
</Section>
<Button href={pageUrl} style={button}>
View Page
</Button>
<Text style={footerText}>
This notification was sent from {workspaceName}.{' '}
<Link href={settingsUrl} style={{ color: '#176ae5' }}>
Manage your notification preferences
</Link>
</Text>
</Section>
</MailBody>
);
};
const mentionSection: React.CSSProperties = {
backgroundColor: '#f5f5f5',
borderRadius: '4px',
padding: '16px',
margin: '16px 0',
};
const mentionText: React.CSSProperties = {
...paragraph,
fontStyle: 'italic',
margin: 0,
};
const footerText: React.CSSProperties = {
...paragraph,
fontSize: '12px',
color: '#666',
marginTop: '24px',
};

View File

@ -0,0 +1,220 @@
export enum NotificationType {
// Mentions
MENTION_IN_PAGE = 'mention_in_page',
MENTION_IN_COMMENT = 'mention_in_comment',
// Comments
COMMENT_ON_PAGE = 'comment_on_page',
REPLY_TO_COMMENT = 'reply_to_comment',
COMMENT_IN_THREAD = 'comment_in_thread',
COMMENT_RESOLVED = 'comment_resolved',
// Exports
EXPORT_COMPLETED = 'export_completed',
EXPORT_FAILED = 'export_failed',
// Pages
PAGE_SHARED = 'page_shared',
PAGE_UPDATED = 'page_updated',
// Tasks (Future)
TASK_ASSIGNED = 'task_assigned',
TASK_DUE = 'task_due',
TASK_COMPLETED = 'task_completed',
// System
SYSTEM_UPDATE = 'system_update',
SYSTEM_ANNOUNCEMENT = 'system_announcement',
}
export enum NotificationStatus {
UNREAD = 'unread',
READ = 'read',
ARCHIVED = 'archived',
}
export enum NotificationPriority {
HIGH = 'high',
NORMAL = 'normal',
LOW = 'low',
}
export enum EmailFrequency {
INSTANT = 'instant',
SMART = 'smart',
DIGEST_DAILY = 'digest_daily',
DIGEST_WEEKLY = 'digest_weekly',
}
export interface NotificationTypeSettings {
email: boolean;
in_app: boolean;
batch: boolean;
}
export enum BatchType {
SIMILAR_ACTIVITY = 'similar_activity',
DIGEST = 'digest',
GROUPED_MENTIONS = 'grouped_mentions',
}
export enum AggregationType {
COMMENTS_ON_PAGE = 'comments_on_page',
MENTIONS_IN_PAGE = 'mentions_in_page',
MENTIONS_IN_COMMENTS = 'mentions_in_comments',
THREAD_ACTIVITY = 'thread_activity',
}
export interface AggregationSummaryData {
total_count: number;
actor_count: number;
first_actor_id: string;
recent_actors: string[];
time_span: {
start: Date;
end: Date;
};
[key: string]: any; // Allow additional type-specific data
}
export interface AggregatedNotificationMessage {
id: string;
title: string;
message: string;
actors: Array<{
id: string;
name: string;
avatarUrl?: string;
}>;
totalCount: number;
entityId: string;
entityType: string;
createdAt: Date;
updatedAt: Date;
}
// Context interfaces for different notification types
export interface MentionInPageContext {
pageId: string;
pageTitle: string;
mentionContext: string;
mentionBy: string;
}
export interface MentionInCommentContext {
pageId: string;
pageTitle: string;
commentId: string;
commentText: string;
mentionBy: string;
}
export interface CommentOnPageContext {
pageId: string;
pageTitle: string;
commentId: string;
commentText: string;
commentBy: string;
}
export interface ReplyToCommentContext {
pageId: string;
pageTitle: string;
commentId: string;
parentCommentId: string;
commentText: string;
replyBy: string;
}
export interface CommentInThreadContext {
pageId: string;
pageTitle: string;
threadId: string;
commentId: string;
commentText: string;
commentBy: string;
}
export interface CommentResolvedContext {
pageId: string;
pageTitle: string;
threadId: string;
resolvedBy: string;
}
export interface ExportCompletedContext {
exportId: string;
exportType: string;
downloadUrl: string;
expiresAt: string;
}
export interface ExportFailedContext {
exportId: string;
exportType: string;
errorMessage: string;
}
export interface PageSharedContext {
pageId: string;
pageTitle: string;
sharedBy: string;
sharedWith: string[];
permissions: string[];
}
export interface PageUpdatedContext {
pageId: string;
pageTitle: string;
updatedBy: string;
changeType: 'content' | 'title' | 'permissions';
}
export interface TaskAssignedContext {
taskId: string;
taskTitle: string;
assignedBy: string;
dueDate?: string;
}
export interface TaskDueContext {
taskId: string;
taskTitle: string;
dueDate: string;
daysOverdue?: number;
}
export interface TaskCompletedContext {
taskId: string;
taskTitle: string;
completedBy: string;
}
export interface SystemUpdateContext {
updateType: string;
version?: string;
description: string;
}
export interface SystemAnnouncementContext {
message: string;
link?: string;
}
export type NotificationContext =
| MentionInPageContext
| MentionInCommentContext
| CommentOnPageContext
| ReplyToCommentContext
| CommentInThreadContext
| CommentResolvedContext
| ExportCompletedContext
| ExportFailedContext
| PageSharedContext
| PageUpdatedContext
| TaskAssignedContext
| TaskDueContext
| TaskCompletedContext
| SystemUpdateContext
| SystemAnnouncementContext
| Record<string, unknown>;

View File

@ -25,6 +25,10 @@ import { MigrationService } from '@docmost/db/services/migration.service';
import { UserTokenRepo } from './repos/user-token/user-token.repo';
import { BacklinkRepo } from '@docmost/db/repos/backlink/backlink.repo';
import { ShareRepo } from '@docmost/db/repos/share/share.repo';
import { NotificationRepo } from './repos/notification/notification.repo';
import { NotificationPreferenceRepo } from './repos/notification/notification-preference.repo';
import { NotificationBatchRepo } from './repos/notification/notification-batch.repo';
import { NotificationAggregationRepo } from './repos/notification/notification-aggregation.repo';
// https://github.com/brianc/node-postgres/issues/811
types.setTypeParser(types.builtins.INT8, (val) => Number(val));
@ -75,7 +79,11 @@ types.setTypeParser(types.builtins.INT8, (val) => Number(val));
AttachmentRepo,
UserTokenRepo,
BacklinkRepo,
ShareRepo
ShareRepo,
NotificationRepo,
NotificationPreferenceRepo,
NotificationBatchRepo,
NotificationAggregationRepo,
],
exports: [
WorkspaceRepo,
@ -90,7 +98,11 @@ types.setTypeParser(types.builtins.INT8, (val) => Number(val));
AttachmentRepo,
UserTokenRepo,
BacklinkRepo,
ShareRepo
ShareRepo,
NotificationRepo,
NotificationPreferenceRepo,
NotificationBatchRepo,
NotificationAggregationRepo,
],
})
export class DatabaseModule

View File

@ -0,0 +1,233 @@
import { Kysely, sql } from 'kysely';
export async function up(db: Kysely<any>): Promise<void> {
// Create notifications table
await db.schema
.createTable('notifications')
.addColumn('id', 'uuid', (col) =>
col.primaryKey().defaultTo(sql`gen_uuid_v7()`),
)
.addColumn('workspace_id', 'uuid', (col) =>
col.notNull().references('workspaces.id').onDelete('cascade'),
)
.addColumn('recipient_id', 'uuid', (col) =>
col.notNull().references('users.id').onDelete('cascade'),
)
.addColumn('actor_id', 'uuid', (col) =>
col.references('users.id').onDelete('set null'),
)
.addColumn('type', 'varchar(50)', (col) => col.notNull())
.addColumn('status', 'varchar(20)', (col) =>
col.notNull().defaultTo('unread'),
)
.addColumn('priority', 'varchar(20)', (col) =>
col.notNull().defaultTo('normal'),
)
.addColumn('entity_type', 'varchar(50)', (col) => col.notNull())
.addColumn('entity_id', 'uuid', (col) => col.notNull())
.addColumn('context', 'jsonb', (col) => col.notNull().defaultTo('{}'))
.addColumn('group_key', 'varchar(255)')
.addColumn('group_count', 'integer', (col) => col.defaultTo(1))
.addColumn('deduplication_key', 'varchar(255)')
.addColumn('batch_id', 'uuid')
.addColumn('is_batched', 'boolean', (col) => col.defaultTo(false))
.addColumn('email_sent_at', 'timestamp')
.addColumn('in_app_delivered_at', 'timestamp')
.addColumn('read_at', 'timestamp')
.addColumn('created_at', 'timestamp', (col) =>
col.notNull().defaultTo(sql`CURRENT_TIMESTAMP`),
)
.addColumn('updated_at', 'timestamp', (col) =>
col.notNull().defaultTo(sql`CURRENT_TIMESTAMP`),
)
.addColumn('expires_at', 'timestamp')
.execute();
// Create indexes for notifications
await db.schema
.createIndex('idx_notifications_recipient_status')
.on('notifications')
.columns(['recipient_id', 'status'])
.execute();
await db.schema
.createIndex('idx_notifications_group_key')
.on('notifications')
.columns(['group_key', 'created_at'])
.execute();
await db.schema
.createIndex('idx_notifications_batch_pending')
.on('notifications')
.columns(['batch_id', 'is_batched'])
.where('is_batched', '=', false)
.execute();
await db.schema
.createIndex('idx_notifications_expires')
.on('notifications')
.column('expires_at')
.where('expires_at', 'is not', null)
.execute();
await db.schema
.createIndex('idx_notifications_deduplication')
.unique()
.on('notifications')
.column('deduplication_key')
.where('deduplication_key', 'is not', null)
.execute();
// Create notification_preferences table
await db.schema
.createTable('notification_preferences')
.addColumn('id', 'uuid', (col) =>
col.primaryKey().defaultTo(sql`gen_uuid_v7()`),
)
.addColumn('user_id', 'uuid', (col) =>
col.notNull().references('users.id').onDelete('cascade'),
)
.addColumn('workspace_id', 'uuid', (col) =>
col.notNull().references('workspaces.id').onDelete('cascade'),
)
.addColumn('email_enabled', 'boolean', (col) =>
col.notNull().defaultTo(true),
)
.addColumn('in_app_enabled', 'boolean', (col) =>
col.notNull().defaultTo(true),
)
.addColumn('notification_settings', 'jsonb', (col) =>
col.notNull().defaultTo(sql`'{
"mention_in_page": {"email": true, "in_app": true, "batch": false},
"mention_in_comment": {"email": true, "in_app": true, "batch": false},
"comment_on_page": {"email": true, "in_app": true, "batch": true},
"reply_to_comment": {"email": true, "in_app": true, "batch": false},
"comment_in_thread": {"email": true, "in_app": true, "batch": true},
"comment_resolved": {"email": true, "in_app": true, "batch": true},
"export_completed": {"email": true, "in_app": true, "batch": false},
"page_shared": {"email": true, "in_app": true, "batch": true},
"task_assigned": {"email": true, "in_app": true, "batch": false}
}'::jsonb`),
)
.addColumn('batch_window_minutes', 'integer', (col) => col.defaultTo(15))
.addColumn('max_batch_size', 'integer', (col) => col.defaultTo(20))
.addColumn('batch_types', sql`text[]`, (col) =>
col.defaultTo(
sql`ARRAY['comment_on_page', 'comment_in_thread', 'comment_resolved']`,
),
)
.addColumn('email_frequency', 'varchar(20)', (col) =>
col.notNull().defaultTo('smart'),
)
.addColumn('digest_time', 'time', (col) => col.defaultTo('09:00:00'))
.addColumn('quiet_hours_enabled', 'boolean', (col) => col.defaultTo(false))
.addColumn('quiet_hours_start', 'time', (col) => col.defaultTo('18:00:00'))
.addColumn('quiet_hours_end', 'time', (col) => col.defaultTo('09:00:00'))
.addColumn('timezone', 'varchar(50)', (col) => col.defaultTo('UTC'))
.addColumn('weekend_notifications', 'boolean', (col) => col.defaultTo(true))
.addColumn('created_at', 'timestamp', (col) =>
col.notNull().defaultTo(sql`CURRENT_TIMESTAMP`),
)
.addColumn('updated_at', 'timestamp', (col) =>
col.notNull().defaultTo(sql`CURRENT_TIMESTAMP`),
)
.execute();
// Create unique index for user_workspace
await db.schema
.createIndex('idx_notification_preferences_user_workspace')
.unique()
.on('notification_preferences')
.columns(['user_id', 'workspace_id'])
.execute();
// Create notification_batches table
await db.schema
.createTable('notification_batches')
.addColumn('id', 'uuid', (col) =>
col.primaryKey().defaultTo(sql`gen_uuid_v7()`),
)
.addColumn('recipient_id', 'uuid', (col) =>
col.notNull().references('users.id').onDelete('cascade'),
)
.addColumn('workspace_id', 'uuid', (col) =>
col.notNull().references('workspaces.id').onDelete('cascade'),
)
.addColumn('batch_type', 'varchar(50)', (col) => col.notNull())
.addColumn('batch_key', 'varchar(255)', (col) => col.notNull())
.addColumn('notification_count', 'integer', (col) =>
col.notNull().defaultTo(0),
)
.addColumn('first_notification_id', 'uuid', (col) =>
col.references('notifications.id').onDelete('set null'),
)
.addColumn('scheduled_for', 'timestamp', (col) => col.notNull())
.addColumn('sent_at', 'timestamp')
.addColumn('created_at', 'timestamp', (col) =>
col.notNull().defaultTo(sql`CURRENT_TIMESTAMP`),
)
.execute();
// Create indexes for notification_batches
await db.schema
.createIndex('idx_notification_batches_scheduled_pending')
.on('notification_batches')
.columns(['scheduled_for', 'sent_at'])
.where('sent_at', 'is', null)
.execute();
await db.schema
.createIndex('idx_notification_batches_batch_key')
.on('notification_batches')
.columns(['batch_key', 'recipient_id'])
.execute();
// Create notification_aggregations table
await db.schema
.createTable('notification_aggregations')
.addColumn('id', 'uuid', (col) =>
col.primaryKey().defaultTo(sql`gen_uuid_v7()`),
)
.addColumn('aggregation_key', 'varchar(255)', (col) => col.notNull())
.addColumn('recipient_id', 'uuid', (col) =>
col.notNull().references('users.id').onDelete('cascade'),
)
.addColumn('aggregation_type', 'varchar(50)', (col) => col.notNull())
.addColumn('entity_type', 'varchar(50)', (col) => col.notNull())
.addColumn('entity_id', 'uuid', (col) => col.notNull())
.addColumn('actor_ids', sql`uuid[]`, (col) =>
col.notNull().defaultTo(sql`'{}'`),
)
.addColumn('notification_ids', sql`uuid[]`, (col) =>
col.notNull().defaultTo(sql`'{}'`),
)
.addColumn('summary_data', 'jsonb', (col) => col.notNull().defaultTo('{}'))
.addColumn('created_at', 'timestamp', (col) =>
col.notNull().defaultTo(sql`CURRENT_TIMESTAMP`),
)
.addColumn('updated_at', 'timestamp', (col) =>
col.notNull().defaultTo(sql`CURRENT_TIMESTAMP`),
)
.execute();
// Create indexes for notification_aggregations
await db.schema
.createIndex('idx_notification_aggregations_key')
.unique()
.on('notification_aggregations')
.column('aggregation_key')
.execute();
await db.schema
.createIndex('idx_notification_aggregations_recipient_updated')
.on('notification_aggregations')
.columns(['recipient_id', 'updated_at'])
.execute();
}
export async function down(db: Kysely<any>): Promise<void> {
await db.schema.dropTable('notification_aggregations').ifExists().execute();
await db.schema.dropTable('notification_batches').ifExists().execute();
await db.schema.dropTable('notification_preferences').ifExists().execute();
await db.schema.dropTable('notifications').ifExists().execute();
}

View File

@ -0,0 +1,125 @@
import { Injectable } from '@nestjs/common';
import { InjectKysely } from 'nestjs-kysely';
import { KyselyDB } from '@docmost/db/types/kysely.types';
import {
NotificationAggregation,
InsertableNotificationAggregation,
UpdatableNotificationAggregation
} from '@docmost/db/types/entity.types';
@Injectable()
export class NotificationAggregationRepo {
constructor(@InjectKysely() private readonly db: KyselyDB) {}
async insertAggregation(aggregation: InsertableNotificationAggregation): Promise<NotificationAggregation> {
return await this.db
.insertInto('notificationAggregations')
.values(aggregation)
.returningAll()
.executeTakeFirstOrThrow();
}
async findById(id: string): Promise<NotificationAggregation | undefined> {
return await this.db
.selectFrom('notificationAggregations')
.selectAll()
.where('id', '=', id)
.executeTakeFirst();
}
async findByKey(aggregationKey: string): Promise<NotificationAggregation | undefined> {
return await this.db
.selectFrom('notificationAggregations')
.selectAll()
.where('aggregationKey', '=', aggregationKey)
.executeTakeFirst();
}
async updateAggregation(
aggregationKey: string,
update: UpdatableNotificationAggregation
): Promise<NotificationAggregation> {
return await this.db
.updateTable('notificationAggregations')
.set({
...update,
updatedAt: new Date(),
})
.where('aggregationKey', '=', aggregationKey)
.returningAll()
.executeTakeFirstOrThrow();
}
async addNotificationToAggregation(
aggregationKey: string,
notificationId: string,
actorId?: string
): Promise<void> {
const aggregation = await this.findByKey(aggregationKey);
if (!aggregation) {
throw new Error(`Aggregation not found: ${aggregationKey}`);
}
const updates: UpdatableNotificationAggregation = {
notificationIds: [...aggregation.notificationIds, notificationId],
updatedAt: new Date(),
};
if (actorId && !aggregation.actorIds.includes(actorId)) {
updates.actorIds = [...aggregation.actorIds, actorId];
}
// Update summary data
updates.summaryData = {
...(aggregation.summaryData as Record<string, any> || {}),
totalCount: aggregation.notificationIds.length + 1,
actorCount: updates.actorIds?.length || aggregation.actorIds.length,
timeSpan: {
...((aggregation.summaryData as any).timeSpan || {}),
end: new Date(),
},
};
await this.updateAggregation(aggregationKey, updates);
}
async findRecentByRecipient(
recipientId: string,
limit = 10
): Promise<NotificationAggregation[]> {
return await this.db
.selectFrom('notificationAggregations')
.selectAll()
.where('recipientId', '=', recipientId)
.orderBy('updatedAt', 'desc')
.limit(limit)
.execute();
}
async deleteOldAggregations(olderThan: Date): Promise<number> {
const result = await this.db
.deleteFrom('notificationAggregations')
.where('updatedAt', '<', olderThan)
.execute();
return Number(result[0].numDeletedRows);
}
async getAggregationsByEntity(
entityType: string,
entityId: string,
recipientId?: string
): Promise<NotificationAggregation[]> {
let query = this.db
.selectFrom('notificationAggregations')
.selectAll()
.where('entityType', '=', entityType)
.where('entityId', '=', entityId);
if (recipientId) {
query = query.where('recipientId', '=', recipientId);
}
return await query.orderBy('updatedAt', 'desc').execute();
}
}

View File

@ -0,0 +1,110 @@
import { Injectable } from '@nestjs/common';
import { InjectKysely } from 'nestjs-kysely';
import { KyselyDB } from '@docmost/db/types/kysely.types';
import {
NotificationBatch,
InsertableNotificationBatch,
UpdatableNotificationBatch
} from '@docmost/db/types/entity.types';
@Injectable()
export class NotificationBatchRepo {
constructor(@InjectKysely() private readonly db: KyselyDB) {}
async insertBatch(batch: InsertableNotificationBatch): Promise<NotificationBatch> {
return await this.db
.insertInto('notificationBatches')
.values(batch)
.returningAll()
.executeTakeFirstOrThrow();
}
async findById(id: string): Promise<NotificationBatch | undefined> {
return await this.db
.selectFrom('notificationBatches')
.selectAll()
.where('id', '=', id)
.executeTakeFirst();
}
async findByBatchKey(
batchKey: string,
recipientId: string,
notSentOnly = true
): Promise<NotificationBatch | undefined> {
let query = this.db
.selectFrom('notificationBatches')
.selectAll()
.where('batchKey', '=', batchKey)
.where('recipientId', '=', recipientId);
if (notSentOnly) {
query = query.where('sentAt', 'is', null);
}
return await query.executeTakeFirst();
}
async getPendingBatches(limit = 100): Promise<NotificationBatch[]> {
return await this.db
.selectFrom('notificationBatches')
.selectAll()
.where('sentAt', 'is', null)
.where('scheduledFor', '<=', new Date())
.orderBy('scheduledFor', 'asc')
.limit(limit)
.execute();
}
async updateBatch(id: string, update: UpdatableNotificationBatch): Promise<NotificationBatch> {
return await this.db
.updateTable('notificationBatches')
.set(update)
.where('id', '=', id)
.returningAll()
.executeTakeFirstOrThrow();
}
async markAsSent(id: string): Promise<void> {
await this.db
.updateTable('notificationBatches')
.set({
sentAt: new Date(),
})
.where('id', '=', id)
.execute();
}
async incrementNotificationCount(id: string): Promise<void> {
await this.db
.updateTable('notificationBatches')
.set((eb) => ({
notificationCount: eb('notificationCount', '+', 1),
}))
.where('id', '=', id)
.execute();
}
async deleteOldBatches(olderThan: Date): Promise<number> {
const result = await this.db
.deleteFrom('notificationBatches')
.where('sentAt', '<', olderThan)
.execute();
return Number(result[0].numDeletedRows);
}
async getScheduledBatchesForUser(
recipientId: string,
workspaceId: string
): Promise<NotificationBatch[]> {
return await this.db
.selectFrom('notificationBatches')
.selectAll()
.where('recipientId', '=', recipientId)
.where('workspaceId', '=', workspaceId)
.where('sentAt', 'is', null)
.orderBy('scheduledFor', 'asc')
.execute();
}
}

View File

@ -0,0 +1,134 @@
import { Injectable } from '@nestjs/common';
import { InjectKysely } from 'nestjs-kysely';
import { KyselyDB } from '@docmost/db/types/kysely.types';
import { sql } from 'kysely';
import {
NotificationPreference,
InsertableNotificationPreference,
UpdatableNotificationPreference
} from '@docmost/db/types/entity.types';
export const DEFAULT_NOTIFICATION_SETTINGS = {
mention_in_page: { email: true, in_app: true, batch: false },
mention_in_comment: { email: true, in_app: true, batch: false },
comment_on_page: { email: true, in_app: true, batch: true },
reply_to_comment: { email: true, in_app: true, batch: false },
comment_in_thread: { email: true, in_app: true, batch: true },
comment_resolved: { email: true, in_app: true, batch: true },
export_completed: { email: true, in_app: true, batch: false },
export_failed: { email: true, in_app: true, batch: false },
page_shared: { email: true, in_app: true, batch: true },
page_updated: { email: false, in_app: true, batch: true },
task_assigned: { email: true, in_app: true, batch: false },
task_due_soon: { email: true, in_app: true, batch: false },
};
@Injectable()
export class NotificationPreferenceRepo {
constructor(@InjectKysely() private readonly db: KyselyDB) {}
async insertPreference(preference: InsertableNotificationPreference): Promise<NotificationPreference> {
return await this.db
.insertInto('notificationPreferences')
.values(preference)
.returningAll()
.executeTakeFirstOrThrow();
}
async findByUserAndWorkspace(
userId: string,
workspaceId: string
): Promise<NotificationPreference | undefined> {
return await this.db
.selectFrom('notificationPreferences')
.selectAll()
.where('userId', '=', userId)
.where('workspaceId', '=', workspaceId)
.executeTakeFirst();
}
async findOrCreate(
userId: string,
workspaceId: string
): Promise<NotificationPreference> {
const existing = await this.findByUserAndWorkspace(userId, workspaceId);
if (existing) {
return existing;
}
return await this.insertPreference({
userId: userId,
workspaceId: workspaceId,
emailEnabled: true,
inAppEnabled: true,
notificationSettings: DEFAULT_NOTIFICATION_SETTINGS,
batchWindowMinutes: 15,
maxBatchSize: 20,
batchTypes: ['comment_on_page', 'comment_in_thread', 'comment_resolved'],
emailFrequency: 'smart',
digestTime: '09:00:00',
quietHoursEnabled: false,
quietHoursStart: '18:00:00',
quietHoursEnd: '09:00:00',
timezone: 'UTC',
weekendNotifications: true,
});
}
async updatePreference(
userId: string,
workspaceId: string,
update: UpdatableNotificationPreference
): Promise<NotificationPreference> {
return await this.db
.updateTable('notificationPreferences')
.set({
...update,
updatedAt: new Date(),
})
.where('userId', '=', userId)
.where('workspaceId', '=', workspaceId)
.returningAll()
.executeTakeFirstOrThrow();
}
async findUsersWithBatchingEnabled(
workspaceId: string,
notificationType: string
): Promise<NotificationPreference[]> {
return await this.db
.selectFrom('notificationPreferences')
.selectAll()
.where('workspaceId', '=', workspaceId)
.where('emailEnabled', '=', true)
.where('emailFrequency', '!=', 'instant')
.where(
sql<boolean>`notification_settings::jsonb->'${sql.raw(notificationType)}'->>'batch' = 'true'`
)
.execute();
}
async findUsersForDigest(
workspaceId: string,
currentTime: string,
timezone: string
): Promise<NotificationPreference[]> {
return await this.db
.selectFrom('notificationPreferences')
.selectAll()
.where('workspaceId', '=', workspaceId)
.where('emailFrequency', '=', 'daily')
.where('digestTime', '=', currentTime)
.where('timezone', '=', timezone)
.where('emailEnabled', '=', true)
.execute();
}
async deletePreference(userId: string, workspaceId: string): Promise<void> {
await this.db
.deleteFrom('notificationPreferences')
.where('userId', '=', userId)
.where('workspaceId', '=', workspaceId)
.execute();
}
}

View File

@ -0,0 +1,175 @@
import { Injectable } from '@nestjs/common';
import { InjectKysely } from 'nestjs-kysely';
import { KyselyDB } from '@docmost/db/types/kysely.types';
import { Notification, InsertableNotification, UpdatableNotification } from '@docmost/db/types/entity.types';
import { PaginationOptions } from '@docmost/db/pagination/pagination-options';
@Injectable()
export class NotificationRepo {
constructor(@InjectKysely() private readonly db: KyselyDB) {}
async insertNotification(notification: InsertableNotification): Promise<Notification> {
return await this.db
.insertInto('notifications')
.values(notification)
.returningAll()
.executeTakeFirstOrThrow();
}
async findById(id: string): Promise<Notification | undefined> {
return await this.db
.selectFrom('notifications')
.selectAll()
.where('id', '=', id)
.executeTakeFirst();
}
async findByRecipient(
recipientId: string,
options: {
status?: string;
limit?: number;
offset?: number;
} = {}
): Promise<Notification[]> {
let query = this.db
.selectFrom('notifications')
.selectAll()
.where('recipientId', '=', recipientId)
.orderBy('createdAt', 'desc');
if (options.status) {
query = query.where('status', '=', options.status);
}
if (options.limit) {
query = query.limit(options.limit);
}
if (options.offset) {
query = query.offset(options.offset);
}
return await query.execute();
}
async findByBatchId(batchId: string): Promise<Notification[]> {
return await this.db
.selectFrom('notifications')
.selectAll()
.where('batchId', '=', batchId)
.orderBy('createdAt', 'desc')
.execute();
}
async findRecent(params: {
recipientId: string;
type: string;
entityId: string;
since: Date;
}): Promise<Notification[]> {
return await this.db
.selectFrom('notifications')
.selectAll()
.where('recipientId', '=', params.recipientId)
.where('type', '=', params.type)
.where('entityId', '=', params.entityId)
.where('createdAt', '>', params.since)
.orderBy('createdAt', 'desc')
.execute();
}
async existsByDeduplicationKey(key: string): Promise<boolean> {
const result = await this.db
.selectFrom('notifications')
.select(['id'])
.where('deduplicationKey', '=', key)
.executeTakeFirst();
return !!result;
}
async updateNotification(id: string, update: UpdatableNotification): Promise<Notification> {
return await this.db
.updateTable('notifications')
.set(update)
.where('id', '=', id)
.returningAll()
.executeTakeFirstOrThrow();
}
async markAsRead(id: string): Promise<void> {
await this.db
.updateTable('notifications')
.set({
status: 'read',
readAt: new Date(),
updatedAt: new Date(),
})
.where('id', '=', id)
.execute();
}
async markManyAsRead(ids: string[]): Promise<void> {
if (ids.length === 0) return;
await this.db
.updateTable('notifications')
.set({
status: 'read',
readAt: new Date(),
updatedAt: new Date(),
})
.where('id', 'in', ids)
.execute();
}
async getUnreadCount(recipientId: string): Promise<number> {
const result = await this.db
.selectFrom('notifications')
.select((eb) => eb.fn.countAll<number>().as('count'))
.where('recipientId', '=', recipientId)
.where('status', '=', 'unread')
.executeTakeFirst();
return result?.count || 0;
}
async deleteExpired(): Promise<number> {
const result = await this.db
.deleteFrom('notifications')
.where('expiresAt', '<', new Date())
.execute();
return Number(result[0].numDeletedRows);
}
async getNotificationsByGroupKey(
groupKey: string,
recipientId: string,
since: Date
): Promise<Notification[]> {
return await this.db
.selectFrom('notifications')
.selectAll()
.where('groupKey', '=', groupKey)
.where('recipientId', '=', recipientId)
.where('createdAt', '>', since)
.orderBy('createdAt', 'desc')
.execute();
}
async updateBatchId(notificationIds: string[], batchId: string): Promise<void> {
if (notificationIds.length === 0) return;
await this.db
.updateTable('notifications')
.set({
batchId: batchId,
isBatched: true,
updatedAt: new Date(),
})
.where('id', 'in', notificationIds)
.execute();
}
}

View File

@ -62,6 +62,7 @@ export interface AuthProviders {
deletedAt: Timestamp | null;
id: Generated<string>;
isEnabled: Generated<boolean>;
isGroupSyncEnabled: Generated<boolean>;
name: string;
oidcClientId: string | null;
oidcClientSecret: string | null;
@ -122,6 +123,7 @@ export interface Comments {
pageId: string;
parentCommentId: string | null;
resolvedAt: Timestamp | null;
resolvedById: string | null;
selection: string | null;
type: string | null;
workspaceId: string;
@ -165,6 +167,78 @@ export interface GroupUsers {
userId: string;
}
export interface NotificationAggregations {
actorIds: Generated<string[]>;
aggregationKey: string;
aggregationType: string;
createdAt: Generated<Timestamp>;
entityId: string;
entityType: string;
id: Generated<string>;
notificationIds: Generated<string[]>;
recipientId: string;
summaryData: Generated<Json>;
updatedAt: Generated<Timestamp>;
}
export interface NotificationBatches {
batchKey: string;
batchType: string;
createdAt: Generated<Timestamp>;
firstNotificationId: string | null;
id: Generated<string>;
notificationCount: Generated<number>;
recipientId: string;
scheduledFor: Timestamp;
sentAt: Timestamp | null;
workspaceId: string;
}
export interface NotificationPreferences {
batchTypes: Generated<string[] | null>;
batchWindowMinutes: Generated<number | null>;
createdAt: Generated<Timestamp>;
digestTime: Generated<string | null>;
emailEnabled: Generated<boolean>;
emailFrequency: Generated<string>;
id: Generated<string>;
inAppEnabled: Generated<boolean>;
maxBatchSize: Generated<number | null>;
notificationSettings: Generated<Json>;
quietHoursEnabled: Generated<boolean | null>;
quietHoursEnd: Generated<string | null>;
quietHoursStart: Generated<string | null>;
timezone: Generated<string | null>;
updatedAt: Generated<Timestamp>;
userId: string;
weekendNotifications: Generated<boolean | null>;
workspaceId: string;
}
export interface Notifications {
actorId: string | null;
batchId: string | null;
context: Generated<Json>;
createdAt: Generated<Timestamp>;
deduplicationKey: string | null;
emailSentAt: Timestamp | null;
entityId: string;
entityType: string;
expiresAt: Timestamp | null;
groupCount: Generated<number | null>;
groupKey: string | null;
id: Generated<string>;
inAppDeliveredAt: Timestamp | null;
isBatched: Generated<boolean | null>;
priority: Generated<string>;
readAt: Timestamp | null;
recipientId: string;
status: Generated<string>;
type: string;
updatedAt: Generated<Timestamp>;
workspaceId: string;
}
export interface PageHistory {
content: Json | null;
coverPhoto: string | null;
@ -324,6 +398,10 @@ export interface DB {
fileTasks: FileTasks;
groups: Groups;
groupUsers: GroupUsers;
notificationAggregations: NotificationAggregations;
notificationBatches: NotificationBatches;
notificationPreferences: NotificationPreferences;
notifications: Notifications;
pageHistory: PageHistory;
pages: Pages;
shares: Shares;

View File

@ -18,6 +18,10 @@ import {
AuthAccounts,
Shares,
FileTasks,
Notifications,
NotificationPreferences,
NotificationBatches,
NotificationAggregations,
} from './db';
// Workspace
@ -113,3 +117,23 @@ export type UpdatableShare = Updateable<Omit<Shares, 'id'>>;
export type FileTask = Selectable<FileTasks>;
export type InsertableFileTask = Insertable<FileTasks>;
export type UpdatableFileTask = Updateable<Omit<FileTasks, 'id'>>;
// Notification
export type Notification = Selectable<Notifications>;
export type InsertableNotification = Insertable<Notifications>;
export type UpdatableNotification = Updateable<Omit<Notifications, 'id'>>;
// NotificationPreference
export type NotificationPreference = Selectable<NotificationPreferences>;
export type InsertableNotificationPreference = Insertable<NotificationPreferences>;
export type UpdatableNotificationPreference = Updateable<Omit<NotificationPreferences, 'id'>>;
// NotificationBatch
export type NotificationBatch = Selectable<NotificationBatches>;
export type InsertableNotificationBatch = Insertable<NotificationBatches>;
export type UpdatableNotificationBatch = Updateable<Omit<NotificationBatches, 'id'>>;
// NotificationAggregation
export type NotificationAggregation = Selectable<NotificationAggregations>;
export type InsertableNotificationAggregation = Insertable<NotificationAggregations>;
export type UpdatableNotificationAggregation = Updateable<Omit<NotificationAggregations, 'id'>>;

View File

@ -19,6 +19,10 @@ import * as cookie from 'cookie';
export class WsGateway implements OnGatewayConnection, OnModuleDestroy {
@WebSocketServer()
server: Server;
// Map to track which sockets belong to which users
private userSocketMap = new Map<string, Set<string>>();
constructor(
private tokenService: TokenService,
private spaceMemberRepo: SpaceMemberRepo,
@ -26,7 +30,7 @@ export class WsGateway implements OnGatewayConnection, OnModuleDestroy {
async handleConnection(client: Socket, ...args: any[]): Promise<void> {
try {
const cookies = cookie.parse(client.handshake.headers.cookie);
const cookies = cookie.parse(client.handshake.headers.cookie || '');
const token: JwtPayload = await this.tokenService.verifyJwt(
cookies['authToken'],
JwtType.ACCESS,
@ -35,18 +39,41 @@ export class WsGateway implements OnGatewayConnection, OnModuleDestroy {
const userId = token.sub;
const workspaceId = token.workspaceId;
// Store user-socket mapping
if (!this.userSocketMap.has(userId)) {
this.userSocketMap.set(userId, new Set());
}
this.userSocketMap.get(userId)!.add(client.id);
// Store user info on socket for later use
(client as any).userId = userId;
(client as any).workspaceId = workspaceId;
const userSpaceIds = await this.spaceMemberRepo.getUserSpaceIds(userId);
const workspaceRoom = `workspace-${workspaceId}`;
const userRoom = `user-${userId}`;
const spaceRooms = userSpaceIds.map((id) => this.getSpaceRoomName(id));
client.join([workspaceRoom, ...spaceRooms]);
client.join([workspaceRoom, userRoom, ...spaceRooms]);
} catch (err) {
client.emit('Unauthorized');
client.disconnect();
}
}
handleDisconnect(client: Socket): void {
// Clean up user-socket mapping
const userId = (client as any).userId;
if (userId && this.userSocketMap.has(userId)) {
const userSockets = this.userSocketMap.get(userId)!;
userSockets.delete(client.id);
if (userSockets.size === 0) {
this.userSocketMap.delete(userId);
}
}
}
@SubscribeMessage('message')
handleMessage(client: Socket, data: any): void {
const spaceEvents = [
@ -85,4 +112,35 @@ export class WsGateway implements OnGatewayConnection, OnModuleDestroy {
getSpaceRoomName(spaceId: string): string {
return `space-${spaceId}`;
}
}
/**
* Emit an event to a specific user
*/
emitToUser(userId: string, event: string, data: any): void {
const userRoom = `user-${userId}`;
this.server.to(userRoom).emit(event, data);
}
/**
* Emit an event to a workspace
*/
emitToWorkspace(workspaceId: string, event: string, data: any): void {
const workspaceRoom = `workspace-${workspaceId}`;
this.server.to(workspaceRoom).emit(event, data);
}
/**
* Emit an event to a space
*/
emitToSpace(spaceId: string, event: string, data: any): void {
const spaceRoom = this.getSpaceRoomName(spaceId);
this.server.to(spaceRoom).emit(event, data);
}
/**
* Check if a user is currently connected
*/
isUserConnected(userId: string): boolean {
return this.userSocketMap.has(userId) && this.userSocketMap.get(userId)!.size > 0;
}
}

View File

@ -5,5 +5,6 @@ import { TokenModule } from '../core/auth/token.module';
@Module({
imports: [TokenModule],
providers: [WsGateway],
exports: [WsGateway],
})
export class WsModule {}