mirror of
https://github.com/docmost/docmost.git
synced 2025-11-12 15:12:39 +10:00
queue import attachments upload (#1353)
This commit is contained in:
@ -71,6 +71,7 @@
|
||||
"nestjs-kysely": "^1.2.0",
|
||||
"nodemailer": "^7.0.3",
|
||||
"openid-client": "^5.7.1",
|
||||
"p-limit": "^6.2.0",
|
||||
"passport-google-oauth20": "^2.0.0",
|
||||
"passport-jwt": "^4.0.1",
|
||||
"pg": "^8.16.0",
|
||||
|
||||
@ -14,10 +14,14 @@ import { AttachmentType } from '../../../core/attachment/attachment.constants';
|
||||
import { unwrapFromParagraph } from '../utils/import-formatter';
|
||||
import { resolveRelativeAttachmentPath } from '../utils/import.utils';
|
||||
import { load } from 'cheerio';
|
||||
import pLimit from 'p-limit';
|
||||
|
||||
@Injectable()
|
||||
export class ImportAttachmentService {
|
||||
private readonly logger = new Logger(ImportAttachmentService.name);
|
||||
private readonly CONCURRENT_UPLOADS = 3;
|
||||
private readonly MAX_RETRIES = 2;
|
||||
private readonly RETRY_DELAY = 2000;
|
||||
|
||||
constructor(
|
||||
private readonly storageService: StorageService,
|
||||
@ -41,7 +45,14 @@ export class ImportAttachmentService {
|
||||
attachmentCandidates,
|
||||
} = opts;
|
||||
|
||||
const attachmentTasks: Promise<void>[] = [];
|
||||
const attachmentTasks: (() => Promise<void>)[] = [];
|
||||
const limit = pLimit(this.CONCURRENT_UPLOADS);
|
||||
const uploadStats = {
|
||||
total: 0,
|
||||
completed: 0,
|
||||
failed: 0,
|
||||
failedFiles: [] as string[],
|
||||
};
|
||||
|
||||
/**
|
||||
* Cache keyed by the *relative* path that appears in the HTML.
|
||||
@ -74,30 +85,16 @@ export class ImportAttachmentService {
|
||||
|
||||
const apiFilePath = `/api/files/${attachmentId}/${fileNameWithExt}`;
|
||||
|
||||
attachmentTasks.push(
|
||||
(async () => {
|
||||
const fileStream = createReadStream(abs);
|
||||
await this.storageService.uploadStream(storageFilePath, fileStream);
|
||||
const stat = await fs.stat(abs);
|
||||
|
||||
await this.db
|
||||
.insertInto('attachments')
|
||||
.values({
|
||||
id: attachmentId,
|
||||
filePath: storageFilePath,
|
||||
fileName: fileNameWithExt,
|
||||
fileSize: stat.size,
|
||||
mimeType: getMimeType(fileNameWithExt),
|
||||
type: 'file',
|
||||
fileExt: ext,
|
||||
creatorId: fileTask.creatorId,
|
||||
workspaceId: fileTask.workspaceId,
|
||||
pageId,
|
||||
spaceId: fileTask.spaceId,
|
||||
})
|
||||
.execute();
|
||||
})(),
|
||||
);
|
||||
attachmentTasks.push(() => this.uploadWithRetry({
|
||||
abs,
|
||||
storageFilePath,
|
||||
attachmentId,
|
||||
fileNameWithExt,
|
||||
ext,
|
||||
pageId,
|
||||
fileTask,
|
||||
uploadStats,
|
||||
}));
|
||||
|
||||
return {
|
||||
attachmentId,
|
||||
@ -292,12 +289,113 @@ export class ImportAttachmentService {
|
||||
}
|
||||
|
||||
// wait for all uploads & DB inserts
|
||||
try {
|
||||
await Promise.all(attachmentTasks);
|
||||
} catch (err) {
|
||||
this.logger.log('Import attachment upload error', err);
|
||||
uploadStats.total = attachmentTasks.length;
|
||||
|
||||
if (uploadStats.total > 0) {
|
||||
this.logger.debug(`Starting upload of ${uploadStats.total} attachments...`);
|
||||
|
||||
try {
|
||||
await Promise.all(
|
||||
attachmentTasks.map(task => limit(task))
|
||||
);
|
||||
} catch (err) {
|
||||
this.logger.error('Import attachment upload error', err);
|
||||
}
|
||||
|
||||
this.logger.debug(
|
||||
`Upload completed: ${uploadStats.completed}/${uploadStats.total} successful, ${uploadStats.failed} failed`
|
||||
);
|
||||
|
||||
if (uploadStats.failed > 0) {
|
||||
this.logger.warn(
|
||||
`Failed to upload ${uploadStats.failed} files:`,
|
||||
uploadStats.failedFiles
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return $.root().html() || '';
|
||||
}
|
||||
|
||||
private async uploadWithRetry(opts: {
|
||||
abs: string;
|
||||
storageFilePath: string;
|
||||
attachmentId: string;
|
||||
fileNameWithExt: string;
|
||||
ext: string;
|
||||
pageId: string;
|
||||
fileTask: FileTask;
|
||||
uploadStats: {
|
||||
total: number;
|
||||
completed: number;
|
||||
failed: number;
|
||||
failedFiles: string[];
|
||||
};
|
||||
}): Promise<void> {
|
||||
const {
|
||||
abs,
|
||||
storageFilePath,
|
||||
attachmentId,
|
||||
fileNameWithExt,
|
||||
ext,
|
||||
pageId,
|
||||
fileTask,
|
||||
uploadStats,
|
||||
} = opts;
|
||||
|
||||
let lastError: Error;
|
||||
|
||||
for (let attempt = 1; attempt <= this.MAX_RETRIES; attempt++) {
|
||||
try {
|
||||
const fileStream = createReadStream(abs);
|
||||
await this.storageService.uploadStream(storageFilePath, fileStream);
|
||||
const stat = await fs.stat(abs);
|
||||
|
||||
await this.db
|
||||
.insertInto('attachments')
|
||||
.values({
|
||||
id: attachmentId,
|
||||
filePath: storageFilePath,
|
||||
fileName: fileNameWithExt,
|
||||
fileSize: stat.size,
|
||||
mimeType: getMimeType(fileNameWithExt),
|
||||
type: 'file',
|
||||
fileExt: ext,
|
||||
creatorId: fileTask.creatorId,
|
||||
workspaceId: fileTask.workspaceId,
|
||||
pageId,
|
||||
spaceId: fileTask.spaceId,
|
||||
})
|
||||
.execute();
|
||||
|
||||
uploadStats.completed++;
|
||||
|
||||
if (uploadStats.completed % 10 === 0) {
|
||||
this.logger.debug(
|
||||
`Upload progress: ${uploadStats.completed}/${uploadStats.total}`
|
||||
);
|
||||
}
|
||||
|
||||
return;
|
||||
} catch (error) {
|
||||
lastError = error as Error;
|
||||
this.logger.warn(
|
||||
`Upload attempt ${attempt}/${this.MAX_RETRIES} failed for ${fileNameWithExt}: ${error instanceof Error ? error.message : String(error)}`
|
||||
);
|
||||
|
||||
if (attempt < this.MAX_RETRIES) {
|
||||
await new Promise(resolve =>
|
||||
setTimeout(resolve, this.RETRY_DELAY * attempt)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
uploadStats.failed++;
|
||||
uploadStats.failedFiles.push(fileNameWithExt);
|
||||
this.logger.error(
|
||||
`Failed to upload ${fileNameWithExt} after ${this.MAX_RETRIES} attempts:`,
|
||||
lastError
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user