refactor(download manager): Moved download manager to separate directory

Signed-off-by: quexeky <git@quexeky.dev>
This commit is contained in:
quexeky
2024-12-30 17:25:17 +11:00
parent 3299c71b3d
commit b6c64e56e5
10 changed files with 19 additions and 34 deletions

View File

@ -0,0 +1,221 @@
use std::{
any::Any,
collections::VecDeque,
fmt::Debug,
sync::{
mpsc::{SendError, Sender},
Arc, Mutex, MutexGuard,
},
thread::JoinHandle,
};
use log::info;
use serde::Serialize;
use crate::downloads::download_agent::{GameDownloadAgent, GameDownloadError};
use super::{download_manager_builder::CurrentProgressObject, progress_object::ProgressObject, queue::Queue};
pub enum DownloadManagerSignal {
/// Resumes (or starts) the DownloadManager
Go,
/// Pauses the DownloadManager
Stop,
/// Called when a GameDownloadAgent has fully completed a download.
Completed(String),
/// Generates and appends a GameDownloadAgent
/// to the registry and queue
Queue(String, String, usize),
/// Tells the Manager to stop the current
/// download, sync everything to disk, and
/// then exit
Finish,
/// Stops (but doesn't remove) current download
Cancel,
/// Removes a given game
Remove(String),
/// Any error which occurs in the agent
Error(GameDownloadError),
/// Pushes UI update
UpdateUIQueue,
UpdateUIStats(usize, usize), //kb/s and seconds
/// Uninstall game
/// Takes game ID
Uninstall(String),
}
#[derive(Debug, Clone)]
pub enum DownloadManagerStatus {
Downloading,
Paused,
Empty,
Error(GameDownloadError),
Finished,
}
impl Serialize for DownloadManagerStatus {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&format!["{:?}", self])
}
}
#[derive(Serialize, Clone)]
pub enum GameDownloadStatus {
Queued,
Downloading,
Error,
}
/// Accessible front-end for the DownloadManager
///
/// The system works entirely through signals, both internally and externally,
/// all of which are accessible through the DownloadManagerSignal type, but
/// should not be used directly. Rather, signals are abstracted through this
/// interface.
///
/// The actual download queue may be accessed through the .edit() function,
/// which provides raw access to the underlying queue.
/// THIS EDITING IS BLOCKING!!!
pub struct DownloadManager {
terminator: JoinHandle<Result<(), ()>>,
download_queue: Queue,
progress: CurrentProgressObject,
command_sender: Sender<DownloadManagerSignal>,
}
pub struct GameDownloadAgentQueueStandin {
pub id: String,
pub status: Mutex<GameDownloadStatus>,
pub progress: Arc<ProgressObject>,
}
impl From<Arc<GameDownloadAgent>> for GameDownloadAgentQueueStandin {
fn from(value: Arc<GameDownloadAgent>) -> Self {
Self {
id: value.id.clone(),
status: Mutex::from(GameDownloadStatus::Queued),
progress: value.progress.clone(),
}
}
}
impl Debug for GameDownloadAgentQueueStandin {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GameDownloadAgentQueueStandin")
.field("id", &self.id)
.finish()
}
}
#[allow(dead_code)]
impl DownloadManager {
pub fn new(
terminator: JoinHandle<Result<(), ()>>,
download_queue: Queue,
progress: CurrentProgressObject,
command_sender: Sender<DownloadManagerSignal>,
) -> Self {
Self {
terminator,
download_queue,
progress,
command_sender,
}
}
pub fn queue_game(
&self,
id: String,
version: String,
target_download_dir: usize,
) -> Result<(), SendError<DownloadManagerSignal>> {
info!("Adding game id {}", id);
self.command_sender.send(DownloadManagerSignal::Queue(
id,
version,
target_download_dir,
))?;
self.command_sender.send(DownloadManagerSignal::Go)
}
pub fn edit(&self) -> MutexGuard<'_, VecDeque<Arc<GameDownloadAgentQueueStandin>>> {
self.download_queue.edit()
}
pub fn read_queue(&self) -> VecDeque<Arc<GameDownloadAgentQueueStandin>> {
self.download_queue.read()
}
pub fn get_current_game_download_progress(&self) -> Option<f64> {
let progress_object = (*self.progress.lock().unwrap()).clone()?;
Some(progress_object.get_progress())
}
pub fn rearrange_string(&self, id: String, new_index: usize) {
let mut queue = self.edit();
let current_index = get_index_from_id(&mut queue, id).unwrap();
let to_move = queue.remove(current_index).unwrap();
queue.insert(new_index, to_move);
self.command_sender
.send(DownloadManagerSignal::UpdateUIQueue)
.unwrap();
}
pub fn cancel(&self, game_id: String) {
self.command_sender
.send(DownloadManagerSignal::Remove(game_id))
.unwrap();
}
pub fn rearrange(&self, current_index: usize, new_index: usize) {
if current_index == new_index {
return;
};
let needs_pause = current_index == 0 || new_index == 0;
if needs_pause {
self.command_sender
.send(DownloadManagerSignal::Cancel)
.unwrap();
}
info!("moving {} to {}", current_index, new_index);
let mut queue = self.edit();
let to_move = queue.remove(current_index).unwrap();
queue.insert(new_index, to_move);
info!("new queue: {:?}", queue);
drop(queue);
if needs_pause {
self.command_sender.send(DownloadManagerSignal::Go).unwrap();
}
self.command_sender
.send(DownloadManagerSignal::UpdateUIQueue)
.unwrap();
}
pub fn pause_downloads(&self) {
self.command_sender
.send(DownloadManagerSignal::Stop)
.unwrap();
}
pub fn resume_downloads(&self) {
self.command_sender.send(DownloadManagerSignal::Go).unwrap();
}
pub fn ensure_terminated(self) -> Result<Result<(), ()>, Box<dyn Any + Send>> {
self.command_sender
.send(DownloadManagerSignal::Finish)
.unwrap();
self.terminator.join()
}
pub fn uninstall_game(&self, game_id: String) {
self.command_sender
.send(DownloadManagerSignal::Uninstall(game_id))
.unwrap();
}
}
/// Takes in the locked value from .edit() and attempts to
/// get the index of whatever game_id is passed in
fn get_index_from_id(
queue: &mut MutexGuard<'_, VecDeque<Arc<GameDownloadAgentQueueStandin>>>,
id: String,
) -> Option<usize> {
queue
.iter()
.position(|download_agent| download_agent.id == id)
}

View File

@ -0,0 +1,521 @@
use std::{
collections::HashMap,
fs::remove_dir_all,
sync::{
mpsc::{channel, Receiver, Sender},
Arc, Mutex, RwLockWriteGuard,
},
thread::{spawn, JoinHandle},
};
use log::{error, info};
use tauri::{AppHandle, Emitter};
use crate::{
db::{Database, GameStatus, GameTransientStatus}, download_manager::download_manager::GameDownloadStatus, downloads::download_agent::{GameDownloadAgent, GameDownloadError}, library::{
on_game_complete, push_game_update, QueueUpdateEvent,
QueueUpdateEventQueueData, StatsUpdateEvent,
}, state::GameStatusManager, DB
};
use super::{download_manager::{DownloadManager, DownloadManagerSignal, DownloadManagerStatus, GameDownloadAgentQueueStandin}, download_thread_control_flag::{DownloadThreadControl, DownloadThreadControlFlag}, progress_object::ProgressObject, queue::Queue};
/*
Welcome to the download manager, the most overengineered, glorious piece of bullshit.
The download manager takes a queue of game_ids and their associated
GameDownloadAgents, and then, one-by-one, executes them. It provides an interface
to interact with the currently downloading agent, and manage the queue.
When the DownloadManager is initialised, it is designed to provide a reference
which can be used to provide some instructions (the DownloadManagerInterface),
but other than that, it runs without any sort of interruptions.
It does this by opening up two data structures. Primarily is the command_receiver,
and mpsc (multi-channel-single-producer) which allows commands to be sent from
the Interface, and queued up for the Manager to process.
These have been mapped in the DownloadManagerSignal docs.
The other way to interact with the DownloadManager is via the donwload_queue,
which is just a collection of ids which may be rearranged to suit
whichever download queue order is required.
+----------------------------------------------------------------------------+
| DO NOT ATTEMPT TO ADD OR REMOVE FROM THE QUEUE WITHOUT USING SIGNALS!! |
| THIS WILL CAUSE A DESYNC BETWEEN THE DOWNLOAD AGENT REGISTRY AND THE QUEUE |
| WHICH HAS NOT BEEN ACCOUNTED FOR |
+----------------------------------------------------------------------------+
This download queue does not actually own any of the GameDownloadAgents. It is
simply a id-based reference system. The actual Agents are stored in the
download_agent_registry HashMap, as ordering is no issue here. This is why
appending or removing from the download_queue must be done via signals.
Behold, my madness - quexeky
*/
// Refactored to consolidate this type. It's a monster.
pub type CurrentProgressObject = Arc<Mutex<Option<Arc<ProgressObject>>>>;
pub struct DownloadManagerBuilder {
download_agent_registry: HashMap<String, Arc<Mutex<GameDownloadAgent>>>,
download_queue: Queue,
command_receiver: Receiver<DownloadManagerSignal>,
sender: Sender<DownloadManagerSignal>,
progress: CurrentProgressObject,
status: Arc<Mutex<DownloadManagerStatus>>,
app_handle: AppHandle,
current_download_agent: Option<Arc<GameDownloadAgentQueueStandin>>, // Should be the only game download agent in the map with the "Go" flag
current_download_thread: Mutex<Option<JoinHandle<()>>>,
active_control_flag: Option<DownloadThreadControl>,
}
impl DownloadManagerBuilder {
pub fn build(app_handle: AppHandle) -> DownloadManager {
let queue = Queue::new();
let (command_sender, command_receiver) = channel();
let active_progress = Arc::new(Mutex::new(None));
let status = Arc::new(Mutex::new(DownloadManagerStatus::Empty));
let manager = Self {
download_agent_registry: HashMap::new(),
download_queue: queue.clone(),
command_receiver,
status: status.clone(),
sender: command_sender.clone(),
progress: active_progress.clone(),
app_handle,
current_download_agent: None,
current_download_thread: Mutex::new(None),
active_control_flag: None,
};
let terminator = spawn(|| manager.manage_queue());
DownloadManager::new(terminator, queue, active_progress, command_sender)
}
fn set_game_status<F: FnOnce(&mut RwLockWriteGuard<'_, Database>, &String)>(
&self,
id: String,
setter: F,
) {
let mut db_handle = DB.borrow_data_mut().unwrap();
setter(&mut db_handle, &id);
drop(db_handle);
DB.save().unwrap();
let status = GameStatusManager::fetch_state(&id);
push_game_update(&self.app_handle, id, status);
}
fn push_ui_stats_update(&self, kbs: usize, time: usize) {
let event_data = StatsUpdateEvent { speed: kbs, time };
self.app_handle.emit("update_stats", event_data).unwrap();
}
fn push_ui_queue_update(&self) {
let queue = self.download_queue.read();
let queue_objs: Vec<QueueUpdateEventQueueData> = queue
.iter()
.map(|interface| QueueUpdateEventQueueData {
id: interface.id.clone(),
status: interface.status.lock().unwrap().clone(),
progress: interface.progress.get_progress(),
})
.collect();
let status_handle = self.status.lock().unwrap();
let status = status_handle.clone();
drop(status_handle);
let event_data = QueueUpdateEvent {
queue: queue_objs,
status,
};
self.app_handle.emit("update_queue", event_data).unwrap();
}
fn stop_and_wait_current_download(&self) {
self.set_status(DownloadManagerStatus::Paused);
if let Some(current_flag) = &self.active_control_flag {
current_flag.set(DownloadThreadControlFlag::Stop);
}
let mut download_thread_lock = self.current_download_thread.lock().unwrap();
if let Some(current_download_thread) = download_thread_lock.take() {
current_download_thread.join().unwrap();
}
drop(download_thread_lock);
}
fn remove_and_cleanup_front_game(&mut self, game_id: &String) -> Arc<Mutex<GameDownloadAgent>> {
self.download_queue.pop_front();
let download_agent = self.download_agent_registry.remove(game_id).unwrap();
self.cleanup_current_download();
download_agent
}
// CAREFUL WITH THIS FUNCTION
// Make sure the download thread is terminated
fn cleanup_current_download(&mut self) {
self.active_control_flag = None;
*self.progress.lock().unwrap() = None;
self.current_download_agent = None;
let mut download_thread_lock = self.current_download_thread.lock().unwrap();
*download_thread_lock = None;
drop(download_thread_lock);
}
fn manage_queue(mut self) -> Result<(), ()> {
loop {
let signal = match self.command_receiver.recv() {
Ok(signal) => signal,
Err(_) => return Err(()),
};
match signal {
DownloadManagerSignal::Go => {
self.manage_go_signal();
}
DownloadManagerSignal::Stop => {
self.manage_stop_signal();
}
DownloadManagerSignal::Completed(game_id) => {
self.manage_completed_signal(game_id);
}
DownloadManagerSignal::Queue(game_id, version, target_download_dir) => {
self.manage_queue_signal(game_id, version, target_download_dir);
}
DownloadManagerSignal::Error(e) => {
self.manage_error_signal(e);
}
DownloadManagerSignal::Cancel => {
self.manage_cancel_signal();
}
DownloadManagerSignal::UpdateUIQueue => {
self.push_ui_queue_update();
}
DownloadManagerSignal::UpdateUIStats(kbs, time) => {
self.push_ui_stats_update(kbs, time);
}
DownloadManagerSignal::Finish => {
self.stop_and_wait_current_download();
return Ok(());
}
DownloadManagerSignal::Remove(game_id) => {
self.manage_remove_game_queue(game_id);
}
DownloadManagerSignal::Uninstall(game_id) => {
self.uninstall_game(game_id);
}
};
}
}
fn uninstall_game(&mut self, game_id: String) {
// Removes the game if it's in the queue
self.manage_remove_game_queue(game_id.clone());
let mut db_handle = DB.borrow_data_mut().unwrap();
db_handle
.games
.transient_statuses
.entry(game_id.clone())
.and_modify(|v| *v = GameTransientStatus::Uninstalling {});
push_game_update(
&self.app_handle,
game_id.clone(),
(None, Some(GameTransientStatus::Uninstalling {})),
);
let previous_state = db_handle.games.statuses.get(&game_id).cloned();
if previous_state.is_none() {
info!("uninstall job doesn't have previous state, failing silently");
return;
}
let previous_state = previous_state.unwrap();
if let Some((version_name, install_dir)) = match previous_state {
GameStatus::Installed {
version_name,
install_dir,
} => Some((version_name, install_dir)),
GameStatus::SetupRequired {
version_name,
install_dir,
} => Some((version_name, install_dir)),
_ => None,
} {
db_handle
.games
.transient_statuses
.entry(game_id.clone())
.and_modify(|v| *v = GameTransientStatus::Uninstalling {});
drop(db_handle);
let sender = self.sender.clone();
let app_handle = self.app_handle.clone();
spawn(move || match remove_dir_all(install_dir) {
Err(e) => {
sender
.send(DownloadManagerSignal::Error(GameDownloadError::IoError(
e.kind(),
)))
.unwrap();
}
Ok(_) => {
let mut db_handle = DB.borrow_data_mut().unwrap();
db_handle.games.transient_statuses.remove(&game_id);
db_handle
.games
.statuses
.entry(game_id.clone())
.and_modify(|e| *e = GameStatus::Remote {});
drop(db_handle);
DB.save().unwrap();
info!("uninstalled {}", game_id);
push_game_update(&app_handle, game_id, (Some(GameStatus::Remote {}), None));
}
});
}
}
fn manage_remove_game_queue(&mut self, game_id: String) {
if let Some(current_download) = &self.current_download_agent {
if current_download.id == game_id {
self.manage_cancel_signal();
}
}
let index = self.download_queue.get_by_id(game_id.clone());
if let Some(index) = index {
let mut queue_handle = self.download_queue.edit();
queue_handle.remove(index);
self.set_game_status(game_id, |db_handle, id| {
db_handle.games.transient_statuses.remove(id);
});
drop(queue_handle);
}
if self.current_download_agent.is_none() {
self.manage_go_signal();
}
self.push_ui_queue_update();
}
fn manage_stop_signal(&mut self) {
info!("Got signal 'Stop'");
self.set_status(DownloadManagerStatus::Paused);
if let Some(active_control_flag) = self.active_control_flag.clone() {
active_control_flag.set(DownloadThreadControlFlag::Stop);
}
}
fn manage_completed_signal(&mut self, game_id: String) {
info!("Got signal 'Completed'");
if let Some(interface) = &self.current_download_agent {
// When if let chains are stabilised, combine these two statements
if interface.id == game_id {
info!("Popping consumed data");
let download_agent = self.remove_and_cleanup_front_game(&game_id);
let download_agent_lock = download_agent.lock().unwrap();
let version = download_agent_lock.version.clone();
let install_dir = download_agent_lock
.stored_manifest
.base_path
.clone()
.to_string_lossy()
.to_string();
drop(download_agent_lock);
if let Err(error) =
on_game_complete(game_id, version, install_dir, &self.app_handle)
{
error!("failed to mark game as completed: {}", error);
// TODO mark game as remote so user can retry
}
}
}
self.sender
.send(DownloadManagerSignal::UpdateUIQueue)
.unwrap();
self.sender.send(DownloadManagerSignal::Go).unwrap();
}
fn manage_queue_signal(&mut self, id: String, version: String, target_download_dir: usize) {
info!("Got signal Queue");
if let Some(index) = self.download_queue.get_by_id(id.clone()) {
// Should always give us a value
if let Some(download_agent) = self.download_agent_registry.get(&id) {
let download_agent_handle = download_agent.lock().unwrap();
if download_agent_handle.version == version {
info!("game with same version already queued, skipping");
return;
}
// If it's not the same, we want to cancel the current one, and then add the new one
drop(download_agent_handle);
self.manage_remove_game_queue(id.clone());
}
}
let download_agent = Arc::new(Mutex::new(GameDownloadAgent::new(
id.clone(),
version,
target_download_dir,
self.sender.clone(),
)));
let download_agent_lock = download_agent.lock().unwrap();
let agent_status = GameDownloadStatus::Queued;
let interface_data = GameDownloadAgentQueueStandin {
id: id.clone(),
status: Mutex::new(agent_status),
progress: download_agent_lock.progress.clone(),
};
let version_name = download_agent_lock.version.clone();
drop(download_agent_lock);
self.download_agent_registry
.insert(interface_data.id.clone(), download_agent);
self.download_queue.append(interface_data);
self.set_game_status(id, |db, id| {
db.games.transient_statuses.insert(
id.to_string(),
GameTransientStatus::Downloading { version_name },
);
});
self.sender
.send(DownloadManagerSignal::UpdateUIQueue)
.unwrap();
}
fn manage_go_signal(&mut self) {
if !(!self.download_agent_registry.is_empty() && !self.download_queue.empty()) {
return;
}
if self.current_download_agent.is_some() {
info!("skipping go signal due to existing download job");
return;
}
info!("current download queue: {:?}", self.download_queue.read());
let agent_data = self.download_queue.read().front().unwrap().clone();
info!("starting download for {}", agent_data.id.clone());
let download_agent = self
.download_agent_registry
.get(&agent_data.id)
.unwrap()
.clone();
let download_agent_lock = download_agent.lock().unwrap();
self.current_download_agent = Some(agent_data);
// Cloning option should be okay because it only clones the Arc inside, not the AgentInterfaceData
let agent_data = self.current_download_agent.clone().unwrap();
let version_name = download_agent_lock.version.clone();
let progress_object = download_agent_lock.progress.clone();
*self.progress.lock().unwrap() = Some(progress_object);
let active_control_flag = download_agent_lock.control_flag.clone();
self.active_control_flag = Some(active_control_flag.clone());
let sender = self.sender.clone();
drop(download_agent_lock);
info!("Spawning download");
let mut download_thread_lock = self.current_download_thread.lock().unwrap();
*download_thread_lock = Some(spawn(move || {
let mut download_agent_lock = download_agent.lock().unwrap();
match download_agent_lock.download() {
// Returns once we've exited the download
// (not necessarily completed)
// The download agent will fire the completed event for us
Ok(_) => {}
// If an error occurred while *starting* the download
Err(err) => {
error!("error while managing download: {}", err);
sender.send(DownloadManagerSignal::Error(err)).unwrap();
}
};
drop(download_agent_lock);
}));
// Set status for games
for queue_game in self.download_queue.read() {
let mut status_handle = queue_game.status.lock().unwrap();
if queue_game.id == agent_data.id {
*status_handle = GameDownloadStatus::Downloading;
} else {
*status_handle = GameDownloadStatus::Queued;
}
drop(status_handle);
}
// Set flags for download manager
active_control_flag.set(DownloadThreadControlFlag::Go);
self.set_status(DownloadManagerStatus::Downloading);
self.set_game_status(agent_data.id.clone(), |db, id| {
db.games.transient_statuses.insert(
id.to_string(),
GameTransientStatus::Downloading { version_name },
);
});
self.sender
.send(DownloadManagerSignal::UpdateUIQueue)
.unwrap();
}
fn manage_error_signal(&mut self, error: GameDownloadError) {
error!("{}", error);
let current_status = self.current_download_agent.clone().unwrap();
self.stop_and_wait_current_download();
self.remove_and_cleanup_front_game(&current_status.id); // Remove all the locks and shit, and remove from queue
self.app_handle
.emit("download_error", error.to_string())
.unwrap();
let mut lock = current_status.status.lock().unwrap();
*lock = GameDownloadStatus::Error;
self.set_status(DownloadManagerStatus::Error(error));
let game_id = current_status.id.clone();
self.set_game_status(game_id, |db_handle, id| {
db_handle.games.transient_statuses.remove(id);
});
self.sender
.send(DownloadManagerSignal::UpdateUIQueue)
.unwrap();
}
fn manage_cancel_signal(&mut self) {
self.stop_and_wait_current_download();
info!("cancel waited for download to finish");
self.cleanup_current_download();
}
fn set_status(&self, status: DownloadManagerStatus) {
*self.status.lock().unwrap() = status;
}
}

View File

@ -0,0 +1,49 @@
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
#[derive(PartialEq, Eq, PartialOrd, Ord)]
pub enum DownloadThreadControlFlag {
Stop,
Go,
}
/// Go => true
/// Stop => false
impl From<DownloadThreadControlFlag> for bool {
fn from(value: DownloadThreadControlFlag) -> Self {
match value {
DownloadThreadControlFlag::Go => true,
DownloadThreadControlFlag::Stop => false,
}
}
}
/// true => Go
/// false => Stop
impl From<bool> for DownloadThreadControlFlag {
fn from(value: bool) -> Self {
match value {
true => DownloadThreadControlFlag::Go,
false => DownloadThreadControlFlag::Stop,
}
}
}
#[derive(Clone)]
pub struct DownloadThreadControl {
inner: Arc<AtomicBool>,
}
impl DownloadThreadControl {
pub fn new(flag: DownloadThreadControlFlag) -> Self {
Self {
inner: Arc::new(AtomicBool::new(flag.into())),
}
}
pub fn get(&self) -> DownloadThreadControlFlag {
self.inner.load(Ordering::Relaxed).into()
}
pub fn set(&self, flag: DownloadThreadControlFlag) {
self.inner.store(flag.into(), Ordering::Relaxed);
}
}

View File

@ -0,0 +1,5 @@
pub mod download_manager;
pub mod download_manager_builder;
pub mod progress_object;
pub mod queue;
pub mod download_thread_control_flag;

View File

@ -0,0 +1,144 @@
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
mpsc::Sender,
Arc, Mutex, RwLock,
},
time::{Duration, Instant},
};
use log::info;
use super::download_manager::DownloadManagerSignal;
#[derive(Clone)]
pub struct ProgressObject {
max: Arc<Mutex<usize>>,
progress_instances: Arc<Mutex<Vec<Arc<AtomicUsize>>>>,
start: Arc<Mutex<Instant>>,
sender: Sender<DownloadManagerSignal>,
points_towards_update: Arc<AtomicUsize>,
points_to_push_update: Arc<AtomicUsize>,
last_update: Arc<RwLock<Instant>>,
amount_last_update: Arc<AtomicUsize>,
}
pub struct ProgressHandle {
progress: Arc<AtomicUsize>,
progress_object: Arc<ProgressObject>,
}
impl ProgressHandle {
pub fn new(progress: Arc<AtomicUsize>, progress_object: Arc<ProgressObject>) -> Self {
Self {
progress,
progress_object,
}
}
pub fn set(&self, amount: usize) {
self.progress.store(amount, Ordering::Relaxed);
}
pub fn add(&self, amount: usize) {
self.progress
.fetch_add(amount, std::sync::atomic::Ordering::Relaxed);
self.progress_object.check_push_update(amount);
}
}
static PROGRESS_UPDATES: usize = 100;
impl ProgressObject {
pub fn new(max: usize, length: usize, sender: Sender<DownloadManagerSignal>) -> Self {
let arr = Mutex::new((0..length).map(|_| Arc::new(AtomicUsize::new(0))).collect());
// TODO: consolidate this calculation with the set_max function below
let points_to_push_update = max / PROGRESS_UPDATES;
Self {
max: Arc::new(Mutex::new(max)),
progress_instances: Arc::new(arr),
start: Arc::new(Mutex::new(Instant::now())),
sender,
points_towards_update: Arc::new(AtomicUsize::new(0)),
points_to_push_update: Arc::new(AtomicUsize::new(points_to_push_update)),
last_update: Arc::new(RwLock::new(Instant::now())),
amount_last_update: Arc::new(AtomicUsize::new(0)),
}
}
pub fn check_push_update(&self, amount_added: usize) {
let current_amount = self
.points_towards_update
.fetch_add(amount_added, Ordering::Relaxed);
let to_update = self.points_to_push_update.fetch_add(0, Ordering::Relaxed);
if current_amount >= to_update {
self.points_towards_update
.fetch_sub(to_update, Ordering::Relaxed);
self.sender
.send(DownloadManagerSignal::UpdateUIQueue)
.unwrap();
}
let last_update = self.last_update.read().unwrap();
let last_update_difference = Instant::now().duration_since(*last_update).as_millis();
if last_update_difference > 1000 {
// push update
drop(last_update);
let mut last_update = self.last_update.write().unwrap();
*last_update = Instant::now();
drop(last_update);
let current_amount = self.sum();
let max = self.get_max();
let amount_at_last_update = self.amount_last_update.fetch_add(0, Ordering::Relaxed);
self.amount_last_update
.store(current_amount, Ordering::Relaxed);
let amount_since_last_update = current_amount - amount_at_last_update;
let kilobytes_per_second = amount_since_last_update / (last_update_difference as usize).max(1);
let remaining = max - current_amount; // bytes
let time_remaining = (remaining / 1000) / kilobytes_per_second.max(1);
self.sender
.send(DownloadManagerSignal::UpdateUIStats(
kilobytes_per_second,
time_remaining,
))
.unwrap();
}
}
pub fn set_time_now(&self) {
*self.start.lock().unwrap() = Instant::now();
}
pub fn sum(&self) -> usize {
self.progress_instances
.lock()
.unwrap()
.iter()
.map(|instance| instance.load(Ordering::Relaxed))
.sum()
}
pub fn get_max(&self) -> usize {
*self.max.lock().unwrap()
}
pub fn set_max(&self, new_max: usize) {
*self.max.lock().unwrap() = new_max;
self.points_to_push_update
.store(new_max / PROGRESS_UPDATES, Ordering::Relaxed);
info!("points to push update: {}", new_max / PROGRESS_UPDATES);
}
pub fn set_size(&self, length: usize) {
*self.progress_instances.lock().unwrap() =
(0..length).map(|_| Arc::new(AtomicUsize::new(0))).collect();
}
pub fn get_progress(&self) -> f64 {
self.sum() as f64 / self.get_max() as f64
}
pub fn get(&self, index: usize) -> Arc<AtomicUsize> {
self.progress_instances.lock().unwrap()[index].clone()
}
}

View File

@ -0,0 +1,73 @@
use std::{
collections::VecDeque,
sync::{Arc, Mutex, MutexGuard},
};
use super::download_manager::GameDownloadAgentQueueStandin;
#[derive(Clone)]
pub struct Queue {
inner: Arc<Mutex<VecDeque<Arc<GameDownloadAgentQueueStandin>>>>,
}
#[allow(dead_code)]
impl Queue {
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(VecDeque::new())),
}
}
pub fn read(&self) -> VecDeque<Arc<GameDownloadAgentQueueStandin>> {
self.inner.lock().unwrap().clone()
}
pub fn edit(&self) -> MutexGuard<'_, VecDeque<Arc<GameDownloadAgentQueueStandin>>> {
self.inner.lock().unwrap()
}
pub fn pop_front(&self) -> Option<Arc<GameDownloadAgentQueueStandin>> {
self.edit().pop_front()
}
pub fn empty(&self) -> bool {
self.inner.lock().unwrap().len() == 0
}
/// Either inserts `interface` at the specified index, or appends to
/// the back of the deque if index is greater than the length of the deque
pub fn insert(&self, interface: GameDownloadAgentQueueStandin, index: usize) {
if self.read().len() > index {
self.append(interface);
} else {
self.edit().insert(index, Arc::new(interface));
}
}
pub fn append(&self, interface: GameDownloadAgentQueueStandin) {
self.edit().push_back(Arc::new(interface));
}
pub fn pop_front_if_equal(
&self,
game_id: String,
) -> Option<Arc<GameDownloadAgentQueueStandin>> {
let mut queue = self.edit();
let front = match queue.front() {
Some(front) => front,
None => return None,
};
if front.id == game_id {
return queue.pop_front();
}
None
}
pub fn get_by_id(&self, game_id: String) -> Option<usize> {
self.read().iter().position(|data| data.id == game_id)
}
pub fn move_to_index_by_id(&self, game_id: String, new_index: usize) -> Result<(), ()> {
let index = match self.get_by_id(game_id) {
Some(index) => index,
None => return Err(()),
};
let existing = match self.edit().remove(index) {
Some(existing) => existing,
None => return Err(()),
};
self.edit().insert(new_index, existing);
Ok(())
}
}