8 Commits

Author SHA1 Message Date
ae4648845e feat: add support for partially deflated zips 2025-08-17 11:21:09 +10:00
bd30464a08 fix: manifest generation with multiple chunks 2025-08-15 21:56:33 +10:00
c67cca4ee0 fix: remove debug println 2025-08-15 21:41:48 +10:00
cae208a3e0 fix: zip read sizing 2025-08-15 21:30:25 +10:00
4276b9d668 fix: skip zip test 2025-08-15 19:47:50 +10:00
4fb9bb7563 fix: manifest sizing for slow backends 2025-08-15 16:49:18 +10:00
913dc2f58d feat: add zip speed test 2025-08-15 12:17:10 +10:00
7ec5e9f215 fix: zip file reader offset 2025-08-13 16:22:48 +10:00
9 changed files with 283 additions and 99 deletions

22
__test__/debug.spec.mjs Normal file
View File

@ -0,0 +1,22 @@
import test from "ava";
import { DropletHandler, generateManifest } from "../index.js";
test.skip("debug", async (t) => {
const handler = new DropletHandler();
console.log("created handler");
const manifest = JSON.parse(
await new Promise((r, e) =>
generateManifest(
handler,
"./assets/TheGame.zip",
(_, __) => {},
(_, __) => {},
(err, manifest) => (err ? e(err) : r(manifest))
)
)
);
return t.pass();
});

View File

@ -1,6 +1,8 @@
import test from "ava"; import test from "ava";
import fs from "node:fs"; import fs from "node:fs";
import path from "path"; import path from "path";
import { createHash } from "node:crypto";
import prettyBytes from "pretty-bytes";
import droplet, { DropletHandler, generateManifest } from "../index.js"; import droplet, { DropletHandler, generateManifest } from "../index.js";
@ -56,7 +58,12 @@ test("read file", async (t) => {
const dropletHandler = new DropletHandler(); const dropletHandler = new DropletHandler();
const stream = dropletHandler.readFile(dirName, "TESTFILE"); const stream = dropletHandler.readFile(
dirName,
"TESTFILE",
BigInt(0),
BigInt(testString.length)
);
let finalString = ""; let finalString = "";
@ -78,7 +85,12 @@ test("read file offset", async (t) => {
fs.writeFileSync(dirName + "/TESTFILE", testString); fs.writeFileSync(dirName + "/TESTFILE", testString);
const dropletHandler = new DropletHandler(); const dropletHandler = new DropletHandler();
const stream = dropletHandler.readFile(dirName, "TESTFILE", BigInt(1), BigInt(4)); const stream = dropletHandler.readFile(
dirName,
"TESTFILE",
BigInt(1),
BigInt(4)
);
let finalString = ""; let finalString = "";
@ -96,10 +108,45 @@ test("read file offset", async (t) => {
fs.rmSync(dirName, { recursive: true }); fs.rmSync(dirName, { recursive: true });
}); });
test("zip file reader", async (t) => { test.skip("zip speed test", async (t) => {
return t.pass(); t.timeout(100_000_000);
const dropletHandler = new DropletHandler();
t.timeout(10_000); const stream = dropletHandler.readFile("./assets/TheGame.zip", "setup.exe");
let totalRead = 0;
let totalSeconds = 0;
let lastTime = process.hrtime.bigint();
const timeThreshold = BigInt(1_000_000_000);
let runningTotal = 0;
let runningTime = BigInt(0);
for await (const chunk of stream.getStream()) {
// Do something with each 'chunk'
const currentTime = process.hrtime.bigint();
const timeDiff = currentTime - lastTime;
lastTime = currentTime;
runningTime += timeDiff;
runningTotal += chunk.length;
if (runningTime >= timeThreshold) {
console.log(`${prettyBytes(runningTotal)}/s`);
totalRead += runningTotal;
totalSeconds += 1;
runningTime = BigInt(0);
runningTotal = 0;
}
}
const roughAverage = totalRead / totalSeconds;
console.log(`total rough average: ${prettyBytes(roughAverage)}/s`);
t.pass();
});
test.skip("zip manifest test", async (t) => {
const dropletHandler = new DropletHandler(); const dropletHandler = new DropletHandler();
const manifest = JSON.parse( const manifest = JSON.parse(
await new Promise((r, e) => await new Promise((r, e) =>
@ -113,20 +160,61 @@ test("zip file reader", async (t) => {
) )
); );
const stream = dropletHandler.readFile( for (const [filename, data] of Object.entries(manifest)) {
"./assets/TheGame.zip", let start = 0;
"setup.exe", for (const [chunkIndex, length] of data.lengths.entries()) {
BigInt(10), const hash = createHash("md5");
BigInt(20) const stream = (
); await dropletHandler.readFile(
"./assets/TheGame.zip",
filename,
BigInt(start),
BigInt(start + length)
)
).getStream();
let streamLength = 0;
await stream.pipeTo(
new WritableStream({
write(chunk) {
streamLength += chunk.length;
hash.update(chunk);
},
})
);
let finalString = ""; if (streamLength != length)
for await (const chunk of stream.getStream()) { return t.fail(
// Do something with each 'chunk' `stream length for chunk index ${chunkIndex} was not expected: real: ${streamLength} vs expected: ${length}`
finalString = String.fromCharCode.apply(null, chunk); );
if(finalString.length > 100) break;
const digest = hash.digest("hex");
if (data.checksums[chunkIndex] != digest)
return t.fail(
`checksums did not match for chunk index ${chunkIndex}: real: ${digest} vs expected: ${data.checksums[chunkIndex]}`
);
start += length;
}
} }
t.pass(); t.pass();
}); });
test.skip("partially compress zip test", async (t) => {
const dropletHandler = new DropletHandler();
const manifest = JSON.parse(
await new Promise((r, e) =>
generateManifest(
dropletHandler,
"./assets/my horror game.zip",
(_, __) => {},
(_, __) => {},
(err, manifest) => (err ? e(err) : r(manifest))
)
)
);
return t.pass();
});

View File

@ -1,3 +1,4 @@
yes "droplet is awesome" | dd of=./setup.exe bs=1024 count=1000000 # yes "droplet is awesome" | dd of=./setup.exe bs=1024 count=1000000
dd if=/dev/random of=./setup.exe bs=1024 count=1000000
zip TheGame.zip setup.exe zip TheGame.zip setup.exe
rm setup.exe rm setup.exe

View File

@ -1,6 +1,6 @@
{ {
"name": "@drop-oss/droplet", "name": "@drop-oss/droplet",
"version": "2.0.1", "version": "2.3.1",
"main": "index.js", "main": "index.js",
"types": "index.d.ts", "types": "index.d.ts",
"napi": { "napi": {
@ -24,7 +24,9 @@
"devDependencies": { "devDependencies": {
"@napi-rs/cli": "3.0.0-alpha.91", "@napi-rs/cli": "3.0.0-alpha.91",
"@types/node": "^22.13.10", "@types/node": "^22.13.10",
"ava": "^6.2.0" "ava": "^6.2.0",
"pretty-bytes": "^7.0.1",
"tsimp": "^2.0.12"
}, },
"ava": { "ava": {
"timeout": "3m", "timeout": "3m",
@ -51,8 +53,5 @@
"packageManager": "yarn@4.7.0", "packageManager": "yarn@4.7.0",
"repository": { "repository": {
"url": "git+https://github.com/Drop-OSS/droplet.git" "url": "git+https://github.com/Drop-OSS/droplet.git"
},
"dependencies": {
"tsimp": "^2.0.12"
} }
} }

View File

@ -1,10 +1,4 @@
use std::{ use std::{collections::HashMap, sync::Arc, thread};
collections::HashMap,
io::{BufRead, BufReader},
path::Path,
sync::Arc,
thread,
};
use napi::{ use napi::{
threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode}, threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode},
@ -42,10 +36,18 @@ pub fn generate_manifest<'a>(
log_sfn: ThreadsafeFunction<String>, log_sfn: ThreadsafeFunction<String>,
callback_sfn: ThreadsafeFunction<String>, callback_sfn: ThreadsafeFunction<String>,
) -> Result<()> { ) -> Result<()> {
let backend: &mut Box<dyn VersionBackend + Send> = let backend: &mut Box<dyn VersionBackend + Send> = droplet_handler
droplet_handler.create_backend_for_path(dir).ok_or(napi::Error::from_reason("Could not create backend for path."))?; .create_backend_for_path(dir)
.ok_or(napi::Error::from_reason(
"Could not create backend for path.",
))?;
// This is unsafe (obviously)
// But it's allg as long the DropletHandler doesn't get
// dropped while we're generating the manifest.
let backend: &'static mut Box<dyn VersionBackend + Send> = let backend: &'static mut Box<dyn VersionBackend + Send> =
unsafe { std::mem::transmute(backend) }; unsafe { std::mem::transmute(backend) };
thread::spawn(move || { thread::spawn(move || {
let files = backend.list_files(); let files = backend.list_files();
@ -55,9 +57,10 @@ pub fn generate_manifest<'a>(
let total: i32 = files.len() as i32; let total: i32 = files.len() as i32;
let mut i: i32 = 0; let mut i: i32 = 0;
let mut buf = [0u8; 1024 * 16];
for version_file in files { for version_file in files {
let raw_reader = backend.reader(&version_file).unwrap(); let mut reader = backend.reader(&version_file, 0, 0).unwrap();
let mut reader = BufReader::with_capacity(CHUNK_SIZE, raw_reader);
let mut chunk_data = ChunkData { let mut chunk_data = ChunkData {
permissions: version_file.permission, permissions: version_file.permission,
@ -68,12 +71,26 @@ pub fn generate_manifest<'a>(
let mut chunk_index = 0; let mut chunk_index = 0;
loop { loop {
let mut length = 0;
let mut buffer: Vec<u8> = Vec::new(); let mut buffer: Vec<u8> = Vec::new();
reader.fill_buf().unwrap().clone_into(&mut buffer); let mut file_empty = false;
let length = buffer.len();
if length == 0 { loop {
break; let read = reader.read(&mut buf).unwrap();
length += read;
// If we're out of data, add this chunk and then move onto the next file
if read == 0 {
file_empty = true;
break;
}
buffer.extend_from_slice(&buf[0..read]);
if length >= CHUNK_SIZE {
break;
}
} }
let chunk_id = Uuid::new_v4(); let chunk_id = Uuid::new_v4();
@ -88,10 +105,14 @@ pub fn generate_manifest<'a>(
"Processed chunk {} for {}", "Processed chunk {} for {}",
chunk_index, &version_file.relative_filename chunk_index, &version_file.relative_filename
); );
log_sfn.call(Ok(log_str), ThreadsafeFunctionCallMode::Blocking); log_sfn.call(Ok(log_str), ThreadsafeFunctionCallMode::Blocking);
reader.consume(length);
chunk_index += 1; chunk_index += 1;
if file_empty {
break;
}
} }
chunks.insert(version_file.relative_filename, chunk_data); chunks.insert(version_file.relative_filename, chunk_data);

View File

@ -2,17 +2,18 @@
use std::os::unix::fs::PermissionsExt; use std::os::unix::fs::PermissionsExt;
use std::{ use std::{
fs::{self, metadata, File}, fs::{self, metadata, File},
io::{self, Read, Sink}, io::{self, Read, Seek, SeekFrom, Sink},
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::Arc, sync::Arc,
}; };
use flate2::read::DeflateDecoder; use flate2::read::DeflateDecoder;
use rawzip::{ use rawzip::{
FileReader, ZipArchive, ZipArchiveEntryWayfinder, ZipEntry, ZipReader, RECOMMENDED_BUFFER_SIZE, CompressionMethod, FileReader, ZipArchive, ZipArchiveEntryWayfinder, ZipEntry,
ZipVerifier, RECOMMENDED_BUFFER_SIZE,
}; };
use crate::version::types::{MinimumFileObject, Skippable, VersionBackend, VersionFile}; use crate::version::types::{MinimumFileObject, VersionBackend, VersionFile};
pub fn _list_files(vec: &mut Vec<PathBuf>, path: &Path) { pub fn _list_files(vec: &mut Vec<PathBuf>, path: &Path) {
if metadata(path).unwrap().is_dir() { if metadata(path).unwrap().is_dir() {
@ -52,8 +53,21 @@ impl VersionBackend for PathVersionBackend {
results results
} }
fn reader(&mut self, file: &VersionFile) -> Option<Box<dyn MinimumFileObject + 'static>> { fn reader(
let file = File::open(self.base_dir.join(file.relative_filename.clone())).ok()?; &mut self,
file: &VersionFile,
start: u64,
end: u64,
) -> Option<Box<dyn MinimumFileObject + 'static>> {
let mut file = File::open(self.base_dir.join(file.relative_filename.clone())).ok()?;
if start != 0 {
file.seek(SeekFrom::Start(start)).ok()?;
}
if end != 0 {
return Some(Box::new(file.take(end - start)));
}
return Some(Box::new(file)); return Some(Box::new(file));
} }
@ -103,31 +117,72 @@ impl ZipVersionBackend {
pub fn new_entry<'archive>( pub fn new_entry<'archive>(
&self, &self,
entry: ZipEntry<'archive, FileReader>, entry: ZipEntry<'archive, FileReader>,
compression_method: CompressionMethod,
start: u64,
end: u64,
) -> ZipFileWrapper<'archive> { ) -> ZipFileWrapper<'archive> {
let deflater = DeflateDecoder::new(entry.reader()); let deflater: Box<dyn Read + Send + 'archive> = match compression_method {
ZipFileWrapper { reader: deflater } CompressionMethod::Store => Box::new(entry.reader()),
CompressionMethod::Deflate => Box::new(DeflateDecoder::new(entry.reader())),
CompressionMethod::Deflate64 => Box::new(DeflateDecoder::new(entry.reader())),
_ => panic!(
"unsupported decompression algorithm: {:?}",
compression_method
),
};
let mut verifier = entry.verifying_reader(deflater);
if start != 0 {
io::copy(&mut (&mut verifier).take(start), &mut Sink::default()).unwrap();
}
ZipFileWrapper {
reader: verifier,
limit: (end - start) as usize,
current: 0,
}
} }
} }
pub struct ZipFileWrapper<'archive> { pub struct ZipFileWrapper<'archive> {
reader: DeflateDecoder<ZipReader<'archive, FileReader>>, reader: ZipVerifier<'archive, Box<dyn Read + Send + 'archive>, FileReader>,
limit: usize,
current: usize,
} }
/**
* This read implemention is a result of debugging hell
* It should probably be replaced with a .take() call.
*/
impl<'a> Read for ZipFileWrapper<'a> { impl<'a> Read for ZipFileWrapper<'a> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let has_limit = self.limit != 0;
// End this stream if the read is the right size
if has_limit {
if self.current >= self.limit {
return Ok(0);
}
}
let read = self.reader.read(buf)?; let read = self.reader.read(buf)?;
Ok(read) if self.limit != 0 {
self.current += read;
if self.current > self.limit {
let over = self.current - self.limit;
return Ok(read - over);
}
}
return Ok(read);
} }
} }
impl<'a> Skippable for ZipFileWrapper<'a> { //impl<'a> MinimumFileObject for ZipFileWrapper<'a> {}
fn skip(&mut self, amount: u64) {
io::copy(&mut self.reader.by_ref().take(amount), &mut Sink::default()).unwrap();
}
}
impl<'a> MinimumFileObject for ZipFileWrapper<'a> {}
impl ZipVersionBackend { impl ZipVersionBackend {
fn find_wayfinder(&mut self, filename: &str) -> Option<ZipArchiveEntryWayfinder> { fn find_wayfinder(
&mut self,
filename: &str,
) -> Option<(ZipArchiveEntryWayfinder, CompressionMethod)> {
let read_buffer = &mut [0u8; RECOMMENDED_BUFFER_SIZE]; let read_buffer = &mut [0u8; RECOMMENDED_BUFFER_SIZE];
let mut entries = self.archive.entries(read_buffer); let mut entries = self.archive.entries(read_buffer);
let entry = loop { let entry = loop {
@ -142,7 +197,7 @@ impl ZipVersionBackend {
let wayfinder = entry.wayfinder(); let wayfinder = entry.wayfinder();
Some(wayfinder) Some((wayfinder, entry.compression_method()))
} }
} }
impl VersionBackend for ZipVersionBackend { impl VersionBackend for ZipVersionBackend {
@ -163,17 +218,22 @@ impl VersionBackend for ZipVersionBackend {
results results
} }
fn reader(&mut self, file: &VersionFile) -> Option<Box<dyn MinimumFileObject + '_>> { fn reader(
let wayfinder = self.find_wayfinder(&file.relative_filename)?; &mut self,
file: &VersionFile,
start: u64,
end: u64,
) -> Option<Box<dyn MinimumFileObject + '_>> {
let (wayfinder, compression_method) = self.find_wayfinder(&file.relative_filename)?;
let local_entry = self.archive.get_entry(wayfinder).unwrap(); let local_entry = self.archive.get_entry(wayfinder).unwrap();
let wrapper = self.new_entry(local_entry); let wrapper = self.new_entry(local_entry, compression_method, start, end);
Some(Box::new(wrapper)) Some(Box::new(wrapper) as Box<dyn MinimumFileObject>)
} }
fn peek_file(&mut self, sub_path: String) -> Option<VersionFile> { fn peek_file(&mut self, sub_path: String) -> Option<VersionFile> {
let entry = self.find_wayfinder(&sub_path)?; let (entry, _) = self.find_wayfinder(&sub_path)?;
Some(VersionFile { Some(VersionFile {
relative_filename: sub_path, relative_filename: sub_path,

View File

@ -1,6 +1,5 @@
use std::{ use std::{
fmt::Debug, fmt::Debug, io::Read
io::{Read, Seek, SeekFrom},
}; };
use dyn_clone::DynClone; use dyn_clone::DynClone;
@ -13,36 +12,26 @@ pub struct VersionFile {
pub size: u64, pub size: u64,
} }
pub trait Skippable { pub trait MinimumFileObject: Read + Send {}
fn skip(&mut self, amount: u64); impl<T: Read + Send> MinimumFileObject for T {}
}
impl<T> Skippable for T
where
T: Seek,
{
fn skip(&mut self, amount: u64) {
self.seek(SeekFrom::Start(amount)).unwrap();
}
}
pub trait MinimumFileObject: Read + Send + Skippable {}
impl<T: Read + Send + Seek> MinimumFileObject for T {}
// Intentionally not a generic, because of types in read_file // Intentionally not a generic, because of types in read_file
pub struct ReadToAsyncRead<'a> { pub struct ReadToAsyncRead<'a> {
pub inner: Box<dyn Read + Send + 'a>, pub inner: Box<dyn Read + Send + 'a>,
} }
const ASYNC_READ_BUFFER_SIZE: usize = 8128;
impl<'a> AsyncRead for ReadToAsyncRead<'a> { impl<'a> AsyncRead for ReadToAsyncRead<'a> {
fn poll_read( fn poll_read(
mut self: std::pin::Pin<&mut Self>, mut self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>, _cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>, buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<io::Result<()>> { ) -> std::task::Poll<io::Result<()>> {
let mut read_buf = [0u8; 8192]; let mut read_buf = [0u8; ASYNC_READ_BUFFER_SIZE];
let var_name = self.inner.read(&mut read_buf).unwrap(); let read_size = ASYNC_READ_BUFFER_SIZE.min(buf.remaining());
let amount = var_name.min(buf.remaining()); let read = self.inner.read(&mut read_buf[0..read_size]).unwrap();
buf.put_slice(&read_buf[0..amount]); buf.put_slice(&read_buf[0..read]);
std::task::Poll::Ready(Ok(())) std::task::Poll::Ready(Ok(()))
} }
} }
@ -50,7 +39,7 @@ impl<'a> AsyncRead for ReadToAsyncRead<'a> {
pub trait VersionBackend: DynClone { pub trait VersionBackend: DynClone {
fn list_files(&mut self) -> Vec<VersionFile>; fn list_files(&mut self) -> Vec<VersionFile>;
fn peek_file(&mut self, sub_path: String) -> Option<VersionFile>; fn peek_file(&mut self, sub_path: String) -> Option<VersionFile>;
fn reader(&mut self, file: &VersionFile) -> Option<Box<dyn MinimumFileObject + '_>>; fn reader(&mut self, file: &VersionFile, start: u64, end: u64) -> Option<Box<dyn MinimumFileObject + '_>>;
} }
dyn_clone::clone_trait_object!(VersionBackend); dyn_clone::clone_trait_object!(VersionBackend);

View File

@ -1,4 +1,6 @@
use std::{collections::HashMap, fs::File, io::Read, path::Path}; use std::{
collections::HashMap, fs::File, path::Path
};
use napi::{bindgen_prelude::*, sys::napi_value__, tokio_stream::StreamExt}; use napi::{bindgen_prelude::*, sys::napi_value__, tokio_stream::StreamExt};
use tokio_util::codec::{BytesCodec, FramedRead}; use tokio_util::codec::{BytesCodec, FramedRead};
@ -115,20 +117,16 @@ impl<'a> DropletHandler<'a> {
size: 0, // Shouldn't matter size: 0, // Shouldn't matter
}; };
// Use `?` operator for cleaner error propagation from `Option` // Use `?` operator for cleaner error propagation from `Option`
let mut reader = backend.reader(&version_file).ok_or(napi::Error::from_reason("Failed to create reader."))?; let reader = backend
.reader(
&version_file,
start.map(|e| e.get_u64().1).unwrap_or(0),
end.map(|e| e.get_u64().1).unwrap_or(0),
)
.ok_or(napi::Error::from_reason("Failed to create reader."))?;
if let Some(skip) = start.clone() { let async_reader = ReadToAsyncRead {
reader.skip(skip.get_u64().1.into()); inner: reader,
// io::copy(&mut reader.by_ref().take(skip.into()), &mut io::sink()).unwrap();
}
let async_reader = if let Some(limit) = end {
let amount = limit.get_u64().1 - start.map_or(Some(0), |v| Some(v.get_u64().1)).unwrap();
ReadToAsyncRead {
inner: Box::new(reader.take(amount.into())),
}
} else {
ReadToAsyncRead { inner: reader }
}; };
// Create a FramedRead stream with BytesCodec for chunking // Create a FramedRead stream with BytesCodec for chunking
@ -147,9 +145,7 @@ impl<'a> DropletHandler<'a> {
Ok(ReadableStream::create_with_stream_bytes(&env, stream).unwrap()) Ok(ReadableStream::create_with_stream_bytes(&env, stream).unwrap())
})?; })?;
Ok(JsDropStreamable { Ok(JsDropStreamable { inner: stream })
inner: stream,
})
} }
} }
@ -164,4 +160,4 @@ impl JsDropStreamable {
pub fn get_stream(&self) -> *mut napi_value__ { pub fn get_stream(&self) -> *mut napi_value__ {
self.inner.raw() self.inner.raw()
} }
} }

View File

@ -12,6 +12,7 @@ __metadata:
"@napi-rs/cli": "npm:3.0.0-alpha.91" "@napi-rs/cli": "npm:3.0.0-alpha.91"
"@types/node": "npm:^22.13.10" "@types/node": "npm:^22.13.10"
ava: "npm:^6.2.0" ava: "npm:^6.2.0"
pretty-bytes: "npm:^7.0.1"
tsimp: "npm:^2.0.12" tsimp: "npm:^2.0.12"
languageName: unknown languageName: unknown
linkType: soft linkType: soft
@ -2432,6 +2433,13 @@ __metadata:
languageName: node languageName: node
linkType: hard linkType: hard
"pretty-bytes@npm:^7.0.1":
version: 7.0.1
resolution: "pretty-bytes@npm:7.0.1"
checksum: 10c0/14ffb503d2de3588042c722848062a4897e6faece1694e0c83ba5669ec003d73311d946d50d2b3c6099a6a306760011b8446ee3cf9cf86eca13a454a8f1c47cb
languageName: node
linkType: hard
"pretty-ms@npm:^9.1.0": "pretty-ms@npm:^9.1.0":
version: 9.2.0 version: 9.2.0
resolution: "pretty-ms@npm:9.2.0" resolution: "pretty-ms@npm:9.2.0"