From 6a38ea306bca409990276fdf58adfedc003b012e Mon Sep 17 00:00:00 2001 From: DecDuck Date: Sun, 10 Nov 2024 22:25:54 +1100 Subject: [PATCH] feat(downloads): reduce scope of download agent due to a miscommunication, the scope of the download agent has grown too much. this commit reduces that scopes, and intends for a lot of the heavy lifting to be done by the soon-to-be-implemented download manager. --- pages/settings.vue | 2 +- pages/store/index.vue | 136 ++--------- src-tauri/src/auth.rs | 1 - src-tauri/src/db.rs | 42 +++- src-tauri/src/downloads/download_agent.rs | 219 ++++++++++------- src-tauri/src/downloads/download_commands.rs | 141 +++-------- src-tauri/src/downloads/download_logic.rs | 241 ++++++++++--------- src-tauri/src/downloads/mod.rs | 3 +- src-tauri/src/downloads/progress.rs | 69 ------ src-tauri/src/lib.rs | 9 +- 10 files changed, 352 insertions(+), 511 deletions(-) delete mode 100644 src-tauri/src/downloads/progress.rs diff --git a/pages/settings.vue b/pages/settings.vue index b897920..35b9783 100644 --- a/pages/settings.vue +++ b/pages/settings.vue @@ -47,7 +47,7 @@ import { RectangleGroupIcon, } from "@heroicons/vue/16/solid"; import type { Component } from "vue"; -import type { NavigationItem } from "~/components/types"; +import type { NavigationItem } from "~/types"; const navigation: Array = [ { diff --git a/pages/store/index.vue b/pages/store/index.vue index 7b7665f..ee5f7b2 100644 --- a/pages/store/index.vue +++ b/pages/store/index.vue @@ -1,139 +1,39 @@ diff --git a/src-tauri/src/auth.rs b/src-tauri/src/auth.rs index 0c57c49..7619e8a 100644 --- a/src-tauri/src/auth.rs +++ b/src-tauri/src/auth.rs @@ -113,7 +113,6 @@ fn recieve_handshake_logic(app: &AppHandle, path: String) -> Result<(), RemoteAc let endpoint = base_url.join("/api/v1/client/auth/handshake")?; let client = reqwest::blocking::Client::new(); let response = client.post(endpoint).json(&body).send()?; - info!("server responded with {}", response.status()); let response_struct = response.json::()?; { diff --git a/src-tauri/src/db.rs b/src-tauri/src/db.rs index afa6a74..0858e1b 100644 --- a/src-tauri/src/db.rs +++ b/src-tauri/src/db.rs @@ -1,7 +1,9 @@ use std::{ + borrow::BorrowMut, collections::HashMap, + fmt::format, fs::{self, create_dir_all}, - path::PathBuf, + path::{Path, PathBuf}, sync::{LazyLock, Mutex}, }; @@ -9,8 +11,11 @@ use directories::BaseDirs; use log::info; use rustbreak::{deser::Bincode, PathDatabase}; use serde::{Deserialize, Serialize}; +use tokio::fs::metadata; use url::Url; +use crate::DB; + #[derive(serde::Serialize, Clone, Deserialize)] #[serde(rename_all = "camelCase")] pub struct DatabaseAuth { @@ -32,7 +37,7 @@ pub enum DatabaseGameStatus { #[derive(Serialize, Clone, Deserialize)] #[serde(rename_all = "camelCase")] pub struct DatabaseGames { - pub games_base_dir: String, + pub install_dirs: Vec, pub games_statuses: HashMap, } @@ -67,7 +72,7 @@ impl DatabaseImpls for DatabaseInterface { auth: None, base_url: "".to_string(), games: DatabaseGames { - games_base_dir: games_base_dir.to_str().unwrap().to_string(), + install_dirs: vec![games_base_dir.to_str().unwrap().to_string()], games_statuses: HashMap::new(), }, }; @@ -91,8 +96,31 @@ impl DatabaseImpls for DatabaseInterface { } #[tauri::command] -pub fn change_root_directory(new_dir: String) { - info!("Changed root directory to {}", new_dir); - let mut lock = DATA_ROOT_DIR.lock().unwrap(); - *lock = new_dir.into(); +pub fn add_new_download_dir(new_dir: String) -> Result<(), String> { + // Check the new directory is all good + let new_dir_path = Path::new(&new_dir); + if new_dir_path.exists() { + let metadata = new_dir_path + .metadata() + .map_err(|e| format!("Unable to access file or directory: {}", e.to_string()))?; + if metadata.is_dir() { + return Err("Invalid path: not a directory".to_string()); + } + let dir_contents = new_dir_path + .read_dir() + .map_err(|e| format!("Unable to check directory contents: {}", e.to_string()))?; + if dir_contents.count() == 0 { + return Err("Path is not empty".to_string()); + } + } else { + create_dir_all(new_dir_path) + .map_err(|e| format!("Unable to create directories to path: {}", e.to_string()))?; + } + + // Add it to the dictionary + let mut lock = DB.borrow_data_mut().unwrap(); + lock.games.install_dirs.push(new_dir); + drop(lock); + + Ok(()) } diff --git a/src-tauri/src/downloads/download_agent.rs b/src-tauri/src/downloads/download_agent.rs index e61a37a..80ede6b 100644 --- a/src-tauri/src/downloads/download_agent.rs +++ b/src-tauri/src/downloads/download_agent.rs @@ -1,96 +1,121 @@ use crate::auth::generate_authorization_header; -use crate::db::{DatabaseImpls, DATA_ROOT_DIR}; -use crate::downloads::download_logic; +use crate::db::DatabaseImpls; use crate::downloads::manifest::{DropDownloadContext, DropManifest}; -use crate::downloads::progress::ProgressChecker; +use crate::remote::RemoteAccessError; use crate::DB; -use atomic_counter::RelaxedCounter; use log::info; -use rustix::fs::{fallocate, FallocateFlags}; +use rayon::ThreadPoolBuilder; use serde::{Deserialize, Serialize}; +use std::fmt::{Display, Formatter}; use std::fs::{create_dir_all, File}; use std::path::Path; +use std::sync::atomic::AtomicU64; use std::sync::{Arc, Mutex, RwLock}; use urlencoding::encode; +#[cfg(target_os = "linux")] +use rustix::fs::{fallocate, FallocateFlags}; + +use super::download_logic::download_game_chunk; + pub struct GameDownloadAgent { pub id: String, pub version: String, - pub status: Arc>, + pub control_flag: Arc>, + pub target_download_dir: usize, contexts: Mutex>, - pub progress: ProgressChecker, + // pub progress: ProgressChecker, pub manifest: Mutex>, + pub progress: ProgressObject, } #[derive(Serialize, Deserialize, Clone, Eq, PartialEq)] -pub enum GameDownloadState { - Uninitialised, - Queued, - Paused, - Manifest, - Downloading, - Finished, - Stalled, - Failed, - Cancelled, +pub enum DownloadThreadControlFlag { + Go, + Stop, } -#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)] +#[derive(Debug)] pub enum GameDownloadError { - ManifestDownload, - FailedContextGeneration, - Status(u16), - System(SystemError), + CommunicationError(RemoteAccessError), + ChecksumError, + SetupError(String), + LockError, } -#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Debug)] -pub enum SystemError { - MutexLockFailed, + +impl Display for GameDownloadError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + GameDownloadError::CommunicationError(error) => write!(f, "{}", error), + GameDownloadError::SetupError(error) => write!(f, "{}", error), + GameDownloadError::LockError => write!(f, "Failed to acquire lock. Something has gone very wrong internally. Please restart the application"), + GameDownloadError::ChecksumError => write!(f, "Checksum failed to validate for download"), + } + } +} + +static DOWNLOAD_MAX_THREADS: usize = 4; + +pub struct ProgressObject { + pub max: u64, + pub current: Arc, } impl GameDownloadAgent { - pub fn new(id: String, version: String) -> Self { - let status = Arc::new(RwLock::new(GameDownloadState::Uninitialised)); + pub fn new(id: String, version: String, target_download_dir: usize) -> Self { + // Don't run by default + let status = Arc::new(RwLock::new(DownloadThreadControlFlag::Stop)); Self { id, version, - status: status.clone(), + control_flag: status.clone(), manifest: Mutex::new(None), - progress: ProgressChecker::new( - Box::new(download_logic::download_game_chunk), - Arc::new(RelaxedCounter::new(0)), - status, - 0, - ), + target_download_dir, contexts: Mutex::new(Vec::new()), + progress: ProgressObject { + max: 0, + current: Arc::new(AtomicU64::new(0)), + }, } } - pub async fn queue(&self) -> Result<(), GameDownloadError> { - self.change_state(GameDownloadState::Queued); - if self.manifest.lock().unwrap().is_none() { - return Ok(()); - } - self.ensure_manifest_exists() + pub fn set_control_flag(&self, flag: DownloadThreadControlFlag) { + let mut lock = self.control_flag.write().unwrap(); + *lock = flag; + } + pub fn get_control_flag(&self) -> DownloadThreadControlFlag { + let lock = self.control_flag.read().unwrap(); + lock.clone() } - pub fn begin_download(&self, max_threads: usize) -> Result<(), GameDownloadError> { - self.change_state(GameDownloadState::Downloading); - // TODO we're coping the whole context thing - // It's not necessary, I just can't figure out to make the borrow checker happy - { - let lock = self.contexts.lock().unwrap().to_vec(); - self.progress.run_context_parallel(lock, max_threads); - } + // Blocking + // Requires mutable self + pub fn setup_download(&mut self) -> Result<(), GameDownloadError> { + self.ensure_manifest_exists()?; + + self.generate_contexts()?; + + self.set_control_flag(DownloadThreadControlFlag::Go); + Ok(()) } - pub fn ensure_manifest_exists(&self) -> Result<(), GameDownloadError> { + // Blocking + pub fn download(&mut self) -> Result<(), GameDownloadError> { + self.setup_download()?; + self.run(); + + Ok(()) + } + + pub fn ensure_manifest_exists(&mut self) -> Result<(), GameDownloadError> { if self.manifest.lock().unwrap().is_some() { return Ok(()); } - self.download_manifest() + // Explicitly propagate error + Ok(self.download_manifest()?) } - fn download_manifest(&self) -> Result<(), GameDownloadError> { + fn download_manifest(&mut self) -> Result<(), GameDownloadError> { let base_url = DB.fetch_base_url(); let manifest_url = base_url .join( @@ -104,8 +129,6 @@ impl GameDownloadAgent { .unwrap(); let header = generate_authorization_header(); - - info!("Generating & sending client"); let client = reqwest::blocking::Client::new(); let response = client .get(manifest_url.to_string()) @@ -114,8 +137,14 @@ impl GameDownloadAgent { .unwrap(); if response.status() != 200 { - info!("Error status: {}", response.status()); - return Err(GameDownloadError::Status(response.status().as_u16())); + return Err(GameDownloadError::CommunicationError( + format!( + "Failed to download game manifest: {} {}", + response.status(), + response.text().unwrap() + ) + .into(), + )); } let manifest_download = response.json::().unwrap(); @@ -125,42 +154,33 @@ impl GameDownloadAgent { return chunk.lengths.iter().sum::(); }) .sum::(); - self.progress.set_capacity(length); + self.progress.max = length.try_into().unwrap(); + if let Ok(mut manifest) = self.manifest.lock() { - *manifest = Some(manifest_download) - } else { - return Err(GameDownloadError::System(SystemError::MutexLockFailed)); + *manifest = Some(manifest_download); + return Ok(()); } - Ok(()) + return Err(GameDownloadError::LockError); } - pub fn change_state(&self, state: GameDownloadState) { - let mut lock = self.status.write().unwrap(); - *lock = state; - } - pub fn get_state(&self) -> GameDownloadState { - let lock = self.status.read().unwrap(); - lock.clone() - } + pub fn generate_contexts(&self) -> Result<(), GameDownloadError> { + let db_lock = DB.borrow_data().unwrap(); + let data_base_dir = db_lock.games.install_dirs[self.target_download_dir].clone(); + drop(db_lock); + + let manifest = self.manifest.lock().unwrap().clone().unwrap(); + let version = self.version.clone(); + let game_id = self.id.clone(); + + let data_base_dir_path = Path::new(&data_base_dir); - pub fn generate_job_contexts( - &self, - manifest: &DropManifest, - version: String, - game_id: String, - ) -> Result<(), GameDownloadError> { let mut contexts = Vec::new(); - let base_path = DATA_ROOT_DIR - .lock() - .unwrap() - .join("games") - .join(game_id.clone()) - .clone(); + let base_path = data_base_dir_path.join(game_id.clone()).clone(); create_dir_all(base_path.clone()).unwrap(); - info!("Generating contexts"); + for (raw_path, chunk) in manifest { - let path = base_path.join(Path::new(raw_path)); + let path = base_path.join(Path::new(&raw_path)); let container = path.parent().unwrap(); create_dir_all(container).unwrap(); @@ -181,17 +201,44 @@ impl GameDownloadAgent { running_offset += *length as u64; } + #[cfg(target_os = "linux")] if running_offset > 0 { fallocate(file, FallocateFlags::empty(), 0, running_offset).unwrap(); } } - info!("Finished generating"); + if let Ok(mut context_lock) = self.contexts.lock() { *context_lock = contexts; - } else { - return Err(GameDownloadError::FailedContextGeneration); + return Ok(()); } - Ok(()) + return Err(GameDownloadError::SetupError( + "Failed to generate download contexts".to_owned(), + )); + } + + pub fn run(&self) { + let pool = ThreadPoolBuilder::new() + .num_threads(DOWNLOAD_MAX_THREADS) + .build() + .unwrap(); + + pool.scope(move |scope| { + let contexts = self.contexts.lock().unwrap(); + + for context in contexts.iter() { + let context = context.clone(); + let control_flag = self.control_flag.clone(); // Clone arcs + let progress = self.progress.current.clone(); // Clone arcs + info!( + "starting download for file {} {}", + context.file_name, context.index + ); + + scope.spawn(move |_| { + download_game_chunk(context, control_flag, progress).unwrap(); + }); + } + }) } } diff --git a/src-tauri/src/downloads/download_commands.rs b/src-tauri/src/downloads/download_commands.rs index 7d63e9a..86f1c71 100644 --- a/src-tauri/src/downloads/download_commands.rs +++ b/src-tauri/src/downloads/download_commands.rs @@ -1,137 +1,62 @@ use std::{ + borrow::Borrow, sync::{Arc, Mutex}, - thread, }; use log::info; +use rayon::spawn; use crate::{downloads::download_agent::GameDownloadAgent, AppState}; -use super::download_agent::{GameDownloadError, GameDownloadState}; - #[tauri::command] -pub async fn queue_game_download( +pub fn download_game( game_id: String, game_version: String, state: tauri::State<'_, Mutex>, -) -> Result<(), GameDownloadError> { - info!("Queuing Game Download"); - let download_agent = Arc::new(GameDownloadAgent::new( - game_id.clone(), - game_version.clone(), - )); - download_agent.queue().await?; - - let mut queue = state.lock().unwrap(); - queue.game_downloads.insert(game_id, download_agent); - Ok(()) -} - -#[tauri::command] -pub async fn start_game_downloads( - max_threads: usize, - state: tauri::State<'_, Mutex>, -) -> Result<(), GameDownloadError> { - info!("Downloading Games"); - let lock = state.lock().unwrap(); - let mut game_downloads = lock.game_downloads.clone(); - drop(lock); - thread::spawn(move || loop { - let mut current_id = String::new(); - let mut download_agent = None; - { - for (id, agent) in &game_downloads { - if agent.get_state() == GameDownloadState::Queued { - download_agent = Some(agent.clone()); - current_id = id.clone(); - info!("Got queued game to download"); - break; - } - } - if download_agent.is_none() { - info!("No more games left to download"); - return; - } - }; - info!("Downloading game"); - { - start_game_download(max_threads, download_agent.unwrap()).unwrap(); - game_downloads.remove_entry(¤t_id); - } - }); - info!("Spawned download"); - Ok(()) -} - -pub fn start_game_download( - max_threads: usize, - download_agent: Arc, -) -> Result<(), GameDownloadError> { - info!("Triggered Game Download"); - - download_agent.ensure_manifest_exists()?; - - let local_manifest = { - let manifest = download_agent.manifest.lock().unwrap(); - (*manifest).clone().unwrap() - }; - - download_agent - .generate_job_contexts( - &local_manifest, - download_agent.version.clone(), - download_agent.id.clone(), - ) - .unwrap(); - - download_agent.begin_download(max_threads).unwrap(); - - Ok(()) -} - -#[tauri::command] -pub async fn cancel_specific_game_download( - state: tauri::State<'_, Mutex>, - game_id: String, ) -> Result<(), String> { - info!("called stop_specific_game_download"); - get_game_download(state, game_id).change_state(GameDownloadState::Cancelled); + info!("beginning game download..."); - //TODO: Drop the game download instance + let mut download_agent = GameDownloadAgent::new(game_id.clone(), game_version.clone(), 0); + // Setup download requires mutable + download_agent.setup_download().unwrap(); - info!("Stopping callback"); + let mut lock: std::sync::MutexGuard<'_, AppState> = state.lock().unwrap(); + let download_agent_ref = Arc::new(download_agent); + lock.game_downloads + .insert(game_id, download_agent_ref.clone()); + + // Run it in another thread + spawn(move || { + // Run doesn't require mutable + download_agent_ref.clone().run(); + }); Ok(()) } #[tauri::command] -pub async fn get_game_download_progress( +pub fn get_game_download_progress( state: tauri::State<'_, Mutex>, game_id: String, ) -> Result { - let progress = get_game_download(state, game_id) - .progress - .get_progress_percentage(); - info!("{}", progress); - Ok(progress) + let da = use_download_agent(state, game_id)?; + + let progress = &da.progress; + let current: f64 = progress + .current + .fetch_add(0, std::sync::atomic::Ordering::Relaxed) as f64; + let max = progress.max as f64; + + let current_progress = current / max; + + Ok(current_progress) } -#[tauri::command] -pub async fn set_download_state( +fn use_download_agent( state: tauri::State<'_, Mutex>, game_id: String, - status: GameDownloadState, -) -> Result<(), String> { - info!("Setting game state"); - get_game_download(state, game_id).change_state(status); - Ok(()) -} - -fn get_game_download( - state: tauri::State<'_, Mutex>, - game_id: String, -) -> Arc { +) -> Result, String> { let lock = state.lock().unwrap(); - let download_agent = lock.game_downloads.get(&game_id).unwrap(); - download_agent.clone() + let download_agent = lock.game_downloads.get(&game_id).ok_or("Invalid game ID")?; + Ok(download_agent.clone()) // Clones the Arc, not the underlying data structure } diff --git a/src-tauri/src/downloads/download_logic.rs b/src-tauri/src/downloads/download_logic.rs index 4dbf632..69d1f8f 100644 --- a/src-tauri/src/downloads/download_logic.rs +++ b/src-tauri/src/downloads/download_logic.rs @@ -1,122 +1,136 @@ +use crate::auth::generate_authorization_header; use crate::db::DatabaseImpls; use crate::downloads::manifest::DropDownloadContext; +use crate::remote::RemoteAccessError; use crate::DB; -use crate::{auth::generate_authorization_header, GAME_PAUSE_CHECK_INTERVAL}; use atomic_counter::{AtomicCounter, RelaxedCounter}; use log::{error, info}; use md5::{Context, Digest}; +use reqwest::blocking::Response; +use serde::de::Error; +use std::io::Read; +use std::sync::atomic::AtomicU64; +use std::sync::RwLock; use std::{ fs::{File, OpenOptions}, - io::{self, BufWriter, Error, ErrorKind, Seek, SeekFrom, Write}, + io::{self, BufWriter, ErrorKind, Seek, SeekFrom, Write}, path::PathBuf, - sync::{Arc, RwLock}, - thread::sleep, + sync::Arc, }; use urlencoding::encode; -use super::download_agent::GameDownloadState; +use super::download_agent::{DownloadThreadControlFlag, GameDownloadError}; -pub struct DropFileWriter { - file: File, +pub struct DropWriter { hasher: Context, - progress: Arc, - status: Arc>, + destination: W, } -impl DropFileWriter { - fn new( - path: PathBuf, - status: Arc>, - progress: Arc, - ) -> Self { +impl DropWriter { + fn new(path: PathBuf) -> Self { Self { - file: OpenOptions::new().write(true).open(path).unwrap(), + destination: OpenOptions::new().write(true).open(path).unwrap(), hasher: Context::new(), - progress, - status, } } + fn finish(mut self) -> io::Result { self.flush().unwrap(); Ok(self.hasher.compute()) } - - fn manage_state(&mut self) -> Option> { - match self.status.read().unwrap().clone() { - GameDownloadState::Uninitialised => todo!(), - GameDownloadState::Queued => { - return Some(Err(Error::new( - ErrorKind::NotConnected, - "Download has not yet been started", - ))) - } - GameDownloadState::Manifest => { - return Some(Err(Error::new( - ErrorKind::NotFound, - "Manifest still not finished downloading", - ))) - } - GameDownloadState::Downloading => {} - GameDownloadState::Finished => { - return Some(Err(Error::new( - ErrorKind::AlreadyExists, - "Download already finished", - ))) - } - GameDownloadState::Stalled => { - return Some(Err(Error::new(ErrorKind::Interrupted, "Download Stalled"))) - } - GameDownloadState::Failed => { - return Some(Err(Error::new(ErrorKind::BrokenPipe, "Download Failed"))) - } - GameDownloadState::Cancelled => { - return Some(Err(Error::new( - ErrorKind::ConnectionAborted, - "Interrupt command recieved", - ))); - } - GameDownloadState::Paused => { - info!("Game download paused"); - sleep(GAME_PAUSE_CHECK_INTERVAL); - } - }; - None - } } -// TODO: Implement error handling -impl Write for DropFileWriter { +// Write automatically pushes to file and hasher +impl Write for DropWriter { fn write(&mut self, buf: &[u8]) -> io::Result { - // TODO: Tidy up these error messages / types because these ones don't really seem to fit - if let Some(value) = self.manage_state() { - return value; - } - let len = buf.len(); - self.progress.add(len); - - //info!("Writing data to writer"); - self.hasher.write_all(buf).unwrap(); - self.file.write(buf) + self.hasher.write_all(buf).map_err(|e| { + io::Error::new( + ErrorKind::Other, + format!("Unable to write to hasher: {}", e), + ) + })?; + self.destination.write(buf) } fn flush(&mut self) -> io::Result<()> { self.hasher.flush()?; - self.file.flush() + self.destination.flush() } } -impl Seek for DropFileWriter { +// Seek moves around destination output +impl Seek for DropWriter { fn seek(&mut self, pos: SeekFrom) -> io::Result { - self.file.seek(pos) + self.destination.seek(pos) } } + +pub struct DropDownloadPipeline { + pub source: R, + pub destination: DropWriter, + pub control_flag: Arc>, + pub progress: Arc, + pub size: usize, +} +impl DropDownloadPipeline { + fn new( + source: Response, + destination: DropWriter, + control_flag: Arc>, + progress: Arc, + size: usize, + ) -> Self { + return Self { + source, + destination, + control_flag, + progress, + size, + }; + } + + fn copy(&mut self) -> Result { + let copy_buf_size = 512; + let mut copy_buf = vec![0; copy_buf_size]; + let mut buf_writer = BufWriter::with_capacity(1024 * 1024, &mut self.destination); + + let mut current_size = 0; + loop { + if *self.control_flag.read().unwrap() == DownloadThreadControlFlag::Stop { + return Ok(false); + } + + let bytes_read = self.source.read(&mut copy_buf)?; + current_size += bytes_read; + + buf_writer.write(©_buf[0..bytes_read])?; + self.progress.fetch_add( + bytes_read.try_into().unwrap(), + std::sync::atomic::Ordering::Relaxed, + ); + + if current_size == self.size { + break; + } + } + + Ok(true) + } + + fn finish(self) -> Result { + let checksum = self.destination.finish()?; + return Ok(checksum); + } +} + pub fn download_game_chunk( ctx: DropDownloadContext, - status: Arc>, - progress: Arc, -) { - if *status.read().unwrap() == GameDownloadState::Cancelled { - info!("Callback stopped download at start"); - return; + control_flag: Arc>, + progress: Arc, +) -> Result { + // If we're paused + if *control_flag.read().unwrap() == DownloadThreadControlFlag::Stop { + return Ok(false); } + let base_url = DB.fetch_base_url(); let client = reqwest::blocking::Client::new(); @@ -133,47 +147,48 @@ pub fn download_game_chunk( let header = generate_authorization_header(); - let mut response = match client.get(chunk_url).header("Authorization", header).send() { - Ok(response) => response, - Err(e) => { - info!("{}", e); - return; - } - }; + let response = client + .get(chunk_url) + .header("Authorization", header) + .send() + .map_err(|e| GameDownloadError::CommunicationError(RemoteAccessError::FetchError(e)))?; - let mut file: DropFileWriter = DropFileWriter::new(ctx.path, status, progress); + let mut destination = DropWriter::new(ctx.path); if ctx.offset != 0 { - file.seek(SeekFrom::Start(ctx.offset)) + destination + .seek(SeekFrom::Start(ctx.offset)) .expect("Failed to seek to file offset"); } - // Writing everything to disk directly is probably slightly faster in terms of disk - // speed because it balances out the writes, but this is better than the performance - // loss from constantly reading the callbacks - - let mut writer = BufWriter::with_capacity(1024 * 1024, file); - - match io::copy(&mut response, &mut writer) { - Ok(_) => {} - Err(e) => { - info!("Copy errored with error {}", e) - } + let content_length = response.content_length(); + if content_length.is_none() { + return Err(GameDownloadError::CommunicationError( + RemoteAccessError::GenericErrror( + "Invalid download endpoint, missing Content-Length header.".to_owned(), + ), + )); } - writer.flush().unwrap(); - let file = match writer.into_inner() { - Ok(file) => file, - Err(_) => { - error!("Failed to acquire writer from BufWriter"); - return; - } + + let mut pipeline = DropDownloadPipeline::new( + response, + destination, + control_flag, + progress, + content_length.unwrap().try_into().unwrap(), + ); + + let completed = pipeline.copy().unwrap(); + if !completed { + return Ok(false); }; - let res = hex::encode(file.finish().unwrap().0); + let checksum = pipeline.finish().unwrap(); + + let res = hex::encode(checksum.0); if res != ctx.checksum { - info!( - "Checksum failed. Original: {}, Calculated: {} for {}", - ctx.checksum, res, ctx.file_name - ); + return Err(GameDownloadError::ChecksumError); } + + return Ok(true); } diff --git a/src-tauri/src/downloads/mod.rs b/src-tauri/src/downloads/mod.rs index 146ed81..4e59d68 100644 --- a/src-tauri/src/downloads/mod.rs +++ b/src-tauri/src/downloads/mod.rs @@ -1,5 +1,4 @@ pub mod download_agent; pub mod download_commands; mod download_logic; -mod manifest; -pub mod progress; +mod manifest; \ No newline at end of file diff --git a/src-tauri/src/downloads/progress.rs b/src-tauri/src/downloads/progress.rs deleted file mode 100644 index 850dba8..0000000 --- a/src-tauri/src/downloads/progress.rs +++ /dev/null @@ -1,69 +0,0 @@ -use atomic_counter::{AtomicCounter, RelaxedCounter}; -use log::info; -use rayon::ThreadPoolBuilder; -use std::sync::{Arc, Mutex, RwLock}; - -use super::download_agent::GameDownloadState; - -pub struct ProgressChecker -where - T: 'static + Send + Sync, -{ - counter: Arc, - f: Arc< - Box>, Arc) + Send + Sync + 'static>, - >, - status: Arc>, - capacity: Mutex, -} - -impl ProgressChecker -where - T: Send + Sync, -{ - pub fn new( - f: Box< - dyn Fn(T, Arc>, Arc) + Send + Sync + 'static, - >, - counter: Arc, - status: Arc>, - capacity: usize, - ) -> Self { - Self { - f: f.into(), - counter, - status, - capacity: capacity.into(), - } - } - pub fn run_context_parallel(&self, contexts: Vec, max_threads: usize) { - let threads = ThreadPoolBuilder::new() - .num_threads(max_threads) - .build() - .unwrap(); - - threads.scope(|s| { - for context in contexts { - let status = self.status.clone(); - let counter = self.counter.clone(); - let f = self.f.clone(); - s.spawn(move |_| { - info!("Running thread"); - f(context, status, counter) - }); - } - }); - info!("Concluded scope"); - } - pub fn set_capacity(&self, capacity: usize) { - let mut lock = self.capacity.lock().unwrap(); - *lock = capacity; - } - pub fn get_progress(&self) -> usize { - self.counter.get() - } - // I strongly dislike type casting in my own code, so I've shovelled it into here - pub fn get_progress_percentage(&self) -> f64 { - (self.get_progress() as f64) / (*self.capacity.lock().unwrap() as f64) - } -} diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 0b868b7..91061ef 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -10,7 +10,7 @@ mod tests; use crate::db::DatabaseImpls; use crate::downloads::download_agent::GameDownloadAgent; use auth::{auth_initiate, generate_authorization_header, recieve_handshake}; -use db::{change_root_directory, DatabaseInterface, DATA_ROOT_DIR}; +use db::{add_new_download_dir, DatabaseInterface, DATA_ROOT_DIR}; use downloads::download_commands::*; use env_logger::Env; use http::{header::*, response::Builder as ResponseBuilder}; @@ -119,13 +119,10 @@ pub fn run() { // Library fetch_library, fetch_game, - change_root_directory, + add_new_download_dir, // Downloads - queue_game_download, - start_game_downloads, - cancel_specific_game_download, + download_game, get_game_download_progress, - set_download_state ]) .plugin(tauri_plugin_shell::init()) .setup(|app| {