mirror of
https://github.com/docmost/docmost.git
synced 2025-11-10 07:12:04 +10:00
Compare commits
3 Commits
fix/tiptap
...
e2b8899569
| Author | SHA1 | Date | |
|---|---|---|---|
| e2b8899569 | |||
| f6e3230eec | |||
| 625bdc7024 |
@ -80,7 +80,9 @@
|
||||
"sanitize-filename-ts": "^1.0.2",
|
||||
"socket.io": "^4.8.1",
|
||||
"stripe": "^17.5.0",
|
||||
"ws": "^8.18.0"
|
||||
"tmp-promise": "^3.0.3",
|
||||
"ws": "^8.18.0",
|
||||
"yauzl": "^3.2.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@eslint/js": "^9.20.0",
|
||||
@ -99,6 +101,7 @@
|
||||
"@types/pg": "^8.11.11",
|
||||
"@types/supertest": "^6.0.2",
|
||||
"@types/ws": "^8.5.14",
|
||||
"@types/yauzl": "^2.10.3",
|
||||
"eslint": "^9.20.1",
|
||||
"eslint-config-prettier": "^10.0.1",
|
||||
"globals": "^15.15.0",
|
||||
|
||||
@ -0,0 +1,45 @@
|
||||
import { Kysely, sql } from 'kysely';
|
||||
|
||||
export async function up(db: Kysely<any>): Promise<void> {
|
||||
await db.schema
|
||||
.createTable('file_tasks')
|
||||
.addColumn('id', 'uuid', (col) =>
|
||||
col.primaryKey().defaultTo(sql`gen_uuid_v7()`),
|
||||
)
|
||||
//type: import or export
|
||||
.addColumn('type', 'varchar', (col) => col)
|
||||
// source - generic, notion, confluence
|
||||
// type or provider?
|
||||
.addColumn('source', 'varchar', (col) => col)
|
||||
// status (enum: PENDING|PROCESSING|SUCCESS|FAILED),
|
||||
.addColumn('status', 'varchar', (col) => col)
|
||||
// file name
|
||||
// file path
|
||||
// file size
|
||||
|
||||
.addColumn('file_name', 'varchar', (col) => col.notNull())
|
||||
.addColumn('file_path', 'varchar', (col) => col.notNull())
|
||||
.addColumn('file_size', 'int8', (col) => col)
|
||||
.addColumn('file_ext', 'varchar', (col) => col)
|
||||
|
||||
.addColumn('creator_id', 'uuid', (col) => col.references('users.id'))
|
||||
.addColumn('space_id', 'uuid', (col) =>
|
||||
col.references('spaces.id').onDelete('cascade'),
|
||||
)
|
||||
.addColumn('workspace_id', 'uuid', (col) =>
|
||||
col.references('workspaces.id').onDelete('cascade').notNull(),
|
||||
)
|
||||
.addColumn('created_at', 'timestamptz', (col) =>
|
||||
col.notNull().defaultTo(sql`now()`),
|
||||
)
|
||||
.addColumn('updated_at', 'timestamptz', (col) =>
|
||||
col.notNull().defaultTo(sql`now()`),
|
||||
)
|
||||
.addColumn('completed_at', 'timestamptz', (col) => col)
|
||||
.addColumn('deleted_at', 'timestamptz', (col) => col)
|
||||
.execute();
|
||||
}
|
||||
|
||||
export async function down(db: Kysely<any>): Promise<void> {
|
||||
await db.schema.dropTable('file_tasks').execute();
|
||||
}
|
||||
19
apps/server/src/database/types/db.d.ts
vendored
19
apps/server/src/database/types/db.d.ts
vendored
@ -122,6 +122,24 @@ export interface Comments {
|
||||
workspaceId: string;
|
||||
}
|
||||
|
||||
export interface FileTasks {
|
||||
completedAt: Timestamp | null;
|
||||
createdAt: Generated<Timestamp>;
|
||||
creatorId: string | null;
|
||||
deletedAt: Timestamp | null;
|
||||
fileExt: string | null;
|
||||
fileName: string;
|
||||
filePath: string;
|
||||
fileSize: Int8 | null;
|
||||
id: Generated<string>;
|
||||
source: string | null;
|
||||
spaceId: string | null;
|
||||
status: string | null;
|
||||
type: string | null;
|
||||
updatedAt: Generated<Timestamp>;
|
||||
workspaceId: string;
|
||||
}
|
||||
|
||||
export interface Groups {
|
||||
createdAt: Generated<Timestamp>;
|
||||
creatorId: string | null;
|
||||
@ -298,6 +316,7 @@ export interface DB {
|
||||
backlinks: Backlinks;
|
||||
billing: Billing;
|
||||
comments: Comments;
|
||||
fileTasks: FileTasks;
|
||||
groups: Groups;
|
||||
groupUsers: GroupUsers;
|
||||
pageHistory: PageHistory;
|
||||
|
||||
@ -17,6 +17,7 @@ import {
|
||||
AuthProviders,
|
||||
AuthAccounts,
|
||||
Shares,
|
||||
FileTasks,
|
||||
} from './db';
|
||||
|
||||
// Workspace
|
||||
@ -107,3 +108,8 @@ export type UpdatableAuthAccount = Updateable<Omit<AuthAccounts, 'id'>>;
|
||||
export type Share = Selectable<Shares>;
|
||||
export type InsertableShare = Insertable<Shares>;
|
||||
export type UpdatableShare = Updateable<Omit<Shares, 'id'>>;
|
||||
|
||||
// File Task
|
||||
export type FileTask = Selectable<FileTasks>;
|
||||
export type InsertableFileTask = Insertable<FileTasks>;
|
||||
export type UpdatableFileTask = Updateable<Omit<FileTasks, 'id'>>;
|
||||
|
||||
225
apps/server/src/integrations/import/file-task.service.ts
Normal file
225
apps/server/src/integrations/import/file-task.service.ts
Normal file
@ -0,0 +1,225 @@
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import * as path from 'path';
|
||||
import { jsonToText } from '../../collaboration/collaboration.util';
|
||||
import { InjectKysely } from 'nestjs-kysely';
|
||||
import { KyselyDB } from '@docmost/db/types/kysely.types';
|
||||
import { extractZip, FileTaskStatus } from './file.utils';
|
||||
import { StorageService } from '../storage/storage.service';
|
||||
import * as tmp from 'tmp-promise';
|
||||
import { pipeline } from 'node:stream/promises';
|
||||
import { createWriteStream } from 'node:fs';
|
||||
import { ImportService } from './import.service';
|
||||
import { promises as fs } from 'fs';
|
||||
import { generateSlugId } from '../../common/helpers';
|
||||
import { v7 } from 'uuid';
|
||||
import { generateJitteredKeyBetween } from 'fractional-indexing-jittered';
|
||||
import { FileTask, InsertablePage } from '@docmost/db/types/entity.types';
|
||||
|
||||
@Injectable()
|
||||
export class FileTaskService {
|
||||
private readonly logger = new Logger(FileTaskService.name);
|
||||
|
||||
constructor(
|
||||
private readonly storageService: StorageService,
|
||||
private readonly importService: ImportService,
|
||||
@InjectKysely() private readonly db: KyselyDB,
|
||||
) {}
|
||||
|
||||
async processZIpImport(fileTaskId: string): Promise<void> {
|
||||
const fileTask = await this.db
|
||||
.selectFrom('fileTasks')
|
||||
.selectAll()
|
||||
.where('id', '=', fileTaskId)
|
||||
.executeTakeFirst();
|
||||
|
||||
if (!fileTask) {
|
||||
this.logger.log(`File task with ID ${fileTaskId} not found`);
|
||||
return;
|
||||
}
|
||||
|
||||
const { path: tmpZipPath, cleanup: cleanupTmpFile } = await tmp.file({
|
||||
prefix: 'docmost-import',
|
||||
postfix: '.zip',
|
||||
discardDescriptor: true,
|
||||
});
|
||||
|
||||
const { path: tmpExtractDir, cleanup: cleanupTmpDir } = await tmp.dir({
|
||||
prefix: 'docmost-extract-',
|
||||
unsafeCleanup: true,
|
||||
});
|
||||
|
||||
const fileStream = await this.storageService.readStream(fileTask.filePath);
|
||||
await pipeline(fileStream, createWriteStream(tmpZipPath));
|
||||
|
||||
await extractZip(tmpZipPath, tmpExtractDir);
|
||||
|
||||
// TODO: internal link mentions, backlinks, attachments
|
||||
try {
|
||||
await this.updateTaskStatus(fileTaskId, FileTaskStatus.Processing);
|
||||
|
||||
await this.processGenericImport({ extractDir: tmpExtractDir, fileTask });
|
||||
await this.updateTaskStatus(fileTaskId, FileTaskStatus.Success);
|
||||
} catch (error) {
|
||||
await this.updateTaskStatus(fileTaskId, FileTaskStatus.Failed);
|
||||
} finally {
|
||||
await cleanupTmpFile();
|
||||
await cleanupTmpDir();
|
||||
}
|
||||
}
|
||||
|
||||
async processGenericImport(opts: {
|
||||
extractDir: string;
|
||||
fileTask: FileTask;
|
||||
}): Promise<void> {
|
||||
const { extractDir, fileTask } = opts;
|
||||
|
||||
const allFiles = await this.collectMarkdownAndHtmlFiles(extractDir);
|
||||
|
||||
const pagesMap = new Map<
|
||||
string,
|
||||
{
|
||||
id: string;
|
||||
slugId: string;
|
||||
name: string;
|
||||
content: string;
|
||||
position?: string | null;
|
||||
parentPageId: string | null;
|
||||
fileExtension: string;
|
||||
filePath: string;
|
||||
}
|
||||
>();
|
||||
|
||||
for (const absPath of allFiles) {
|
||||
const relPath = path
|
||||
.relative(extractDir, absPath)
|
||||
.split(path.sep)
|
||||
.join('/'); // normalize to forward-slashes
|
||||
const ext = path.extname(relPath).toLowerCase();
|
||||
const content = await fs.readFile(absPath, 'utf-8');
|
||||
|
||||
pagesMap.set(relPath, {
|
||||
id: v7(),
|
||||
slugId: generateSlugId(),
|
||||
name: path.basename(relPath, ext),
|
||||
content,
|
||||
parentPageId: null,
|
||||
fileExtension: ext,
|
||||
filePath: relPath,
|
||||
});
|
||||
}
|
||||
|
||||
// parent/child linking
|
||||
pagesMap.forEach((page, filePath) => {
|
||||
const segments = filePath.split('/');
|
||||
segments.pop();
|
||||
let parentPage = null;
|
||||
while (segments.length) {
|
||||
const tryMd = segments.join('/') + '.md';
|
||||
const tryHtml = segments.join('/') + '.html';
|
||||
if (pagesMap.has(tryMd)) {
|
||||
parentPage = pagesMap.get(tryMd)!;
|
||||
break;
|
||||
}
|
||||
if (pagesMap.has(tryHtml)) {
|
||||
parentPage = pagesMap.get(tryHtml)!;
|
||||
break;
|
||||
}
|
||||
segments.pop();
|
||||
}
|
||||
if (parentPage) page.parentPageId = parentPage.id;
|
||||
});
|
||||
|
||||
// generate position keys
|
||||
const siblingsMap = new Map<string | null, typeof Array.prototype>();
|
||||
pagesMap.forEach((page) => {
|
||||
const sibs = siblingsMap.get(page.parentPageId) || [];
|
||||
sibs.push(page);
|
||||
siblingsMap.set(page.parentPageId, sibs);
|
||||
});
|
||||
siblingsMap.forEach((sibs) => {
|
||||
sibs.sort((a, b) => a.name.localeCompare(b.name));
|
||||
let prevPos: string | null = null;
|
||||
for (const page of sibs) {
|
||||
page.position = generateJitteredKeyBetween(prevPos, null);
|
||||
prevPos = page.position;
|
||||
}
|
||||
});
|
||||
|
||||
const filePathToPageMetaMap = new Map<
|
||||
string,
|
||||
{ id: string; title: string; slugId: string }
|
||||
>();
|
||||
pagesMap.forEach((page) => {
|
||||
filePathToPageMetaMap.set(page.filePath, {
|
||||
id: page.id,
|
||||
title: page.name,
|
||||
slugId: page.slugId,
|
||||
});
|
||||
});
|
||||
|
||||
const insertablePages: InsertablePage[] = await Promise.all(
|
||||
Array.from(pagesMap.values()).map(async (page) => {
|
||||
const pmState = await this.importService.markdownOrHtmlToProsemirror(
|
||||
page.content,
|
||||
page.fileExtension,
|
||||
);
|
||||
const { title, prosemirrorJson } =
|
||||
this.importService.extractTitleAndRemoveHeading(pmState);
|
||||
|
||||
/*const rewDoc =
|
||||
await this.importService.convertInternalLinksToMentionsPM(
|
||||
jsonToNode(prosemirrorJson),
|
||||
page.filePath,
|
||||
filePathToPageMetaMap,
|
||||
);*/
|
||||
const proseJson = prosemirrorJson; //rewDoc.toJSON();
|
||||
|
||||
return {
|
||||
id: page.id,
|
||||
slugId: page.slugId,
|
||||
title: title || page.name,
|
||||
content: proseJson,
|
||||
textContent: jsonToText(proseJson),
|
||||
ydoc: await this.importService.createYdoc(proseJson),
|
||||
position: page.position!,
|
||||
spaceId: fileTask.spaceId,
|
||||
workspaceId: fileTask.workspaceId,
|
||||
creatorId: fileTask.creatorId,
|
||||
lastUpdatedById: fileTask.creatorId,
|
||||
parentPageId: page.parentPageId,
|
||||
};
|
||||
}),
|
||||
);
|
||||
|
||||
await this.db.insertInto('pages').values(insertablePages).execute();
|
||||
}
|
||||
|
||||
async collectMarkdownAndHtmlFiles(dir: string): Promise<string[]> {
|
||||
const results: string[] = [];
|
||||
|
||||
async function walk(current: string) {
|
||||
const entries = await fs.readdir(current, { withFileTypes: true });
|
||||
for (const ent of entries) {
|
||||
const fullPath = path.join(current, ent.name);
|
||||
if (ent.isDirectory()) {
|
||||
await walk(fullPath);
|
||||
} else if (
|
||||
['.md', '.html'].includes(path.extname(ent.name).toLowerCase())
|
||||
) {
|
||||
results.push(fullPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
await walk(dir);
|
||||
return results;
|
||||
}
|
||||
|
||||
async updateTaskStatus(fileTaskId: string, status: FileTaskStatus) {
|
||||
await this.db
|
||||
.updateTable('fileTasks')
|
||||
.set({ status: status })
|
||||
.where('id', '=', fileTaskId)
|
||||
.execute();
|
||||
}
|
||||
}
|
||||
77
apps/server/src/integrations/import/file.utils.ts
Normal file
77
apps/server/src/integrations/import/file.utils.ts
Normal file
@ -0,0 +1,77 @@
|
||||
import * as yauzl from 'yauzl';
|
||||
import * as path from 'path';
|
||||
import * as fs from 'node:fs';
|
||||
|
||||
export enum FileTaskType {
|
||||
Import = 'import',
|
||||
Export = 'export',
|
||||
}
|
||||
|
||||
export enum FileImportType {
|
||||
Generic = 'generic',
|
||||
Notion = 'notion',
|
||||
Confluence = 'confluence',
|
||||
}
|
||||
|
||||
export enum FileTaskStatus {
|
||||
Pending = 'pending',
|
||||
Processing = 'processing',
|
||||
Success = 'success',
|
||||
Failed = 'failed',
|
||||
}
|
||||
|
||||
export function getFileTaskFolderPath(
|
||||
type: FileTaskType,
|
||||
workspaceId: string,
|
||||
): string {
|
||||
switch (type) {
|
||||
case FileTaskType.Import:
|
||||
return `${workspaceId}/imports`;
|
||||
case FileTaskType.Export:
|
||||
return `${workspaceId}/exports`;
|
||||
}
|
||||
}
|
||||
|
||||
export function extractZip(source: string, target: string) {
|
||||
//https://github.com/Surfer-Org
|
||||
return new Promise((resolve, reject) => {
|
||||
yauzl.open(source, { lazyEntries: true }, (err, zipfile) => {
|
||||
if (err) return reject(err);
|
||||
|
||||
zipfile.readEntry();
|
||||
zipfile.on('entry', (entry) => {
|
||||
const fullPath = path.join(target, entry.fileName);
|
||||
const directory = path.dirname(fullPath);
|
||||
|
||||
if (/\/$/.test(entry.fileName)) {
|
||||
// Directory entry
|
||||
try {
|
||||
fs.mkdirSync(fullPath, { recursive: true });
|
||||
zipfile.readEntry();
|
||||
} catch (err) {
|
||||
reject(err);
|
||||
}
|
||||
} else {
|
||||
// File entry
|
||||
try {
|
||||
fs.mkdirSync(directory, { recursive: true });
|
||||
zipfile.openReadStream(entry, (err, readStream) => {
|
||||
if (err) return reject(err);
|
||||
const writeStream = fs.createWriteStream(fullPath);
|
||||
readStream.on('end', () => {
|
||||
writeStream.end();
|
||||
zipfile.readEntry();
|
||||
});
|
||||
readStream.pipe(writeStream);
|
||||
});
|
||||
} catch (err) {
|
||||
reject(err);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
zipfile.on('end', resolve);
|
||||
zipfile.on('error', reject);
|
||||
});
|
||||
});
|
||||
}
|
||||
@ -83,4 +83,57 @@ export class ImportController {
|
||||
|
||||
return this.importService.importPage(file, user.id, spaceId, workspace.id);
|
||||
}
|
||||
|
||||
@UseInterceptors(FileInterceptor)
|
||||
@UseGuards(JwtAuthGuard)
|
||||
@HttpCode(HttpStatus.OK)
|
||||
// temporary naming
|
||||
@Post('pages/import-zip')
|
||||
async importZip(
|
||||
@Req() req: any,
|
||||
@AuthUser() user: User,
|
||||
@AuthWorkspace() workspace: Workspace,
|
||||
) {
|
||||
const validFileExtensions = ['.zip'];
|
||||
|
||||
const maxFileSize = bytes('100mb');
|
||||
|
||||
let file = null;
|
||||
try {
|
||||
file = await req.file({
|
||||
limits: { fileSize: maxFileSize, fields: 3, files: 1 },
|
||||
});
|
||||
} catch (err: any) {
|
||||
this.logger.error(err.message);
|
||||
if (err?.statusCode === 413) {
|
||||
throw new BadRequestException(
|
||||
`File too large. Exceeds the 100mb import limit`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if (!file) {
|
||||
throw new BadRequestException('Failed to upload file');
|
||||
}
|
||||
|
||||
if (
|
||||
!validFileExtensions.includes(path.extname(file.filename).toLowerCase())
|
||||
) {
|
||||
throw new BadRequestException('Invalid import file type.');
|
||||
}
|
||||
|
||||
const spaceId = file.fields?.spaceId?.value;
|
||||
const source = file.fields?.source?.value;
|
||||
|
||||
if (!spaceId) {
|
||||
throw new BadRequestException('spaceId or format not found');
|
||||
}
|
||||
|
||||
const ability = await this.spaceAbility.createForUser(user, spaceId);
|
||||
if (ability.cannot(SpaceCaslAction.Edit, SpaceCaslSubject.Page)) {
|
||||
throw new ForbiddenException();
|
||||
}
|
||||
|
||||
return this.importService.importZip(file, source, user.id, spaceId, workspace.id);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,9 +1,13 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
import { ImportService } from './import.service';
|
||||
import { ImportController } from './import.controller';
|
||||
import { StorageModule } from '../storage/storage.module';
|
||||
import { FileTaskService } from './file-task.service';
|
||||
import { FileTaskProcessor } from './processors/file-task.processor';
|
||||
|
||||
@Module({
|
||||
providers: [ImportService],
|
||||
providers: [ImportService, FileTaskService, FileTaskProcessor],
|
||||
controllers: [ImportController],
|
||||
imports: [StorageModule],
|
||||
})
|
||||
export class ImportModule {}
|
||||
|
||||
@ -4,7 +4,8 @@ import { MultipartFile } from '@fastify/multipart';
|
||||
import { sanitize } from 'sanitize-filename-ts';
|
||||
import * as path from 'path';
|
||||
import {
|
||||
htmlToJson, jsonToText,
|
||||
htmlToJson,
|
||||
jsonToText,
|
||||
tiptapExtensions,
|
||||
} from '../../collaboration/collaboration.util';
|
||||
import { InjectKysely } from 'nestjs-kysely';
|
||||
@ -13,7 +14,20 @@ import { generateSlugId } from '../../common/helpers';
|
||||
import { generateJitteredKeyBetween } from 'fractional-indexing-jittered';
|
||||
import { TiptapTransformer } from '@hocuspocus/transformer';
|
||||
import * as Y from 'yjs';
|
||||
import { markdownToHtml } from "@docmost/editor-ext";
|
||||
import { markdownToHtml } from '@docmost/editor-ext';
|
||||
import {
|
||||
FileTaskStatus,
|
||||
FileTaskType,
|
||||
getFileTaskFolderPath,
|
||||
} from './file.utils';
|
||||
import { v7, v7 as uuid7 } from 'uuid';
|
||||
import { StorageService } from '../storage/storage.service';
|
||||
import { InjectQueue } from '@nestjs/bullmq';
|
||||
import { Queue } from 'bullmq';
|
||||
import { QueueJob, QueueName } from '../queue/constants';
|
||||
import { Node as PMNode } from '@tiptap/pm/model';
|
||||
import { EditorState, Transaction } from '@tiptap/pm/state';
|
||||
import { getSchema } from '@tiptap/core';
|
||||
|
||||
@Injectable()
|
||||
export class ImportService {
|
||||
@ -21,7 +35,10 @@ export class ImportService {
|
||||
|
||||
constructor(
|
||||
private readonly pageRepo: PageRepo,
|
||||
private readonly storageService: StorageService,
|
||||
@InjectKysely() private readonly db: KyselyDB,
|
||||
@InjectQueue(QueueName.FILE_TASK_QUEUE)
|
||||
private readonly fileTaskQueue: Queue,
|
||||
) {}
|
||||
|
||||
async importPage(
|
||||
@ -113,7 +130,7 @@ export class ImportService {
|
||||
|
||||
async createYdoc(prosemirrorJson: any): Promise<Buffer | null> {
|
||||
if (prosemirrorJson) {
|
||||
this.logger.debug(`Converting prosemirror json state to ydoc`);
|
||||
// this.logger.debug(`Converting prosemirror json state to ydoc`);
|
||||
|
||||
const ydoc = TiptapTransformer.toYdoc(
|
||||
prosemirrorJson,
|
||||
@ -161,4 +178,141 @@ export class ImportService {
|
||||
return generateJitteredKeyBetween(null, null);
|
||||
}
|
||||
}
|
||||
|
||||
async importZip(
|
||||
filePromise: Promise<MultipartFile>,
|
||||
source: string,
|
||||
userId: string,
|
||||
spaceId: string,
|
||||
workspaceId: string,
|
||||
): Promise<void> {
|
||||
const file = await filePromise;
|
||||
const fileBuffer = await file.toBuffer();
|
||||
const fileExtension = path.extname(file.filename).toLowerCase();
|
||||
const fileName = sanitize(
|
||||
path.basename(file.filename, fileExtension).slice(0, 255),
|
||||
);
|
||||
|
||||
const fileTaskId = uuid7();
|
||||
const filePath = `${getFileTaskFolderPath(FileTaskType.Import, workspaceId)}/${fileTaskId}/${fileName}`;
|
||||
|
||||
// upload file
|
||||
await this.storageService.upload(filePath, fileBuffer);
|
||||
|
||||
// store in fileTasks table
|
||||
await this.db
|
||||
.insertInto('fileTasks')
|
||||
.values({
|
||||
id: fileTaskId,
|
||||
type: FileTaskType.Import,
|
||||
source: source,
|
||||
status: FileTaskStatus.Pending,
|
||||
fileName: fileName,
|
||||
filePath: filePath,
|
||||
fileSize: 0,
|
||||
fileExt: 'zip',
|
||||
creatorId: userId,
|
||||
spaceId: spaceId,
|
||||
workspaceId: workspaceId,
|
||||
})
|
||||
.execute();
|
||||
|
||||
// what to send to queue
|
||||
// pass the task ID
|
||||
await this.fileTaskQueue.add(QueueJob.IMPORT_TASK, {
|
||||
fileTaskId: fileTaskId,
|
||||
});
|
||||
// return tasks info
|
||||
|
||||
// when the processor picks it up
|
||||
// we change the status to processing
|
||||
// if it gets processed successfully,
|
||||
// we change the status to success
|
||||
// else failed
|
||||
}
|
||||
|
||||
async markdownOrHtmlToProsemirror(
|
||||
fileContent: string,
|
||||
fileExtension: string,
|
||||
): Promise<any> {
|
||||
let prosemirrorState = '';
|
||||
if (fileExtension === '.md') {
|
||||
prosemirrorState = await this.processMarkdown(fileContent);
|
||||
} else if (fileExtension.endsWith('.html')) {
|
||||
prosemirrorState = await this.processHTML(fileContent);
|
||||
}
|
||||
return prosemirrorState;
|
||||
}
|
||||
|
||||
async convertInternalLinksToMentionsPM(
|
||||
doc: PMNode,
|
||||
currentFilePath: string,
|
||||
filePathToPageMetaMap: Map<
|
||||
string,
|
||||
{ id: string; title: string; slugId: string }
|
||||
>,
|
||||
): Promise<PMNode> {
|
||||
const schema = getSchema(tiptapExtensions);
|
||||
const state = EditorState.create({ doc, schema });
|
||||
let tr: Transaction = state.tr;
|
||||
|
||||
const normalizePath = (p: string) => p.replace(/\\/g, '/');
|
||||
|
||||
// Collect replacements from the original doc.
|
||||
const replacements: Array<{
|
||||
from: number;
|
||||
to: number;
|
||||
mentionNode: PMNode;
|
||||
}> = [];
|
||||
|
||||
doc.descendants((node, pos) => {
|
||||
if (!node.isText || !node.marks?.length) return;
|
||||
|
||||
// Look for the link mark
|
||||
const linkMark = node.marks.find(
|
||||
(mark) => mark.type.name === 'link' && mark.attrs?.href,
|
||||
);
|
||||
if (!linkMark) return;
|
||||
|
||||
// Compute the range for the entire text node.
|
||||
const from = pos;
|
||||
const to = pos + node.nodeSize;
|
||||
|
||||
// Resolve the path and get page meta.
|
||||
const resolvedPath = normalizePath(
|
||||
path.join(path.dirname(currentFilePath), linkMark.attrs.href),
|
||||
);
|
||||
const pageMeta = filePathToPageMetaMap.get(resolvedPath);
|
||||
if (!pageMeta) return;
|
||||
|
||||
// Create the mention node with all required attributes.
|
||||
const mentionNode = schema.nodes.mention.create({
|
||||
id: v7(),
|
||||
entityType: 'page',
|
||||
entityId: pageMeta.id,
|
||||
label: node.text || pageMeta.title,
|
||||
slugId: pageMeta.slugId,
|
||||
creatorId: 'not available', // This is required per your schema.
|
||||
});
|
||||
|
||||
replacements.push({ from, to, mentionNode });
|
||||
});
|
||||
|
||||
// Apply replacements in reverse order.
|
||||
for (let i = replacements.length - 1; i >= 0; i--) {
|
||||
const { from, to, mentionNode } = replacements[i];
|
||||
try {
|
||||
tr = tr.replaceWith(from, to, mentionNode);
|
||||
} catch (err) {
|
||||
console.error('❌ Failed to insert mention:', err);
|
||||
}
|
||||
}
|
||||
if (tr.docChanged) {
|
||||
console.log('doc changed');
|
||||
console.log(JSON.stringify(state.apply(tr).doc.toJSON()));
|
||||
}
|
||||
|
||||
// Return the updated document if any change was made.
|
||||
return tr.docChanged ? state.apply(tr).doc : doc;
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,51 @@
|
||||
import { Logger, OnModuleDestroy } from '@nestjs/common';
|
||||
import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
|
||||
import { Job } from 'bullmq';
|
||||
import { QueueJob, QueueName } from 'src/integrations/queue/constants';
|
||||
import { FileTaskService } from '../file-task.service';
|
||||
|
||||
@Processor(QueueName.FILE_TASK_QUEUE)
|
||||
export class FileTaskProcessor extends WorkerHost implements OnModuleDestroy {
|
||||
private readonly logger = new Logger(FileTaskProcessor.name);
|
||||
constructor(private readonly fileTaskService: FileTaskService) {
|
||||
super();
|
||||
}
|
||||
|
||||
async process(job: Job<any, void>): Promise<void> {
|
||||
try {
|
||||
switch (job.name) {
|
||||
case QueueJob.IMPORT_TASK:
|
||||
console.log('import task', job.data.fileTaskId);
|
||||
await this.fileTaskService.processZIpImport(job.data.fileTaskId);
|
||||
break;
|
||||
case QueueJob.EXPORT_TASK:
|
||||
console.log('export task', job.data.fileTaskId);
|
||||
}
|
||||
} catch (err) {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
@OnWorkerEvent('active')
|
||||
onActive(job: Job) {
|
||||
this.logger.debug(`Processing ${job.name} job`);
|
||||
}
|
||||
|
||||
@OnWorkerEvent('failed')
|
||||
onError(job: Job) {
|
||||
this.logger.error(
|
||||
`Error processing ${job.name} job. Reason: ${job.failedReason}`,
|
||||
);
|
||||
}
|
||||
|
||||
@OnWorkerEvent('completed')
|
||||
onCompleted(job: Job) {
|
||||
this.logger.debug(`Completed ${job.name} job`);
|
||||
}
|
||||
|
||||
async onModuleDestroy(): Promise<void> {
|
||||
if (this.worker) {
|
||||
await this.worker.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -3,6 +3,7 @@ export enum QueueName {
|
||||
ATTACHMENT_QUEUE = '{attachment-queue}',
|
||||
GENERAL_QUEUE = '{general-queue}',
|
||||
BILLING_QUEUE = '{billing-queue}',
|
||||
FILE_TASK_QUEUE = '{file-task-queue}',
|
||||
}
|
||||
|
||||
export enum QueueJob {
|
||||
@ -19,4 +20,7 @@ export enum QueueJob {
|
||||
TRIAL_ENDED = 'trial-ended',
|
||||
WELCOME_EMAIL = 'welcome-email',
|
||||
FIRST_PAYMENT_EMAIL = 'first-payment-email',
|
||||
|
||||
IMPORT_TASK = 'import-task',
|
||||
EXPORT_TASK = 'export-task',
|
||||
}
|
||||
|
||||
@ -49,6 +49,9 @@ import { BacklinksProcessor } from './processors/backlinks.processor';
|
||||
BullModule.registerQueue({
|
||||
name: QueueName.BILLING_QUEUE,
|
||||
}),
|
||||
BullModule.registerQueue({
|
||||
name: QueueName.FILE_TASK_QUEUE,
|
||||
}),
|
||||
],
|
||||
exports: [BullModule],
|
||||
providers: [BacklinksProcessor],
|
||||
|
||||
@ -5,6 +5,8 @@ import {
|
||||
} from '../interfaces';
|
||||
import { join } from 'path';
|
||||
import * as fs from 'fs-extra';
|
||||
import { Readable } from 'stream';
|
||||
import { createReadStream } from 'node:fs';
|
||||
|
||||
export class LocalDriver implements StorageDriver {
|
||||
private readonly config: LocalStorageConfig;
|
||||
@ -43,6 +45,14 @@ export class LocalDriver implements StorageDriver {
|
||||
}
|
||||
}
|
||||
|
||||
async readStream(filePath: string): Promise<Readable> {
|
||||
try {
|
||||
return createReadStream(this._fullPath(filePath));
|
||||
} catch (err) {
|
||||
throw new Error(`Failed to read file: ${(err as Error).message}`);
|
||||
}
|
||||
}
|
||||
|
||||
async exists(filePath: string): Promise<boolean> {
|
||||
try {
|
||||
return await fs.pathExists(this._fullPath(filePath));
|
||||
|
||||
@ -71,6 +71,21 @@ export class S3Driver implements StorageDriver {
|
||||
}
|
||||
}
|
||||
|
||||
async readStream(filePath: string): Promise<Readable> {
|
||||
try {
|
||||
const command = new GetObjectCommand({
|
||||
Bucket: this.config.bucket,
|
||||
Key: filePath,
|
||||
});
|
||||
|
||||
const response = await this.s3Client.send(command);
|
||||
|
||||
return response.Body as Readable;
|
||||
} catch (err) {
|
||||
throw new Error(`Failed to read file from S3: ${(err as Error).message}`);
|
||||
}
|
||||
}
|
||||
|
||||
async exists(filePath: string): Promise<boolean> {
|
||||
try {
|
||||
const command = new HeadObjectCommand({
|
||||
|
||||
@ -1,3 +1,5 @@
|
||||
import { Readable } from 'stream';
|
||||
|
||||
export interface StorageDriver {
|
||||
upload(filePath: string, file: Buffer): Promise<void>;
|
||||
|
||||
@ -5,6 +7,9 @@ export interface StorageDriver {
|
||||
|
||||
read(filePath: string): Promise<Buffer>;
|
||||
|
||||
readStream(filePath: string): Promise<Readable>;
|
||||
|
||||
|
||||
exists(filePath: string): Promise<boolean>;
|
||||
|
||||
getUrl(filePath: string): string;
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
import { Inject, Injectable, Logger } from '@nestjs/common';
|
||||
import { STORAGE_DRIVER_TOKEN } from './constants/storage.constants';
|
||||
import { StorageDriver } from './interfaces';
|
||||
import { Readable } from 'stream';
|
||||
|
||||
@Injectable()
|
||||
export class StorageService {
|
||||
@ -23,6 +24,10 @@ export class StorageService {
|
||||
return this.storageDriver.read(filePath);
|
||||
}
|
||||
|
||||
async readStream(filePath: string): Promise<Readable> {
|
||||
return this.storageDriver.readStream(filePath);
|
||||
}
|
||||
|
||||
async exists(filePath: string): Promise<boolean> {
|
||||
return this.storageDriver.exists(filePath);
|
||||
}
|
||||
|
||||
42
pnpm-lock.yaml
generated
42
pnpm-lock.yaml
generated
@ -552,9 +552,15 @@ importers:
|
||||
stripe:
|
||||
specifier: ^17.5.0
|
||||
version: 17.5.0
|
||||
tmp-promise:
|
||||
specifier: ^3.0.3
|
||||
version: 3.0.3
|
||||
ws:
|
||||
specifier: ^8.18.0
|
||||
version: 8.18.0
|
||||
yauzl:
|
||||
specifier: ^3.2.0
|
||||
version: 3.2.0
|
||||
devDependencies:
|
||||
'@eslint/js':
|
||||
specifier: ^9.20.0
|
||||
@ -604,6 +610,9 @@ importers:
|
||||
'@types/ws':
|
||||
specifier: ^8.5.14
|
||||
version: 8.5.14
|
||||
'@types/yauzl':
|
||||
specifier: ^2.10.3
|
||||
version: 2.10.3
|
||||
eslint:
|
||||
specifier: ^9.20.1
|
||||
version: 9.20.1(jiti@1.21.0)
|
||||
@ -4093,6 +4102,9 @@ packages:
|
||||
'@types/yargs@17.0.32':
|
||||
resolution: {integrity: sha512-xQ67Yc/laOG5uMfX/093MRlGGCIBzZMarVa+gfNKJxWAIgykYpVGkBdbqEzGDDfCrVUj6Hiff4mTZ5BA6TmAog==}
|
||||
|
||||
'@types/yauzl@2.10.3':
|
||||
resolution: {integrity: sha512-oJoftv0LSuaDZE3Le4DbKX+KS9G36NzOeSap90UIK0yMA/NhKJhqlSGtNDORNRaIbQfzjXDrQa0ytJ6mNRGz/Q==}
|
||||
|
||||
'@typescript-eslint/eslint-plugin@8.17.0':
|
||||
resolution: {integrity: sha512-HU1KAdW3Tt8zQkdvNoIijfWDMvdSweFYm4hWh+KwhPstv+sCmWb89hCIP8msFm9N1R/ooh9honpSuvqKWlYy3w==}
|
||||
engines: {node: ^18.18.0 || ^20.9.0 || >=21.1.0}
|
||||
@ -4610,6 +4622,9 @@ packages:
|
||||
bser@2.1.1:
|
||||
resolution: {integrity: sha512-gQxTNE/GAfIIrmHLUE3oJyp5FO6HRBfhjnw4/wMmA63ZGDJnWBmgY/lyQBpnDUkGmAhbSe39tx2d/iTOAfglwQ==}
|
||||
|
||||
buffer-crc32@0.2.13:
|
||||
resolution: {integrity: sha512-VO9Ht/+p3SN7SKWqcrgEzjGbRSJYTx+Q1pTQC0wrWqHx0vpJraQ6GtHx8tvcg1rlK1byhU5gccxgOgj7B0TDkQ==}
|
||||
|
||||
buffer-equal-constant-time@1.0.1:
|
||||
resolution: {integrity: sha512-zRpUiDwd/xk6ADqPMATG8vc9VPrkck7T07OIx0gnjmJAnHnTVXNQG3vfvWNuiZIkwu9KrKdA1iJKfsfTVxE6NA==}
|
||||
|
||||
@ -7196,6 +7211,9 @@ packages:
|
||||
resolution: {integrity: sha512-nri2TO5JE3/mRryik9LlHFT53cgHfRK0Lt0BAZQXku/AW3E6XLt2GaY8siWi7dvW/m1z0ecn+J+bpDa9ZN3IsQ==}
|
||||
engines: {node: '>=18'}
|
||||
|
||||
pend@1.2.0:
|
||||
resolution: {integrity: sha512-F3asv42UuXchdzt+xXqfW1OGlVBe+mxa2mqI0pg5yAHZPvFmY3Y6drSf/GQ1A86WgWEN9Kzh/WrgKa6iGcHXLg==}
|
||||
|
||||
pg-cloudflare@1.1.1:
|
||||
resolution: {integrity: sha512-xWPagP/4B6BgFO+EKz3JONXv3YDgvkbVrGw2mTo3D6tVDQRh1e7cqVGvyR3BE+eQgAvx1XhW/iEASj4/jCWl3Q==}
|
||||
|
||||
@ -8248,6 +8266,9 @@ packages:
|
||||
resolution: {integrity: sha512-QNtgIqSUb9o2CoUjX9T5TwaIvUUJFU1+12PJkgt42DFV2yf9J6549yTF2uGloQsJ/JOC8X+gIB81ind97hRiIQ==}
|
||||
hasBin: true
|
||||
|
||||
tmp-promise@3.0.3:
|
||||
resolution: {integrity: sha512-RwM7MoPojPxsOBYnyd2hy0bxtIlVrihNs9pj5SUvY8Zz1sQcQG2tG1hSr8PDxfgEB8RNKDhqbIlroIarSNDNsQ==}
|
||||
|
||||
tmp@0.0.33:
|
||||
resolution: {integrity: sha512-jRCJlojKnZ3addtTOjdIqoRuPEKBvNXcGYqzO6zWZX8KfKEpnGY5jfggJQ3EjKuu8D4bJRr0y+cYJFmYbImXGw==}
|
||||
engines: {node: '>=0.6.0'}
|
||||
@ -8904,6 +8925,10 @@ packages:
|
||||
resolution: {integrity: sha512-7dSzzRQ++CKnNI/krKnYRV7JKKPUXMEh61soaHKg9mrWEhzFWhFnxPxGl+69cD1Ou63C13NUPCnmIcrvqCuM6w==}
|
||||
engines: {node: '>=12'}
|
||||
|
||||
yauzl@3.2.0:
|
||||
resolution: {integrity: sha512-Ow9nuGZE+qp1u4JIPvg+uCiUr7xGQWdff7JQSk5VGYTAZMDe2q8lxJ10ygv10qmSj031Ty/6FNJpLO4o1Sgc+w==}
|
||||
engines: {node: '>=12'}
|
||||
|
||||
yjs@13.6.20:
|
||||
resolution: {integrity: sha512-Z2YZI+SYqK7XdWlloI3lhMiKnCdFCVC4PchpdO+mCYwtiTwncjUbnRK9R1JmkNfdmHyDXuWN3ibJAt0wsqTbLQ==}
|
||||
engines: {node: '>=16.0.0', npm: '>=8.0.0'}
|
||||
@ -13261,6 +13286,10 @@ snapshots:
|
||||
dependencies:
|
||||
'@types/yargs-parser': 21.0.3
|
||||
|
||||
'@types/yauzl@2.10.3':
|
||||
dependencies:
|
||||
'@types/node': 22.13.4
|
||||
|
||||
'@typescript-eslint/eslint-plugin@8.17.0(@typescript-eslint/parser@8.17.0(eslint@9.15.0(jiti@1.21.0))(typescript@5.7.2))(eslint@9.15.0(jiti@1.21.0))(typescript@5.7.2)':
|
||||
dependencies:
|
||||
'@eslint-community/regexpp': 4.12.1
|
||||
@ -13959,6 +13988,8 @@ snapshots:
|
||||
dependencies:
|
||||
node-int64: 0.4.0
|
||||
|
||||
buffer-crc32@0.2.13: {}
|
||||
|
||||
buffer-equal-constant-time@1.0.1: {}
|
||||
|
||||
buffer-from@1.1.2: {}
|
||||
@ -16994,6 +17025,8 @@ snapshots:
|
||||
|
||||
peek-readable@7.0.0: {}
|
||||
|
||||
pend@1.2.0: {}
|
||||
|
||||
pg-cloudflare@1.1.1:
|
||||
optional: true
|
||||
|
||||
@ -18163,6 +18196,10 @@ snapshots:
|
||||
dependencies:
|
||||
tldts-core: 6.1.72
|
||||
|
||||
tmp-promise@3.0.3:
|
||||
dependencies:
|
||||
tmp: 0.2.1
|
||||
|
||||
tmp@0.0.33:
|
||||
dependencies:
|
||||
os-tmpdir: 1.0.2
|
||||
@ -18759,6 +18796,11 @@ snapshots:
|
||||
y18n: 5.0.8
|
||||
yargs-parser: 21.1.1
|
||||
|
||||
yauzl@3.2.0:
|
||||
dependencies:
|
||||
buffer-crc32: 0.2.13
|
||||
pend: 1.2.0
|
||||
|
||||
yjs@13.6.20:
|
||||
dependencies:
|
||||
lib0: 0.2.98
|
||||
|
||||
Reference in New Issue
Block a user