diff --git a/apps/server/src/database/migrations/20250521T154949-file_tasks.ts b/apps/server/src/database/migrations/20250521T154949-file_tasks.ts new file mode 100644 index 00000000..61a1d5f7 --- /dev/null +++ b/apps/server/src/database/migrations/20250521T154949-file_tasks.ts @@ -0,0 +1,45 @@ +import { Kysely, sql } from 'kysely'; + +export async function up(db: Kysely): Promise { + await db.schema + .createTable('file_tasks') + .addColumn('id', 'uuid', (col) => + col.primaryKey().defaultTo(sql`gen_uuid_v7()`), + ) + //type: import or export + .addColumn('type', 'varchar', (col) => col) + // source - generic, notion, confluence + // type or provider? + .addColumn('source', 'varchar', (col) => col) + // status (enum: PENDING|PROCESSING|SUCCESS|FAILED), + .addColumn('status', 'varchar', (col) => col) + // file name + // file path + // file size + + .addColumn('file_name', 'varchar', (col) => col.notNull()) + .addColumn('file_path', 'varchar', (col) => col.notNull()) + .addColumn('file_size', 'int8', (col) => col) + .addColumn('file_ext', 'varchar', (col) => col) + + .addColumn('creator_id', 'uuid', (col) => col.references('users.id')) + .addColumn('space_id', 'uuid', (col) => + col.references('spaces.id').onDelete('cascade'), + ) + .addColumn('workspace_id', 'uuid', (col) => + col.references('workspaces.id').onDelete('cascade').notNull(), + ) + .addColumn('created_at', 'timestamptz', (col) => + col.notNull().defaultTo(sql`now()`), + ) + .addColumn('updated_at', 'timestamptz', (col) => + col.notNull().defaultTo(sql`now()`), + ) + .addColumn('completed_at', 'timestamptz', (col) => col) + .addColumn('deleted_at', 'timestamptz', (col) => col) + .execute(); +} + +export async function down(db: Kysely): Promise { + await db.schema.dropTable('file_tasks').execute(); +} diff --git a/apps/server/src/database/types/db.d.ts b/apps/server/src/database/types/db.d.ts index 8c4cbd57..69f352f8 100644 --- a/apps/server/src/database/types/db.d.ts +++ b/apps/server/src/database/types/db.d.ts @@ -122,6 +122,24 @@ export interface Comments { workspaceId: string; } +export interface FileTasks { + completedAt: Timestamp | null; + createdAt: Generated; + creatorId: string | null; + deletedAt: Timestamp | null; + fileExt: string | null; + fileName: string; + filePath: string; + fileSize: Int8 | null; + id: Generated; + source: string | null; + spaceId: string | null; + status: string | null; + type: string | null; + updatedAt: Generated; + workspaceId: string; +} + export interface Groups { createdAt: Generated; creatorId: string | null; @@ -298,6 +316,7 @@ export interface DB { backlinks: Backlinks; billing: Billing; comments: Comments; + fileTasks: FileTasks; groups: Groups; groupUsers: GroupUsers; pageHistory: PageHistory; diff --git a/apps/server/src/database/types/entity.types.ts b/apps/server/src/database/types/entity.types.ts index 6cb55a11..db2c2823 100644 --- a/apps/server/src/database/types/entity.types.ts +++ b/apps/server/src/database/types/entity.types.ts @@ -17,6 +17,7 @@ import { AuthProviders, AuthAccounts, Shares, + FileTasks, } from './db'; // Workspace @@ -107,3 +108,8 @@ export type UpdatableAuthAccount = Updateable>; export type Share = Selectable; export type InsertableShare = Insertable; export type UpdatableShare = Updateable>; + +// File Task +export type FileTask = Selectable; +export type InsertableFileTask = Insertable; +export type UpdatableFileTask = Updateable>; diff --git a/apps/server/src/integrations/import/file-task.service.ts b/apps/server/src/integrations/import/file-task.service.ts new file mode 100644 index 00000000..cca0833b --- /dev/null +++ b/apps/server/src/integrations/import/file-task.service.ts @@ -0,0 +1,68 @@ +import { BadRequestException, Injectable, Logger } from '@nestjs/common'; +import { PageRepo } from '@docmost/db/repos/page/page.repo'; +import { MultipartFile } from '@fastify/multipart'; +import { sanitize } from 'sanitize-filename-ts'; +import * as path from 'path'; +import { + htmlToJson, + jsonToText, + tiptapExtensions, +} from '../../collaboration/collaboration.util'; +import { InjectKysely } from 'nestjs-kysely'; +import { KyselyDB } from '@docmost/db/types/kysely.types'; +import { generateSlugId } from '../../common/helpers'; +import { generateJitteredKeyBetween } from 'fractional-indexing-jittered'; +import { TiptapTransformer } from '@hocuspocus/transformer'; +import * as Y from 'yjs'; +import { markdownToHtml } from '@docmost/editor-ext'; +import { + FileTaskStatus, + FileTaskType, + getFileTaskFolderPath, +} from './file.utils'; +import { v7 as uuid7 } from 'uuid'; +import { StorageService } from '../storage/storage.service'; +import { InjectQueue } from '@nestjs/bullmq'; +import { Queue } from 'bullmq'; +import { QueueJob, QueueName } from '../queue/constants'; + +@Injectable() +export class FileTaskService { + private readonly logger = new Logger(FileTaskService.name); + + constructor( + private readonly storageService: StorageService, + @InjectKysely() private readonly db: KyselyDB, + ) {} + + async processZIpImport(fileTaskId: string): Promise { + console.log(`Processing zip import: ${fileTaskId}`); + + const fileTask = await this.db + .selectFrom('fileTasks') + .selectAll() + .where('id', '=', fileTaskId) + .executeTakeFirst(); + + if (!fileTask) { + this.logger.log(`File task with ID ${fileTaskId} not found`); + return; + } + + // update status to processing + await this.db + .updateTable('fileTasks') + .set({ status: FileTaskStatus.Processing }) + .execute(); + + // it did, what next? + const file = await this.storageService.read(fileTask.filePath); + + + } + + // receive the file + async processGenericImport(fileTaskId: string): Promise { + + } +} diff --git a/apps/server/src/integrations/import/file.utils.ts b/apps/server/src/integrations/import/file.utils.ts new file mode 100644 index 00000000..7f80d000 --- /dev/null +++ b/apps/server/src/integrations/import/file.utils.ts @@ -0,0 +1,29 @@ +export enum FileTaskType { + Import = 'import', + Export = 'export', +} + +export enum FileImportType { + Generic = 'generic', + Notion = 'notion', + Confluence = 'confluence', +} + +export enum FileTaskStatus { + Pending = 'pending', + Processing = 'processing', + Success = 'success', + Failed = 'failed', +} + +export function getFileTaskFolderPath( + type: FileTaskType, + workspaceId: string, +): string { + switch (type) { + case FileTaskType.Import: + return `${workspaceId}/imports`; + case FileTaskType.Export: + return `${workspaceId}/exports`; + } +} diff --git a/apps/server/src/integrations/import/import.controller.ts b/apps/server/src/integrations/import/import.controller.ts index 975301af..a223b6df 100644 --- a/apps/server/src/integrations/import/import.controller.ts +++ b/apps/server/src/integrations/import/import.controller.ts @@ -83,4 +83,57 @@ export class ImportController { return this.importService.importPage(file, user.id, spaceId, workspace.id); } + + @UseInterceptors(FileInterceptor) + @UseGuards(JwtAuthGuard) + @HttpCode(HttpStatus.OK) + // temporary naming + @Post('pages/import-zip') + async importZip( + @Req() req: any, + @AuthUser() user: User, + @AuthWorkspace() workspace: Workspace, + ) { + const validFileExtensions = ['.zip']; + + const maxFileSize = bytes('100mb'); + + let file = null; + try { + file = await req.file({ + limits: { fileSize: maxFileSize, fields: 3, files: 1 }, + }); + } catch (err: any) { + this.logger.error(err.message); + if (err?.statusCode === 413) { + throw new BadRequestException( + `File too large. Exceeds the 100mb import limit`, + ); + } + } + + if (!file) { + throw new BadRequestException('Failed to upload file'); + } + + if ( + !validFileExtensions.includes(path.extname(file.filename).toLowerCase()) + ) { + throw new BadRequestException('Invalid import file type.'); + } + + const spaceId = file.fields?.spaceId?.value; + const source = file.fields?.source?.value; + + if (!spaceId) { + throw new BadRequestException('spaceId or format not found'); + } + + const ability = await this.spaceAbility.createForUser(user, spaceId); + if (ability.cannot(SpaceCaslAction.Edit, SpaceCaslSubject.Page)) { + throw new ForbiddenException(); + } + + return this.importService.importZip(file, source, user.id, spaceId, workspace.id); + } } diff --git a/apps/server/src/integrations/import/import.module.ts b/apps/server/src/integrations/import/import.module.ts index 60498808..b22d6405 100644 --- a/apps/server/src/integrations/import/import.module.ts +++ b/apps/server/src/integrations/import/import.module.ts @@ -1,9 +1,13 @@ import { Module } from '@nestjs/common'; import { ImportService } from './import.service'; import { ImportController } from './import.controller'; +import { StorageModule } from '../storage/storage.module'; +import { FileTaskService } from './file-task.service'; +import { FileTaskProcessor } from './processors/file-task.processor'; @Module({ - providers: [ImportService], + providers: [ImportService, FileTaskService, FileTaskProcessor], controllers: [ImportController], + imports: [StorageModule], }) export class ImportModule {} diff --git a/apps/server/src/integrations/import/import.service.ts b/apps/server/src/integrations/import/import.service.ts index f77df0dc..10260b6a 100644 --- a/apps/server/src/integrations/import/import.service.ts +++ b/apps/server/src/integrations/import/import.service.ts @@ -4,7 +4,8 @@ import { MultipartFile } from '@fastify/multipart'; import { sanitize } from 'sanitize-filename-ts'; import * as path from 'path'; import { - htmlToJson, jsonToText, + htmlToJson, + jsonToText, tiptapExtensions, } from '../../collaboration/collaboration.util'; import { InjectKysely } from 'nestjs-kysely'; @@ -13,7 +14,17 @@ import { generateSlugId } from '../../common/helpers'; import { generateJitteredKeyBetween } from 'fractional-indexing-jittered'; import { TiptapTransformer } from '@hocuspocus/transformer'; import * as Y from 'yjs'; -import { markdownToHtml } from "@docmost/editor-ext"; +import { markdownToHtml } from '@docmost/editor-ext'; +import { + FileTaskStatus, + FileTaskType, + getFileTaskFolderPath, +} from './file.utils'; +import { v7 as uuid7 } from 'uuid'; +import { StorageService } from '../storage/storage.service'; +import { InjectQueue } from '@nestjs/bullmq'; +import { Queue } from 'bullmq'; +import { QueueJob, QueueName } from '../queue/constants'; @Injectable() export class ImportService { @@ -21,7 +32,10 @@ export class ImportService { constructor( private readonly pageRepo: PageRepo, + private readonly storageService: StorageService, @InjectKysely() private readonly db: KyselyDB, + @InjectQueue(QueueName.FILE_TASK_QUEUE) + private readonly fileTaskQueue: Queue, ) {} async importPage( @@ -161,4 +175,56 @@ export class ImportService { return generateJitteredKeyBetween(null, null); } } + + async importZip( + filePromise: Promise, + source: string, + userId: string, + spaceId: string, + workspaceId: string, + ): Promise { + const file = await filePromise; + const fileBuffer = await file.toBuffer(); + const fileExtension = path.extname(file.filename).toLowerCase(); + const fileName = sanitize( + path.basename(file.filename, fileExtension).slice(0, 255), + ); + + const fileTaskId = uuid7(); + const filePath = `${getFileTaskFolderPath(FileTaskType.Import, workspaceId)}/${fileTaskId}/${fileName}`; + + // upload file + await this.storageService.upload(filePath, fileBuffer); + + // store in fileTasks table + await this.db + .insertInto('fileTasks') + .values({ + id: fileTaskId, + type: FileTaskType.Import, + source: source, + status: FileTaskStatus.Pending, + fileName: fileName, + filePath: filePath, + fileSize: 0, + fileExt: 'zip', + creatorId: userId, + spaceId: spaceId, + workspaceId: workspaceId, + }) + .execute(); + + // what to send to queue + // pass the task ID + await this.fileTaskQueue.add(QueueJob.IMPORT_TASK, { + fileTaskId: fileTaskId, + }); + // return tasks info + + // when the processor picks it up + // we change the status to processing + // if it gets processed successfully, + // we change the status to success + // else failed + } } diff --git a/apps/server/src/integrations/import/processors/file-task.processor.ts b/apps/server/src/integrations/import/processors/file-task.processor.ts new file mode 100644 index 00000000..2783828c --- /dev/null +++ b/apps/server/src/integrations/import/processors/file-task.processor.ts @@ -0,0 +1,51 @@ +import { Logger, OnModuleDestroy } from '@nestjs/common'; +import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq'; +import { Job } from 'bullmq'; +import { QueueJob, QueueName } from 'src/integrations/queue/constants'; +import { FileTaskService } from '../file-task.service'; + +@Processor(QueueName.FILE_TASK_QUEUE) +export class FileTaskProcessor extends WorkerHost implements OnModuleDestroy { + private readonly logger = new Logger(FileTaskProcessor.name); + constructor(private readonly fileTaskService: FileTaskService) { + super(); + } + + async process(job: Job): Promise { + try { + switch (job.name) { + case QueueJob.IMPORT_TASK: + console.log('import task', job.data.fileTaskId); + await this.fileTaskService.processZIpImport(job.data.fileTaskId); + break; + case QueueJob.EXPORT_TASK: + console.log('export task', job.data.fileTaskId); + } + } catch (err) { + throw err; + } + } + + @OnWorkerEvent('active') + onActive(job: Job) { + this.logger.debug(`Processing ${job.name} job`); + } + + @OnWorkerEvent('failed') + onError(job: Job) { + this.logger.error( + `Error processing ${job.name} job. Reason: ${job.failedReason}`, + ); + } + + @OnWorkerEvent('completed') + onCompleted(job: Job) { + this.logger.debug(`Completed ${job.name} job`); + } + + async onModuleDestroy(): Promise { + if (this.worker) { + await this.worker.close(); + } + } +} diff --git a/apps/server/src/integrations/queue/constants/queue.constants.ts b/apps/server/src/integrations/queue/constants/queue.constants.ts index 61d7163b..150c098e 100644 --- a/apps/server/src/integrations/queue/constants/queue.constants.ts +++ b/apps/server/src/integrations/queue/constants/queue.constants.ts @@ -3,6 +3,7 @@ export enum QueueName { ATTACHMENT_QUEUE = '{attachment-queue}', GENERAL_QUEUE = '{general-queue}', BILLING_QUEUE = '{billing-queue}', + FILE_TASK_QUEUE = '{file-task-queue}', } export enum QueueJob { @@ -19,4 +20,7 @@ export enum QueueJob { TRIAL_ENDED = 'trial-ended', WELCOME_EMAIL = 'welcome-email', FIRST_PAYMENT_EMAIL = 'first-payment-email', + + IMPORT_TASK = 'import-task', + EXPORT_TASK = 'export-task', } diff --git a/apps/server/src/integrations/queue/queue.module.ts b/apps/server/src/integrations/queue/queue.module.ts index 3531a204..c8f7f79c 100644 --- a/apps/server/src/integrations/queue/queue.module.ts +++ b/apps/server/src/integrations/queue/queue.module.ts @@ -49,6 +49,9 @@ import { BacklinksProcessor } from './processors/backlinks.processor'; BullModule.registerQueue({ name: QueueName.BILLING_QUEUE, }), + BullModule.registerQueue({ + name: QueueName.FILE_TASK_QUEUE, + }), ], exports: [BullModule], providers: [BacklinksProcessor],