diff --git a/apps/server/package.json b/apps/server/package.json index 63fa0be1..4106ba89 100644 --- a/apps/server/package.json +++ b/apps/server/package.json @@ -71,6 +71,7 @@ "nestjs-kysely": "^1.2.0", "nodemailer": "^7.0.3", "openid-client": "^5.7.1", + "p-limit": "^6.2.0", "passport-google-oauth20": "^2.0.0", "passport-jwt": "^4.0.1", "pg": "^8.16.0", diff --git a/apps/server/src/integrations/import/services/import-attachment.service.ts b/apps/server/src/integrations/import/services/import-attachment.service.ts index cd9039e2..b9a488a9 100644 --- a/apps/server/src/integrations/import/services/import-attachment.service.ts +++ b/apps/server/src/integrations/import/services/import-attachment.service.ts @@ -14,10 +14,14 @@ import { AttachmentType } from '../../../core/attachment/attachment.constants'; import { unwrapFromParagraph } from '../utils/import-formatter'; import { resolveRelativeAttachmentPath } from '../utils/import.utils'; import { load } from 'cheerio'; +import pLimit from 'p-limit'; @Injectable() export class ImportAttachmentService { private readonly logger = new Logger(ImportAttachmentService.name); + private readonly CONCURRENT_UPLOADS = 3; + private readonly MAX_RETRIES = 2; + private readonly RETRY_DELAY = 2000; constructor( private readonly storageService: StorageService, @@ -41,7 +45,14 @@ export class ImportAttachmentService { attachmentCandidates, } = opts; - const attachmentTasks: Promise[] = []; + const attachmentTasks: (() => Promise)[] = []; + const limit = pLimit(this.CONCURRENT_UPLOADS); + const uploadStats = { + total: 0, + completed: 0, + failed: 0, + failedFiles: [] as string[], + }; /** * Cache keyed by the *relative* path that appears in the HTML. @@ -74,30 +85,16 @@ export class ImportAttachmentService { const apiFilePath = `/api/files/${attachmentId}/${fileNameWithExt}`; - attachmentTasks.push( - (async () => { - const fileStream = createReadStream(abs); - await this.storageService.uploadStream(storageFilePath, fileStream); - const stat = await fs.stat(abs); - - await this.db - .insertInto('attachments') - .values({ - id: attachmentId, - filePath: storageFilePath, - fileName: fileNameWithExt, - fileSize: stat.size, - mimeType: getMimeType(fileNameWithExt), - type: 'file', - fileExt: ext, - creatorId: fileTask.creatorId, - workspaceId: fileTask.workspaceId, - pageId, - spaceId: fileTask.spaceId, - }) - .execute(); - })(), - ); + attachmentTasks.push(() => this.uploadWithRetry({ + abs, + storageFilePath, + attachmentId, + fileNameWithExt, + ext, + pageId, + fileTask, + uploadStats, + })); return { attachmentId, @@ -292,12 +289,113 @@ export class ImportAttachmentService { } // wait for all uploads & DB inserts - try { - await Promise.all(attachmentTasks); - } catch (err) { - this.logger.log('Import attachment upload error', err); + uploadStats.total = attachmentTasks.length; + + if (uploadStats.total > 0) { + this.logger.debug(`Starting upload of ${uploadStats.total} attachments...`); + + try { + await Promise.all( + attachmentTasks.map(task => limit(task)) + ); + } catch (err) { + this.logger.error('Import attachment upload error', err); + } + + this.logger.debug( + `Upload completed: ${uploadStats.completed}/${uploadStats.total} successful, ${uploadStats.failed} failed` + ); + + if (uploadStats.failed > 0) { + this.logger.warn( + `Failed to upload ${uploadStats.failed} files:`, + uploadStats.failedFiles + ); + } } return $.root().html() || ''; } + + private async uploadWithRetry(opts: { + abs: string; + storageFilePath: string; + attachmentId: string; + fileNameWithExt: string; + ext: string; + pageId: string; + fileTask: FileTask; + uploadStats: { + total: number; + completed: number; + failed: number; + failedFiles: string[]; + }; + }): Promise { + const { + abs, + storageFilePath, + attachmentId, + fileNameWithExt, + ext, + pageId, + fileTask, + uploadStats, + } = opts; + + let lastError: Error; + + for (let attempt = 1; attempt <= this.MAX_RETRIES; attempt++) { + try { + const fileStream = createReadStream(abs); + await this.storageService.uploadStream(storageFilePath, fileStream); + const stat = await fs.stat(abs); + + await this.db + .insertInto('attachments') + .values({ + id: attachmentId, + filePath: storageFilePath, + fileName: fileNameWithExt, + fileSize: stat.size, + mimeType: getMimeType(fileNameWithExt), + type: 'file', + fileExt: ext, + creatorId: fileTask.creatorId, + workspaceId: fileTask.workspaceId, + pageId, + spaceId: fileTask.spaceId, + }) + .execute(); + + uploadStats.completed++; + + if (uploadStats.completed % 10 === 0) { + this.logger.debug( + `Upload progress: ${uploadStats.completed}/${uploadStats.total}` + ); + } + + return; + } catch (error) { + lastError = error as Error; + this.logger.warn( + `Upload attempt ${attempt}/${this.MAX_RETRIES} failed for ${fileNameWithExt}: ${error instanceof Error ? error.message : String(error)}` + ); + + if (attempt < this.MAX_RETRIES) { + await new Promise(resolve => + setTimeout(resolve, this.RETRY_DELAY * attempt) + ); + } + } + } + + uploadStats.failed++; + uploadStats.failedFiles.push(fileNameWithExt); + this.logger.error( + `Failed to upload ${fileNameWithExt} after ${this.MAX_RETRIES} attempts:`, + lastError + ); + } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index b8c0c88c..304171e7 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -534,6 +534,9 @@ importers: openid-client: specifier: ^5.7.1 version: 5.7.1 + p-limit: + specifier: ^6.2.0 + version: 6.2.0 passport-google-oauth20: specifier: ^2.0.0 version: 2.0.0 @@ -7637,6 +7640,10 @@ packages: resolution: {integrity: sha512-TYOanM3wGwNGsZN2cVTYPArw454xnXj5qmWF1bEoAc4+cU/ol7GVh7odevjp1FNHduHc3KZMcFduxU5Xc6uJRQ==} engines: {node: '>=10'} + p-limit@6.2.0: + resolution: {integrity: sha512-kuUqqHNUqoIWp/c467RI4X6mmyuojY5jGutNU0wVTmEOOfcuwLqyMVoAi9MKi2Ak+5i9+nhmrK4ufZE8069kHA==} + engines: {node: '>=18'} + p-locate@4.1.0: resolution: {integrity: sha512-R79ZZ/0wAxKGu3oYMlz8jy/kbhsNrS7SKZ7PxEHBgJ5+F2mtFW2fK2cOtBh1cHYkQsbzFV7I+EoRKe6Yt0oK7A==} engines: {node: '>=8'} @@ -9567,6 +9574,10 @@ packages: resolution: {integrity: sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q==} engines: {node: '>=10'} + yocto-queue@1.2.1: + resolution: {integrity: sha512-AyeEbWOu/TAXdxlV9wmGcR0+yh2j3vYPGOECcIj2S7MkrLyC7ne+oye2BKTItt0ii2PHk4cDy+95+LshzbXnGg==} + engines: {node: '>=12.20'} + yoctocolors-cjs@2.1.2: resolution: {integrity: sha512-cYVsTjKl8b+FrnidjibDWskAv7UKOfcwaVZdp/it9n1s9fU3IkgDbhdIRKCW4JDsAlECJY0ytoVPT3sK6kideA==} engines: {node: '>=18'} @@ -18193,6 +18204,10 @@ snapshots: dependencies: yocto-queue: 0.1.0 + p-limit@6.2.0: + dependencies: + yocto-queue: 1.2.1 + p-locate@4.1.0: dependencies: p-limit: 2.3.0 @@ -20183,6 +20198,8 @@ snapshots: yocto-queue@0.1.0: {} + yocto-queue@1.2.1: {} + yoctocolors-cjs@2.1.2: {} zeed-dom@0.15.1: