This commit is contained in:
Philipinho
2025-06-08 18:55:57 -07:00
parent 097e30e992
commit cbaf3394c0
11 changed files with 166 additions and 74 deletions

View File

@ -5,3 +5,14 @@ export class FileTaskIdDto {
@IsUUID()
fileTaskId: string;
}
export type ImportPageNode = {
id: string;
slugId: string;
name: string;
content: string;
position?: string | null;
parentPageId: string | null;
fileExtension: string;
filePath: string;
};

View File

@ -6,6 +6,7 @@ import { FileTaskService } from './services/file-task.service';
import { FileTaskProcessor } from './processors/file-task.processor';
import { ImportAttachmentService } from './services/import-attachment.service';
import { FileTaskController } from './file-task.controller';
import { PageModule } from '../../core/page/page.module';
@Module({
providers: [
@ -16,6 +17,6 @@ import { FileTaskController } from './file-task.controller';
],
exports: [ImportService, ImportAttachmentService],
controllers: [ImportController, FileTaskController],
imports: [StorageModule],
imports: [StorageModule, PageModule],
})
export class ImportModule {}

View File

@ -44,35 +44,23 @@ export class FileTaskProcessor extends WorkerHost implements OnModuleDestroy {
`Error processing ${job.name} job. Reason: ${job.failedReason}`,
);
const MAX_JOB_ATTEMPTS = 3;
const fileTaskId = job.data.fileTaskId;
if (job.attemptsMade >= MAX_JOB_ATTEMPTS) {
this.logger.error(`Max import attempts reached for Task ${fileTaskId}.`);
try {
const fileTaskId = job.data.fileTaskId;
await this.fileTaskService.updateTaskStatus(
fileTaskId,
FileTaskStatus.Failed,
job.failedReason,
);
try {
const fileTask = await this.fileTaskService.getFileTask(fileTaskId);
if (fileTask) {
await this.storageService.delete(fileTask.filePath);
}
} catch (err) {
this.logger.error(err);
const fileTask = await this.fileTaskService.getFileTask(fileTaskId);
if (fileTask) {
await this.storageService.delete(fileTask.filePath);
}
} catch (err) {
this.logger.error(err);
}
}
@OnWorkerEvent('stalled')
async onStalled(job: Job) {
this.logger.error(
`Stalled processing ${job.name} job. Reason: ${job.failedReason}`,
);
}
@OnWorkerEvent('completed')
onCompleted(job: Job) {
this.logger.log(

View File

@ -29,6 +29,8 @@ import { executeTx } from '@docmost/db/utils';
import { BacklinkRepo } from '@docmost/db/repos/backlink/backlink.repo';
import { ImportAttachmentService } from './import-attachment.service';
import { ModuleRef } from '@nestjs/core';
import { PageService } from '../../../core/page/services/page.service';
import { ImportPageNode } from '../dto/file-task-dto';
@Injectable()
export class FileTaskService {
@ -37,6 +39,7 @@ export class FileTaskService {
constructor(
private readonly storageService: StorageService,
private readonly importService: ImportService,
private readonly pageService: PageService,
private readonly backlinkRepo: BacklinkRepo,
@InjectKysely() private readonly db: KyselyDB,
private readonly importAttachmentService: ImportAttachmentService,
@ -55,6 +58,10 @@ export class FileTaskService {
return;
}
if (fileTask.status === FileTaskStatus.Failed) {
return;
}
if (fileTask.status === FileTaskStatus.Success) {
this.logger.log('Imported task already processed.');
return;
@ -118,6 +125,8 @@ export class FileTaskService {
}
try {
await this.updateTaskStatus(fileTaskId, FileTaskStatus.Success, null);
await cleanupTmpFile();
await cleanupTmpDir();
// delete stored file on success
await this.storageService.delete(fileTask.filePath);
} catch (err) {
@ -142,19 +151,7 @@ export class FileTaskService {
const allFiles = await collectMarkdownAndHtmlFiles(extractDir);
const attachmentCandidates = await buildAttachmentCandidates(extractDir);
const pagesMap = new Map<
string,
{
id: string;
slugId: string;
name: string;
content: string;
position?: string | null;
parentPageId: string | null;
fileExtension: string;
filePath: string;
}
>();
const pagesMap = new Map<string, ImportPageNode>();
for (const absPath of allFiles) {
const relPath = path
@ -201,14 +198,42 @@ export class FileTaskService {
});
// generate position keys
const siblingsMap = new Map<string | null, typeof Array.prototype>();
const siblingsMap = new Map<string | null, ImportPageNode[]>();
pagesMap.forEach((page) => {
const sibs = siblingsMap.get(page.parentPageId) || [];
sibs.push(page);
siblingsMap.set(page.parentPageId, sibs);
const group = siblingsMap.get(page.parentPageId) ?? [];
group.push(page);
siblingsMap.set(page.parentPageId, group);
});
siblingsMap.forEach((sibs) => {
// get root pages
const rootSibs = siblingsMap.get(null);
if (rootSibs?.length) {
rootSibs.sort((a, b) => a.name.localeCompare(b.name));
// get first position key from the server
const nextPosition = await this.pageService.nextPagePosition(
fileTask.spaceId,
);
let prevPos: string | null = null;
rootSibs.forEach((page, idx) => {
if (idx === 0) {
page.position = nextPosition;
} else {
page.position = generateJitteredKeyBetween(prevPos, null);
}
prevPos = page.position;
});
}
// non-root buckets (children & deeper levels)
siblingsMap.forEach((sibs, parentId) => {
if (parentId === null) return; // root already done
sibs.sort((a, b) => a.name.localeCompare(b.name));
let prevPos: string | null = null;
for (const page of sibs) {
page.position = generateJitteredKeyBetween(prevPos, null);
@ -216,6 +241,7 @@ export class FileTaskService {
}
});
// internal page links
const filePathToPageMetaMap = new Map<
string,
{ id: string; title: string; slugId: string }

View File

@ -260,7 +260,11 @@ export class ImportAttachmentService {
}
// wait for all uploads & DB inserts
await Promise.all(attachmentTasks);
try {
await Promise.all(attachmentTasks);
} catch (err) {
this.logger.log('Import attachment upload error', err);
}
return $.root().html() || '';
}

View File

@ -54,6 +54,7 @@ import { BacklinksProcessor } from './processors/backlinks.processor';
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
attempts: 1,
},
}),
],