fix: zip read sizing

This commit is contained in:
DecDuck
2025-08-15 21:30:25 +10:00
parent 4276b9d668
commit cae208a3e0
6 changed files with 130 additions and 74 deletions

View File

@ -57,7 +57,7 @@ test("read file", async (t) => {
const dropletHandler = new DropletHandler();
const stream = dropletHandler.readFile(dirName, "TESTFILE");
const stream = dropletHandler.readFile(dirName, "TESTFILE", BigInt(0), BigInt(testString.length));
let finalString = "";
@ -154,11 +154,34 @@ test.skip("zip manifest test", async (t) => {
)
);
const file = manifest[Object.keys(manifest).at(0)];
const amount = file.ids.length;
for (const [filename, data] of Object.entries(manifest)) {
let start = 0;
for (const [chunkIndex, length] of data.lengths.entries()) {
const stream = (
await dropletHandler.readFile(
"./assets/TheGame.zip",
filename,
BigInt(start),
BigInt(start + length)
)
).getStream();
if(amount > 20) {
return t.fail(`Zip manifest has ${amount} chunks, more than 20`);
let streamLength = 0;
await stream.pipeTo(
new WritableStream({
write(chunk) {
streamLength += chunk.length;
},
})
);
if (streamLength != length)
return t.fail(
`stream length for chunk index ${chunkIndex} was not expected: real: ${streamLength} vs expected: ${length}`
);
start += length;
}
}
t.pass();

View File

@ -1,6 +1,6 @@
{
"name": "@drop-oss/droplet",
"version": "2.1.1",
"version": "2.2.0",
"main": "index.js",
"types": "index.d.ts",
"napi": {

View File

@ -1,9 +1,4 @@
use std::{
collections::HashMap,
io::{BufRead, BufReader},
sync::Arc,
thread,
};
use std::{collections::HashMap, sync::Arc, thread};
use napi::{
threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode},
@ -46,6 +41,14 @@ pub fn generate_manifest<'a>(
.ok_or(napi::Error::from_reason(
"Could not create backend for path.",
))?;
// This is unsafe (obviously)
// But it's allg as long the DropletHandler doesn't get
// dropped while we're generating the manifest.
let backend: &'static mut Box<dyn VersionBackend + Send> =
unsafe { std::mem::transmute(backend) };
thread::spawn(move || {
let files = backend.list_files();
// Filepath to chunk data
@ -54,9 +57,10 @@ pub fn generate_manifest<'a>(
let total: i32 = files.len() as i32;
let mut i: i32 = 0;
let mut buf = [0u8; 1024 * 16];
for version_file in files {
let reader = backend.reader(&version_file).unwrap();
let mut reader = BufReader::with_capacity(8128, reader);
let mut reader = backend.reader(&version_file, 0, 0).unwrap();
let mut chunk_data = ChunkData {
permissions: version_file.permission,
@ -72,25 +76,25 @@ pub fn generate_manifest<'a>(
let mut file_empty = false;
loop {
let read_buf = reader.fill_buf().unwrap();
let buf_length = read_buf.len();
let read = reader.read(&mut buf).unwrap();
length += buf_length;
length += read;
if length >= CHUNK_SIZE {
break;
}
// If we're out of data, add this chunk and then move onto the next file
if buf_length == 0 {
if read == 0 {
file_empty = true;
break;
}
buffer.extend_from_slice(read_buf);
reader.consume(length);
buffer.extend_from_slice(&buf[..read]);
}
println!("created chunk of size {}", length);
let chunk_id = Uuid::new_v4();
let checksum = md5::compute(buffer).0;
let checksum_string = hex::encode(checksum);
@ -124,6 +128,7 @@ pub fn generate_manifest<'a>(
Ok(json!(chunks).to_string()),
ThreadsafeFunctionCallMode::Blocking,
);
});
Ok(())
}

View File

@ -2,7 +2,7 @@
use std::os::unix::fs::PermissionsExt;
use std::{
fs::{self, metadata, File},
io::{self, Read, Sink},
io::{self, Read, Seek, SeekFrom, Sink},
path::{Path, PathBuf},
sync::Arc,
};
@ -12,7 +12,7 @@ use rawzip::{
FileReader, ZipArchive, ZipArchiveEntryWayfinder, ZipEntry, ZipReader, RECOMMENDED_BUFFER_SIZE,
};
use crate::version::types::{MinimumFileObject, Skippable, VersionBackend, VersionFile};
use crate::version::types::{MinimumFileObject, VersionBackend, VersionFile};
pub fn _list_files(vec: &mut Vec<PathBuf>, path: &Path) {
if metadata(path).unwrap().is_dir() {
@ -52,8 +52,21 @@ impl VersionBackend for PathVersionBackend {
results
}
fn reader(&mut self, file: &VersionFile) -> Option<Box<dyn MinimumFileObject + 'static>> {
let file = File::open(self.base_dir.join(file.relative_filename.clone())).ok()?;
fn reader(
&mut self,
file: &VersionFile,
start: u64,
end: u64,
) -> Option<Box<dyn MinimumFileObject + 'static>> {
let mut file = File::open(self.base_dir.join(file.relative_filename.clone())).ok()?;
if start != 0 {
file.seek(SeekFrom::Start(start)).ok()?;
}
if end != 0 {
return Some(Box::new(file.take(end - start)));
}
return Some(Box::new(file));
}
@ -103,28 +116,53 @@ impl ZipVersionBackend {
pub fn new_entry<'archive>(
&self,
entry: ZipEntry<'archive, FileReader>,
start: u64,
end: u64,
) -> ZipFileWrapper<'archive> {
let deflater = DeflateDecoder::new(entry.reader());
ZipFileWrapper { reader: deflater }
let mut deflater = DeflateDecoder::new(entry.reader());
if start != 0 {
io::copy(&mut (&mut deflater).take(start), &mut Sink::default()).unwrap();
}
ZipFileWrapper {
reader: deflater,
limit: (end - start) as usize,
current: 0,
}
}
}
pub struct ZipFileWrapper<'archive> {
reader: DeflateDecoder<ZipReader<'archive, FileReader>>,
limit: usize,
current: usize,
}
/**
* This read implemention is a result of debugging hell
* It should probably be replaced with a .take() call.
*/
impl<'a> Read for ZipFileWrapper<'a> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let has_limit = self.limit != 0;
// End this stream if the read is the right size
if has_limit {
if self.current >= self.limit {
return Ok(0);
}
}
let read = self.reader.read(buf)?;
Ok(read)
if self.limit != 0 {
self.current += read;
if self.current > self.limit {
let over = self.current - self.limit;
return Ok(read - over);
}
}
impl<'a> Skippable for ZipFileWrapper<'a> {
fn skip(&mut self, amount: u64) {
io::copy(&mut self.take(amount), &mut Sink::default()).unwrap();
return Ok(read);
}
}
impl<'a> MinimumFileObject for ZipFileWrapper<'a> {}
impl ZipVersionBackend {
fn find_wayfinder(&mut self, filename: &str) -> Option<ZipArchiveEntryWayfinder> {
@ -163,11 +201,16 @@ impl VersionBackend for ZipVersionBackend {
results
}
fn reader(&mut self, file: &VersionFile) -> Option<Box<dyn MinimumFileObject + '_>> {
fn reader(
&mut self,
file: &VersionFile,
start: u64,
end: u64,
) -> Option<Box<dyn MinimumFileObject + '_>> {
let wayfinder = self.find_wayfinder(&file.relative_filename)?;
let local_entry = self.archive.get_entry(wayfinder).unwrap();
let wrapper = self.new_entry(local_entry);
let wrapper = self.new_entry(local_entry, start, end);
Some(Box::new(wrapper))
}

View File

@ -1,6 +1,5 @@
use std::{
fmt::Debug,
io::{Read, Seek, SeekFrom},
fmt::Debug, io::Read
};
use dyn_clone::DynClone;
@ -13,36 +12,26 @@ pub struct VersionFile {
pub size: u64,
}
pub trait Skippable {
fn skip(&mut self, amount: u64);
}
impl<T> Skippable for T
where
T: Seek,
{
fn skip(&mut self, amount: u64) {
self.seek(SeekFrom::Start(amount)).unwrap();
}
}
pub trait MinimumFileObject: Read + Send + Skippable {}
impl<T: Read + Send + Seek> MinimumFileObject for T {}
pub trait MinimumFileObject: Read + Send {}
impl<T: Read + Send> MinimumFileObject for T {}
// Intentionally not a generic, because of types in read_file
pub struct ReadToAsyncRead<'a> {
pub inner: Box<dyn Read + Send + 'a>,
}
const ASYNC_READ_BUFFER_SIZE: usize = 8128;
impl<'a> AsyncRead for ReadToAsyncRead<'a> {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<io::Result<()>> {
let mut read_buf = [0u8; 8192];
let var_name = self.inner.read(&mut read_buf).unwrap();
let amount = var_name.min(buf.remaining());
buf.put_slice(&read_buf[0..amount]);
let mut read_buf = [0u8; ASYNC_READ_BUFFER_SIZE];
let read_size = ASYNC_READ_BUFFER_SIZE.min(buf.remaining());
let read = self.inner.read(&mut read_buf[0..read_size]).unwrap();
buf.put_slice(&read_buf[0..read]);
std::task::Poll::Ready(Ok(()))
}
}
@ -50,7 +39,7 @@ impl<'a> AsyncRead for ReadToAsyncRead<'a> {
pub trait VersionBackend: DynClone {
fn list_files(&mut self) -> Vec<VersionFile>;
fn peek_file(&mut self, sub_path: String) -> Option<VersionFile>;
fn reader(&mut self, file: &VersionFile) -> Option<Box<dyn MinimumFileObject + '_>>;
fn reader(&mut self, file: &VersionFile, start: u64, end: u64) -> Option<Box<dyn MinimumFileObject + '_>>;
}
dyn_clone::clone_trait_object!(VersionBackend);

View File

@ -1,4 +1,6 @@
use std::{collections::HashMap, fs::File, io::Read, path::Path};
use std::{
collections::HashMap, fs::File, path::Path
};
use napi::{bindgen_prelude::*, sys::napi_value__, tokio_stream::StreamExt};
use tokio_util::codec::{BytesCodec, FramedRead};
@ -115,20 +117,16 @@ impl<'a> DropletHandler<'a> {
size: 0, // Shouldn't matter
};
// Use `?` operator for cleaner error propagation from `Option`
let mut reader = backend.reader(&version_file).ok_or(napi::Error::from_reason("Failed to create reader."))?;
let reader = backend
.reader(
&version_file,
start.map(|e| e.get_u64().1).unwrap_or(0),
end.map(|e| e.get_u64().1).unwrap_or(0),
)
.ok_or(napi::Error::from_reason("Failed to create reader."))?;
if let Some(skip) = start.clone() {
reader.skip(skip.get_u64().1.into());
// io::copy(&mut reader.by_ref().take(skip.into()), &mut io::sink()).unwrap();
}
let async_reader = if let Some(limit) = end {
let amount = limit.get_u64().1 - start.map_or(Some(0), |v| Some(v.get_u64().1)).unwrap();
ReadToAsyncRead {
inner: Box::new(reader.take(amount.into())),
}
} else {
ReadToAsyncRead { inner: reader }
let async_reader = ReadToAsyncRead {
inner: reader,
};
// Create a FramedRead stream with BytesCodec for chunking
@ -147,9 +145,7 @@ impl<'a> DropletHandler<'a> {
Ok(ReadableStream::create_with_stream_bytes(&env, stream).unwrap())
})?;
Ok(JsDropStreamable {
inner: stream,
})
Ok(JsDropStreamable { inner: stream })
}
}