feat embeddings sync

This commit is contained in:
Philipinho
2025-10-15 10:23:59 +01:00
parent 807603f3a2
commit b5b31cc48c
10 changed files with 159 additions and 23 deletions

View File

@ -169,6 +169,10 @@ export class PersistenceExtension implements Extension {
workspaceId: page.workspaceId,
mentions: pageMentions,
} as IPageBacklinkJob);
await this.aiQueue.add(QueueJob.PAGE_CONTENT_UPDATED, {
pageIds: [pageId],
});
}
}
@ -182,22 +186,10 @@ export class PersistenceExtension implements Extension {
}
this.contributors.get(documentName).add(userId);
console.log('embedd me')
const pageId = getPageId(documentName);
await this.aiQueue.add(QueueJob.GENERATE_PAGE_EMBEDDINGS, {
pageId: pageId,
});
}
async afterUnloadDocument(data: afterUnloadDocumentPayload) {
const documentName = data.documentName;
const pageId = getPageId(documentName);
this.contributors.delete(documentName);
// should only queue embed after unload// should delay so we dont embed always
}
}

View File

@ -2,7 +2,17 @@ export enum EventName {
COLLAB_PAGE_UPDATED = 'collab.page.updated',
PAGE_CREATED = 'page.created',
PAGE_UPDATED = 'page.updated',
PAGE_CONTENT_UPDATED = 'page-content-updated',
PAGE_MOVED_TO_SPACE = 'page-moved-to-space',
PAGE_DELETED = 'page.deleted',
PAGE_SOFT_DELETED = 'page.soft_deleted',
PAGE_RESTORED = 'page.restored',
SPACE_CREATED = 'space.created',
SPACE_UPDATED = 'space.updated',
SPACE_DELETED = 'space.deleted',
WORKSPACE_CREATED = 'workspace.created',
WORKSPACE_UPDATED = 'workspace.updated',
WORKSPACE_DELETED = 'workspace.deleted',
}

View File

@ -51,6 +51,7 @@ export class PageService {
@InjectKysely() private readonly db: KyselyDB,
private readonly storageService: StorageService,
@InjectQueue(QueueName.ATTACHMENT_QUEUE) private attachmentQueue: Queue,
@InjectQueue(QueueName.AI_QUEUE) private aiQueue: Queue,
private eventEmitter: EventEmitter2,
) {}
@ -255,6 +256,10 @@ export class PageService {
pageIds,
trx,
);
await this.aiQueue.add(QueueJob.PAGE_MOVED_TO_SPACE, {
pageId: pageIds,
});
}
});
}

View File

@ -4,6 +4,7 @@ import { EventName } from '../../common/events/event.contants';
import { InjectQueue } from '@nestjs/bullmq';
import { QueueJob, QueueName } from '../../integrations/queue/constants';
import { Queue } from 'bullmq';
import { EnvironmentService } from '../../integrations/environment/environment.service';
export class PageEvent {
pageIds: string[];
@ -14,36 +15,65 @@ export class PageListener {
private readonly logger = new Logger(PageListener.name);
constructor(
private readonly environmentService: EnvironmentService,
@InjectQueue(QueueName.SEARCH_QUEUE) private searchQueue: Queue,
@InjectQueue(QueueName.AI_QUEUE) private aiQueue: Queue,
) {}
@OnEvent(EventName.PAGE_CREATED)
async handlePageCreated(event: PageEvent) {
const { pageIds } = event;
await this.searchQueue.add(QueueJob.PAGE_CREATED, { pageIds });
if (this.isTypesense()) {
await this.searchQueue.add(QueueJob.PAGE_CREATED, { pageIds });
}
if (this.environmentService.isAISearchEnabled()) {
await this.aiQueue.add(QueueJob.PAGE_CREATED, { pageIds });
}
}
@OnEvent(EventName.PAGE_UPDATED)
async handlePageUpdated(event: PageEvent) {
const { pageIds } = event;
await this.searchQueue.add(QueueJob.PAGE_UPDATED, { pageIds });
if (this.isTypesense()) {
await this.searchQueue.add(QueueJob.PAGE_UPDATED, { pageIds });
}
}
@OnEvent(EventName.PAGE_DELETED)
async handlePageDeleted(event: PageEvent) {
const { pageIds } = event;
await this.searchQueue.add(QueueJob.PAGE_DELETED, { pageIds });
if (this.isTypesense()) {
await this.searchQueue.add(QueueJob.PAGE_DELETED, { pageIds });
}
await this.aiQueue.add(QueueJob.PAGE_DELETED, { pageIds });
}
@OnEvent(EventName.PAGE_SOFT_DELETED)
async handlePageSoftDeleted(event: PageEvent) {
const { pageIds } = event;
await this.searchQueue.add(QueueJob.PAGE_SOFT_DELETED, { pageIds });
if (this.isTypesense()) {
await this.searchQueue.add(QueueJob.PAGE_SOFT_DELETED, { pageIds });
}
await this.aiQueue.add(QueueJob.PAGE_SOFT_DELETED, { pageIds });
}
@OnEvent(EventName.PAGE_RESTORED)
async handlePageRestored(event: PageEvent) {
const { pageIds } = event;
await this.searchQueue.add(QueueJob.PAGE_RESTORED, { pageIds });
if (this.isTypesense()) {
await this.searchQueue.add(QueueJob.PAGE_RESTORED, { pageIds });
}
if (this.environmentService.isAISearchEnabled()) {
await this.aiQueue.add(QueueJob.PAGE_RESTORED, { pageIds });
}
}
isTypesense(): boolean {
return this.environmentService.getSearchDriver() === 'typesense';
}
}

View File

@ -0,0 +1,38 @@
import { Injectable, Logger } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { EventName } from '../../common/events/event.contants';
import { InjectQueue } from '@nestjs/bullmq';
import { QueueJob, QueueName } from '../../integrations/queue/constants';
import { Queue } from 'bullmq';
import { EnvironmentService } from '../../integrations/environment/environment.service';
export class SpaceEvent {
spaceId: string;
}
@Injectable()
export class SpaceListener {
private readonly logger = new Logger(SpaceListener.name);
constructor(
private readonly environmentService: EnvironmentService,
@InjectQueue(QueueName.SEARCH_QUEUE) private searchQueue: Queue,
@InjectQueue(QueueName.AI_QUEUE) private aiQueue: Queue,
) {}
@OnEvent(EventName.SPACE_DELETED)
async handleSpaceDeleted(event: SpaceEvent) {
const { spaceId } = event;
if (this.isTypesense()) {
await this.searchQueue.add(QueueJob.SPACE_DELETED, { spaceId });
}
if (this.environmentService.isAISearchEnabled()) {
await this.aiQueue.add(QueueJob.SPACE_DELETED, { spaceId });
}
}
isTypesense(): boolean {
return this.environmentService.getSearchDriver() === 'typesense';
}
}

View File

@ -0,0 +1,36 @@
import { Injectable, Logger } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { EventName } from '../../common/events/event.contants';
import { InjectQueue } from '@nestjs/bullmq';
import { QueueJob, QueueName } from '../../integrations/queue/constants';
import { Queue } from 'bullmq';
import { EnvironmentService } from '../../integrations/environment/environment.service';
export class WorkspaceEvent {
workspaceId: string;
}
@Injectable()
export class WorkspaceListener {
private readonly logger = new Logger(WorkspaceListener.name);
constructor(
private readonly environmentService: EnvironmentService,
@InjectQueue(QueueName.SEARCH_QUEUE) private searchQueue: Queue,
@InjectQueue(QueueName.AI_QUEUE) private aiQueue: Queue,
) {}
@OnEvent(EventName.WORKSPACE_DELETED)
async handlePageDeleted(event: WorkspaceEvent) {
const { workspaceId } = event;
if (this.isTypesense()) {
await this.searchQueue.add(QueueJob.WORKSPACE_DELETED, { workspaceId });
}
await this.aiQueue.add(QueueJob.WORKSPACE_DELETED, { workspaceId });
}
isTypesense(): boolean {
return this.environmentService.getSearchDriver() === 'typesense';
}
}

View File

@ -12,10 +12,15 @@ import { PaginationOptions } from '../../pagination/pagination-options';
import { executeWithPagination } from '@docmost/db/pagination/pagination';
import { DB } from '@docmost/db/types/db';
import { validate as isValidUUID } from 'uuid';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { EventName } from '../../../common/events/event.contants';
@Injectable()
export class SpaceRepo {
constructor(@InjectKysely() private readonly db: KyselyDB) {}
constructor(
@InjectKysely() private readonly db: KyselyDB,
private eventEmitter: EventEmitter2,
) {}
async findById(
spaceId: string,
@ -110,7 +115,11 @@ export class SpaceRepo {
if (pagination.query) {
query = query.where((eb) =>
eb(sql`f_unaccent(name)`, 'ilike', sql`f_unaccent(${'%' + pagination.query + '%'})`).or(
eb(
sql`f_unaccent(name)`,
'ilike',
sql`f_unaccent(${'%' + pagination.query + '%'})`,
).or(
sql`f_unaccent(description)`,
'ilike',
sql`f_unaccent(${'%' + pagination.query + '%'})`,
@ -155,5 +164,9 @@ export class SpaceRepo {
.where('id', '=', spaceId)
.where('workspaceId', '=', workspaceId)
.execute();
this.eventEmitter.emit(EventName.SPACE_DELETED, {
spaceId,
});
}
}

View File

@ -293,7 +293,10 @@ export class EnvironmentService {
return this.configService.get<string>('ENABLE_AI');
}
isAIVectorSearchEnabled(): string {
return this.configService.get<string>('AI_VECTOR_SEARCH');
isAISearchEnabled(): boolean {
const config = this.configService
.get<string>('AI_SEARCH', 'false')
.toLowerCase();
return config === 'true';
}
}

View File

@ -14,7 +14,6 @@ export enum QueueJob {
ATTACHMENT_INDEX_CONTENT = 'attachment-index-content',
ATTACHMENT_INDEXING = 'attachment-indexing',
DELETE_PAGE_ATTACHMENTS = 'delete-page-attachments',
PAGE_CONTENT_UPDATE = 'page-content-update',
DELETE_USER_AVATARS = 'delete-user-avatars',
@ -40,11 +39,21 @@ export enum QueueJob {
TYPESENSE_FLUSH = 'typesense-flush',
PAGE_CREATED = 'page-created',
PAGE_CONTENT_UPDATED = 'page-content-updated',
PAGE_MOVED_TO_SPACE = 'page-moved-to-space',
PAGE_UPDATED = 'page-updated',
PAGE_SOFT_DELETED = 'page-soft-deleted',
PAGE_RESTORED = 'page-restored',
PAGE_DELETED = 'page-deleted',
SPACE_CREATED = 'space-created',
SPACE_UPDATED = 'space-updated',
SPACE_DELETED = 'space-deleted',
WORKSPACE_CREATED = 'workspace-created',
WORKSPACE_SPACE_UPDATED = 'workspace-updated',
WORKSPACE_DELETED = 'workspace-deleted',
GENERATE_PAGE_EMBEDDINGS = 'generate-page-embeddings',
DELETE_PAGE_EMBEDDINGS = 'delete-page-embeddings',
}