queue attachments upload

This commit is contained in:
Philipinho
2025-07-08 22:47:14 -07:00
parent f80004817c
commit a8f95605ef
3 changed files with 145 additions and 29 deletions

View File

@ -71,6 +71,7 @@
"nestjs-kysely": "^1.2.0",
"nodemailer": "^7.0.3",
"openid-client": "^5.7.1",
"p-limit": "^6.2.0",
"passport-google-oauth20": "^2.0.0",
"passport-jwt": "^4.0.1",
"pg": "^8.16.0",

View File

@ -14,10 +14,14 @@ import { AttachmentType } from '../../../core/attachment/attachment.constants';
import { unwrapFromParagraph } from '../utils/import-formatter';
import { resolveRelativeAttachmentPath } from '../utils/import.utils';
import { load } from 'cheerio';
import pLimit from 'p-limit';
@Injectable()
export class ImportAttachmentService {
private readonly logger = new Logger(ImportAttachmentService.name);
private readonly CONCURRENT_UPLOADS = 3;
private readonly MAX_RETRIES = 2;
private readonly RETRY_DELAY = 2000;
constructor(
private readonly storageService: StorageService,
@ -41,7 +45,14 @@ export class ImportAttachmentService {
attachmentCandidates,
} = opts;
const attachmentTasks: Promise<void>[] = [];
const attachmentTasks: (() => Promise<void>)[] = [];
const limit = pLimit(this.CONCURRENT_UPLOADS);
const uploadStats = {
total: 0,
completed: 0,
failed: 0,
failedFiles: [] as string[],
};
/**
* Cache keyed by the *relative* path that appears in the HTML.
@ -74,30 +85,16 @@ export class ImportAttachmentService {
const apiFilePath = `/api/files/${attachmentId}/${fileNameWithExt}`;
attachmentTasks.push(
(async () => {
const fileStream = createReadStream(abs);
await this.storageService.uploadStream(storageFilePath, fileStream);
const stat = await fs.stat(abs);
await this.db
.insertInto('attachments')
.values({
id: attachmentId,
filePath: storageFilePath,
fileName: fileNameWithExt,
fileSize: stat.size,
mimeType: getMimeType(fileNameWithExt),
type: 'file',
fileExt: ext,
creatorId: fileTask.creatorId,
workspaceId: fileTask.workspaceId,
pageId,
spaceId: fileTask.spaceId,
})
.execute();
})(),
);
attachmentTasks.push(() => this.uploadWithRetry({
abs,
storageFilePath,
attachmentId,
fileNameWithExt,
ext,
pageId,
fileTask,
uploadStats,
}));
return {
attachmentId,
@ -292,12 +289,113 @@ export class ImportAttachmentService {
}
// wait for all uploads & DB inserts
try {
await Promise.all(attachmentTasks);
} catch (err) {
this.logger.log('Import attachment upload error', err);
uploadStats.total = attachmentTasks.length;
if (uploadStats.total > 0) {
this.logger.debug(`Starting upload of ${uploadStats.total} attachments...`);
try {
await Promise.all(
attachmentTasks.map(task => limit(task))
);
} catch (err) {
this.logger.error('Import attachment upload error', err);
}
this.logger.debug(
`Upload completed: ${uploadStats.completed}/${uploadStats.total} successful, ${uploadStats.failed} failed`
);
if (uploadStats.failed > 0) {
this.logger.warn(
`Failed to upload ${uploadStats.failed} files:`,
uploadStats.failedFiles
);
}
}
return $.root().html() || '';
}
private async uploadWithRetry(opts: {
abs: string;
storageFilePath: string;
attachmentId: string;
fileNameWithExt: string;
ext: string;
pageId: string;
fileTask: FileTask;
uploadStats: {
total: number;
completed: number;
failed: number;
failedFiles: string[];
};
}): Promise<void> {
const {
abs,
storageFilePath,
attachmentId,
fileNameWithExt,
ext,
pageId,
fileTask,
uploadStats,
} = opts;
let lastError: Error;
for (let attempt = 1; attempt <= this.MAX_RETRIES; attempt++) {
try {
const fileStream = createReadStream(abs);
await this.storageService.uploadStream(storageFilePath, fileStream);
const stat = await fs.stat(abs);
await this.db
.insertInto('attachments')
.values({
id: attachmentId,
filePath: storageFilePath,
fileName: fileNameWithExt,
fileSize: stat.size,
mimeType: getMimeType(fileNameWithExt),
type: 'file',
fileExt: ext,
creatorId: fileTask.creatorId,
workspaceId: fileTask.workspaceId,
pageId,
spaceId: fileTask.spaceId,
})
.execute();
uploadStats.completed++;
if (uploadStats.completed % 10 === 0) {
this.logger.debug(
`Upload progress: ${uploadStats.completed}/${uploadStats.total}`
);
}
return;
} catch (error) {
lastError = error as Error;
this.logger.warn(
`Upload attempt ${attempt}/${this.MAX_RETRIES} failed for ${fileNameWithExt}: ${error instanceof Error ? error.message : String(error)}`
);
if (attempt < this.MAX_RETRIES) {
await new Promise(resolve =>
setTimeout(resolve, this.RETRY_DELAY * attempt)
);
}
}
}
uploadStats.failed++;
uploadStats.failedFiles.push(fileNameWithExt);
this.logger.error(
`Failed to upload ${fileNameWithExt} after ${this.MAX_RETRIES} attempts:`,
lastError
);
}
}

17
pnpm-lock.yaml generated
View File

@ -534,6 +534,9 @@ importers:
openid-client:
specifier: ^5.7.1
version: 5.7.1
p-limit:
specifier: ^6.2.0
version: 6.2.0
passport-google-oauth20:
specifier: ^2.0.0
version: 2.0.0
@ -7637,6 +7640,10 @@ packages:
resolution: {integrity: sha512-TYOanM3wGwNGsZN2cVTYPArw454xnXj5qmWF1bEoAc4+cU/ol7GVh7odevjp1FNHduHc3KZMcFduxU5Xc6uJRQ==}
engines: {node: '>=10'}
p-limit@6.2.0:
resolution: {integrity: sha512-kuUqqHNUqoIWp/c467RI4X6mmyuojY5jGutNU0wVTmEOOfcuwLqyMVoAi9MKi2Ak+5i9+nhmrK4ufZE8069kHA==}
engines: {node: '>=18'}
p-locate@4.1.0:
resolution: {integrity: sha512-R79ZZ/0wAxKGu3oYMlz8jy/kbhsNrS7SKZ7PxEHBgJ5+F2mtFW2fK2cOtBh1cHYkQsbzFV7I+EoRKe6Yt0oK7A==}
engines: {node: '>=8'}
@ -9567,6 +9574,10 @@ packages:
resolution: {integrity: sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q==}
engines: {node: '>=10'}
yocto-queue@1.2.1:
resolution: {integrity: sha512-AyeEbWOu/TAXdxlV9wmGcR0+yh2j3vYPGOECcIj2S7MkrLyC7ne+oye2BKTItt0ii2PHk4cDy+95+LshzbXnGg==}
engines: {node: '>=12.20'}
yoctocolors-cjs@2.1.2:
resolution: {integrity: sha512-cYVsTjKl8b+FrnidjibDWskAv7UKOfcwaVZdp/it9n1s9fU3IkgDbhdIRKCW4JDsAlECJY0ytoVPT3sK6kideA==}
engines: {node: '>=18'}
@ -18193,6 +18204,10 @@ snapshots:
dependencies:
yocto-queue: 0.1.0
p-limit@6.2.0:
dependencies:
yocto-queue: 1.2.1
p-locate@4.1.0:
dependencies:
p-limit: 2.3.0
@ -20183,6 +20198,8 @@ snapshots:
yocto-queue@0.1.0: {}
yocto-queue@1.2.1: {}
yoctocolors-cjs@2.1.2: {}
zeed-dom@0.15.1: