Files
docmost/apps/server/src/collaboration/extensions/persistence.extension.ts
Philip Okugbe 6d024fc3de feat: bulk page imports (#1219)
* refactor imports - WIP

* Add readstream

* WIP

* fix attachmentId render

* fix attachmentId render

* turndown video tag

* feat: add stream upload support and improve file handling

- Add stream upload functionality to storage drivers\n- Improve ZIP file extraction with better encoding handling\n- Fix attachment ID rendering issues\n- Add AWS S3 upload stream support\n- Update dependencies for better compatibility

* WIP

* notion formatter

* move embed parser to editor-ext package

* import embeds

* utility files

* cleanup

* Switch from happy-dom to cheerio
* Refine code

* WIP

* bug fixes and UI

* sync

* WIP

* sync

* keep import modal mounted

* Show modal during upload

* WIP

* WIP
2025-06-09 04:29:27 +01:00

191 lines
5.4 KiB
TypeScript

import {
afterUnloadDocumentPayload,
Extension,
onChangePayload,
onLoadDocumentPayload,
onStoreDocumentPayload,
} from '@hocuspocus/server';
import * as Y from 'yjs';
import { Injectable, Logger } from '@nestjs/common';
import { TiptapTransformer } from '@hocuspocus/transformer';
import { getPageId, jsonToText, tiptapExtensions } from '../collaboration.util';
import { PageRepo } from '@docmost/db/repos/page/page.repo';
import { InjectKysely } from 'nestjs-kysely';
import { KyselyDB } from '@docmost/db/types/kysely.types';
import { executeTx } from '@docmost/db/utils';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { InjectQueue } from '@nestjs/bullmq';
import { QueueJob, QueueName } from '../../integrations/queue/constants';
import { Queue } from 'bullmq';
import {
extractMentions,
extractPageMentions,
} from '../../common/helpers/prosemirror/utils';
import { isDeepStrictEqual } from 'node:util';
import { IPageBacklinkJob } from '../../integrations/queue/constants/queue.interface';
import { Page } from '@docmost/db/types/entity.types';
@Injectable()
export class PersistenceExtension implements Extension {
private readonly logger = new Logger(PersistenceExtension.name);
private contributors: Map<string, Set<string>> = new Map();
constructor(
private readonly pageRepo: PageRepo,
@InjectKysely() private readonly db: KyselyDB,
private eventEmitter: EventEmitter2,
@InjectQueue(QueueName.GENERAL_QUEUE) private generalQueue: Queue,
) {}
async onLoadDocument(data: onLoadDocumentPayload) {
const { documentName, document } = data;
const pageId = getPageId(documentName);
if (!document.isEmpty('default')) {
return;
}
const page = await this.pageRepo.findById(pageId, {
includeContent: true,
includeYdoc: true,
});
if (!page) {
this.logger.warn('page not found');
return;
}
if (page.ydoc) {
this.logger.debug(`ydoc loaded from db: ${pageId}`);
const doc = new Y.Doc();
const dbState = new Uint8Array(page.ydoc);
Y.applyUpdate(doc, dbState);
return doc;
}
// if no ydoc state in db convert json in page.content to Ydoc.
if (page.content) {
this.logger.debug(`converting json to ydoc: ${pageId}`);
const ydoc = TiptapTransformer.toYdoc(
page.content,
'default',
tiptapExtensions,
);
Y.encodeStateAsUpdate(ydoc);
return ydoc;
}
this.logger.debug(`creating fresh ydoc: ${pageId}`);
return new Y.Doc();
}
async onStoreDocument(data: onStoreDocumentPayload) {
const { documentName, document, context } = data;
const pageId = getPageId(documentName);
const tiptapJson = TiptapTransformer.fromYdoc(document, 'default');
const ydocState = Buffer.from(Y.encodeStateAsUpdate(document));
let textContent = null;
try {
textContent = jsonToText(tiptapJson);
} catch (err) {
this.logger.warn('jsonToText' + err?.['message']);
}
let page: Page = null;
try {
await executeTx(this.db, async (trx) => {
page = await this.pageRepo.findById(pageId, {
withLock: true,
includeContent: true,
trx,
});
if (!page) {
this.logger.error(`Page with id ${pageId} not found`);
return;
}
if (isDeepStrictEqual(tiptapJson, page.content)) {
page = null;
return;
}
let contributorIds = undefined;
try {
const existingContributors = page.contributorIds || [];
const contributorSet = this.contributors.get(documentName);
contributorSet.add(page.creatorId);
const newContributors = [...contributorSet];
contributorIds = Array.from(
new Set([...existingContributors, ...newContributors]),
);
this.contributors.delete(documentName);
} catch (err) {
//this.logger.debug('Contributors error:' + err?.['message']);
}
await this.pageRepo.updatePage(
{
content: tiptapJson,
textContent: textContent,
ydoc: ydocState,
lastUpdatedById: context.user.id,
contributorIds: contributorIds,
},
pageId,
trx,
);
this.logger.debug(`Page updated: ${pageId} - SlugId: ${page.slugId}`);
});
} catch (err) {
this.logger.error(`Failed to update page ${pageId}`, err);
}
if (page) {
this.eventEmitter.emit('collab.page.updated', {
page: {
...page,
content: tiptapJson,
lastUpdatedById: context.user.id,
},
});
const mentions = extractMentions(tiptapJson);
const pageMentions = extractPageMentions(mentions);
await this.generalQueue.add(QueueJob.PAGE_BACKLINKS, {
pageId: pageId,
workspaceId: page.workspaceId,
mentions: pageMentions,
} as IPageBacklinkJob);
}
}
async onChange(data: onChangePayload) {
const documentName = data.documentName;
const userId = data.context?.user.id;
if (!userId) return;
if (!this.contributors.has(documentName)) {
this.contributors.set(documentName, new Set());
}
this.contributors.get(documentName).add(userId);
}
async afterUnloadDocument(data: afterUnloadDocumentPayload) {
const documentName = data.documentName;
this.contributors.delete(documentName);
}
}