diff --git a/Cargo.toml b/Cargo.toml index a868a77..cb316ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,8 @@ serde = "1.0.210" ciborium = "0.2.2" time = "0.3.36" hex = "0.4.3" +gxhash = "=2.3.0" +serde_json = "1.0.128" [dependencies.openssl] version = "0.10.66" diff --git a/src/chunker.rs b/src/chunker.rs deleted file mode 100644 index 58667ce..0000000 --- a/src/chunker.rs +++ /dev/null @@ -1,109 +0,0 @@ -use std::{collections::HashMap, fs::File, io::{self, BufRead, BufReader}, path::Path, sync::{Arc, Mutex}}; -use napi::Error; - -#[cfg(unix)] -use std::os::unix::fs::PermissionsExt; - -use uuid::Uuid; - -use crate::{file_utils::list_files, manifest::{generate_manifest, Manifest, ManifestChunk, ManifestRecord}}; - -const CHUNK_SIZE: usize = 1024 * 1024 * 64; - - -fn compress(mut buffer: &[u8], output_path: &Path, chunk_id: Uuid) { - let chunk_path = output_path.join(chunk_id.to_string() + ".bin"); - let mut chunk_file = File::create_new(chunk_path).unwrap(); - - io::copy(&mut buffer, &mut chunk_file).unwrap(); -} - -#[napi] -pub async fn chunk(source: String, output: String) -> Result<(), Error> { - let source_path = Path::new(&source); - let output_path = Path::new(&output); - - let files = list_files(source_path); - - let num_of_threads: u64 = 8; - - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(num_of_threads.try_into().unwrap()) - .build() - .unwrap(); - - let queue_size = Arc::new(Mutex::new(0)); - - pool.scope(|scope| { - let mut manifest = Manifest { - record: HashMap::new(), - }; - - for file_path in files { - let file = File::open(file_path.clone()).unwrap(); - let permissions = file.try_clone().unwrap().metadata().unwrap().permissions(); - let mut reader = BufReader::with_capacity(CHUNK_SIZE, file); - let relative = file_path.strip_prefix(source_path).unwrap(); - - let mut record = ManifestRecord { - chunks: Vec::new(), - permissions: 0, - }; - #[cfg(unix)] - { - record.permissions = permissions.mode(); - } - - let mut chunk_index = 0; - loop { - let mut buffer: Vec = Vec::new(); - reader.fill_buf().unwrap().clone_into(&mut buffer); - let length = buffer.len(); - - if length == 0 { - break; - } - - { - *queue_size.lock().unwrap() += 1; - } - - let chunk_id: Uuid = Uuid::new_v4(); - - let queue_size_handle = queue_size.clone(); - scope.spawn(move |_scope| { - compress(&buffer, output_path, chunk_id); - let mut num = queue_size_handle.lock().unwrap(); - *num -= 1; - }); - - reader.consume(length); - - let chunk_record = ManifestChunk { - uuid: chunk_id.to_string(), - index: chunk_index, - }; - record.chunks.push(chunk_record); - chunk_index += 1; - - loop { - let num = queue_size.lock().unwrap(); - if *num < num_of_threads { - break; - } - } - } - - manifest - .record - .insert(relative.to_str().unwrap().to_string(), record); - - println!("Queued {}", file_path.to_str().unwrap()); - } - - let manifest_path = output_path.join("manifest.drop"); - generate_manifest(manifest, &manifest_path); - }); - - return Ok(()); -} diff --git a/src/file_utils.rs b/src/file_utils.rs index bc0b4fa..ab06369 100644 --- a/src/file_utils.rs +++ b/src/file_utils.rs @@ -1,6 +1,6 @@ use std::{fs::{self, metadata}, path::{Path, PathBuf}}; -pub fn _list_files(vec: &mut Vec, path: &Path) { +fn _list_files(vec: &mut Vec, path: &Path) { if metadata(&path).unwrap().is_dir() { let paths = fs::read_dir(&path).unwrap(); for path_result in paths { diff --git a/src/lib.rs b/src/lib.rs index b1b559c..6e2bac6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,6 @@ pub mod file_utils; pub mod manifest; -pub mod chunker; pub mod ssl; #[macro_use] diff --git a/src/manifest.rs b/src/manifest.rs index 95e134d..1a46b21 100644 --- a/src/manifest.rs +++ b/src/manifest.rs @@ -1,25 +1,83 @@ -use std::{collections::HashMap, fs::File, path::Path}; +use std::{ + fs::File, + io::{BufRead, BufReader}, + path::Path, +}; -use ciborium::into_writer; +#[cfg(unix)] +use std::os::unix::fs::PermissionsExt; -#[derive(serde::Serialize)] -pub struct ManifestChunk { - pub uuid: String, - pub index: i64, +use gxhash::gxhash128; +use napi::Error; +use serde::Serialize; +use serde_json::json; +use uuid::Uuid; + +use crate::file_utils::list_files; + +const CHUNK_SIZE: usize = 1024 * 1024 * 16; + +#[derive(Serialize)] +struct Chunk { + id: String, + permissions: u32, + file_name: String, + chunk_index: u32, + checksum: String, } -#[derive(serde::Serialize)] -pub struct ManifestRecord { - pub chunks: Vec, - pub permissions: u32, -} +#[napi] +pub fn generate_manifest(dir: String) -> Result { + let base_dir = Path::new(&dir); + let files = list_files(base_dir); -#[derive(serde::Serialize)] -pub struct Manifest { - pub record: HashMap, -} + let mut chunks: Vec = Vec::new(); -pub fn generate_manifest(manifest: Manifest, path: &Path) { - let file = File::create(path).unwrap(); - into_writer(&manifest, file).unwrap(); + for file_path in files { + let file = File::open(file_path.clone()).unwrap(); + let relative = file_path.strip_prefix(base_dir).unwrap(); + let permission_object = file.try_clone().unwrap().metadata().unwrap().permissions(); + let permissions = { + let mut perm = 0; + #[cfg(unix)] + { + perm = permission_object.mode(); + } + perm + }; + + let mut reader = BufReader::with_capacity(CHUNK_SIZE, file); + + let mut chunk_index = 0; + loop { + let mut buffer: Vec = Vec::new(); + reader.fill_buf().unwrap().clone_into(&mut buffer); + let length = buffer.len(); + + if length == 0 { + break; + } + + let chunk_id = Uuid::new_v4(); + let checksum = gxhash128(&buffer, 0); + let checksum_string = hex::encode(checksum.to_le_bytes()); + + let chunk = Chunk { + id: chunk_id.to_string(), + chunk_index: chunk_index, + permissions: permissions, + file_name: relative.to_str().unwrap().to_string(), + checksum: checksum_string, + }; + + chunks.push(chunk); + + println!("Processed chunk {} for {}", chunk_index, relative.to_str().unwrap()); + reader.consume(length); + chunk_index += 1; + + } + } + + Ok(json!(chunks).to_string()) }