new manifest generation with checksums for Drop

This commit is contained in:
DecDuck
2024-10-10 14:37:49 +11:00
parent 3ed9fe282f
commit 8af08460b0
5 changed files with 79 additions and 129 deletions

View File

@ -16,6 +16,8 @@ serde = "1.0.210"
ciborium = "0.2.2"
time = "0.3.36"
hex = "0.4.3"
gxhash = "=2.3.0"
serde_json = "1.0.128"
[dependencies.openssl]
version = "0.10.66"

View File

@ -1,109 +0,0 @@
use std::{collections::HashMap, fs::File, io::{self, BufRead, BufReader}, path::Path, sync::{Arc, Mutex}};
use napi::Error;
#[cfg(unix)]
use std::os::unix::fs::PermissionsExt;
use uuid::Uuid;
use crate::{file_utils::list_files, manifest::{generate_manifest, Manifest, ManifestChunk, ManifestRecord}};
const CHUNK_SIZE: usize = 1024 * 1024 * 64;
fn compress(mut buffer: &[u8], output_path: &Path, chunk_id: Uuid) {
let chunk_path = output_path.join(chunk_id.to_string() + ".bin");
let mut chunk_file = File::create_new(chunk_path).unwrap();
io::copy(&mut buffer, &mut chunk_file).unwrap();
}
#[napi]
pub async fn chunk(source: String, output: String) -> Result<(), Error> {
let source_path = Path::new(&source);
let output_path = Path::new(&output);
let files = list_files(source_path);
let num_of_threads: u64 = 8;
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(num_of_threads.try_into().unwrap())
.build()
.unwrap();
let queue_size = Arc::new(Mutex::new(0));
pool.scope(|scope| {
let mut manifest = Manifest {
record: HashMap::new(),
};
for file_path in files {
let file = File::open(file_path.clone()).unwrap();
let permissions = file.try_clone().unwrap().metadata().unwrap().permissions();
let mut reader = BufReader::with_capacity(CHUNK_SIZE, file);
let relative = file_path.strip_prefix(source_path).unwrap();
let mut record = ManifestRecord {
chunks: Vec::new(),
permissions: 0,
};
#[cfg(unix)]
{
record.permissions = permissions.mode();
}
let mut chunk_index = 0;
loop {
let mut buffer: Vec<u8> = Vec::new();
reader.fill_buf().unwrap().clone_into(&mut buffer);
let length = buffer.len();
if length == 0 {
break;
}
{
*queue_size.lock().unwrap() += 1;
}
let chunk_id: Uuid = Uuid::new_v4();
let queue_size_handle = queue_size.clone();
scope.spawn(move |_scope| {
compress(&buffer, output_path, chunk_id);
let mut num = queue_size_handle.lock().unwrap();
*num -= 1;
});
reader.consume(length);
let chunk_record = ManifestChunk {
uuid: chunk_id.to_string(),
index: chunk_index,
};
record.chunks.push(chunk_record);
chunk_index += 1;
loop {
let num = queue_size.lock().unwrap();
if *num < num_of_threads {
break;
}
}
}
manifest
.record
.insert(relative.to_str().unwrap().to_string(), record);
println!("Queued {}", file_path.to_str().unwrap());
}
let manifest_path = output_path.join("manifest.drop");
generate_manifest(manifest, &manifest_path);
});
return Ok(());
}

View File

@ -1,6 +1,6 @@
use std::{fs::{self, metadata}, path::{Path, PathBuf}};
pub fn _list_files(vec: &mut Vec<PathBuf>, path: &Path) {
fn _list_files(vec: &mut Vec<PathBuf>, path: &Path) {
if metadata(&path).unwrap().is_dir() {
let paths = fs::read_dir(&path).unwrap();
for path_result in paths {

View File

@ -2,7 +2,6 @@
pub mod file_utils;
pub mod manifest;
pub mod chunker;
pub mod ssl;
#[macro_use]

View File

@ -1,25 +1,83 @@
use std::{collections::HashMap, fs::File, path::Path};
use std::{
fs::File,
io::{BufRead, BufReader},
path::Path,
};
use ciborium::into_writer;
#[cfg(unix)]
use std::os::unix::fs::PermissionsExt;
#[derive(serde::Serialize)]
pub struct ManifestChunk {
pub uuid: String,
pub index: i64,
use gxhash::gxhash128;
use napi::Error;
use serde::Serialize;
use serde_json::json;
use uuid::Uuid;
use crate::file_utils::list_files;
const CHUNK_SIZE: usize = 1024 * 1024 * 16;
#[derive(Serialize)]
struct Chunk {
id: String,
permissions: u32,
file_name: String,
chunk_index: u32,
checksum: String,
}
#[derive(serde::Serialize)]
pub struct ManifestRecord {
pub chunks: Vec<ManifestChunk>,
pub permissions: u32,
}
#[napi]
pub fn generate_manifest(dir: String) -> Result<String, Error> {
let base_dir = Path::new(&dir);
let files = list_files(base_dir);
#[derive(serde::Serialize)]
pub struct Manifest {
pub record: HashMap<String, ManifestRecord>,
}
let mut chunks: Vec<Chunk> = Vec::new();
pub fn generate_manifest(manifest: Manifest, path: &Path) {
let file = File::create(path).unwrap();
into_writer(&manifest, file).unwrap();
for file_path in files {
let file = File::open(file_path.clone()).unwrap();
let relative = file_path.strip_prefix(base_dir).unwrap();
let permission_object = file.try_clone().unwrap().metadata().unwrap().permissions();
let permissions = {
let mut perm = 0;
#[cfg(unix)]
{
perm = permission_object.mode();
}
perm
};
let mut reader = BufReader::with_capacity(CHUNK_SIZE, file);
let mut chunk_index = 0;
loop {
let mut buffer: Vec<u8> = Vec::new();
reader.fill_buf().unwrap().clone_into(&mut buffer);
let length = buffer.len();
if length == 0 {
break;
}
let chunk_id = Uuid::new_v4();
let checksum = gxhash128(&buffer, 0);
let checksum_string = hex::encode(checksum.to_le_bytes());
let chunk = Chunk {
id: chunk_id.to_string(),
chunk_index: chunk_index,
permissions: permissions,
file_name: relative.to_str().unwrap().to_string(),
checksum: checksum_string,
};
chunks.push(chunk);
println!("Processed chunk {} for {}", chunk_index, relative.to_str().unwrap());
reader.consume(length);
chunk_index += 1;
}
}
Ok(json!(chunks).to_string())
}