diff --git a/__test__/utils.spec.mjs b/__test__/utils.spec.mjs index 4fb07b6..838df6c 100644 --- a/__test__/utils.spec.mjs +++ b/__test__/utils.spec.mjs @@ -57,7 +57,7 @@ test("read file", async (t) => { const dropletHandler = new DropletHandler(); - const stream = dropletHandler.readFile(dirName, "TESTFILE"); + const stream = dropletHandler.readFile(dirName, "TESTFILE", BigInt(0), BigInt(testString.length)); let finalString = ""; @@ -154,11 +154,34 @@ test.skip("zip manifest test", async (t) => { ) ); - const file = manifest[Object.keys(manifest).at(0)]; - const amount = file.ids.length; + for (const [filename, data] of Object.entries(manifest)) { + let start = 0; + for (const [chunkIndex, length] of data.lengths.entries()) { + const stream = ( + await dropletHandler.readFile( + "./assets/TheGame.zip", + filename, + BigInt(start), + BigInt(start + length) + ) + ).getStream(); - if(amount > 20) { - return t.fail(`Zip manifest has ${amount} chunks, more than 20`); + let streamLength = 0; + await stream.pipeTo( + new WritableStream({ + write(chunk) { + streamLength += chunk.length; + }, + }) + ); + + if (streamLength != length) + return t.fail( + `stream length for chunk index ${chunkIndex} was not expected: real: ${streamLength} vs expected: ${length}` + ); + + start += length; + } } t.pass(); diff --git a/package.json b/package.json index 0b0ff58..bfba769 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@drop-oss/droplet", - "version": "2.1.1", + "version": "2.2.0", "main": "index.js", "types": "index.d.ts", "napi": { diff --git a/src/manifest.rs b/src/manifest.rs index 9b3b68a..9cec34f 100644 --- a/src/manifest.rs +++ b/src/manifest.rs @@ -1,9 +1,4 @@ -use std::{ - collections::HashMap, - io::{BufRead, BufReader}, - sync::Arc, - thread, -}; +use std::{collections::HashMap, sync::Arc, thread}; use napi::{ threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode}, @@ -46,6 +41,14 @@ pub fn generate_manifest<'a>( .ok_or(napi::Error::from_reason( "Could not create backend for path.", ))?; + + // This is unsafe (obviously) + // But it's allg as long the DropletHandler doesn't get + // dropped while we're generating the manifest. + let backend: &'static mut Box = + unsafe { std::mem::transmute(backend) }; + + thread::spawn(move || { let files = backend.list_files(); // Filepath to chunk data @@ -54,9 +57,10 @@ pub fn generate_manifest<'a>( let total: i32 = files.len() as i32; let mut i: i32 = 0; + let mut buf = [0u8; 1024 * 16]; + for version_file in files { - let reader = backend.reader(&version_file).unwrap(); - let mut reader = BufReader::with_capacity(8128, reader); + let mut reader = backend.reader(&version_file, 0, 0).unwrap(); let mut chunk_data = ChunkData { permissions: version_file.permission, @@ -72,25 +76,25 @@ pub fn generate_manifest<'a>( let mut file_empty = false; loop { - let read_buf = reader.fill_buf().unwrap(); - let buf_length = read_buf.len(); + let read = reader.read(&mut buf).unwrap(); - length += buf_length; + length += read; if length >= CHUNK_SIZE { break; } // If we're out of data, add this chunk and then move onto the next file - if buf_length == 0 { + if read == 0 { file_empty = true; break; } - buffer.extend_from_slice(read_buf); - reader.consume(length); + buffer.extend_from_slice(&buf[..read]); } + println!("created chunk of size {}", length); + let chunk_id = Uuid::new_v4(); let checksum = md5::compute(buffer).0; let checksum_string = hex::encode(checksum); @@ -124,6 +128,7 @@ pub fn generate_manifest<'a>( Ok(json!(chunks).to_string()), ThreadsafeFunctionCallMode::Blocking, ); + }); Ok(()) } diff --git a/src/version/backends.rs b/src/version/backends.rs index 5311844..ef11ea0 100644 --- a/src/version/backends.rs +++ b/src/version/backends.rs @@ -2,7 +2,7 @@ use std::os::unix::fs::PermissionsExt; use std::{ fs::{self, metadata, File}, - io::{self, Read, Sink}, + io::{self, Read, Seek, SeekFrom, Sink}, path::{Path, PathBuf}, sync::Arc, }; @@ -12,7 +12,7 @@ use rawzip::{ FileReader, ZipArchive, ZipArchiveEntryWayfinder, ZipEntry, ZipReader, RECOMMENDED_BUFFER_SIZE, }; -use crate::version::types::{MinimumFileObject, Skippable, VersionBackend, VersionFile}; +use crate::version::types::{MinimumFileObject, VersionBackend, VersionFile}; pub fn _list_files(vec: &mut Vec, path: &Path) { if metadata(path).unwrap().is_dir() { @@ -52,8 +52,21 @@ impl VersionBackend for PathVersionBackend { results } - fn reader(&mut self, file: &VersionFile) -> Option> { - let file = File::open(self.base_dir.join(file.relative_filename.clone())).ok()?; + fn reader( + &mut self, + file: &VersionFile, + start: u64, + end: u64, + ) -> Option> { + let mut file = File::open(self.base_dir.join(file.relative_filename.clone())).ok()?; + + if start != 0 { + file.seek(SeekFrom::Start(start)).ok()?; + } + + if end != 0 { + return Some(Box::new(file.take(end - start))); + } return Some(Box::new(file)); } @@ -103,28 +116,53 @@ impl ZipVersionBackend { pub fn new_entry<'archive>( &self, entry: ZipEntry<'archive, FileReader>, + start: u64, + end: u64, ) -> ZipFileWrapper<'archive> { - let deflater = DeflateDecoder::new(entry.reader()); - ZipFileWrapper { reader: deflater } + let mut deflater = DeflateDecoder::new(entry.reader()); + if start != 0 { + io::copy(&mut (&mut deflater).take(start), &mut Sink::default()).unwrap(); + } + ZipFileWrapper { + reader: deflater, + limit: (end - start) as usize, + current: 0, + } } } pub struct ZipFileWrapper<'archive> { reader: DeflateDecoder>, + limit: usize, + current: usize, } +/** + * This read implemention is a result of debugging hell + * It should probably be replaced with a .take() call. + */ impl<'a> Read for ZipFileWrapper<'a> { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let has_limit = self.limit != 0; + + // End this stream if the read is the right size + if has_limit { + if self.current >= self.limit { + return Ok(0); + } + } + let read = self.reader.read(buf)?; - Ok(read) + if self.limit != 0 { + self.current += read; + if self.current > self.limit { + let over = self.current - self.limit; + return Ok(read - over); + } + } + return Ok(read); } } -impl<'a> Skippable for ZipFileWrapper<'a> { - fn skip(&mut self, amount: u64) { - io::copy(&mut self.take(amount), &mut Sink::default()).unwrap(); - } -} -impl<'a> MinimumFileObject for ZipFileWrapper<'a> {} impl ZipVersionBackend { fn find_wayfinder(&mut self, filename: &str) -> Option { @@ -163,11 +201,16 @@ impl VersionBackend for ZipVersionBackend { results } - fn reader(&mut self, file: &VersionFile) -> Option> { + fn reader( + &mut self, + file: &VersionFile, + start: u64, + end: u64, + ) -> Option> { let wayfinder = self.find_wayfinder(&file.relative_filename)?; let local_entry = self.archive.get_entry(wayfinder).unwrap(); - let wrapper = self.new_entry(local_entry); + let wrapper = self.new_entry(local_entry, start, end); Some(Box::new(wrapper)) } diff --git a/src/version/types.rs b/src/version/types.rs index f3e7342..37fe899 100644 --- a/src/version/types.rs +++ b/src/version/types.rs @@ -1,6 +1,5 @@ use std::{ - fmt::Debug, - io::{Read, Seek, SeekFrom}, + fmt::Debug, io::Read }; use dyn_clone::DynClone; @@ -13,36 +12,26 @@ pub struct VersionFile { pub size: u64, } -pub trait Skippable { - fn skip(&mut self, amount: u64); -} -impl Skippable for T -where - T: Seek, -{ - fn skip(&mut self, amount: u64) { - self.seek(SeekFrom::Start(amount)).unwrap(); - } -} - -pub trait MinimumFileObject: Read + Send + Skippable {} -impl MinimumFileObject for T {} +pub trait MinimumFileObject: Read + Send {} +impl MinimumFileObject for T {} // Intentionally not a generic, because of types in read_file pub struct ReadToAsyncRead<'a> { pub inner: Box, } +const ASYNC_READ_BUFFER_SIZE: usize = 8128; + impl<'a> AsyncRead for ReadToAsyncRead<'a> { fn poll_read( mut self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>, buf: &mut tokio::io::ReadBuf<'_>, ) -> std::task::Poll> { - let mut read_buf = [0u8; 8192]; - let var_name = self.inner.read(&mut read_buf).unwrap(); - let amount = var_name.min(buf.remaining()); - buf.put_slice(&read_buf[0..amount]); + let mut read_buf = [0u8; ASYNC_READ_BUFFER_SIZE]; + let read_size = ASYNC_READ_BUFFER_SIZE.min(buf.remaining()); + let read = self.inner.read(&mut read_buf[0..read_size]).unwrap(); + buf.put_slice(&read_buf[0..read]); std::task::Poll::Ready(Ok(())) } } @@ -50,7 +39,7 @@ impl<'a> AsyncRead for ReadToAsyncRead<'a> { pub trait VersionBackend: DynClone { fn list_files(&mut self) -> Vec; fn peek_file(&mut self, sub_path: String) -> Option; - fn reader(&mut self, file: &VersionFile) -> Option>; + fn reader(&mut self, file: &VersionFile, start: u64, end: u64) -> Option>; } dyn_clone::clone_trait_object!(VersionBackend); \ No newline at end of file diff --git a/src/version/utils.rs b/src/version/utils.rs index 515106a..36d5b72 100644 --- a/src/version/utils.rs +++ b/src/version/utils.rs @@ -1,4 +1,6 @@ -use std::{collections::HashMap, fs::File, io::Read, path::Path}; +use std::{ + collections::HashMap, fs::File, path::Path +}; use napi::{bindgen_prelude::*, sys::napi_value__, tokio_stream::StreamExt}; use tokio_util::codec::{BytesCodec, FramedRead}; @@ -115,20 +117,16 @@ impl<'a> DropletHandler<'a> { size: 0, // Shouldn't matter }; // Use `?` operator for cleaner error propagation from `Option` - let mut reader = backend.reader(&version_file).ok_or(napi::Error::from_reason("Failed to create reader."))?; + let reader = backend + .reader( + &version_file, + start.map(|e| e.get_u64().1).unwrap_or(0), + end.map(|e| e.get_u64().1).unwrap_or(0), + ) + .ok_or(napi::Error::from_reason("Failed to create reader."))?; - if let Some(skip) = start.clone() { - reader.skip(skip.get_u64().1.into()); - // io::copy(&mut reader.by_ref().take(skip.into()), &mut io::sink()).unwrap(); - } - - let async_reader = if let Some(limit) = end { - let amount = limit.get_u64().1 - start.map_or(Some(0), |v| Some(v.get_u64().1)).unwrap(); - ReadToAsyncRead { - inner: Box::new(reader.take(amount.into())), - } - } else { - ReadToAsyncRead { inner: reader } + let async_reader = ReadToAsyncRead { + inner: reader, }; // Create a FramedRead stream with BytesCodec for chunking @@ -147,9 +145,7 @@ impl<'a> DropletHandler<'a> { Ok(ReadableStream::create_with_stream_bytes(&env, stream).unwrap()) })?; - Ok(JsDropStreamable { - inner: stream, - }) + Ok(JsDropStreamable { inner: stream }) } } @@ -164,4 +160,4 @@ impl JsDropStreamable { pub fn get_stream(&self) -> *mut napi_value__ { self.inner.raw() } -} \ No newline at end of file +}