mirror of
https://github.com/Drop-OSS/droplet.git
synced 2025-11-13 00:02:46 +10:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c67cca4ee0 | |||
| cae208a3e0 | |||
| 4276b9d668 | |||
| 4fb9bb7563 | |||
| 913dc2f58d | |||
| 7ec5e9f215 |
22
__test__/debug.spec.mjs
Normal file
22
__test__/debug.spec.mjs
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
import test from "ava";
|
||||||
|
import { DropletHandler, generateManifest } from "../index.js";
|
||||||
|
|
||||||
|
test.skip("debug", async (t) => {
|
||||||
|
const handler = new DropletHandler();
|
||||||
|
|
||||||
|
console.log("created handler");
|
||||||
|
|
||||||
|
const manifest = JSON.parse(
|
||||||
|
await new Promise((r, e) =>
|
||||||
|
generateManifest(
|
||||||
|
handler,
|
||||||
|
"./assets/TheGame.zip",
|
||||||
|
(_, __) => {},
|
||||||
|
(_, __) => {},
|
||||||
|
(err, manifest) => (err ? e(err) : r(manifest))
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
return t.pass();
|
||||||
|
});
|
||||||
@ -1,6 +1,7 @@
|
|||||||
import test from "ava";
|
import test from "ava";
|
||||||
import fs from "node:fs";
|
import fs from "node:fs";
|
||||||
import path from "path";
|
import path from "path";
|
||||||
|
import prettyBytes from "pretty-bytes";
|
||||||
|
|
||||||
import droplet, { DropletHandler, generateManifest } from "../index.js";
|
import droplet, { DropletHandler, generateManifest } from "../index.js";
|
||||||
|
|
||||||
@ -56,7 +57,7 @@ test("read file", async (t) => {
|
|||||||
|
|
||||||
const dropletHandler = new DropletHandler();
|
const dropletHandler = new DropletHandler();
|
||||||
|
|
||||||
const stream = dropletHandler.readFile(dirName, "TESTFILE");
|
const stream = dropletHandler.readFile(dirName, "TESTFILE", BigInt(0), BigInt(testString.length));
|
||||||
|
|
||||||
let finalString = "";
|
let finalString = "";
|
||||||
|
|
||||||
@ -78,7 +79,12 @@ test("read file offset", async (t) => {
|
|||||||
fs.writeFileSync(dirName + "/TESTFILE", testString);
|
fs.writeFileSync(dirName + "/TESTFILE", testString);
|
||||||
|
|
||||||
const dropletHandler = new DropletHandler();
|
const dropletHandler = new DropletHandler();
|
||||||
const stream = dropletHandler.readFile(dirName, "TESTFILE", BigInt(1), BigInt(4));
|
const stream = dropletHandler.readFile(
|
||||||
|
dirName,
|
||||||
|
"TESTFILE",
|
||||||
|
BigInt(1),
|
||||||
|
BigInt(4)
|
||||||
|
);
|
||||||
|
|
||||||
let finalString = "";
|
let finalString = "";
|
||||||
|
|
||||||
@ -96,10 +102,45 @@ test("read file offset", async (t) => {
|
|||||||
fs.rmSync(dirName, { recursive: true });
|
fs.rmSync(dirName, { recursive: true });
|
||||||
});
|
});
|
||||||
|
|
||||||
test("zip file reader", async (t) => {
|
test.skip("zip speed test", async (t) => {
|
||||||
return t.pass();
|
t.timeout(100_000_000);
|
||||||
|
const dropletHandler = new DropletHandler();
|
||||||
|
|
||||||
t.timeout(10_000);
|
const stream = dropletHandler.readFile("./assets/TheGame.zip", "setup.exe");
|
||||||
|
|
||||||
|
let totalRead = 0;
|
||||||
|
let totalSeconds = 0;
|
||||||
|
|
||||||
|
let lastTime = process.hrtime.bigint();
|
||||||
|
const timeThreshold = BigInt(1_000_000_000);
|
||||||
|
let runningTotal = 0;
|
||||||
|
let runningTime = BigInt(0);
|
||||||
|
for await (const chunk of stream.getStream()) {
|
||||||
|
// Do something with each 'chunk'
|
||||||
|
const currentTime = process.hrtime.bigint();
|
||||||
|
const timeDiff = currentTime - lastTime;
|
||||||
|
lastTime = currentTime;
|
||||||
|
runningTime += timeDiff;
|
||||||
|
|
||||||
|
runningTotal += chunk.length;
|
||||||
|
|
||||||
|
if (runningTime >= timeThreshold) {
|
||||||
|
console.log(`${prettyBytes(runningTotal)}/s`);
|
||||||
|
totalRead += runningTotal;
|
||||||
|
totalSeconds += 1;
|
||||||
|
runningTime = BigInt(0);
|
||||||
|
runningTotal = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const roughAverage = totalRead / totalSeconds;
|
||||||
|
|
||||||
|
console.log(`total rough average: ${prettyBytes(roughAverage)}/s`);
|
||||||
|
|
||||||
|
t.pass();
|
||||||
|
});
|
||||||
|
|
||||||
|
test.skip("zip manifest test", async (t) => {
|
||||||
const dropletHandler = new DropletHandler();
|
const dropletHandler = new DropletHandler();
|
||||||
const manifest = JSON.parse(
|
const manifest = JSON.parse(
|
||||||
await new Promise((r, e) =>
|
await new Promise((r, e) =>
|
||||||
@ -113,19 +154,34 @@ test("zip file reader", async (t) => {
|
|||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
const stream = dropletHandler.readFile(
|
for (const [filename, data] of Object.entries(manifest)) {
|
||||||
"./assets/TheGame.zip",
|
let start = 0;
|
||||||
"setup.exe",
|
for (const [chunkIndex, length] of data.lengths.entries()) {
|
||||||
BigInt(10),
|
const stream = (
|
||||||
BigInt(20)
|
await dropletHandler.readFile(
|
||||||
);
|
"./assets/TheGame.zip",
|
||||||
|
filename,
|
||||||
|
BigInt(start),
|
||||||
|
BigInt(start + length)
|
||||||
|
)
|
||||||
|
).getStream();
|
||||||
|
|
||||||
|
let streamLength = 0;
|
||||||
|
await stream.pipeTo(
|
||||||
|
new WritableStream({
|
||||||
|
write(chunk) {
|
||||||
|
streamLength += chunk.length;
|
||||||
|
},
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
let finalString = "";
|
if (streamLength != length)
|
||||||
for await (const chunk of stream.getStream()) {
|
return t.fail(
|
||||||
// Do something with each 'chunk'
|
`stream length for chunk index ${chunkIndex} was not expected: real: ${streamLength} vs expected: ${length}`
|
||||||
finalString = String.fromCharCode.apply(null, chunk);
|
);
|
||||||
if(finalString.length > 100) break;
|
|
||||||
|
start += length;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
t.pass();
|
t.pass();
|
||||||
|
|||||||
@ -1,3 +1,4 @@
|
|||||||
yes "droplet is awesome" | dd of=./setup.exe bs=1024 count=1000000
|
# yes "droplet is awesome" | dd of=./setup.exe bs=1024 count=1000000
|
||||||
|
dd if=/dev/random of=./setup.exe bs=1024 count=1000000
|
||||||
zip TheGame.zip setup.exe
|
zip TheGame.zip setup.exe
|
||||||
rm setup.exe
|
rm setup.exe
|
||||||
@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@drop-oss/droplet",
|
"name": "@drop-oss/droplet",
|
||||||
"version": "2.0.1",
|
"version": "2.2.1",
|
||||||
"main": "index.js",
|
"main": "index.js",
|
||||||
"types": "index.d.ts",
|
"types": "index.d.ts",
|
||||||
"napi": {
|
"napi": {
|
||||||
@ -24,7 +24,9 @@
|
|||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"@napi-rs/cli": "3.0.0-alpha.91",
|
"@napi-rs/cli": "3.0.0-alpha.91",
|
||||||
"@types/node": "^22.13.10",
|
"@types/node": "^22.13.10",
|
||||||
"ava": "^6.2.0"
|
"ava": "^6.2.0",
|
||||||
|
"pretty-bytes": "^7.0.1",
|
||||||
|
"tsimp": "^2.0.12"
|
||||||
},
|
},
|
||||||
"ava": {
|
"ava": {
|
||||||
"timeout": "3m",
|
"timeout": "3m",
|
||||||
@ -51,8 +53,5 @@
|
|||||||
"packageManager": "yarn@4.7.0",
|
"packageManager": "yarn@4.7.0",
|
||||||
"repository": {
|
"repository": {
|
||||||
"url": "git+https://github.com/Drop-OSS/droplet.git"
|
"url": "git+https://github.com/Drop-OSS/droplet.git"
|
||||||
},
|
|
||||||
"dependencies": {
|
|
||||||
"tsimp": "^2.0.12"
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,10 +1,4 @@
|
|||||||
use std::{
|
use std::{collections::HashMap, sync::Arc, thread};
|
||||||
collections::HashMap,
|
|
||||||
io::{BufRead, BufReader},
|
|
||||||
path::Path,
|
|
||||||
sync::Arc,
|
|
||||||
thread,
|
|
||||||
};
|
|
||||||
|
|
||||||
use napi::{
|
use napi::{
|
||||||
threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode},
|
threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode},
|
||||||
@ -42,10 +36,18 @@ pub fn generate_manifest<'a>(
|
|||||||
log_sfn: ThreadsafeFunction<String>,
|
log_sfn: ThreadsafeFunction<String>,
|
||||||
callback_sfn: ThreadsafeFunction<String>,
|
callback_sfn: ThreadsafeFunction<String>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let backend: &mut Box<dyn VersionBackend + Send> =
|
let backend: &mut Box<dyn VersionBackend + Send> = droplet_handler
|
||||||
droplet_handler.create_backend_for_path(dir).ok_or(napi::Error::from_reason("Could not create backend for path."))?;
|
.create_backend_for_path(dir)
|
||||||
|
.ok_or(napi::Error::from_reason(
|
||||||
|
"Could not create backend for path.",
|
||||||
|
))?;
|
||||||
|
|
||||||
|
// This is unsafe (obviously)
|
||||||
|
// But it's allg as long the DropletHandler doesn't get
|
||||||
|
// dropped while we're generating the manifest.
|
||||||
let backend: &'static mut Box<dyn VersionBackend + Send> =
|
let backend: &'static mut Box<dyn VersionBackend + Send> =
|
||||||
unsafe { std::mem::transmute(backend) };
|
unsafe { std::mem::transmute(backend) };
|
||||||
|
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
let files = backend.list_files();
|
let files = backend.list_files();
|
||||||
|
|
||||||
@ -55,9 +57,10 @@ pub fn generate_manifest<'a>(
|
|||||||
let total: i32 = files.len() as i32;
|
let total: i32 = files.len() as i32;
|
||||||
let mut i: i32 = 0;
|
let mut i: i32 = 0;
|
||||||
|
|
||||||
|
let mut buf = [0u8; 1024 * 16];
|
||||||
|
|
||||||
for version_file in files {
|
for version_file in files {
|
||||||
let raw_reader = backend.reader(&version_file).unwrap();
|
let mut reader = backend.reader(&version_file, 0, 0).unwrap();
|
||||||
let mut reader = BufReader::with_capacity(CHUNK_SIZE, raw_reader);
|
|
||||||
|
|
||||||
let mut chunk_data = ChunkData {
|
let mut chunk_data = ChunkData {
|
||||||
permissions: version_file.permission,
|
permissions: version_file.permission,
|
||||||
@ -68,12 +71,26 @@ pub fn generate_manifest<'a>(
|
|||||||
|
|
||||||
let mut chunk_index = 0;
|
let mut chunk_index = 0;
|
||||||
loop {
|
loop {
|
||||||
|
let mut length = 0;
|
||||||
let mut buffer: Vec<u8> = Vec::new();
|
let mut buffer: Vec<u8> = Vec::new();
|
||||||
reader.fill_buf().unwrap().clone_into(&mut buffer);
|
let mut file_empty = false;
|
||||||
let length = buffer.len();
|
|
||||||
|
|
||||||
if length == 0 {
|
loop {
|
||||||
break;
|
let read = reader.read(&mut buf).unwrap();
|
||||||
|
|
||||||
|
length += read;
|
||||||
|
|
||||||
|
if length >= CHUNK_SIZE {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we're out of data, add this chunk and then move onto the next file
|
||||||
|
if read == 0 {
|
||||||
|
file_empty = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
buffer.extend_from_slice(&buf[..read]);
|
||||||
}
|
}
|
||||||
|
|
||||||
let chunk_id = Uuid::new_v4();
|
let chunk_id = Uuid::new_v4();
|
||||||
@ -88,10 +105,14 @@ pub fn generate_manifest<'a>(
|
|||||||
"Processed chunk {} for {}",
|
"Processed chunk {} for {}",
|
||||||
chunk_index, &version_file.relative_filename
|
chunk_index, &version_file.relative_filename
|
||||||
);
|
);
|
||||||
|
|
||||||
log_sfn.call(Ok(log_str), ThreadsafeFunctionCallMode::Blocking);
|
log_sfn.call(Ok(log_str), ThreadsafeFunctionCallMode::Blocking);
|
||||||
|
|
||||||
reader.consume(length);
|
|
||||||
chunk_index += 1;
|
chunk_index += 1;
|
||||||
|
|
||||||
|
if file_empty {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
chunks.insert(version_file.relative_filename, chunk_data);
|
chunks.insert(version_file.relative_filename, chunk_data);
|
||||||
|
|||||||
@ -2,7 +2,7 @@
|
|||||||
use std::os::unix::fs::PermissionsExt;
|
use std::os::unix::fs::PermissionsExt;
|
||||||
use std::{
|
use std::{
|
||||||
fs::{self, metadata, File},
|
fs::{self, metadata, File},
|
||||||
io::{self, Read, Sink},
|
io::{self, Read, Seek, SeekFrom, Sink},
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
};
|
};
|
||||||
@ -12,7 +12,7 @@ use rawzip::{
|
|||||||
FileReader, ZipArchive, ZipArchiveEntryWayfinder, ZipEntry, ZipReader, RECOMMENDED_BUFFER_SIZE,
|
FileReader, ZipArchive, ZipArchiveEntryWayfinder, ZipEntry, ZipReader, RECOMMENDED_BUFFER_SIZE,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::version::types::{MinimumFileObject, Skippable, VersionBackend, VersionFile};
|
use crate::version::types::{MinimumFileObject, VersionBackend, VersionFile};
|
||||||
|
|
||||||
pub fn _list_files(vec: &mut Vec<PathBuf>, path: &Path) {
|
pub fn _list_files(vec: &mut Vec<PathBuf>, path: &Path) {
|
||||||
if metadata(path).unwrap().is_dir() {
|
if metadata(path).unwrap().is_dir() {
|
||||||
@ -52,8 +52,21 @@ impl VersionBackend for PathVersionBackend {
|
|||||||
results
|
results
|
||||||
}
|
}
|
||||||
|
|
||||||
fn reader(&mut self, file: &VersionFile) -> Option<Box<dyn MinimumFileObject + 'static>> {
|
fn reader(
|
||||||
let file = File::open(self.base_dir.join(file.relative_filename.clone())).ok()?;
|
&mut self,
|
||||||
|
file: &VersionFile,
|
||||||
|
start: u64,
|
||||||
|
end: u64,
|
||||||
|
) -> Option<Box<dyn MinimumFileObject + 'static>> {
|
||||||
|
let mut file = File::open(self.base_dir.join(file.relative_filename.clone())).ok()?;
|
||||||
|
|
||||||
|
if start != 0 {
|
||||||
|
file.seek(SeekFrom::Start(start)).ok()?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if end != 0 {
|
||||||
|
return Some(Box::new(file.take(end - start)));
|
||||||
|
}
|
||||||
|
|
||||||
return Some(Box::new(file));
|
return Some(Box::new(file));
|
||||||
}
|
}
|
||||||
@ -103,28 +116,53 @@ impl ZipVersionBackend {
|
|||||||
pub fn new_entry<'archive>(
|
pub fn new_entry<'archive>(
|
||||||
&self,
|
&self,
|
||||||
entry: ZipEntry<'archive, FileReader>,
|
entry: ZipEntry<'archive, FileReader>,
|
||||||
|
start: u64,
|
||||||
|
end: u64,
|
||||||
) -> ZipFileWrapper<'archive> {
|
) -> ZipFileWrapper<'archive> {
|
||||||
let deflater = DeflateDecoder::new(entry.reader());
|
let mut deflater = DeflateDecoder::new(entry.reader());
|
||||||
ZipFileWrapper { reader: deflater }
|
if start != 0 {
|
||||||
|
io::copy(&mut (&mut deflater).take(start), &mut Sink::default()).unwrap();
|
||||||
|
}
|
||||||
|
ZipFileWrapper {
|
||||||
|
reader: deflater,
|
||||||
|
limit: (end - start) as usize,
|
||||||
|
current: 0,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ZipFileWrapper<'archive> {
|
pub struct ZipFileWrapper<'archive> {
|
||||||
reader: DeflateDecoder<ZipReader<'archive, FileReader>>,
|
reader: DeflateDecoder<ZipReader<'archive, FileReader>>,
|
||||||
|
limit: usize,
|
||||||
|
current: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This read implemention is a result of debugging hell
|
||||||
|
* It should probably be replaced with a .take() call.
|
||||||
|
*/
|
||||||
impl<'a> Read for ZipFileWrapper<'a> {
|
impl<'a> Read for ZipFileWrapper<'a> {
|
||||||
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||||
|
let has_limit = self.limit != 0;
|
||||||
|
|
||||||
|
// End this stream if the read is the right size
|
||||||
|
if has_limit {
|
||||||
|
if self.current >= self.limit {
|
||||||
|
return Ok(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let read = self.reader.read(buf)?;
|
let read = self.reader.read(buf)?;
|
||||||
Ok(read)
|
if self.limit != 0 {
|
||||||
|
self.current += read;
|
||||||
|
if self.current > self.limit {
|
||||||
|
let over = self.current - self.limit;
|
||||||
|
return Ok(read - over);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Ok(read);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
impl<'a> Skippable for ZipFileWrapper<'a> {
|
|
||||||
fn skip(&mut self, amount: u64) {
|
|
||||||
io::copy(&mut self.reader.by_ref().take(amount), &mut Sink::default()).unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl<'a> MinimumFileObject for ZipFileWrapper<'a> {}
|
|
||||||
|
|
||||||
impl ZipVersionBackend {
|
impl ZipVersionBackend {
|
||||||
fn find_wayfinder(&mut self, filename: &str) -> Option<ZipArchiveEntryWayfinder> {
|
fn find_wayfinder(&mut self, filename: &str) -> Option<ZipArchiveEntryWayfinder> {
|
||||||
@ -163,11 +201,16 @@ impl VersionBackend for ZipVersionBackend {
|
|||||||
results
|
results
|
||||||
}
|
}
|
||||||
|
|
||||||
fn reader(&mut self, file: &VersionFile) -> Option<Box<dyn MinimumFileObject + '_>> {
|
fn reader(
|
||||||
|
&mut self,
|
||||||
|
file: &VersionFile,
|
||||||
|
start: u64,
|
||||||
|
end: u64,
|
||||||
|
) -> Option<Box<dyn MinimumFileObject + '_>> {
|
||||||
let wayfinder = self.find_wayfinder(&file.relative_filename)?;
|
let wayfinder = self.find_wayfinder(&file.relative_filename)?;
|
||||||
let local_entry = self.archive.get_entry(wayfinder).unwrap();
|
let local_entry = self.archive.get_entry(wayfinder).unwrap();
|
||||||
|
|
||||||
let wrapper = self.new_entry(local_entry);
|
let wrapper = self.new_entry(local_entry, start, end);
|
||||||
|
|
||||||
Some(Box::new(wrapper))
|
Some(Box::new(wrapper))
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,6 +1,5 @@
|
|||||||
use std::{
|
use std::{
|
||||||
fmt::Debug,
|
fmt::Debug, io::Read
|
||||||
io::{Read, Seek, SeekFrom},
|
|
||||||
};
|
};
|
||||||
|
|
||||||
use dyn_clone::DynClone;
|
use dyn_clone::DynClone;
|
||||||
@ -13,36 +12,26 @@ pub struct VersionFile {
|
|||||||
pub size: u64,
|
pub size: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait Skippable {
|
pub trait MinimumFileObject: Read + Send {}
|
||||||
fn skip(&mut self, amount: u64);
|
impl<T: Read + Send> MinimumFileObject for T {}
|
||||||
}
|
|
||||||
impl<T> Skippable for T
|
|
||||||
where
|
|
||||||
T: Seek,
|
|
||||||
{
|
|
||||||
fn skip(&mut self, amount: u64) {
|
|
||||||
self.seek(SeekFrom::Start(amount)).unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub trait MinimumFileObject: Read + Send + Skippable {}
|
|
||||||
impl<T: Read + Send + Seek> MinimumFileObject for T {}
|
|
||||||
|
|
||||||
// Intentionally not a generic, because of types in read_file
|
// Intentionally not a generic, because of types in read_file
|
||||||
pub struct ReadToAsyncRead<'a> {
|
pub struct ReadToAsyncRead<'a> {
|
||||||
pub inner: Box<dyn Read + Send + 'a>,
|
pub inner: Box<dyn Read + Send + 'a>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const ASYNC_READ_BUFFER_SIZE: usize = 8128;
|
||||||
|
|
||||||
impl<'a> AsyncRead for ReadToAsyncRead<'a> {
|
impl<'a> AsyncRead for ReadToAsyncRead<'a> {
|
||||||
fn poll_read(
|
fn poll_read(
|
||||||
mut self: std::pin::Pin<&mut Self>,
|
mut self: std::pin::Pin<&mut Self>,
|
||||||
_cx: &mut std::task::Context<'_>,
|
_cx: &mut std::task::Context<'_>,
|
||||||
buf: &mut tokio::io::ReadBuf<'_>,
|
buf: &mut tokio::io::ReadBuf<'_>,
|
||||||
) -> std::task::Poll<io::Result<()>> {
|
) -> std::task::Poll<io::Result<()>> {
|
||||||
let mut read_buf = [0u8; 8192];
|
let mut read_buf = [0u8; ASYNC_READ_BUFFER_SIZE];
|
||||||
let var_name = self.inner.read(&mut read_buf).unwrap();
|
let read_size = ASYNC_READ_BUFFER_SIZE.min(buf.remaining());
|
||||||
let amount = var_name.min(buf.remaining());
|
let read = self.inner.read(&mut read_buf[0..read_size]).unwrap();
|
||||||
buf.put_slice(&read_buf[0..amount]);
|
buf.put_slice(&read_buf[0..read]);
|
||||||
std::task::Poll::Ready(Ok(()))
|
std::task::Poll::Ready(Ok(()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -50,7 +39,7 @@ impl<'a> AsyncRead for ReadToAsyncRead<'a> {
|
|||||||
pub trait VersionBackend: DynClone {
|
pub trait VersionBackend: DynClone {
|
||||||
fn list_files(&mut self) -> Vec<VersionFile>;
|
fn list_files(&mut self) -> Vec<VersionFile>;
|
||||||
fn peek_file(&mut self, sub_path: String) -> Option<VersionFile>;
|
fn peek_file(&mut self, sub_path: String) -> Option<VersionFile>;
|
||||||
fn reader(&mut self, file: &VersionFile) -> Option<Box<dyn MinimumFileObject + '_>>;
|
fn reader(&mut self, file: &VersionFile, start: u64, end: u64) -> Option<Box<dyn MinimumFileObject + '_>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
dyn_clone::clone_trait_object!(VersionBackend);
|
dyn_clone::clone_trait_object!(VersionBackend);
|
||||||
@ -1,4 +1,6 @@
|
|||||||
use std::{collections::HashMap, fs::File, io::Read, path::Path};
|
use std::{
|
||||||
|
collections::HashMap, fs::File, path::Path
|
||||||
|
};
|
||||||
|
|
||||||
use napi::{bindgen_prelude::*, sys::napi_value__, tokio_stream::StreamExt};
|
use napi::{bindgen_prelude::*, sys::napi_value__, tokio_stream::StreamExt};
|
||||||
use tokio_util::codec::{BytesCodec, FramedRead};
|
use tokio_util::codec::{BytesCodec, FramedRead};
|
||||||
@ -115,20 +117,16 @@ impl<'a> DropletHandler<'a> {
|
|||||||
size: 0, // Shouldn't matter
|
size: 0, // Shouldn't matter
|
||||||
};
|
};
|
||||||
// Use `?` operator for cleaner error propagation from `Option`
|
// Use `?` operator for cleaner error propagation from `Option`
|
||||||
let mut reader = backend.reader(&version_file).ok_or(napi::Error::from_reason("Failed to create reader."))?;
|
let reader = backend
|
||||||
|
.reader(
|
||||||
|
&version_file,
|
||||||
|
start.map(|e| e.get_u64().1).unwrap_or(0),
|
||||||
|
end.map(|e| e.get_u64().1).unwrap_or(0),
|
||||||
|
)
|
||||||
|
.ok_or(napi::Error::from_reason("Failed to create reader."))?;
|
||||||
|
|
||||||
if let Some(skip) = start.clone() {
|
let async_reader = ReadToAsyncRead {
|
||||||
reader.skip(skip.get_u64().1.into());
|
inner: reader,
|
||||||
// io::copy(&mut reader.by_ref().take(skip.into()), &mut io::sink()).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
let async_reader = if let Some(limit) = end {
|
|
||||||
let amount = limit.get_u64().1 - start.map_or(Some(0), |v| Some(v.get_u64().1)).unwrap();
|
|
||||||
ReadToAsyncRead {
|
|
||||||
inner: Box::new(reader.take(amount.into())),
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ReadToAsyncRead { inner: reader }
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// Create a FramedRead stream with BytesCodec for chunking
|
// Create a FramedRead stream with BytesCodec for chunking
|
||||||
@ -147,9 +145,7 @@ impl<'a> DropletHandler<'a> {
|
|||||||
Ok(ReadableStream::create_with_stream_bytes(&env, stream).unwrap())
|
Ok(ReadableStream::create_with_stream_bytes(&env, stream).unwrap())
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
Ok(JsDropStreamable {
|
Ok(JsDropStreamable { inner: stream })
|
||||||
inner: stream,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -164,4 +160,4 @@ impl JsDropStreamable {
|
|||||||
pub fn get_stream(&self) -> *mut napi_value__ {
|
pub fn get_stream(&self) -> *mut napi_value__ {
|
||||||
self.inner.raw()
|
self.inner.raw()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,6 +12,7 @@ __metadata:
|
|||||||
"@napi-rs/cli": "npm:3.0.0-alpha.91"
|
"@napi-rs/cli": "npm:3.0.0-alpha.91"
|
||||||
"@types/node": "npm:^22.13.10"
|
"@types/node": "npm:^22.13.10"
|
||||||
ava: "npm:^6.2.0"
|
ava: "npm:^6.2.0"
|
||||||
|
pretty-bytes: "npm:^7.0.1"
|
||||||
tsimp: "npm:^2.0.12"
|
tsimp: "npm:^2.0.12"
|
||||||
languageName: unknown
|
languageName: unknown
|
||||||
linkType: soft
|
linkType: soft
|
||||||
@ -2432,6 +2433,13 @@ __metadata:
|
|||||||
languageName: node
|
languageName: node
|
||||||
linkType: hard
|
linkType: hard
|
||||||
|
|
||||||
|
"pretty-bytes@npm:^7.0.1":
|
||||||
|
version: 7.0.1
|
||||||
|
resolution: "pretty-bytes@npm:7.0.1"
|
||||||
|
checksum: 10c0/14ffb503d2de3588042c722848062a4897e6faece1694e0c83ba5669ec003d73311d946d50d2b3c6099a6a306760011b8446ee3cf9cf86eca13a454a8f1c47cb
|
||||||
|
languageName: node
|
||||||
|
linkType: hard
|
||||||
|
|
||||||
"pretty-ms@npm:^9.1.0":
|
"pretty-ms@npm:^9.1.0":
|
||||||
version: 9.2.0
|
version: 9.2.0
|
||||||
resolution: "pretty-ms@npm:9.2.0"
|
resolution: "pretty-ms@npm:9.2.0"
|
||||||
|
|||||||
Reference in New Issue
Block a user