From 64d7f649c6b9eb16afe68e07ea08bbcb8bb1af8b Mon Sep 17 00:00:00 2001 From: DecDuck Date: Thu, 28 Nov 2024 12:39:21 +1100 Subject: [PATCH] fix(download manager): use of completed signal, and pause/resuming --- pages/store/index.vue | 28 ++++- src-tauri/src/downloads/download_agent.rs | 102 ++++++++++++------ src-tauri/src/downloads/download_commands.rs | 13 ++- src-tauri/src/downloads/download_logic.rs | 1 - src-tauri/src/downloads/download_manager.rs | 25 ++--- .../src/downloads/download_manager_builder.rs | 43 ++++---- src-tauri/src/downloads/progress_object.rs | 13 ++- src-tauri/src/downloads/queue.rs | 21 ++-- src-tauri/src/lib.rs | 6 +- 9 files changed, 162 insertions(+), 90 deletions(-) diff --git a/pages/store/index.vue b/pages/store/index.vue index 45170b5..85d220b 100644 --- a/pages/store/index.vue +++ b/pages/store/index.vue @@ -7,14 +7,30 @@ @click="startGameDownload" > Download game - ({{ Math.floor(progress * 1000) / 10 }}%) + + ({{ Math.floor(progress * 1000) / 10 }}%) + + + + + + diff --git a/src-tauri/src/downloads/download_agent.rs b/src-tauri/src/downloads/download_agent.rs index 9987436..2e960e3 100644 --- a/src-tauri/src/downloads/download_agent.rs +++ b/src-tauri/src/downloads/download_agent.rs @@ -29,7 +29,7 @@ pub struct GameDownloadAgent { contexts: Mutex>, pub manifest: Mutex>, pub progress: ProgressObject, - sender: Sender + sender: Sender, } #[derive(Debug)] @@ -39,12 +39,12 @@ pub enum GameDownloadError { Setup(SetupError), Lock, IoError(io::Error), - DownloadError + DownloadError, } #[derive(Debug)] pub enum SetupError { - Context + Context, } impl Display for GameDownloadError { @@ -61,7 +61,12 @@ impl Display for GameDownloadError { } impl GameDownloadAgent { - pub fn new(id: String, version: String, target_download_dir: usize, sender: Sender) -> Self { + pub fn new( + id: String, + version: String, + target_download_dir: usize, + sender: Sender, + ) -> Self { // Don't run by default let control_flag = DownloadThreadControl::new(DownloadThreadControlFlag::Stop); Self { @@ -72,7 +77,7 @@ impl GameDownloadAgent { target_download_dir, contexts: Mutex::new(Vec::new()), progress: ProgressObject::new(0, 0), - sender + sender, } } @@ -81,8 +86,8 @@ impl GameDownloadAgent { self.ensure_manifest_exists()?; info!("Ensured manifest exists"); - self.generate_contexts()?; - info!("Generated contexts"); + self.ensure_contexts()?; + info!("Ensured contexts exists"); self.control_flag.set(DownloadThreadControlFlag::Go); @@ -129,7 +134,10 @@ impl GameDownloadAgent { if response.status() != 200 { return Err(GameDownloadError::Communication( - RemoteAccessError::ManifestDownloadFailed(response.status(), response.text().unwrap()) + RemoteAccessError::ManifestDownloadFailed( + response.status(), + response.text().unwrap(), + ), )); } @@ -144,12 +152,15 @@ impl GameDownloadAgent { } fn set_progress_object_params(&self) { + // Avoid re-setting it + if self.progress.get_max() != 0 { + return; + } + let lock = self.contexts.lock().unwrap(); let length = lock.len(); - let chunk_count = lock.iter() - .map(|chunk| chunk.length) - .sum(); + let chunk_count = lock.iter().map(|chunk| chunk.length).sum(); debug!("Setting ProgressObject max to {}", chunk_count); self.progress.set_max(chunk_count); @@ -159,6 +170,18 @@ impl GameDownloadAgent { self.progress.set_time_now(); } + pub fn ensure_contexts(&self) -> Result<(), GameDownloadError> { + let context_lock = self.contexts.lock().unwrap(); + info!("{:?} {}", context_lock, context_lock.is_empty()); + if !context_lock.is_empty() { + return Ok(()); + } + drop(context_lock); + + self.generate_contexts()?; + return Ok(()); + } + pub fn generate_contexts(&self) -> Result<(), GameDownloadError> { let db_lock = DB.borrow_data().unwrap(); let data_base_dir = db_lock.games.install_dirs[self.target_download_dir].clone(); @@ -192,14 +215,14 @@ impl GameDownloadAgent { game_id: game_id.to_string(), path: path.clone(), checksum: chunk.checksums[i].clone(), - length: *length + length: *length, }); running_offset += *length as u64; } #[cfg(target_os = "linux")] if running_offset > 0 { - fallocate(file, FallocateFlags::empty(), 0, running_offset).unwrap(); + let _ = fallocate(file, FallocateFlags::empty(), 0, running_offset); } } @@ -212,54 +235,65 @@ impl GameDownloadAgent { } pub fn run(&self) -> Result<(), ()> { - const DOWNLOAD_MAX_THREADS: usize = 1; + info!("downloading game: {}", self.id); + const DOWNLOAD_MAX_THREADS: usize = 4; let pool = ThreadPoolBuilder::new() .num_threads(DOWNLOAD_MAX_THREADS) .build() .unwrap(); - let new_contexts = Arc::new(Mutex::new(Vec::new())); - let new_contexts_ref = new_contexts.clone(); + let completed_indexes = Arc::new(Mutex::new(Vec::new())); + let completed_indexes_loop_arc = completed_indexes.clone(); pool.scope(move |scope| { let contexts = self.contexts.lock().unwrap(); - for (index, context) in contexts.iter().enumerate() { let context = context.clone(); let control_flag = self.control_flag.clone(); // Clone arcs let progress = self.progress.get(index); // Clone arcs - let new_contexts_ref = new_contexts_ref.clone(); + let completed_indexes_ref = completed_indexes_loop_arc.clone(); scope.spawn(move |_| { - info!( - "starting download for file {} {}", - context.file_name, context.index - ); match download_game_chunk(context.clone(), control_flag, progress) { - Ok(res) => { - match res { - true => {}, - false => new_contexts_ref.lock().unwrap().push(context), + Ok(res) => match res { + true => { + let mut lock = completed_indexes_ref.lock().unwrap(); + lock.push(index); } + false => {} }, Err(e) => { error!("GameDownloadError: {}", e); self.sender.send(DownloadManagerSignal::Error(e)).unwrap(); - new_contexts_ref.lock().unwrap().push(context); - }, + } } }); } }); - if !new_contexts.lock().unwrap().is_empty() { - debug!("New contexts not empty"); - *self.contexts.lock().unwrap() = Arc::into_inner(new_contexts).unwrap().into_inner().unwrap(); - debug!("Contexts: {:?}", *self.contexts.lock().unwrap()); - return Err(()) + + let mut context_lock = self.contexts.lock().unwrap(); + let mut completed_lock = completed_indexes.lock().unwrap(); + + // Sort desc so we don't have to modify indexes + completed_lock.sort_by(|a, b| b.cmp(a)); + + for index in completed_lock.iter() { + context_lock.remove(*index); } - info!("Contexts: {:?}", *self.contexts.lock().unwrap()); + + // If we're not out of contexts, we're not done, so we don't fire completed + if !context_lock.is_empty() { + info!("Download agent didn't finish, not sending completed signal"); + return Ok(()); + } + + // We've completed + self.sender + .send(DownloadManagerSignal::Completed(self.id.clone())) + .unwrap(); + Ok(()) } } diff --git a/src-tauri/src/downloads/download_commands.rs b/src-tauri/src/downloads/download_commands.rs index 5644106..c6f3c42 100644 --- a/src-tauri/src/downloads/download_commands.rs +++ b/src-tauri/src/downloads/download_commands.rs @@ -34,7 +34,7 @@ pub fn get_current_game_download_progress( } #[tauri::command] -pub fn stop_game_download(state: tauri::State<'_, Mutex>, game_id: String) { +pub fn cancel_game_download(state: tauri::State<'_, Mutex>, game_id: String) { info!("Cancelling game download {}", game_id); state .lock() @@ -42,6 +42,17 @@ pub fn stop_game_download(state: tauri::State<'_, Mutex>, game_id: Str .download_manager .cancel_download(game_id); } + +#[tauri::command] +pub fn pause_game_downloads(state: tauri::State<'_, Mutex>) { + state.lock().unwrap().download_manager.pause_downloads() +} + +#[tauri::command] +pub fn resume_game_downloads(state: tauri::State<'_, Mutex>) { + state.lock().unwrap().download_manager.resume_downloads() +} + #[tauri::command] pub fn get_current_write_speed(state: tauri::State<'_, Mutex>) {} diff --git a/src-tauri/src/downloads/download_logic.rs b/src-tauri/src/downloads/download_logic.rs index 942423a..ed2645f 100644 --- a/src-tauri/src/downloads/download_logic.rs +++ b/src-tauri/src/downloads/download_logic.rs @@ -124,7 +124,6 @@ pub fn download_game_chunk( ) -> Result { // If we're paused if control_flag.get() == DownloadThreadControlFlag::Stop { - info!("Control flag is Stop"); progress.store(0, Ordering::Relaxed); return Ok(false); } diff --git a/src-tauri/src/downloads/download_manager.rs b/src-tauri/src/downloads/download_manager.rs index f3406aa..f57c87c 100644 --- a/src-tauri/src/downloads/download_manager.rs +++ b/src-tauri/src/downloads/download_manager.rs @@ -12,7 +12,8 @@ use log::info; use super::{ download_agent::{GameDownloadAgent, GameDownloadError}, - progress_object::ProgressObject, queue::Queue, + progress_object::ProgressObject, + queue::Queue, }; pub enum DownloadManagerSignal { @@ -20,8 +21,7 @@ pub enum DownloadManagerSignal { Go, /// Pauses the DownloadManager Stop, - /// Called when a GameDownloadAgent has finished. - /// Triggers the next download cycle to begin + /// Called when a GameDownloadAgent has fully completed a download. Completed(String), /// Generates and appends a GameDownloadAgent /// to the registry and queue @@ -104,11 +104,10 @@ impl DownloadManager { ))?; self.command_sender.send(DownloadManagerSignal::Go) } - pub fn cancel_download( - &self, - game_id: String - ) { - self.command_sender.send(DownloadManagerSignal::Cancel(game_id)).unwrap(); + pub fn cancel_download(&self, game_id: String) { + self.command_sender + .send(DownloadManagerSignal::Cancel(game_id)) + .unwrap(); } pub fn edit(&self) -> MutexGuard<'_, VecDeque>> { self.download_queue.edit() @@ -139,11 +138,13 @@ impl DownloadManager { let current_index = get_index_from_id(&mut queue, id).unwrap(); queue.remove(current_index); } - pub fn pause_downloads(&self) -> Result<(), SendError> { - self.command_sender.send(DownloadManagerSignal::Stop) + pub fn pause_downloads(&self) { + self.command_sender + .send(DownloadManagerSignal::Stop) + .unwrap(); } - pub fn resume_downloads(&self) -> Result<(), SendError> { - self.command_sender.send(DownloadManagerSignal::Go) + pub fn resume_downloads(&self) { + self.command_sender.send(DownloadManagerSignal::Go).unwrap(); } pub fn ensure_terminated(self) -> Result, Box> { self.command_sender diff --git a/src-tauri/src/downloads/download_manager_builder.rs b/src-tauri/src/downloads/download_manager_builder.rs index e696abf..656d299 100644 --- a/src-tauri/src/downloads/download_manager_builder.rs +++ b/src-tauri/src/downloads/download_manager_builder.rs @@ -170,15 +170,15 @@ impl DownloadManagerBuilder { fn manage_go_signal(&mut self) { info!("Got signal 'Go'"); - if self.active_control_flag.is_none() && !self.download_agent_registry.is_empty() { + if !self.download_agent_registry.is_empty() && !self.download_queue.empty() { info!("Starting download agent"); - let download_agent = { - let front = self.download_queue.read().front().unwrap().clone(); - self.download_agent_registry.get(&front.id).unwrap().clone() - }; - let download_agent_interface = - Arc::new(AgentInterfaceData::from(download_agent.clone())); - self.current_game_interface = Some(download_agent_interface); + let agent_data = self.download_queue.read().front().unwrap().clone(); + let download_agent = self + .download_agent_registry + .get(&agent_data.id) + .unwrap() + .clone(); + self.current_game_interface = Some(agent_data); let progress_object = download_agent.progress.clone(); *self.progress.lock().unwrap() = Some(progress_object); @@ -191,29 +191,20 @@ impl DownloadManagerBuilder { info!("Spawning download"); spawn(move || { match download_agent.download() { - Ok(_) => { - // TODO wrap this pattern in a macro - let result = sender - .send(DownloadManagerSignal::Completed(download_agent.id.clone())); - if let Err(err) = result { - error!("{}", err); - } - } + // Returns once we've exited the download + // (not necessarily completed) + // The download agent will fire the completed event for us + Ok(_) => {} + // If an error occurred while *starting* the download Err(err) => { - let result = sender.send(DownloadManagerSignal::Error(err)); - if let Err(err) = result { - error!("{}", err); - } + error!("error while managing download: {}", err); + sender.send(DownloadManagerSignal::Error(err)).unwrap(); } }; }); - info!("Finished spawning Download"); active_control_flag.set(DownloadThreadControlFlag::Go); self.set_status(DownloadManagerStatus::Downloading); - } else if let Some(active_control_flag) = self.active_control_flag.clone() { - info!("Restarting current download"); - active_control_flag.set(DownloadThreadControlFlag::Go); } else { info!("Nothing was set"); } @@ -230,6 +221,8 @@ impl DownloadManagerBuilder { self.active_control_flag = None; *self.progress.lock().unwrap() = None; } + // TODO wait until current download exits + self.download_agent_registry.remove(&game_id); let mut lock = self.download_queue.edit(); let index = match lock.iter().position(|interface| interface.id == game_id) { @@ -237,6 +230,8 @@ impl DownloadManagerBuilder { None => return, }; lock.remove(index); + + // Start next download self.sender.send(DownloadManagerSignal::Go).unwrap(); info!( "{:?}", diff --git a/src-tauri/src/downloads/progress_object.rs b/src-tauri/src/downloads/progress_object.rs index 0848caf..9114003 100644 --- a/src-tauri/src/downloads/progress_object.rs +++ b/src-tauri/src/downloads/progress_object.rs @@ -1,13 +1,16 @@ -use std::{sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, Mutex, -}, time::Instant}; +use std::{ + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, + time::Instant, +}; #[derive(Clone)] pub struct ProgressObject { max: Arc>, progress_instances: Arc>>>, - start: Arc> + start: Arc>, } impl ProgressObject { diff --git a/src-tauri/src/downloads/queue.rs b/src-tauri/src/downloads/queue.rs index 1ce695f..80e3dc0 100644 --- a/src-tauri/src/downloads/queue.rs +++ b/src-tauri/src/downloads/queue.rs @@ -1,19 +1,22 @@ -use std::{collections::VecDeque, sync::{Arc, Mutex, MutexGuard}}; +use std::{ + collections::VecDeque, + sync::{Arc, Mutex, MutexGuard}, +}; use super::download_manager::AgentInterfaceData; #[derive(Clone)] pub struct Queue { - inner: Arc>>> + inner: Arc>>>, } impl Queue { pub fn new() -> Self { Self { - inner: Arc::new(Mutex::new(VecDeque::new())) + inner: Arc::new(Mutex::new(VecDeque::new())), } } - pub fn read(&self) -> VecDeque> { + pub fn read(&self) -> VecDeque> { self.inner.lock().unwrap().clone() } pub fn edit(&self) -> MutexGuard<'_, VecDeque>> { @@ -22,13 +25,15 @@ impl Queue { pub fn pop_front(&self) -> Option> { self.edit().pop_front() } + pub fn empty(&self) -> bool { + self.inner.lock().unwrap().len() == 0 + } /// Either inserts `interface` at the specified index, or appends to /// the back of the deque if index is greater than the length of the deque pub fn insert(&self, interface: AgentInterfaceData, index: usize) { if self.read().len() > index { self.append(interface); - } - else { + } else { self.edit().insert(index, Arc::new(interface)); } } @@ -44,7 +49,7 @@ impl Queue { if front.id == game_id { return queue.pop_front(); } - return None + return None; } pub fn get_by_id(&self, game_id: String) -> Option { self.read().iter().position(|data| data.id == game_id) @@ -61,4 +66,4 @@ impl Queue { self.edit().insert(new_index, existing); Ok(()) } -} \ No newline at end of file +} diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 840b803..8d1dbd2 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -15,8 +15,8 @@ use db::{ DATA_ROOT_DIR, }; use downloads::download_commands::*; -use downloads::download_manager_builder::DownloadManagerBuilder; use downloads::download_manager::DownloadManager; +use downloads::download_manager_builder::DownloadManagerBuilder; use env_logger::Env; use http::{header::*, response::Builder as ResponseBuilder}; use library::{fetch_game, fetch_game_status, fetch_library, Game}; @@ -137,7 +137,9 @@ pub fn run() { // Downloads download_game, get_current_game_download_progress, - stop_game_download + cancel_game_download, + pause_game_downloads, + resume_game_downloads, ]) .plugin(tauri_plugin_shell::init()) .plugin(tauri_plugin_dialog::init())