import type { ObjectMetadata, ObjectReference, Source } from "./objectHandler"; import { ObjectBackend } from "./objectHandler"; import { LRUCache } from "lru-cache"; import fs from "fs"; import path from "path"; import { Readable } from "stream"; import { createHash } from "crypto"; import prisma from "../db/database"; export class FsObjectBackend extends ObjectBackend { private baseObjectPath: string; private baseMetadataPath: string; private hashStore = new FsHashStore(); constructor() { super(); const basePath = process.env.FS_BACKEND_PATH ?? "./.data/objects"; this.baseObjectPath = path.join(basePath, "objects"); this.baseMetadataPath = path.join(basePath, "metadata"); fs.mkdirSync(this.baseObjectPath, { recursive: true }); fs.mkdirSync(this.baseMetadataPath, { recursive: true }); } async fetch(id: ObjectReference) { console.log("ID: " + id); const objectPath = path.join(this.baseObjectPath, id); if (!fs.existsSync(objectPath)) return undefined; return fs.createReadStream(objectPath); } async write(id: ObjectReference, source: Source): Promise { const objectPath = path.join(this.baseObjectPath, id); if (!fs.existsSync(objectPath)) return false; // remove item from cache this.hashStore.delete(id); if (source instanceof Readable) { const outputStream = fs.createWriteStream(objectPath); source.pipe(outputStream, { end: true }); await new Promise((r, _j) => source.on("end", r)); return true; } if (source instanceof Buffer) { fs.writeFileSync(objectPath, source); return true; } return false; } async startWriteStream(id: ObjectReference) { const objectPath = path.join(this.baseObjectPath, id); if (!fs.existsSync(objectPath)) return undefined; // remove item from cache this.hashStore.delete(id); return fs.createWriteStream(objectPath); } async create( id: string, source: Source, metadata: ObjectMetadata, ): Promise { const objectPath = path.join(this.baseObjectPath, id); const metadataPath = path.join(this.baseMetadataPath, `${id}.json`); if (fs.existsSync(objectPath) || fs.existsSync(metadataPath)) return undefined; // Write metadata fs.writeFileSync(metadataPath, JSON.stringify(metadata)); // Create file so write passes fs.writeFileSync(objectPath, ""); // Call write this.write(id, source); return id; } async createWithWriteStream(id: string, metadata: ObjectMetadata) { const objectPath = path.join(this.baseObjectPath, id); const metadataPath = path.join(this.baseMetadataPath, `${id}.json`); if (fs.existsSync(objectPath) || fs.existsSync(metadataPath)) return undefined; // Write metadata fs.writeFileSync(metadataPath, JSON.stringify(metadata)); // Create file so write passes fs.writeFileSync(objectPath, ""); const stream = await this.startWriteStream(id); if (!stream) throw new Error("Could not create write stream"); return stream; } async delete(id: ObjectReference): Promise { const objectPath = path.join(this.baseObjectPath, id); if (!fs.existsSync(objectPath)) return true; fs.rmSync(objectPath); // remove item from cache this.hashStore.delete(id); return true; } async fetchMetadata( id: ObjectReference, ): Promise { const metadataPath = path.join(this.baseMetadataPath, `${id}.json`); if (!fs.existsSync(metadataPath)) return undefined; const metadata = JSON.parse(fs.readFileSync(metadataPath, "utf-8")); return metadata as ObjectMetadata; } async writeMetadata( id: ObjectReference, metadata: ObjectMetadata, ): Promise { const metadataPath = path.join(this.baseMetadataPath, `${id}.json`); if (!fs.existsSync(metadataPath)) return false; fs.writeFileSync(metadataPath, JSON.stringify(metadata)); return true; } async fetchHash(id: ObjectReference): Promise { const cacheResult = await this.hashStore.get(id); if (cacheResult !== undefined) return cacheResult; const obj = await this.fetch(id); if (obj === undefined) return; // local variable to point to object const cache = this.hashStore; // hash object const hash = createHash("md5"); hash.setEncoding("hex"); // read obj into hash obj.pipe(hash); await new Promise((r) => { obj.on("end", function () { hash.end(); cache.save(id, hash.read()); r(); }); }); return await this.hashStore.get(id); } } class FsHashStore { private cache = new LRUCache({ max: 1000, // number of items }); /** * Gets hash of object * @param id * @returns */ async get(id: ObjectReference) { const cacheRes = this.cache.get(id); if (cacheRes !== undefined) return cacheRes; const objectHash = await prisma.objectHash.findUnique({ where: { id, }, select: { hash: true, }, }); if (objectHash === null) return undefined; this.cache.set(id, objectHash.hash); return objectHash.hash; } /** * Saves hash of object * @param id */ async save(id: ObjectReference, hash: string) { await prisma.objectHash.upsert({ where: { id, }, create: { id, hash, }, update: { hash, }, }); this.cache.set(id, hash); } /** * Hash is no longer valid for whatever reason * @param id */ async delete(id: ObjectReference) { this.cache.delete(id); try { // need to catch in case the object doesn't exist await prisma.objectHash.delete({ where: { id, }, }); } catch { /* empty */ } } }