From a1ada07690b24811b7458ed61bc8540ee361fdfc Mon Sep 17 00:00:00 2001 From: quexeky Date: Wed, 13 Nov 2024 20:38:00 +1100 Subject: [PATCH] feat(downloads): Added Download Manager Signed-off-by: quexeky --- src-tauri/src/downloads/download_agent.rs | 12 +- src-tauri/src/downloads/download_commands.rs | 10 +- src-tauri/src/downloads/download_logic.rs | 2 + src-tauri/src/downloads/download_manager.rs | 196 ++++++++++++++++--- src-tauri/src/downloads/mod.rs | 2 +- src-tauri/src/downloads/progress_object.rs | 25 ++- src-tauri/src/lib.rs | 6 +- 7 files changed, 206 insertions(+), 47 deletions(-) diff --git a/src-tauri/src/downloads/download_agent.rs b/src-tauri/src/downloads/download_agent.rs index 8bcd2b5..c977ecd 100644 --- a/src-tauri/src/downloads/download_agent.rs +++ b/src-tauri/src/downloads/download_agent.rs @@ -66,7 +66,7 @@ impl GameDownloadAgent { // Blocking // Requires mutable self - pub fn setup_download(&mut self) -> Result<(), GameDownloadError> { + pub fn setup_download(&self) -> Result<(), GameDownloadError> { self.ensure_manifest_exists()?; info!("Ensured manifest exists"); @@ -79,23 +79,22 @@ impl GameDownloadAgent { } // Blocking - pub fn download(&mut self) -> Result<(), GameDownloadError> { + pub fn download(&self) -> Result<(), GameDownloadError> { self.setup_download()?; self.run(); Ok(()) } - pub fn ensure_manifest_exists(&mut self) -> Result<(), GameDownloadError> { + pub fn ensure_manifest_exists(&self) -> Result<(), GameDownloadError> { if self.manifest.lock().unwrap().is_some() { return Ok(()); } - // Explicitly propagate error self.download_manifest() } - fn download_manifest(&mut self) -> Result<(), GameDownloadError> { + fn download_manifest(&self) -> Result<(), GameDownloadError> { let base_url = DB.fetch_base_url(); let manifest_url = base_url .join( @@ -138,7 +137,8 @@ impl GameDownloadAgent { .values() .map(|chunk| chunk.lengths.len()) .sum(); - self.progress = ProgressObject::new(length.try_into().unwrap(), chunk_count); + self.progress.set_max(length.try_into().unwrap()); + self.progress.set_size(chunk_count); if let Ok(mut manifest) = self.manifest.lock() { *manifest = Some(manifest_download); diff --git a/src-tauri/src/downloads/download_commands.rs b/src-tauri/src/downloads/download_commands.rs index 9449ab2..c1ab541 100644 --- a/src-tauri/src/downloads/download_commands.rs +++ b/src-tauri/src/downloads/download_commands.rs @@ -3,7 +3,7 @@ use std::sync::{Arc, Mutex}; use log::info; use rayon::spawn; -use crate::{downloads::download_agent::GameDownloadAgent, AppState}; +use crate::{AppState}; #[tauri::command] pub fn download_game( @@ -29,7 +29,7 @@ pub fn download_game( download_agent_ref.clone().run(); }); */ - + state.lock().unwrap().download_manager.queue_game(game_id, game_version, 0).unwrap(); Ok(()) } @@ -45,8 +45,12 @@ pub fn get_game_download_progress( Ok(progress.get_progress()) */ + let progress = match state.lock().unwrap().download_manager.get_current_game_download_progress() { + Some(progress) => progress, + None => 0.0 + }; - Ok(0.0) + Ok(progress) } /* fn use_download_agent( diff --git a/src-tauri/src/downloads/download_logic.rs b/src-tauri/src/downloads/download_logic.rs index 2a0c21f..dc95f2f 100644 --- a/src-tauri/src/downloads/download_logic.rs +++ b/src-tauri/src/downloads/download_logic.rs @@ -3,6 +3,7 @@ use crate::db::DatabaseImpls; use crate::downloads::manifest::DropDownloadContext; use crate::remote::RemoteAccessError; use crate::DB; +use log::info; use md5::{Context, Digest}; use reqwest::blocking::Response; @@ -123,6 +124,7 @@ pub fn download_game_chunk( ) -> Result { // If we're paused if control_flag.get() == DownloadThreadControlFlag::Stop { + info!("Control flag is Stop"); return Ok(false); } diff --git a/src-tauri/src/downloads/download_manager.rs b/src-tauri/src/downloads/download_manager.rs index 3c14fa8..dcd7bf5 100644 --- a/src-tauri/src/downloads/download_manager.rs +++ b/src-tauri/src/downloads/download_manager.rs @@ -1,40 +1,182 @@ use std::{ - collections::HashMap, - sync::{Arc, Mutex}, - thread::JoinHandle, + collections::{HashMap, VecDeque}, sync::{mpsc::{channel, Receiver, SendError, Sender}, Arc, Mutex, MutexGuard}, thread::{spawn, JoinHandle}, }; -use super::{download_agent::GameDownloadAgent, download_thread_control_flag::DownloadThreadControlFlag}; +use log::info; + +use super::{download_agent::GameDownloadAgent, download_thread_control_flag::{DownloadThreadControl, DownloadThreadControlFlag}, progress_object::ProgressObject}; pub struct DownloadManager { - download_agent_registry: HashMap>>, - download_queue: Vec, + download_agent_registry: HashMap>, + download_queue: Arc>>, + receiver: Receiver, + sender: Sender, + progress: Arc>>, - current_thread: Option>, current_game_id: Option, // Should be the only game download agent in the map with the "Go" flag + active_control_flag: Option +} +pub struct DownloadManagerInterface { + terminator: JoinHandle>, + download_queue: Arc>>, + progress: Arc>>, + sender: Sender, +} +pub enum DownloadManagerSignal { + Go, + Stop, + Completed(String), + Queue(String, String, usize) } -impl DownloadManager { - pub fn new() -> Self { - return Self { - download_agent_registry: HashMap::new(), - download_queue: Vec::new(), - current_thread: None, - current_game_id: None, - }; +impl DownloadManagerInterface { + pub fn queue_game(&self, game_id: String, version: String, target_download_dir: usize) -> Result<(), SendError> { + info!("Adding game id {}", game_id); + self.sender.send(DownloadManagerSignal::Queue(game_id, version, target_download_dir))?; + self.sender.send(DownloadManagerSignal::Go) } - - pub fn queue_game(&mut self, game_id: String, version_name: String) { - let existing_da = self.download_agent_registry.get(&game_id); - - if let Some(da_mutex) = existing_da { - let da = da_mutex.lock().unwrap(); - if da.version == version_name { - return; // We're already queued - } - - da.control_flag.set(DownloadThreadControlFlag::Stop); - + pub fn edit(&self) -> MutexGuard<'_, VecDeque> { + self.download_queue.lock().unwrap() + } + pub fn get_current_game_download_progress(&self) -> Option { + let progress_object = (*self.progress.lock().unwrap()).clone()?; + Some(progress_object.get_progress()) + } + pub fn rearrange_string(&self, id: String, new_index: usize) { + let mut queue = self.edit(); + let current_index = get_index_from_id(&mut queue, id).unwrap(); + let to_move = queue.remove(current_index).unwrap(); + queue.insert(new_index, to_move); + } + pub fn rearrange(&self, current_index: usize, new_index: usize) { + let mut queue = self.edit(); + let to_move = queue.remove(current_index).unwrap(); + queue.insert(new_index, to_move); + } + pub fn remove_from_queue(&self, index: usize) { + self.edit().remove(index); + } + pub fn remove_from_queue_string(&self, game_id: String) { + let mut queue = self.edit(); + let current_index = get_index_from_id(&mut queue, game_id).unwrap(); + queue.remove(current_index); + } + pub fn pause_downloads(&self) -> Result<(), SendError> { + self.sender.send(DownloadManagerSignal::Stop) + } + pub fn resume_downloads(&self) -> Result<(), SendError> { + self.sender.send(DownloadManagerSignal::Go) + } + pub fn ensure_terminated(self) -> Result<(), ()> { + match self.terminator.join() { + Ok(o) => o, + Err(_) => Err(()), } } } + +impl DownloadManager { + pub fn generate() -> DownloadManagerInterface { + let queue = Arc::new(Mutex::new(VecDeque::new())); + let (sender, receiver) = channel(); + let active_progress = Arc::new(Mutex::new(None)); + + let manager = Self { + download_agent_registry: HashMap::new(), + download_queue: queue.clone(), + receiver, + current_game_id: None, + active_control_flag: None, + sender: sender.clone(), + progress: active_progress.clone(), + }; + + let terminator = spawn(|| {manager.manage_queue()}); + + let interface = DownloadManagerInterface { + terminator, + download_queue: queue, + sender, + progress: active_progress + }; + return interface; + } + + fn manage_queue(mut self) -> Result<(), ()> { + loop { + let signal = match self.receiver.recv() { + Ok(signal) => signal, + Err(e) => { + return Err(()) + }, + }; + + match signal { + DownloadManagerSignal::Go => { + info!("Got signal 'Go'"); + if self.active_control_flag.is_none() && !self.download_agent_registry.is_empty() { + info!("Starting download agent"); + let download_agent = { + let lock = self.download_queue.lock().unwrap(); + self.download_agent_registry.get(&lock.front().unwrap().clone()).unwrap().clone() + }; + self.current_game_id = Some(download_agent.id.clone()); + + let progress_object = download_agent.progress.clone(); + *self.progress.lock().unwrap() = Some(progress_object); + + let active_control_flag = download_agent.control_flag.clone(); + self.active_control_flag = Some(active_control_flag.clone()); + + let sender = self.sender.clone(); + + info!("Spawning download"); + spawn(move || { + download_agent.download().unwrap(); + sender.send(DownloadManagerSignal::Completed(download_agent.id.clone())).unwrap(); + }); + info!("Finished spawning Download"); + + active_control_flag.set(DownloadThreadControlFlag::Go); + } + else if let Some(active_control_flag) = self.active_control_flag.clone() { + info!("Restarting current download"); + active_control_flag.set(DownloadThreadControlFlag::Go); + } + else { + info!("Nothing was set"); + } + }, + DownloadManagerSignal::Stop => { + info!("Got signal 'Stop'"); + if let Some(active_control_flag) = self.active_control_flag.clone() { + active_control_flag.set(DownloadThreadControlFlag::Stop); + } + }, + DownloadManagerSignal::Completed(game_id) => { + info!("Got signal 'Completed'"); + if self.current_game_id == Some(game_id.clone()) { + info!("Popping consumed data"); + self.download_queue.lock().unwrap().pop_front(); + self.download_agent_registry.remove(&game_id); + self.active_control_flag = None; + *self.progress.lock().unwrap() = None; + } + self.sender.send(DownloadManagerSignal::Go).unwrap(); + } + DownloadManagerSignal::Queue(game_id, version, target_download_dir) => { + info!("Got signal Queue"); + let download_agent = Arc::new(GameDownloadAgent::new(game_id.clone(), version, target_download_dir)); + self.download_agent_registry.insert(game_id.clone(), download_agent); + self.download_queue.lock().unwrap().push_back(game_id); + }, + }; + } + } +} + +pub fn get_index_from_id(queue: &mut MutexGuard<'_, VecDeque>, id: String) -> Option { + queue.iter().position(|download_agent| { + download_agent == &id + }) +} diff --git a/src-tauri/src/downloads/mod.rs b/src-tauri/src/downloads/mod.rs index ea2edfd..e166e3f 100644 --- a/src-tauri/src/downloads/mod.rs +++ b/src-tauri/src/downloads/mod.rs @@ -4,4 +4,4 @@ pub mod download_manager; mod download_logic; mod download_thread_control_flag; mod manifest; -mod progress_object; +mod progress_object; \ No newline at end of file diff --git a/src-tauri/src/downloads/progress_object.rs b/src-tauri/src/downloads/progress_object.rs index f3be4fb..6ed2400 100644 --- a/src-tauri/src/downloads/progress_object.rs +++ b/src-tauri/src/downloads/progress_object.rs @@ -1,33 +1,44 @@ use std::sync::{ atomic::{AtomicUsize, Ordering}, - Arc, + Arc, Mutex, }; #[derive(Clone)] pub struct ProgressObject { - max: usize, - progress_instances: Arc>>, + max: Arc>, + progress_instances: Arc>>>, } impl ProgressObject { pub fn new(max: usize, length: usize) -> Self { - let arr = (0..length).map(|_| Arc::new(AtomicUsize::new(0))).collect(); + let arr = Mutex::new((0..length).map(|_| Arc::new(AtomicUsize::new(0))).collect()); Self { - max, + max: Arc::new(Mutex::new(max)), progress_instances: Arc::new(arr), } } pub fn sum(&self) -> usize { self.progress_instances + .lock() + .unwrap() .iter() .map(|instance| instance.load(Ordering::Relaxed)) .sum() } + pub fn get_max(&self) -> usize { + self.max.lock().unwrap().clone() + } + pub fn set_max(&self, new_max: usize) { + *self.max.lock().unwrap() = new_max + } + pub fn set_size(&self, length: usize) { + *self.progress_instances.lock().unwrap() = (0..length).map(|_| Arc::new(AtomicUsize::new(0))).collect(); + } pub fn get_progress(&self) -> f64 { - self.sum() as f64 / self.max as f64 + self.sum() as f64 / self.get_max() as f64 } pub fn get(&self, index: usize) -> Arc { - self.progress_instances[index].clone() + self.progress_instances.lock().unwrap()[index].clone() } } diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 5e880a2..deb7a4c 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -13,7 +13,7 @@ use crate::downloads::download_agent::GameDownloadAgent; use auth::{auth_initiate, generate_authorization_header, recieve_handshake}; use db::{add_new_download_dir, DatabaseInterface, DATA_ROOT_DIR}; use downloads::download_commands::*; -use downloads::download_manager::DownloadManager; +use downloads::download_manager::{DownloadManager, DownloadManagerInterface}; use env_logger::Env; use http::{header::*, response::Builder as ResponseBuilder}; use library::{fetch_game, fetch_library, Game}; @@ -54,7 +54,7 @@ pub struct AppState { games: HashMap, #[serde(skip_serializing)] - download_manager: Arc, + download_manager: Arc, } #[tauri::command] @@ -69,7 +69,7 @@ fn setup() -> AppState { env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); let games = HashMap::new(); - let download_manager = Arc::new(DownloadManager::new()); + let download_manager = Arc::new(DownloadManager::generate()); let is_set_up = DB.database_is_set_up(); if !is_set_up {