diff --git a/apps/server/src/ee b/apps/server/src/ee index d03a6a3f..fd34d418 160000 --- a/apps/server/src/ee +++ b/apps/server/src/ee @@ -1 +1 @@ -Subproject commit d03a6a3f2de77df4447b56135e1600243bd67173 +Subproject commit fd34d4183aaae765b27f95e49830c6ff2ac9aa1f diff --git a/apps/server/src/integrations/import/processors/file-task.processor.ts b/apps/server/src/integrations/import/processors/file-task.processor.ts index 38ef8dec..7b65f5ab 100644 --- a/apps/server/src/integrations/import/processors/file-task.processor.ts +++ b/apps/server/src/integrations/import/processors/file-task.processor.ts @@ -41,15 +41,32 @@ export class FileTaskProcessor extends WorkerHost implements OnModuleDestroy { @OnWorkerEvent('failed') async onFailed(job: Job) { this.logger.error( - `Error processing ${job.name} job. Reason: ${job.failedReason}`, + `Error processing ${job.name} job. Import Task ID: ${job.data.fileTaskId}. Reason: ${job.failedReason}`, ); + await this.handleFailedJob(job); + } + + @OnWorkerEvent('stalled') + async onStalled(job: Job) { + this.logger.error( + `Job ${job.name} stalled. . Import Task ID: ${job.data.fileTaskId}.. Job ID: ${job.id}`, + ); + + // Set failedReason for stalled jobs since it's not automatically set + job.failedReason = 'Job stalled and was marked as failed'; + await this.handleFailedJob(job); + } + + private async handleFailedJob(job: Job) { try { const fileTaskId = job.data.fileTaskId; + const reason = job.failedReason || 'Unknown error'; + await this.fileTaskService.updateTaskStatus( fileTaskId, FileTaskStatus.Failed, - job.failedReason, + reason, ); const fileTask = await this.fileTaskService.getFileTask(fileTaskId); @@ -62,10 +79,22 @@ export class FileTaskProcessor extends WorkerHost implements OnModuleDestroy { } @OnWorkerEvent('completed') - onCompleted(job: Job) { + async onCompleted(job: Job) { this.logger.log( `Completed ${job.name} job for File task ID ${job.data.fileTaskId}`, ); + + try { + const fileTask = await this.fileTaskService.getFileTask( + job.data.fileTaskId, + ); + if (fileTask) { + await this.storageService.delete(fileTask.filePath); + this.logger.debug(`Deleted imported zip file: ${fileTask.filePath}`); + } + } catch (err) { + this.logger.error(`Failed to delete imported zip file:`, err); + } } async onModuleDestroy(): Promise { diff --git a/apps/server/src/integrations/import/services/import-attachment.service.ts b/apps/server/src/integrations/import/services/import-attachment.service.ts index 92780395..04a18fda 100644 --- a/apps/server/src/integrations/import/services/import-attachment.service.ts +++ b/apps/server/src/integrations/import/services/import-attachment.service.ts @@ -35,7 +35,7 @@ interface DrawioPair { @Injectable() export class ImportAttachmentService { private readonly logger = new Logger(ImportAttachmentService.name); - private readonly CONCURRENT_UPLOADS = 5; + private readonly CONCURRENT_UPLOADS = 1; private readonly MAX_RETRIES = 2; private readonly RETRY_DELAY = 2000; @@ -139,7 +139,9 @@ export class ImportAttachmentService { const stream = Readable.from(svgBuffer); // Upload to storage - await this.storageService.uploadStream(storageFilePath, stream); + await this.storageService.uploadStream(storageFilePath, stream, { + recreateClient: true, + }); // Insert into database await this.db @@ -802,7 +804,10 @@ export class ImportAttachmentService { for (let attempt = 1; attempt <= this.MAX_RETRIES; attempt++) { try { const fileStream = createReadStream(abs); - await this.storageService.uploadStream(storageFilePath, fileStream); + await this.storageService.uploadStream(storageFilePath, fileStream, { + recreateClient: true, + }); + const stat = await fs.stat(abs); await this.db diff --git a/apps/server/src/integrations/import/utils/import-formatter.ts b/apps/server/src/integrations/import/utils/import-formatter.ts index da30a862..54fc50dd 100644 --- a/apps/server/src/integrations/import/utils/import-formatter.ts +++ b/apps/server/src/integrations/import/utils/import-formatter.ts @@ -214,6 +214,9 @@ export function notionFormatter($: CheerioAPI, $root: Cheerio) { $fig.replaceWith($newAnchor); }); + // remove user icons + $root.find('span.user img.user-icon').remove(); + // remove toc $root.find('nav.table_of_contents').remove(); } diff --git a/apps/server/src/integrations/storage/drivers/local.driver.ts b/apps/server/src/integrations/storage/drivers/local.driver.ts index caa6ab36..5171066c 100644 --- a/apps/server/src/integrations/storage/drivers/local.driver.ts +++ b/apps/server/src/integrations/storage/drivers/local.driver.ts @@ -28,7 +28,7 @@ export class LocalDriver implements StorageDriver { } } - async uploadStream(filePath: string, file: Readable): Promise { + async uploadStream(filePath: string, file: Readable, options?: { recreateClient?: boolean }): Promise { try { const fullPath = this._fullPath(filePath); await fs.mkdir(dirname(fullPath), { recursive: true }); diff --git a/apps/server/src/integrations/storage/drivers/s3.driver.ts b/apps/server/src/integrations/storage/drivers/s3.driver.ts index f9852347..f6d48677 100644 --- a/apps/server/src/integrations/storage/drivers/s3.driver.ts +++ b/apps/server/src/integrations/storage/drivers/s3.driver.ts @@ -41,12 +41,26 @@ export class S3Driver implements StorageDriver { } } - async uploadStream(filePath: string, file: Readable): Promise { + async uploadStream( + filePath: string, + file: Readable, + options?: { recreateClient?: boolean }, + ): Promise { + let clientToUse = this.s3Client; + let shouldDestroyClient = false; + + // optionally recreate client to avoid socket hang errors + // (during multi-attachments imports) + if (options?.recreateClient) { + clientToUse = new S3Client(this.config as any); + shouldDestroyClient = true; + } + try { const contentType = getMimeType(filePath); const upload = new Upload({ - client: this.s3Client, + client: clientToUse, params: { Bucket: this.config.bucket, Key: filePath, @@ -58,6 +72,10 @@ export class S3Driver implements StorageDriver { await upload.done(); } catch (err) { throw new Error(`Failed to upload file: ${(err as Error).message}`); + } finally { + if (shouldDestroyClient && clientToUse) { + clientToUse.destroy(); + } } } diff --git a/apps/server/src/integrations/storage/interfaces/storage-driver.interface.ts b/apps/server/src/integrations/storage/interfaces/storage-driver.interface.ts index c4511a80..22a86d2b 100644 --- a/apps/server/src/integrations/storage/interfaces/storage-driver.interface.ts +++ b/apps/server/src/integrations/storage/interfaces/storage-driver.interface.ts @@ -3,7 +3,7 @@ import { Readable } from 'stream'; export interface StorageDriver { upload(filePath: string, file: Buffer): Promise; - uploadStream(filePath: string, file: Readable): Promise; + uploadStream(filePath: string, file: Readable, options?: { recreateClient?: boolean }): Promise; copy(fromFilePath: string, toFilePath: string): Promise; diff --git a/apps/server/src/integrations/storage/storage.service.ts b/apps/server/src/integrations/storage/storage.service.ts index e9577ee9..d796351b 100644 --- a/apps/server/src/integrations/storage/storage.service.ts +++ b/apps/server/src/integrations/storage/storage.service.ts @@ -15,8 +15,8 @@ export class StorageService { this.logger.debug(`File uploaded successfully. Path: ${filePath}`); } - async uploadStream(filePath: string, fileContent: Readable) { - await this.storageDriver.uploadStream(filePath, fileContent); + async uploadStream(filePath: string, fileContent: Readable, options?: { recreateClient?: boolean }) { + await this.storageDriver.uploadStream(filePath, fileContent, options); this.logger.debug(`File uploaded successfully. Path: ${filePath}`); }