Ran cargo fmt

Signed-off-by: quexeky <git@quexeky.dev>
This commit is contained in:
quexeky
2024-11-04 18:50:25 +11:00
parent 813f0c09fa
commit 7213416421
10 changed files with 113 additions and 81 deletions

View File

@ -86,4 +86,4 @@ impl DatabaseImpls for DatabaseInterface {
let handle = self.borrow_data().unwrap();
Url::parse(&handle.base_url).unwrap()
}
}
}

View File

@ -7,11 +7,11 @@ use crate::{AppState, DB};
use log::info;
use rustix::fs::{fallocate, FallocateFlags};
use serde::{Deserialize, Serialize};
use urlencoding::encode;
use std::fs::{create_dir_all, File};
use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::{Arc, Mutex};
use urlencoding::encode;
pub struct GameDownloadAgent {
pub id: String,
@ -20,7 +20,7 @@ pub struct GameDownloadAgent {
contexts: Mutex<Vec<DropDownloadContext>>,
progress: ProgressChecker<DropDownloadContext>,
pub manifest: Mutex<Option<DropManifest>>,
pub callback: Arc<AtomicBool>
pub callback: Arc<AtomicBool>,
}
#[derive(Serialize, Deserialize, Clone, Eq, PartialEq)]
pub enum GameDownloadState {
@ -58,7 +58,7 @@ impl GameDownloadAgent {
progress: ProgressChecker::new(
Box::new(download_logic::download_game_chunk),
Arc::new(AtomicUsize::new(0)),
callback
callback,
),
contexts: Mutex::new(Vec::new()),
}
@ -77,8 +77,7 @@ impl GameDownloadAgent {
// It's not necessary, I just can't figure out to make the borrow checker happy
{
let lock = self.contexts.lock().unwrap().to_vec();
self.progress
.run_context_parallel(lock, max_threads);
self.progress.run_context_parallel(lock, max_threads);
}
Ok(())
}
@ -97,7 +96,8 @@ impl GameDownloadAgent {
.join(
format!(
"/api/v1/client/metadata/manifest?id={}&version={}",
self.id, encode(&self.version)
self.id,
encode(&self.version)
)
.as_str(),
)
@ -164,7 +164,7 @@ impl GameDownloadAgent {
index: i,
game_id: game_id.to_string(),
path: path.clone(),
checksum: chunk.checksums[i].clone()
checksum: chunk.checksums[i].clone(),
});
running_offset += *length as u64;
}
@ -183,4 +183,3 @@ impl GameDownloadAgent {
Ok(())
}
}

View File

@ -1,4 +1,7 @@
use std::{sync::{atomic::Ordering, Arc, Mutex}, thread};
use std::{
sync::{atomic::Ordering, Arc, Mutex},
thread,
};
use log::info;
@ -13,7 +16,10 @@ pub async fn queue_game_download(
state: tauri::State<'_, Mutex<AppState>>,
) -> Result<(), GameDownloadError> {
info!("Queuing Game Download");
let download_agent = Arc::new(GameDownloadAgent::new(game_id.clone(), game_version.clone()));
let download_agent = Arc::new(GameDownloadAgent::new(
game_id.clone(),
game_version.clone(),
));
download_agent.queue().await?;
let mut queue = state.lock().unwrap();
@ -30,42 +36,39 @@ pub async fn start_game_downloads(
let lock = state.lock().unwrap();
let mut game_downloads = lock.game_downloads.clone();
drop(lock);
thread::spawn(move || {
loop {
let mut current_id = String::new();
let mut download_agent = None;
{
for (id, agent) in &game_downloads {
if agent.get_state() == GameDownloadState::Queued {
download_agent = Some(agent.clone());
current_id = id.clone();
info!("Got queued game to download");
break;
}
thread::spawn(move || loop {
let mut current_id = String::new();
let mut download_agent = None;
{
for (id, agent) in &game_downloads {
if agent.get_state() == GameDownloadState::Queued {
download_agent = Some(agent.clone());
current_id = id.clone();
info!("Got queued game to download");
break;
}
if download_agent.is_none() {
info!("No more games left to download");
return;
}
};
info!("Downloading game");
{
start_game_download(max_threads, download_agent.unwrap()).unwrap();
game_downloads.remove_entry(&current_id);
}
}
if download_agent.is_none() {
info!("No more games left to download");
return;
}
};
info!("Downloading game");
{
start_game_download(max_threads, download_agent.unwrap()).unwrap();
game_downloads.remove_entry(&current_id);
}
});
info!("Spawned download");
return Ok(())
return Ok(());
}
pub fn start_game_download(
max_threads: usize,
download_agent: Arc<GameDownloadAgent>
download_agent: Arc<GameDownloadAgent>,
) -> Result<(), GameDownloadError> {
info!("Triggered Game Download");
download_agent.ensure_manifest_exists()?;
let local_manifest = {
@ -73,7 +76,13 @@ pub fn start_game_download(
(*manifest).clone().unwrap()
};
download_agent.generate_job_contexts(&local_manifest, download_agent.version.clone(), download_agent.id.clone()).unwrap();
download_agent
.generate_job_contexts(
&local_manifest,
download_agent.version.clone(),
download_agent.id.clone(),
)
.unwrap();
download_agent.begin_download(max_threads).unwrap();
@ -81,7 +90,10 @@ pub fn start_game_download(
}
#[tauri::command]
pub async fn stop_specific_game_download(state: tauri::State<'_, Mutex<AppState>>, game_id: String) -> Result<(), String> {
pub async fn stop_specific_game_download(
state: tauri::State<'_, Mutex<AppState>>,
game_id: String,
) -> Result<(), String> {
info!("called stop_specific_game_download");
let lock = state.lock().unwrap();
let download_agent = lock.game_downloads.get(&game_id).unwrap();
@ -92,5 +104,5 @@ pub async fn stop_specific_game_download(state: tauri::State<'_, Mutex<AppState>
info!("Stopping callback");
callback.store(true, Ordering::Release);
return Ok(())
}
return Ok(());
}

View File

@ -6,20 +6,29 @@ use gxhash::{gxhash128, GxHasher};
use log::info;
use md5::{Context, Digest};
use reqwest::blocking::Response;
use std::{fs::{File, OpenOptions}, hash::Hasher, io::{self, BufReader, BufWriter, Error, ErrorKind, Read, Seek, SeekFrom, Write}, path::PathBuf, sync::{atomic::{AtomicBool, Ordering}, Arc}};
use std::{
fs::{File, OpenOptions},
hash::Hasher,
io::{self, BufReader, BufWriter, Error, ErrorKind, Read, Seek, SeekFrom, Write},
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use urlencoding::encode;
pub struct DropFileWriter {
file: File,
hasher: Context,
callback: Arc<AtomicBool>
callback: Arc<AtomicBool>,
}
impl DropFileWriter {
fn new(path: PathBuf, callback: Arc<AtomicBool>) -> Self {
Self {
file: OpenOptions::new().write(true).open(path).unwrap(),
hasher: Context::new(),
callback
callback,
}
}
fn finish(mut self) -> io::Result<Digest> {
@ -31,9 +40,12 @@ impl DropFileWriter {
impl Write for DropFileWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
if self.callback.load(Ordering::Acquire) {
return Err(Error::new(ErrorKind::ConnectionAborted, "Interrupt command recieved"));
return Err(Error::new(
ErrorKind::ConnectionAborted,
"Interrupt command recieved",
));
}
//info!("Writing data to writer");
self.hasher.write_all(buf).unwrap();
self.file.write(buf)
@ -79,25 +91,29 @@ pub fn download_game_chunk(ctx: DropDownloadContext, callback: Arc<AtomicBool>)
let mut file: DropFileWriter = DropFileWriter::new(ctx.path, callback);
if ctx.offset != 0 {
file
.seek(SeekFrom::Start(ctx.offset))
file.seek(SeekFrom::Start(ctx.offset))
.expect("Failed to seek to file offset");
}
// Writing everything to disk directly is probably slightly faster because it balances out the writes,
// Writing everything to disk directly is probably slightly faster because it balances out the writes,
// but this is better than the performance loss from constantly reading the callbacks
//let mut writer = BufWriter::with_capacity(1024 * 1024, file);
//copy_to_drop_file_writer(&mut response, &mut file);
match io::copy(&mut response, &mut file) {
Ok(_) => {},
Err(e) => { info!("Copy errored with error {}", e)},
Ok(_) => {}
Err(e) => {
info!("Copy errored with error {}", e)
}
}
let res = hex::encode(file.finish().unwrap().0);
if res != ctx.checksum {
info!("Checksum failed. Original: {}, Calculated: {} for {}", ctx.checksum, res, ctx.file_name);
if res != ctx.checksum {
info!(
"Checksum failed. Original: {}, Calculated: {} for {}",
ctx.checksum, res, ctx.file_name
);
}
// stream.flush().unwrap();
@ -109,16 +125,17 @@ pub fn copy_to_drop_file_writer(response: &mut Response, writer: &mut DropFileWr
let mut buf = [0u8; 1024];
response.read(&mut buf).unwrap();
match writer.write_all(&buf) {
Ok(_) => {},
Err(e) => {
match e.kind() {
ErrorKind::Interrupted => {
info!("Interrupted");
return;
}
_ => { println!("{}", e); return;}
Ok(_) => {}
Err(e) => match e.kind() {
ErrorKind::Interrupted => {
info!("Interrupted");
return;
}
_ => {
println!("{}", e);
return;
}
},
}
}
}
}

View File

@ -21,5 +21,5 @@ pub struct DropDownloadContext {
pub offset: u64,
pub game_id: String,
pub path: PathBuf,
pub checksum: String
pub checksum: String,
}

View File

@ -1,5 +1,5 @@
pub mod download_agent;
pub mod download_commands;
mod download_logic;
mod manifest;
pub mod progress;
pub mod download_agent;
mod download_logic;
pub mod download_commands;

View File

@ -9,7 +9,7 @@ where
{
counter: Arc<AtomicUsize>,
f: Arc<Box<dyn Fn(T, Arc<AtomicBool>) + Send + Sync + 'static>>,
callback: Arc<AtomicBool>
callback: Arc<AtomicBool>,
}
impl<T> ProgressChecker<T>
@ -19,12 +19,12 @@ where
pub fn new(
f: Box<dyn Fn(T, Arc<AtomicBool>) + Send + Sync + 'static>,
counter_reference: Arc<AtomicUsize>,
callback: Arc<AtomicBool>
callback: Arc<AtomicBool>,
) -> Self {
Self {
f: f.into(),
counter: counter_reference,
callback
callback,
}
}
pub fn run_contexts_sequentially(&self, contexts: Vec<T>) {
@ -57,11 +57,13 @@ where
for context in contexts {
let callback = self.callback.clone();
let f = self.f.clone();
s.spawn(move |_| {info!("Running thread"); f(context, callback)});
s.spawn(move |_| {
info!("Running thread");
f(context, callback)
});
}
});
info!("Concluded scope");
}
pub fn get_progress(&self) -> usize {
self.counter.load(Ordering::Relaxed)

View File

@ -1,28 +1,30 @@
mod auth;
mod db;
mod downloads;
mod library;
mod remote;
mod downloads;
#[cfg(test)]
mod tests;
use crate::db::DatabaseImpls;
use crate::downloads::download_agent::GameDownloadAgent;
use auth::{auth_initiate, generate_authorization_header, recieve_handshake};
use db::{DatabaseInterface, DATA_ROOT_DIR};
use downloads::download_commands::{queue_game_download, start_game_downloads, stop_specific_game_download};
use downloads::download_commands::{
queue_game_download, start_game_downloads, stop_specific_game_download,
};
use env_logger::Env;
use http::{header::*, response::Builder as ResponseBuilder};
use library::{fetch_game, fetch_library, Game};
use log::info;
use remote::{gen_drop_url, use_remote};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::{
collections::HashMap,
sync::{LazyLock, Mutex},
};
use std::sync::Arc;
use tauri_plugin_deep_link::DeepLinkExt;
use crate::downloads::download_agent::{GameDownloadAgent};
#[derive(Clone, Copy, Serialize)]
pub enum AppStatus {
@ -49,9 +51,9 @@ pub struct AppState {
status: AppStatus,
user: Option<User>,
games: HashMap<String, Game>,
#[serde(skip_serializing)]
game_downloads: HashMap<String, Arc<GameDownloadAgent>>
game_downloads: HashMap<String, Arc<GameDownloadAgent>>,
}
#[tauri::command]

View File

@ -1 +1 @@
mod progress_tests;
mod progress_tests;

View File

@ -1,6 +1,6 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use crate::downloads::progress::ProgressChecker;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::Arc;
#[test]
fn test_progress_sequentially() {
@ -20,4 +20,4 @@ fn test_progress_parallel() {
fn test_fn(int: usize, callback: Arc<AtomicBool>) {
println!("{}", int);
}
}