10 Commits

Author SHA1 Message Date
bb678b4b3a fix: tests 2025-05-28 16:48:07 +10:00
cc94798962 feat: add file reader 2025-05-28 15:03:45 +10:00
7811818a72 Merge branch 'borked-reader' 2025-05-28 14:55:05 +10:00
b6910e717b fix: Changed FramedRead to work with ReadableStream
Signed-off-by: quexeky <git@quexeky.dev>
2025-05-28 14:52:42 +10:00
45a26c7156 inprogress: handoff to quexeky 2025-05-28 13:53:28 +10:00
16b78bca17 fix: chunk size 2025-05-27 10:34:44 +10:00
4ac19b8be0 fix: update index.js & index.d.ts 2025-05-26 17:20:03 +10:00
072a1584a0 feat: add list files command 2025-05-26 15:02:41 +10:00
6b5356627a fix: remove unnecessary size parameter causing windows build issues 2025-05-26 14:03:26 +10:00
7ede73e87c feat: move to backend-based manifest 2025-05-26 13:56:24 +10:00
10 changed files with 240 additions and 78 deletions

2
.gitignore vendored
View File

@ -9,7 +9,7 @@ npm-debug.log*
yarn-debug.log* yarn-debug.log*
yarn-error.log* yarn-error.log*
lerna-debug.log* lerna-debug.log*
.test .test*
.tsimp .tsimp
# Diagnostic reports (https://nodejs.org/api/report.html) # Diagnostic reports (https://nodejs.org/api/report.html)

View File

@ -9,11 +9,12 @@ crate-type = ["cdylib"]
[dependencies] [dependencies]
# Default enable napi4 feature, see https://nodejs.org/api/n-api.html#node-api-version-matrix # Default enable napi4 feature, see https://nodejs.org/api/n-api.html#node-api-version-matrix
napi = { version = "2.12.2", default-features = false, features = [ napi = { version = "3.0.0-alpha.33", default-features = false, features = [
"napi4", "napi4",
"async", "async",
"web_stream",
] } ] }
napi-derive = "2.12.2" napi-derive = "3.0.0-alpha.33"
hex = "0.4.3" hex = "0.4.3"
serde_json = "1.0.128" serde_json = "1.0.128"
md5 = "0.7.0" md5 = "0.7.0"
@ -21,6 +22,8 @@ time-macros = "0.2.22"
time = "0.3.41" time = "0.3.41"
webpki = "0.22.4" webpki = "0.22.4"
ring = "0.17.14" ring = "0.17.14"
tokio = { version = "1.45.1", features = ["fs"] }
tokio-util = { version = "0.7.15", features = ["codec"] }
[dependencies.x509-parser] [dependencies.x509-parser]
version = "0.17.0" version = "0.17.0"

View File

@ -2,7 +2,7 @@ import test from "ava";
import fs from "node:fs"; import fs from "node:fs";
import path from "path"; import path from "path";
import { generateManifest } from "../index.js"; import { generateManifest, listFiles } from "../index.js";
test("numerous small file", async (t) => { test("numerous small file", async (t) => {
// Setup test dir // Setup test dir

46
__test__/utils.spec.mjs Normal file
View File

@ -0,0 +1,46 @@
import test from "ava";
import fs from "node:fs";
import path from "path";
import droplet from "../index.js";
test("check alt thread util", async (t) => {
let endtime1, endtime2;
droplet.callAltThreadFunc(async () => {
await new Promise((r) => setTimeout(r, 100));
endtime1 = Date.now();
});
await new Promise((r) => setTimeout(r, 500));
endtime2 = Date.now();
const difference = endtime2 - endtime1;
if (difference >= 600) {
t.fail("likely isn't multithreaded, difference: " + difference);
}
t.pass();
});
test("read file", async (t) => {
const dirName = "./.test2";
if (fs.existsSync(dirName)) fs.rmSync(dirName, { recursive: true });
fs.mkdirSync(dirName, { recursive: true });
const testString = "g'day what's up my koala bros\n".repeat(10000);
fs.writeFileSync("./.test2/TESTFILE", testString);
const stream = droplet.readFile("./.test2", "TESTFILE");
let finalString = "";
for await (const chunk of stream) {
// Do something with each 'chunk'
finalString += String.fromCharCode.apply(null, chunk);
}
t.assert(finalString == testString, "file strings don't match");
fs.rmSync(dirName, { recursive: true });
});

18
index.d.ts vendored
View File

@ -3,10 +3,14 @@
/* auto-generated by NAPI-RS */ /* auto-generated by NAPI-RS */
export declare function callAltThreadFunc(callback: (...args: any[]) => any): void function hasBackendForPath(path: string): boolean
export declare function generateManifest(dir: string, progress: (...args: any[]) => any, log: (...args: any[]) => any, callback: (...args: any[]) => any): void function listFiles(path: string): Array<string>
export declare function generateRootCa(): Array<string> function readFile(path: string, subPath: string): ReadableStream<Buffer> | null
export declare function generateClientCertificate(clientId: string, clientName: string, rootCa: string, rootCaPrivate: string): Array<string> function callAltThreadFunc(tsfn: ((err: Error | null, ) => any)): void
export declare function verifyClientCertificate(clientCert: string, rootCa: string): boolean function generateManifest(dir: string, progressSfn: ((err: Error | null, arg: number) => any), logSfn: ((err: Error | null, arg: string) => any), callbackSfn: ((err: Error | null, arg: string) => any)): void
export declare function signNonce(privateKey: string, nonce: string): string function generateRootCa(): Array<string>
export declare function verifyNonce(publicCert: string, nonce: string, signature: string): boolean function generateClientCertificate(clientId: string, clientName: string, rootCa: string, rootCaPrivate: string): Array<string>
function verifyClientCertificate(clientCert: string, rootCa: string): boolean
function signNonce(privateKey: string, nonce: string): string
function verifyNonce(publicCert: string, nonce: string, signature: string): boolean
undefinedundefined

View File

@ -310,8 +310,11 @@ if (!nativeBinding) {
throw new Error(`Failed to load native binding`) throw new Error(`Failed to load native binding`)
} }
const { callAltThreadFunc, generateManifest, generateRootCa, generateClientCertificate, verifyClientCertificate, signNonce, verifyNonce } = nativeBinding const { hasBackendForPath, listFiles, readFile, callAltThreadFunc, generateManifest, generateRootCa, generateClientCertificate, verifyClientCertificate, signNonce, verifyNonce, } = nativeBinding
module.exports.hasBackendForPath = hasBackendForPath
module.exports.listFiles = listFiles
module.exports.readFile = readFile
module.exports.callAltThreadFunc = callAltThreadFunc module.exports.callAltThreadFunc = callAltThreadFunc
module.exports.generateManifest = generateManifest module.exports.generateManifest = generateManifest
module.exports.generateRootCa = generateRootCa module.exports.generateRootCa = generateRootCa
@ -319,3 +322,4 @@ module.exports.generateClientCertificate = generateClientCertificate
module.exports.verifyClientCertificate = verifyClientCertificate module.exports.verifyClientCertificate = verifyClientCertificate
module.exports.signNonce = signNonce module.exports.signNonce = signNonce
module.exports.verifyNonce = verifyNonce module.exports.verifyNonce = verifyNonce
module.exports.undefined = undefined

View File

@ -1,6 +1,6 @@
{ {
"name": "@drop-oss/droplet", "name": "@drop-oss/droplet",
"version": "0.7.2", "version": "1.2.0",
"main": "index.js", "main": "index.js",
"types": "index.d.ts", "types": "index.d.ts",
"napi": { "napi": {
@ -21,7 +21,7 @@
}, },
"license": "MIT", "license": "MIT",
"devDependencies": { "devDependencies": {
"@napi-rs/cli": "^2.18.4", "@napi-rs/cli": "2.18.4",
"@types/node": "^22.13.10", "@types/node": "^22.13.10",
"ava": "^6.2.0" "ava": "^6.2.0"
}, },
@ -42,6 +42,6 @@
}, },
"packageManager": "yarn@4.7.0", "packageManager": "yarn@4.7.0",
"repository": { "repository": {
"url": "https://github.com/Drop-OSS/droplet" "url": "git+https://github.com/Drop-OSS/droplet.git"
} }
} }

View File

@ -1,6 +1,19 @@
#[cfg(unix)]
use std::os::unix::fs::PermissionsExt;
use std::{ use std::{
fs::{self, metadata}, fs::{self, metadata, File},
io::{self, BufReader, ErrorKind, Read},
path::{Path, PathBuf}, path::{Path, PathBuf},
task::Poll,
};
use napi::{
bindgen_prelude::*,
tokio_stream::{Stream, StreamExt},
};
use tokio_util::{
bytes::BytesMut,
codec::{BytesCodec, FramedRead},
}; };
fn _list_files(vec: &mut Vec<PathBuf>, path: &Path) { fn _list_files(vec: &mut Vec<PathBuf>, path: &Path) {
@ -17,8 +30,136 @@ fn _list_files(vec: &mut Vec<PathBuf>, path: &Path) {
} }
} }
pub fn list_files(path: &Path) -> Vec<PathBuf> { pub struct VersionFile {
let mut vec = Vec::new(); pub relative_filename: String,
_list_files(&mut vec, path); pub permission: u32,
vec }
pub trait VersionBackend: 'static {
fn list_files(&self, path: &Path) -> Vec<VersionFile>;
fn reader(&self, file: &VersionFile) -> Option<File>;
}
pub struct PathVersionBackend {
pub base_dir: PathBuf,
}
impl VersionBackend for PathVersionBackend {
fn list_files(&self, path: &Path) -> Vec<VersionFile> {
let mut vec = Vec::new();
_list_files(&mut vec, path);
let mut results = Vec::new();
for pathbuf in vec.iter() {
let file = File::open(pathbuf.clone()).unwrap();
let relative = pathbuf.strip_prefix(path).unwrap();
let metadata = file.try_clone().unwrap().metadata().unwrap();
let permission_object = metadata.permissions();
let permissions = {
let perm: u32;
#[cfg(target_family = "unix")]
{
perm = permission_object.mode();
}
#[cfg(not(target_family = "unix"))]
{
perm = 0
}
perm
};
results.push(VersionFile {
relative_filename: relative.to_string_lossy().to_string(),
permission: permissions,
});
}
results
}
fn reader(&self, file: &VersionFile) -> Option<File> {
let file = File::open(self.base_dir.join(file.relative_filename.clone())).ok()?;
return Some(file);
}
}
// Todo implementation for archives
// Split into a separate impl for each type of archive
pub struct ArchiveVersionBackend {}
impl VersionBackend for ArchiveVersionBackend {
fn list_files(&self, path: &Path) -> Vec<VersionFile> {
todo!()
}
fn reader(&self, file: &VersionFile) -> Option<File> {
todo!()
}
}
pub fn create_backend_for_path(path: &Path) -> Option<Box<(dyn VersionBackend)>> {
let is_directory = path.is_dir();
if is_directory {
return Some(Box::new(PathVersionBackend {
base_dir: path.to_path_buf(),
}));
};
/*
Insert checks for whatever backend you like
*/
None
}
#[napi]
pub fn has_backend_for_path(path: String) -> bool {
let path = Path::new(&path);
let has_backend = create_backend_for_path(path).is_some();
has_backend
}
#[napi]
pub fn list_files(path: String) -> Vec<String> {
let path = Path::new(&path);
let backend = create_backend_for_path(path).unwrap();
let files = backend.list_files(path);
files.into_iter().map(|e| e.relative_filename).collect()
}
#[napi]
pub fn read_file(
path: String,
sub_path: String,
env: &Env,
) -> Option<ReadableStream<'static, BufferSlice<'static>>> {
let path = Path::new(&path);
let backend = create_backend_for_path(path).unwrap();
let version_file = VersionFile {
relative_filename: sub_path,
permission: 0, // Shouldn't matter
};
// Use `?` operator for cleaner error propagation from `Option`
let reader = backend.reader(&version_file)?;
// Convert std::fs::File to tokio::fs::File for async operations
let reader = tokio::fs::File::from_std(reader);
// Create a FramedRead stream with BytesCodec for chunking
let stream = FramedRead::new(reader, BytesCodec::new())
// Use StreamExt::map to transform each Result item
.map(|result_item| {
result_item
// Apply Result::map to transform Ok(BytesMut) to Ok(Vec<u8>)
.map(|bytes| bytes.to_vec())
// Apply Result::map_err to transform Err(std::io::Error) to Err(napi::Error)
.map_err(|e| napi::Error::from(e)) // napi::Error implements From<tokio::io::Error>
});
// Create the napi-rs ReadableStream from the tokio_stream::Stream
// The unwrap() here means if stream creation fails, it will panic.
// For a production system, consider returning Result<Option<...>> and handling this.
Some(ReadableStream::create_with_stream_bytes(env, stream).unwrap())
} }

View File

@ -1,22 +1,19 @@
use std::{ use std::{
collections::HashMap, collections::HashMap, fs::File, io::{BufRead, BufReader}, path::Path, rc::Rc, sync::Arc, thread
fs::File,
io::{BufRead, BufReader},
path::Path,
thread,
}; };
#[cfg(unix)] #[cfg(unix)]
use std::os::unix::fs::PermissionsExt; use std::os::unix::fs::PermissionsExt;
use napi::{ use napi::{
threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFunctionCallMode}, bindgen_prelude::Function,
Error, JsFunction, threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode},
Env, Error, Result,
}; };
use serde_json::json; use serde_json::json;
use uuid::Uuid; use uuid::Uuid;
use crate::file_utils::list_files; use crate::file_utils::create_backend_for_path;
const CHUNK_SIZE: usize = 1024 * 1024 * 64; const CHUNK_SIZE: usize = 1024 * 1024 * 64;
@ -29,14 +26,10 @@ struct ChunkData {
} }
#[napi] #[napi]
pub fn call_alt_thread_func(callback: JsFunction) -> Result<(), Error> { pub fn call_alt_thread_func(tsfn: Arc<ThreadsafeFunction<()>>) -> Result<(), String> {
let tsfn: ThreadsafeFunction<u32, ErrorStrategy::CalleeHandled> = callback let tsfn_cloned = tsfn.clone();
.create_threadsafe_function(0, |ctx| {
ctx.env.create_uint32(ctx.value + 1).map(|v| vec![v])
})?;
let tsfn = tsfn.clone();
thread::spawn(move || { thread::spawn(move || {
tsfn.call(Ok(0), ThreadsafeFunctionCallMode::NonBlocking); tsfn_cloned.call(Ok(()), ThreadsafeFunctionCallMode::Blocking);
}); });
Ok(()) Ok(())
} }
@ -44,27 +37,14 @@ pub fn call_alt_thread_func(callback: JsFunction) -> Result<(), Error> {
#[napi] #[napi]
pub fn generate_manifest( pub fn generate_manifest(
dir: String, dir: String,
progress: JsFunction, progress_sfn: ThreadsafeFunction<i32>,
log: JsFunction, log_sfn: ThreadsafeFunction<String>,
callback: JsFunction, callback_sfn: ThreadsafeFunction<String>,
) -> Result<(), Error> { ) -> Result<(), String> {
let progress_sfn: ThreadsafeFunction<i32, ErrorStrategy::CalleeHandled> = progress
.create_threadsafe_function(0, |ctx| ctx.env.create_int32(ctx.value).map(|v| vec![v]))
.unwrap();
let log_sfn: ThreadsafeFunction<String, ErrorStrategy::CalleeHandled> = log
.create_threadsafe_function(0, |ctx| {
ctx.env.create_string_from_std(ctx.value).map(|v| vec![v])
})
.unwrap();
let callback_sfn: ThreadsafeFunction<String, ErrorStrategy::CalleeHandled> = callback
.create_threadsafe_function(0, |ctx| {
ctx.env.create_string_from_std(ctx.value).map(|v| vec![v])
})
.unwrap();
thread::spawn(move || { thread::spawn(move || {
let base_dir = Path::new(&dir); let base_dir = Path::new(&dir);
let files = list_files(base_dir); let backend = create_backend_for_path(base_dir).unwrap();
let files = backend.list_files(base_dir);
// Filepath to chunk data // Filepath to chunk data
let mut chunks: HashMap<String, ChunkData> = HashMap::new(); let mut chunks: HashMap<String, ChunkData> = HashMap::new();
@ -72,27 +52,12 @@ pub fn generate_manifest(
let total: i32 = files.len() as i32; let total: i32 = files.len() as i32;
let mut i: i32 = 0; let mut i: i32 = 0;
for file_path in files { for version_file in files {
let file = File::open(file_path.clone()).unwrap(); let mut raw_reader= backend.reader(&version_file).unwrap();
let relative = file_path.strip_prefix(base_dir).unwrap(); let mut reader = BufReader::with_capacity(CHUNK_SIZE, raw_reader);
let permission_object = file.try_clone().unwrap().metadata().unwrap().permissions();
let permissions = {
let perm: u32;
#[cfg(target_family = "unix")]
{
perm = permission_object.mode();
}
#[cfg(not(target_family = "unix"))]
{
perm = 0
}
perm
};
let mut reader = BufReader::with_capacity(CHUNK_SIZE, file);
let mut chunk_data = ChunkData { let mut chunk_data = ChunkData {
permissions, permissions: version_file.permission,
ids: Vec::new(), ids: Vec::new(),
checksums: Vec::new(), checksums: Vec::new(),
lengths: Vec::new(), lengths: Vec::new(),
@ -118,8 +83,7 @@ pub fn generate_manifest(
let log_str = format!( let log_str = format!(
"Processed chunk {} for {}", "Processed chunk {} for {}",
chunk_index, chunk_index, &version_file.relative_filename
relative.to_str().unwrap()
); );
log_sfn.call(Ok(log_str), ThreadsafeFunctionCallMode::Blocking); log_sfn.call(Ok(log_str), ThreadsafeFunctionCallMode::Blocking);
@ -127,7 +91,7 @@ pub fn generate_manifest(
chunk_index += 1; chunk_index += 1;
} }
chunks.insert(relative.to_str().unwrap().to_string(), chunk_data); chunks.insert(version_file.relative_filename, chunk_data);
i += 1; i += 1;
let progress = i * 100 / total; let progress = i * 100 / total;

View File

@ -9,7 +9,7 @@ __metadata:
version: 0.0.0-use.local version: 0.0.0-use.local
resolution: "@drop-oss/droplet@workspace:." resolution: "@drop-oss/droplet@workspace:."
dependencies: dependencies:
"@napi-rs/cli": "npm:^2.18.4" "@napi-rs/cli": "npm:2.18.4"
"@types/node": "npm:^22.13.10" "@types/node": "npm:^22.13.10"
ava: "npm:^6.2.0" ava: "npm:^6.2.0"
languageName: unknown languageName: unknown
@ -55,7 +55,7 @@ __metadata:
languageName: node languageName: node
linkType: hard linkType: hard
"@napi-rs/cli@npm:^2.18.4": "@napi-rs/cli@npm:2.18.4":
version: 2.18.4 version: 2.18.4
resolution: "@napi-rs/cli@npm:2.18.4" resolution: "@napi-rs/cli@npm:2.18.4"
bin: bin: