fix(download manager): fixed queue manipulation and waiting for downloads

This commit is contained in:
DecDuck
2024-12-09 18:07:41 +11:00
parent 671d45fbe4
commit 01260f0732
5 changed files with 65 additions and 65 deletions

View File

@ -285,7 +285,7 @@ impl GameDownloadAgent {
// If we're not out of contexts, we're not done, so we don't fire completed // If we're not out of contexts, we're not done, so we don't fire completed
if !context_lock.is_empty() { if !context_lock.is_empty() {
info!("Download agent didn't finish, not sending completed signal"); info!("da for {} exited without completing", self.id.clone());
return Ok(()); return Ok(());
} }

View File

@ -19,16 +19,6 @@ pub fn download_game(
.map_err(|_| "An error occurred while communicating with the download manager.".to_string()) .map_err(|_| "An error occurred while communicating with the download manager.".to_string())
} }
#[tauri::command]
pub fn cancel_game_download(state: tauri::State<'_, Mutex<AppState>>, game_id: String) {
info!("Cancelling game download {}", game_id);
state
.lock()
.unwrap()
.download_manager
.cancel_download(game_id);
}
#[tauri::command] #[tauri::command]
pub fn pause_game_downloads(state: tauri::State<'_, Mutex<AppState>>) { pub fn pause_game_downloads(state: tauri::State<'_, Mutex<AppState>>) {
state.lock().unwrap().download_manager.pause_downloads() state.lock().unwrap().download_manager.pause_downloads()

View File

@ -1,6 +1,7 @@
use std::{ use std::{
any::Any, any::Any,
collections::VecDeque, collections::VecDeque,
fmt::Debug,
sync::{ sync::{
mpsc::{SendError, Sender}, mpsc::{SendError, Sender},
Arc, Mutex, MutexGuard, Arc, Mutex, MutexGuard,
@ -31,7 +32,7 @@ pub enum DownloadManagerSignal {
/// Tells the Manager to stop the current /// Tells the Manager to stop the current
/// download and return /// download and return
Finish, Finish,
Cancel(String), Cancel,
/// Any error which occurs in the agent /// Any error which occurs in the agent
Error(GameDownloadError), Error(GameDownloadError),
/// Pushes UI update /// Pushes UI update
@ -82,6 +83,13 @@ impl From<Arc<GameDownloadAgent>> for GameDownloadAgentQueueStandin {
} }
} }
} }
impl Debug for GameDownloadAgentQueueStandin {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GameDownloadAgentQueueStandin")
.field("id", &self.id)
.finish()
}
}
impl DownloadManager { impl DownloadManager {
pub fn new( pub fn new(
@ -112,11 +120,6 @@ impl DownloadManager {
))?; ))?;
self.command_sender.send(DownloadManagerSignal::Go) self.command_sender.send(DownloadManagerSignal::Go)
} }
pub fn cancel_download(&self, game_id: String) {
self.command_sender
.send(DownloadManagerSignal::Cancel(game_id))
.unwrap();
}
pub fn edit(&self) -> MutexGuard<'_, VecDeque<Arc<GameDownloadAgentQueueStandin>>> { pub fn edit(&self) -> MutexGuard<'_, VecDeque<Arc<GameDownloadAgentQueueStandin>>> {
self.download_queue.edit() self.download_queue.edit()
} }
@ -132,23 +135,32 @@ impl DownloadManager {
let current_index = get_index_from_id(&mut queue, id).unwrap(); let current_index = get_index_from_id(&mut queue, id).unwrap();
let to_move = queue.remove(current_index).unwrap(); let to_move = queue.remove(current_index).unwrap();
queue.insert(new_index, to_move); queue.insert(new_index, to_move);
self.command_sender.send(DownloadManagerSignal::Update); self.command_sender
.send(DownloadManagerSignal::Update)
.unwrap();
} }
pub fn rearrange(&self, current_index: usize, new_index: usize) { pub fn rearrange(&self, current_index: usize, new_index: usize) {
let needs_pause = current_index == 0 || new_index == 0;
if needs_pause {
self.command_sender
.send(DownloadManagerSignal::Cancel)
.unwrap();
}
info!("moving {} to {}", current_index, new_index);
let mut queue = self.edit(); let mut queue = self.edit();
let to_move = queue.remove(current_index).unwrap(); let to_move = queue.remove(current_index).unwrap();
queue.insert(new_index, to_move); queue.insert(new_index, to_move);
self.command_sender.send(DownloadManagerSignal::Update);
} info!("new queue: {:?}", queue);
pub fn remove_from_queue(&self, index: usize) {
self.edit().remove(index); if needs_pause {
self.command_sender.send(DownloadManagerSignal::Update); self.command_sender.send(DownloadManagerSignal::Go).unwrap();
} }
pub fn remove_from_queue_string(&self, id: String) { self.command_sender
let mut queue = self.edit(); .send(DownloadManagerSignal::Update)
let current_index = get_index_from_id(&mut queue, id).unwrap(); .unwrap();
queue.remove(current_index);
self.command_sender.send(DownloadManagerSignal::Update);
} }
pub fn pause_downloads(&self) { pub fn pause_downloads(&self) {
self.command_sender self.command_sender

View File

@ -4,7 +4,7 @@ use std::{
mpsc::{channel, Receiver, Sender}, mpsc::{channel, Receiver, Sender},
Arc, Mutex, Arc, Mutex,
}, },
thread::spawn, thread::{spawn, JoinHandle},
}; };
use log::{error, info}; use log::{error, info};
@ -77,6 +77,7 @@ pub struct DownloadManagerBuilder {
app_handle: AppHandle, app_handle: AppHandle,
current_download_agent: Option<Arc<GameDownloadAgentQueueStandin>>, // Should be the only game download agent in the map with the "Go" flag current_download_agent: Option<Arc<GameDownloadAgentQueueStandin>>, // Should be the only game download agent in the map with the "Go" flag
current_download_thread: Mutex<Option<JoinHandle<()>>>,
active_control_flag: Option<DownloadThreadControl>, active_control_flag: Option<DownloadThreadControl>,
} }
@ -91,12 +92,14 @@ impl DownloadManagerBuilder {
download_agent_registry: HashMap::new(), download_agent_registry: HashMap::new(),
download_queue: queue.clone(), download_queue: queue.clone(),
command_receiver, command_receiver,
current_download_agent: None,
active_control_flag: None,
status: status.clone(), status: status.clone(),
sender: command_sender.clone(), sender: command_sender.clone(),
progress: active_progress.clone(), progress: active_progress.clone(),
app_handle, app_handle,
current_download_agent: None,
current_download_thread: Mutex::new(None),
active_control_flag: None,
}; };
let terminator = spawn(|| manager.manage_queue()); let terminator = spawn(|| manager.manage_queue());
@ -138,14 +141,23 @@ impl DownloadManagerBuilder {
self.app_handle.emit("update_queue", event_data).unwrap(); self.app_handle.emit("update_queue", event_data).unwrap();
} }
fn cleanup_current_game(&mut self, game_id: &String) -> Arc<GameDownloadAgent> { fn remove_and_cleanup_game(&mut self, game_id: &String) -> Arc<GameDownloadAgent> {
self.download_queue.pop_front(); self.download_queue.pop_front();
let download_agent = self.download_agent_registry.remove(game_id).unwrap(); let download_agent = self.download_agent_registry.remove(game_id).unwrap();
self.cleanup_current_download();
return download_agent;
}
// CAREFUL WITH THIS FUNCTION
// Make sure the download thread is terminated
fn cleanup_current_download(&mut self) {
self.active_control_flag = None; self.active_control_flag = None;
*self.progress.lock().unwrap() = None; *self.progress.lock().unwrap() = None;
self.current_download_agent = None; self.current_download_agent = None;
return download_agent; let mut download_thread_lock = self.current_download_thread.lock().unwrap();
*download_thread_lock = None;
drop(download_thread_lock);
} }
fn manage_queue(mut self) -> Result<(), ()> { fn manage_queue(mut self) -> Result<(), ()> {
@ -177,8 +189,8 @@ impl DownloadManagerBuilder {
DownloadManagerSignal::Error(e) => { DownloadManagerSignal::Error(e) => {
self.manage_error_signal(e); self.manage_error_signal(e);
} }
DownloadManagerSignal::Cancel(id) => { DownloadManagerSignal::Cancel => {
self.manage_cancel_signal(id); self.manage_cancel_signal();
} }
DownloadManagerSignal::Update => { DownloadManagerSignal::Update => {
self.push_manager_update(); self.push_manager_update();
@ -200,7 +212,7 @@ impl DownloadManagerBuilder {
// When if let chains are stabilised, combine these two statements // When if let chains are stabilised, combine these two statements
if interface.id == game_id { if interface.id == game_id {
info!("Popping consumed data"); info!("Popping consumed data");
let download_agent = self.cleanup_current_game(&game_id); let download_agent = self.remove_and_cleanup_game(&game_id);
if let Err(error) = if let Err(error) =
on_game_complete(game_id, download_agent.version.clone(), &self.app_handle) on_game_complete(game_id, download_agent.version.clone(), &self.app_handle)
@ -241,8 +253,6 @@ impl DownloadManagerBuilder {
} }
fn manage_go_signal(&mut self) { fn manage_go_signal(&mut self) {
info!("Got signal 'Go'");
if !(!self.download_agent_registry.is_empty() && !self.download_queue.empty()) { if !(!self.download_agent_registry.is_empty() && !self.download_queue.empty()) {
return; return;
} }
@ -252,8 +262,9 @@ impl DownloadManagerBuilder {
return; return;
} }
info!("Starting download agent"); info!("current download queue: {:?}", self.download_queue.read());
let agent_data = self.download_queue.read().front().unwrap().clone(); let agent_data = self.download_queue.read().front().unwrap().clone();
info!("starting download for {}", agent_data.id.clone());
let download_agent = self let download_agent = self
.download_agent_registry .download_agent_registry
.get(&agent_data.id) .get(&agent_data.id)
@ -274,7 +285,8 @@ impl DownloadManagerBuilder {
let sender = self.sender.clone(); let sender = self.sender.clone();
info!("Spawning download"); info!("Spawning download");
spawn(move || { let mut download_thread_lock = self.current_download_thread.lock().unwrap();
*download_thread_lock = Some(spawn(move || {
match download_agent.download() { match download_agent.download() {
// Returns once we've exited the download // Returns once we've exited the download
// (not necessarily completed) // (not necessarily completed)
@ -286,7 +298,7 @@ impl DownloadManagerBuilder {
sender.send(DownloadManagerSignal::Error(err)).unwrap(); sender.send(DownloadManagerSignal::Error(err)).unwrap();
} }
}; };
}); }));
// Set status for game // Set status for game
let mut status_handle = agent_data.status.lock().unwrap(); let mut status_handle = agent_data.status.lock().unwrap();
@ -303,7 +315,7 @@ impl DownloadManagerBuilder {
fn manage_error_signal(&mut self, error: GameDownloadError) { fn manage_error_signal(&mut self, error: GameDownloadError) {
let current_status = self.current_download_agent.clone().unwrap(); let current_status = self.current_download_agent.clone().unwrap();
self.cleanup_current_game(&current_status.id); // Remove all the locks and shit self.remove_and_cleanup_game(&current_status.id); // Remove all the locks and shit
let mut lock = current_status.status.lock().unwrap(); let mut lock = current_status.status.lock().unwrap();
*lock = GameDownloadStatus::Error; *lock = GameDownloadStatus::Error;
@ -314,33 +326,20 @@ impl DownloadManagerBuilder {
self.sender.send(DownloadManagerSignal::Update).unwrap(); self.sender.send(DownloadManagerSignal::Update).unwrap();
} }
fn manage_cancel_signal(&mut self, game_id: String) { fn manage_cancel_signal(&mut self) {
if let Some(current_flag) = &self.active_control_flag { if let Some(current_flag) = &self.active_control_flag {
current_flag.set(DownloadThreadControlFlag::Stop); current_flag.set(DownloadThreadControlFlag::Stop);
} }
// TODO wait until current download exits
// This cleanup function might break things because it let mut download_thread_lock = self.current_download_thread.lock().unwrap();
// unsets the control flag if let Some(current_download_thread) = download_thread_lock.take() {
self.cleanup_current_game(&game_id); current_download_thread.join().unwrap();
}
drop(download_thread_lock);
self.download_agent_registry.remove(&game_id); info!("cancel waited for download to finish");
let mut lock = self.download_queue.edit();
let index = match lock.iter().position(|interface| interface.id == game_id) {
Some(index) => index,
None => return,
};
lock.remove(index);
// Start next download self.cleanup_current_download();
self.sender.send(DownloadManagerSignal::Go).unwrap();
info!(
"{:?}",
self.download_agent_registry
.iter()
.map(|x| x.0.clone())
.collect::<String>()
);
} }
fn set_status(&self, status: DownloadManagerStatus) { fn set_status(&self, status: DownloadManagerStatus) {
*self.status.lock().unwrap() = status; *self.status.lock().unwrap() = status;

View File

@ -135,7 +135,6 @@ pub fn run() {
// Downloads // Downloads
download_game, download_game,
move_game_in_queue, move_game_in_queue,
cancel_game_download,
pause_game_downloads, pause_game_downloads,
resume_game_downloads, resume_game_downloads,
]) ])