style(downloads): Abstracted queue system

TODO: Still need to cleanup the rest of the legacy code which used to use the queue system
Signed-off-by: quexeky <git@quexeky.dev>
This commit is contained in:
quexeky
2024-11-23 18:18:03 +11:00
parent 450bca9c5b
commit 76b0975bcc
5 changed files with 82 additions and 19 deletions

View File

@ -8,7 +8,7 @@ use md5::{Context, Digest};
use reqwest::blocking::Response;
use std::io::Read;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{
fs::{File, OpenOptions},
io::{self, BufWriter, ErrorKind, Seek, SeekFrom, Write},
@ -125,6 +125,7 @@ pub fn download_game_chunk(
// If we're paused
if control_flag.get() == DownloadThreadControlFlag::Stop {
info!("Control flag is Stop");
progress.store(0, Ordering::Relaxed);
return Ok(false);
}

View File

@ -13,7 +13,7 @@ use super::{
download_agent::{GameDownloadAgent, GameDownloadError},
download_manager_interface::{AgentInterfaceData, DownloadManager},
download_thread_control_flag::{DownloadThreadControl, DownloadThreadControlFlag},
progress_object::ProgressObject,
progress_object::ProgressObject, queue::Queue,
};
/*
@ -55,7 +55,7 @@ Behold, my madness - quexeky
pub struct DownloadManagerBuilder {
download_agent_registry: HashMap<String, Arc<GameDownloadAgent>>,
download_queue: Arc<Mutex<VecDeque<Arc<AgentInterfaceData>>>>,
download_queue: Queue,
command_receiver: Receiver<DownloadManagerSignal>,
sender: Sender<DownloadManagerSignal>,
progress: Arc<Mutex<Option<ProgressObject>>>,
@ -97,7 +97,7 @@ pub enum GameDownloadStatus {
impl DownloadManagerBuilder {
pub fn build() -> DownloadManager {
let queue = Arc::new(Mutex::new(VecDeque::new()));
let queue = Queue::new();
let (command_sender, command_receiver) = channel();
let active_progress = Arc::new(Mutex::new(None));
let status = Arc::new(Mutex::new(DownloadManagerStatus::Empty));
@ -167,7 +167,7 @@ impl DownloadManagerBuilder {
// When if let chains are stabilised, combine these two statements
if interface.id == game_id {
info!("Popping consumed data");
self.download_queue.lock().unwrap().pop_front();
self.download_queue.pop_front();
self.download_agent_registry.remove(&game_id);
self.active_control_flag = None;
*self.progress.lock().unwrap() = None;
@ -185,16 +185,13 @@ impl DownloadManagerBuilder {
self.sender.clone()
));
let agent_status = GameDownloadStatus::Uninitialised;
let interface_data = Arc::new(AgentInterfaceData {
let interface_data = AgentInterfaceData {
id,
status: Mutex::new(agent_status),
});
};
self.download_agent_registry
.insert(interface_data.id.clone(), download_agent);
self.download_queue
.lock()
.unwrap()
.push_back(interface_data);
self.download_queue.append(interface_data);
}
fn manage_go_signal(&mut self) {
@ -202,9 +199,9 @@ impl DownloadManagerBuilder {
if self.active_control_flag.is_none() && !self.download_agent_registry.is_empty() {
info!("Starting download agent");
let download_agent = {
let lock = self.download_queue.lock().unwrap();
let front = self.download_queue.read().front().unwrap().clone();
self.download_agent_registry
.get(&lock.front().unwrap().id)
.get(&front.id)
.unwrap()
.clone()
};
@ -256,7 +253,7 @@ impl DownloadManagerBuilder {
*self.progress.lock().unwrap() = None;
}
self.download_agent_registry.remove(&game_id);
let mut lock = self.download_queue.lock().unwrap();
let mut lock = self.download_queue.edit();
let index = match lock.iter().position(|interface| interface.id == game_id) {
Some(index) => index,
None => return,

View File

@ -13,7 +13,7 @@ use log::info;
use super::{
download_agent::GameDownloadAgent,
download_manager::{DownloadManagerSignal, GameDownloadStatus},
progress_object::ProgressObject,
progress_object::ProgressObject, queue::Queue,
};
/// Accessible front-end for the DownloadManager
@ -28,7 +28,7 @@ use super::{
/// THIS EDITING IS BLOCKING!!!
pub struct DownloadManager {
terminator: JoinHandle<Result<(), ()>>,
download_queue: Arc<Mutex<VecDeque<Arc<AgentInterfaceData>>>>,
download_queue: Queue,
progress: Arc<Mutex<Option<ProgressObject>>>,
command_sender: Sender<DownloadManagerSignal>,
}
@ -48,7 +48,7 @@ impl From<Arc<GameDownloadAgent>> for AgentInterfaceData {
impl DownloadManager {
pub fn new(
terminator: JoinHandle<Result<(), ()>>,
download_queue: Arc<Mutex<VecDeque<Arc<AgentInterfaceData>>>>,
download_queue: Queue,
progress: Arc<Mutex<Option<ProgressObject>>>,
command_sender: Sender<DownloadManagerSignal>,
) -> Self {
@ -81,10 +81,10 @@ impl DownloadManager {
self.command_sender.send(DownloadManagerSignal::Cancel(game_id)).unwrap();
}
pub fn edit(&self) -> MutexGuard<'_, VecDeque<Arc<AgentInterfaceData>>> {
self.download_queue.lock().unwrap()
self.download_queue.edit()
}
pub fn read_queue(&self) -> VecDeque<Arc<AgentInterfaceData>> {
self.download_queue.lock().unwrap().clone()
self.download_queue.read()
}
pub fn get_current_game_download_progress(&self) -> Option<f64> {
let progress_object = (*self.progress.lock().unwrap()).clone()?;

View File

@ -6,3 +6,4 @@ pub mod download_manager_interface;
mod download_thread_control_flag;
mod manifest;
mod progress_object;
pub mod queue;

View File

@ -0,0 +1,64 @@
use std::{collections::VecDeque, sync::{Arc, Mutex, MutexGuard}};
use super::download_manager_interface::AgentInterfaceData;
#[derive(Clone)]
pub struct Queue {
inner: Arc<Mutex<VecDeque<Arc<AgentInterfaceData>>>>
}
impl Queue {
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(VecDeque::new()))
}
}
pub fn read(&self) -> VecDeque<Arc<AgentInterfaceData>> {
self.inner.lock().unwrap().clone()
}
pub fn edit(&self) -> MutexGuard<'_, VecDeque<Arc<AgentInterfaceData>>> {
self.inner.lock().unwrap()
}
pub fn pop_front(&self) -> Option<Arc<AgentInterfaceData>> {
self.edit().pop_front()
}
/// Either inserts `interface` at the specified index, or appends to
/// the back of the deque if index is greater than the length of the deque
pub fn insert(&self, interface: AgentInterfaceData, index: usize) {
if self.read().len() > index {
self.append(interface);
}
else {
self.edit().insert(index, Arc::new(interface));
}
}
pub fn append(&self, interface: AgentInterfaceData) {
self.edit().push_back(Arc::new(interface));
}
pub fn pop_front_if_equal(&self, game_id: String) -> Option<Arc<AgentInterfaceData>> {
let mut queue = self.edit();
let front = match queue.front() {
Some(front) => front,
None => return None,
};
if front.id == game_id {
return queue.pop_front();
}
return None
}
pub fn get_by_id(&self, game_id: String) -> Option<usize> {
self.read().iter().position(|data| data.id == game_id)
}
pub fn move_to_index_by_id(&self, game_id: String, new_index: usize) -> Result<(), ()> {
let index = match self.get_by_id(game_id) {
Some(index) => index,
None => return Err(()),
};
let existing = match self.edit().remove(index) {
Some(existing) => existing,
None => return Err(()),
};
self.edit().insert(new_index, existing);
Ok(())
}
}