feat: add stream upload support and improve file handling

- Add stream upload functionality to storage drivers\n- Improve ZIP file extraction with better encoding handling\n- Fix attachment ID rendering issues\n- Add AWS S3 upload stream support\n- Update dependencies for better compatibility
This commit is contained in:
Philipinho
2025-05-23 22:31:37 -07:00
parent ec533934de
commit 065f888c32
12 changed files with 697 additions and 69 deletions

View File

@ -1,5 +1,6 @@
import * as path from 'path';
import * as bcrypt from 'bcrypt';
import { sanitize } from 'sanitize-filename-ts';
export const envPath = path.resolve(process.cwd(), '..', '..', '.env');
@ -62,3 +63,8 @@ export function extractDateFromUuid7(uuid7: string) {
return new Date(timestamp);
}
export function sanitizeFileName(fileName: string): string {
const sanitizedFilename = sanitize(fileName).replace(/ /g, '_');
return sanitizedFilename.slice(0, 255);
}

View File

@ -7,13 +7,27 @@ import { extractZip, FileTaskStatus } from './file.utils';
import { StorageService } from '../storage/storage.service';
import * as tmp from 'tmp-promise';
import { pipeline } from 'node:stream/promises';
import { createWriteStream } from 'node:fs';
import { createReadStream, createWriteStream } from 'node:fs';
import { ImportService } from './import.service';
import { promises as fs } from 'fs';
import { generateSlugId } from '../../common/helpers';
import {
generateSlugId,
getMimeType,
sanitizeFileName,
} from '../../common/helpers';
import { v7 } from 'uuid';
import { generateJitteredKeyBetween } from 'fractional-indexing-jittered';
import { FileTask, InsertablePage } from '@docmost/db/types/entity.types';
import {
DOMParser,
Node as HDNode,
Element as HDElement,
Window,
} from 'happy-dom';
import { markdownToHtml } from '@docmost/editor-ext';
import { getAttachmentFolderPath } from '../../core/attachment/attachment.utils';
import { AttachmentType } from '../../core/attachment/attachment.constants';
import { getProsemirrorContent } from '../../common/helpers/prosemirror/utils';
@Injectable()
export class FileTaskService {
@ -52,15 +66,17 @@ export class FileTaskService {
await pipeline(fileStream, createWriteStream(tmpZipPath));
await extractZip(tmpZipPath, tmpExtractDir);
console.log('extract here');
// TODO: internal link mentions, backlinks, attachments
try {
await this.updateTaskStatus(fileTaskId, FileTaskStatus.Processing);
// if type == generic
await this.processGenericImport({ extractDir: tmpExtractDir, fileTask });
await this.updateTaskStatus(fileTaskId, FileTaskStatus.Success);
} catch (error) {
await this.updateTaskStatus(fileTaskId, FileTaskStatus.Failed);
console.error(error);
} finally {
await cleanupTmpFile();
await cleanupTmpDir();
@ -74,6 +90,10 @@ export class FileTaskService {
const { extractDir, fileTask } = opts;
const allFiles = await this.collectMarkdownAndHtmlFiles(extractDir);
const attachmentCandidates =
await this.buildAttachmentCandidates(extractDir);
console.log('attachment count: ', attachmentCandidates.size);
const pagesMap = new Map<
string,
@ -95,7 +115,22 @@ export class FileTaskService {
.split(path.sep)
.join('/'); // normalize to forward-slashes
const ext = path.extname(relPath).toLowerCase();
const content = await fs.readFile(absPath, 'utf-8');
let content = await fs.readFile(absPath, 'utf-8');
if (ext.toLowerCase() === '.html' || ext.toLowerCase() === '.md') {
// rewrite local asset references
if (ext === '.md') {
content = await markdownToHtml(content);
}
content = await this.rewriteLocalFilesInHtml({
html: content,
extractDir,
pageId: v7(),
fileTask,
attachmentCandidates,
});
}
pagesMap.set(relPath, {
id: v7(),
@ -159,28 +194,27 @@ export class FileTaskService {
const insertablePages: InsertablePage[] = await Promise.all(
Array.from(pagesMap.values()).map(async (page) => {
const pmState = await this.importService.markdownOrHtmlToProsemirror(
const htmlContent = await this.rewriteInternalLinksToMentionHtml(
page.content,
page.fileExtension,
page.filePath,
filePathToPageMetaMap,
fileTask.creatorId,
);
const pmState = getProsemirrorContent(
await this.importService.processHTML(htmlContent),
);
const { title, prosemirrorJson } =
this.importService.extractTitleAndRemoveHeading(pmState);
/*const rewDoc =
await this.importService.convertInternalLinksToMentionsPM(
jsonToNode(prosemirrorJson),
page.filePath,
filePathToPageMetaMap,
);*/
const proseJson = prosemirrorJson; //rewDoc.toJSON();
return {
id: page.id,
slugId: page.slugId,
title: title || page.name,
content: proseJson,
textContent: jsonToText(proseJson),
ydoc: await this.importService.createYdoc(proseJson),
content: prosemirrorJson,
textContent: jsonToText(prosemirrorJson),
ydoc: await this.importService.createYdoc(prosemirrorJson),
position: page.position!,
spaceId: fileTask.spaceId,
workspaceId: fileTask.workspaceId,
@ -191,7 +225,241 @@ export class FileTaskService {
}),
);
await this.db.insertInto('pages').values(insertablePages).execute();
try {
await this.db.insertInto('pages').values(insertablePages).execute();
} catch (e) {
console.error(e);
}
}
async rewriteLocalFilesInHtml(opts: {
html: string;
extractDir: string;
pageId: string;
fileTask: FileTask;
attachmentCandidates: Map<string, string>;
}): Promise<string> {
const { html, extractDir, pageId, fileTask, attachmentCandidates } = opts;
const window = new Window();
const doc = window.document;
doc.body.innerHTML = html;
const tasks: Promise<void>[] = [];
const processFile = (relPath: string) => {
const abs = attachmentCandidates.get(relPath)!;
const attachmentId = v7();
const ext = path.extname(abs);
const fileName = sanitizeFileName(path.basename(abs, ext));
const fileNameWithExt =
sanitizeFileName(path.basename(abs, ext)) + ext.toLowerCase();
const storageFilePath = `${getAttachmentFolderPath(AttachmentType.File, fileTask.workspaceId)}/${attachmentId}/${fileNameWithExt}`;
const apiFilePath = `/api/files/${attachmentId}/${fileNameWithExt}`;
// console.log('file Path:', apiFilePath, ' and ', storageFilePath);
console.log(storageFilePath);
tasks.push(
(async () => {
const fileStream = createReadStream(abs);
await this.storageService.uploadStream(storageFilePath, fileStream);
const stat = await fs.stat(abs);
const uploaded = await this.db
.insertInto('attachments')
.values({
id: attachmentId,
filePath: storageFilePath,
fileName: fileNameWithExt,
fileSize: stat.size,
mimeType: getMimeType(ext),
fileExt: ext,
creatorId: fileTask.creatorId,
workspaceId: fileTask.workspaceId,
pageId,
spaceId: fileTask.spaceId,
})
.returningAll()
.execute();
console.log(uploaded);
})(),
);
console.log('upload file');
return {
attachmentId,
storageFilePath,
apiFilePath,
fileNameWithExt,
abs,
};
};
// rewrite <img>
for (const img of Array.from(doc.getElementsByTagName('img'))) {
const src = img.getAttribute('src') ?? '';
if (!src || src.startsWith('http') || src.startsWith('/api/files/'))
continue;
const rel = src.replace(/^\.\/?/, '');
if (!attachmentCandidates.has(rel)) continue;
const {
attachmentId,
storageFilePath,
apiFilePath,
fileNameWithExt,
abs,
} = processFile(rel);
const stat = await fs.stat(abs);
img.setAttribute('src', apiFilePath);
img.setAttribute('data-attachment-id', attachmentId);
img.setAttribute('data-size', stat.size.toString());
img.setAttribute('width', '100%');
img.setAttribute('data-align', 'center');
this.unwrapFromParagraph(img);
}
// rewrite <video>
for (const vid of Array.from(doc.getElementsByTagName('video'))) {
const src = vid.getAttribute('src') ?? '';
if (!src || src.startsWith('http') || src.startsWith('/api/files/'))
continue;
const rel = src.replace(/^\.\/?/, '');
if (!attachmentCandidates.has(rel)) continue;
const { attachmentId, apiFilePath, abs } = processFile(rel);
const stat = await fs.stat(abs);
vid.setAttribute('src', apiFilePath);
vid.setAttribute('data-attachment-id', attachmentId);
vid.setAttribute('data-size', stat.size.toString());
vid.setAttribute('width', '100%');
vid.setAttribute('data-align', 'center');
// @ts-ignore
this.unwrapFromParagraph(vid);
}
// rewrite other attachments via <a>
for (const a of Array.from(doc.getElementsByTagName('a'))) {
const href = a.getAttribute('href') ?? '';
if (!href || href.startsWith('http') || href.startsWith('/api/files/'))
continue;
const rel = href.replace(/^\.\/?/, '');
if (!attachmentCandidates.has(rel)) continue;
const { attachmentId, apiFilePath, abs } = processFile(rel);
const stat = await fs.stat(abs);
const div = doc.createElement('div') as HDElement;
div.setAttribute('data-type', 'attachment');
div.setAttribute('data-attachment-url', apiFilePath);
div.setAttribute('data-attachment-name', path.basename(abs));
div.setAttribute('data-attachment-mime', getMimeType(abs));
div.setAttribute('data-attachment-size', stat.size.toString());
div.setAttribute('data-attachment-id', attachmentId);
a.replaceWith(div);
this.unwrapFromParagraph(div);
}
// wait for all uploads & DB inserts
await Promise.all(tasks);
// serialize back to HTML
return doc.documentElement.outerHTML;
}
async rewriteInternalLinksToMentionHtml(
html: string,
currentFilePath: string,
filePathToPageMetaMap: Map<
string,
{ id: string; title: string; slugId: string }
>,
creatorId: string,
): Promise<string> {
const window = new Window();
const doc = window.document;
doc.body.innerHTML = html;
// normalize helper
const normalize = (p: string) => p.replace(/\\/g, '/');
for (const a of Array.from(doc.getElementsByTagName('a'))) {
const rawHref = a.getAttribute('href');
if (!rawHref) continue;
// 1) skip absolute/external URLs
if (rawHref.startsWith('http') || rawHref.startsWith('/api/')) {
continue;
}
const decoded = decodeURIComponent(rawHref);
// 3) resolve "../Main Page.md" relative to the current file
// path.dirname might be "" for root-level pages
const parentDir = path.dirname(currentFilePath);
const joined = path.join(parentDir, decoded);
const resolved = normalize(joined);
// 4) look up in your map
const pageMeta = filePathToPageMetaMap.get(resolved);
if (!pageMeta) {
// not an internal link we know about
continue;
}
const mentionEl = doc.createElement('span') as HDElement;
mentionEl.setAttribute('data-type', 'mention');
mentionEl.setAttribute('data-id', v7());
mentionEl.setAttribute('data-entity-type', 'page');
mentionEl.setAttribute('data-entity-id', pageMeta.id);
mentionEl.setAttribute('data-label', pageMeta.title);
mentionEl.setAttribute('data-slug-id', pageMeta.slugId);
mentionEl.setAttribute('data-creator-id', creatorId);
mentionEl.textContent = pageMeta.title;
a.replaceWith(mentionEl);
}
return doc.body.innerHTML;
}
unwrapFromParagraph(node: HDElement) {
const parent = node.parentNode as HDElement;
if (parent && parent.tagName === 'P') {
if (parent.childNodes.length === 1) {
// <p><node></node></p> → <node>
parent.replaceWith(node);
} else {
// mixed content: move node out, leave the rest of the <p> intact
parent.parentNode!.insertBefore(node, parent);
}
}
}
async buildAttachmentCandidates(
extractDir: string,
): Promise<Map<string, string>> {
const map = new Map<string, string>();
async function walk(dir: string) {
for (const ent of await fs.readdir(dir, { withFileTypes: true })) {
const abs = path.join(dir, ent.name);
if (ent.isDirectory()) {
await walk(abs);
} else {
const rel = path.relative(extractDir, abs).split(path.sep).join('/');
console.log(abs)
map.set(rel, abs);
}
}
}
await walk(extractDir);
return map;
}
async collectMarkdownAndHtmlFiles(dir: string): Promise<string[]> {

View File

@ -35,43 +35,55 @@ export function getFileTaskFolderPath(
export function extractZip(source: string, target: string) {
//https://github.com/Surfer-Org
return new Promise((resolve, reject) => {
yauzl.open(source, { lazyEntries: true }, (err, zipfile) => {
if (err) return reject(err);
yauzl.open(
source,
{ lazyEntries: true, decodeStrings: false, autoClose: true },
(err, zipfile) => {
if (err) return reject(err);
zipfile.readEntry();
zipfile.on('entry', (entry) => {
const fullPath = path.join(target, entry.fileName);
const directory = path.dirname(fullPath);
zipfile.readEntry();
zipfile.on('entry', (entry) => {
const name = entry.fileName.toString('utf8'); // or 'cp437' if you need the original DOS charset
const safeName = name.replace(/^\/+/, ''); // strip any leading slashes
if (/\/$/.test(entry.fileName)) {
// Directory entry
try {
fs.mkdirSync(fullPath, { recursive: true });
zipfile.readEntry();
} catch (err) {
reject(err);
const fullPath = path.join(target, safeName);
const directory = path.dirname(fullPath);
// <-- skip all macOS metadata
if (safeName.startsWith('__MACOSX/')) {
return zipfile.readEntry();
}
} else {
// File entry
try {
fs.mkdirSync(directory, { recursive: true });
zipfile.openReadStream(entry, (err, readStream) => {
if (err) return reject(err);
const writeStream = fs.createWriteStream(fullPath);
readStream.on('end', () => {
writeStream.end();
zipfile.readEntry();
if (/\/$/.test(entry.fileName)) {
// Directory entry
try {
fs.mkdirSync(fullPath, { recursive: true });
zipfile.readEntry();
} catch (err) {
reject(err);
}
} else {
// File entry
try {
fs.mkdirSync(directory, { recursive: true });
zipfile.openReadStream(entry, (err, readStream) => {
if (err) return reject(err);
const writeStream = fs.createWriteStream(fullPath);
readStream.on('end', () => {
writeStream.end();
zipfile.readEntry();
});
readStream.pipe(writeStream);
});
readStream.pipe(writeStream);
});
} catch (err) {
reject(err);
} catch (err) {
reject(err);
}
}
}
});
});
zipfile.on('end', resolve);
zipfile.on('error', reject);
});
zipfile.on('end', resolve);
zipfile.on('error', reject);
},
);
});
}

View File

@ -130,7 +130,7 @@ export class ImportService {
async createYdoc(prosemirrorJson: any): Promise<Buffer | null> {
if (prosemirrorJson) {
// this.logger.debug(`Converting prosemirror json state to ydoc`);
// this.logger.debug(`Converting prosemirror json state to ydoc`);
const ydoc = TiptapTransformer.toYdoc(
prosemirrorJson,
@ -146,20 +146,34 @@ export class ImportService {
}
extractTitleAndRemoveHeading(prosemirrorState: any) {
let title = null;
let title: string | null = null;
const content = prosemirrorState.content ?? [];
if (
prosemirrorState?.content?.length > 0 &&
prosemirrorState.content[0].type === 'heading' &&
prosemirrorState.content[0].attrs?.level === 1
content.length > 0 &&
content[0].type === 'heading' &&
content[0].attrs?.level === 1
) {
title = prosemirrorState.content[0].content[0].text;
// remove h1 header node from state
prosemirrorState.content.shift();
title = content[0].content?.[0]?.text ?? null;
content.shift();
}
return { title, prosemirrorJson: prosemirrorState };
// ensure at least one paragraph
if (content.length === 0) {
content.push({
type: 'paragraph',
content: [],
});
}
return {
title,
prosemirrorJson: {
...prosemirrorState,
content,
},
};
}
async getNewPagePosition(spaceId: string): Promise<string> {

View File

@ -22,6 +22,7 @@ export class FileTaskProcessor extends WorkerHost implements OnModuleDestroy {
console.log('export task', job.data.fileTaskId);
}
} catch (err) {
console.error(err);
throw err;
}
}

View File

@ -3,10 +3,11 @@ import {
LocalStorageConfig,
StorageOption,
} from '../interfaces';
import { join } from 'path';
import { join, dirname } from 'path';
import * as fs from 'fs-extra';
import { Readable } from 'stream';
import { createReadStream } from 'node:fs';
import { createReadStream, createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
export class LocalDriver implements StorageDriver {
private readonly config: LocalStorageConfig;
@ -27,6 +28,16 @@ export class LocalDriver implements StorageDriver {
}
}
async uploadStream(filePath: string, file: Readable): Promise<void> {
try {
const fullPath = this._fullPath(filePath);
await fs.mkdir(dirname(fullPath), { recursive: true });
await pipeline(file, createWriteStream(fullPath));
} catch (err) {
throw new Error(`Failed to upload file: ${(err as Error).message}`);
}
}
async copy(fromFilePath: string, toFilePath: string): Promise<void> {
try {
if (await this.exists(fromFilePath)) {

View File

@ -12,6 +12,7 @@ import { streamToBuffer } from '../storage.utils';
import { Readable } from 'stream';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
import { getMimeType } from '../../../common/helpers';
import { Upload } from '@aws-sdk/lib-storage';
export class S3Driver implements StorageDriver {
private readonly s3Client: S3Client;
@ -40,6 +41,26 @@ export class S3Driver implements StorageDriver {
}
}
async uploadStream(filePath: string, file: Readable): Promise<void> {
try {
const contentType = getMimeType(filePath);
const upload = new Upload({
client: this.s3Client,
params: {
Bucket: this.config.bucket,
Key: filePath,
Body: file,
ContentType: contentType,
},
});
await upload.done();
} catch (err) {
throw new Error(`Failed to upload file: ${(err as Error).message}`);
}
}
async copy(fromFilePath: string, toFilePath: string): Promise<void> {
try {
if (await this.exists(fromFilePath)) {

View File

@ -3,13 +3,14 @@ import { Readable } from 'stream';
export interface StorageDriver {
upload(filePath: string, file: Buffer): Promise<void>;
uploadStream(filePath: string, file: Readable): Promise<void>;
copy(fromFilePath: string, toFilePath: string): Promise<void>;
read(filePath: string): Promise<Buffer>;
readStream(filePath: string): Promise<Readable>;
exists(filePath: string): Promise<boolean>;
getUrl(filePath: string): string;

View File

@ -15,6 +15,11 @@ export class StorageService {
this.logger.debug(`File uploaded successfully. Path: ${filePath}`);
}
async uploadStream(filePath: string, fileContent: Readable) {
await this.storageDriver.uploadStream(filePath, fileContent);
this.logger.debug(`File uploaded successfully. Path: ${filePath}`);
}
async copy(fromFilePath: string, toFilePath: string) {
await this.storageDriver.copy(fromFilePath, toFilePath);
this.logger.debug(`File copied successfully. Path: ${toFilePath}`);