156 refactor into workspaces (#157)

* chore: Major refactoring

Still needs a massive go-over because there shouldn't be anything referencing tauri in any of the workspaces except the original one. Process manager has been refactored as an example

Signed-off-by: quexeky <git@quexeky.dev>

* fix: Remote tauri dependency from process

Signed-off-by: quexeky <git@quexeky.dev>

* refactor: Improvements to src-tauri

Signed-off-by: quexeky <git@quexeky.dev>

* refactor: Builds, but some logic still left to move back

Signed-off-by: quexeky <git@quexeky.dev>

* refactor: Finish refactor

Signed-off-by: quexeky <git@quexeky.dev>

* chore: Run cargo clippy && cargo fmt

Signed-off-by: quexeky <git@quexeky.dev>

* refactor: Move everything into src-tauri

Signed-off-by: quexeky <git@quexeky.dev>

---------

Signed-off-by: quexeky <git@quexeky.dev>
This commit is contained in:
quexeky
2025-10-14 17:12:51 +11:00
committed by GitHub
parent cc57ca7076
commit 87bbe1da49
107 changed files with 8700 additions and 2738 deletions

View File

@ -0,0 +1,17 @@
[package]
name = "download_manager"
version = "0.1.0"
edition = "2024"
[dependencies]
atomic-instant-full = "0.1.0"
database = { version = "0.1.0", path = "../database" }
humansize = "2.1.3"
log = "0.4.28"
parking_lot = "0.12.5"
remote = { version = "0.1.0", path = "../remote" }
serde = "1.0.228"
serde_with = "3.15.0"
tauri = "2.8.5"
throttle_my_fn = "0.2.6"
utils = { version = "0.1.0", path = "../utils" }

View File

@ -0,0 +1,400 @@
use std::{
collections::HashMap,
sync::{
Arc, Mutex,
mpsc::{Receiver, Sender, channel},
},
thread::{JoinHandle, spawn},
};
use database::DownloadableMetadata;
use log::{debug, error, info, warn};
use tauri::AppHandle;
use utils::{app_emit, lock, send};
use crate::{
download_manager_frontend::DownloadStatus,
error::ApplicationDownloadError,
frontend_updates::{QueueUpdateEvent, QueueUpdateEventQueueData, StatsUpdateEvent},
};
use super::{
download_manager_frontend::{DownloadManager, DownloadManagerSignal, DownloadManagerStatus},
downloadable::Downloadable,
util::{
download_thread_control_flag::{DownloadThreadControl, DownloadThreadControlFlag},
progress_object::ProgressObject,
queue::Queue,
},
};
pub type DownloadAgent = Arc<Box<dyn Downloadable + Send + Sync>>;
pub type CurrentProgressObject = Arc<Mutex<Option<Arc<ProgressObject>>>>;
/*
Welcome to the download manager, the most overengineered, glorious piece of bullshit.
The download manager takes a queue of ids and their associated
DownloadAgents, and then, one-by-one, executes them. It provides an interface
to interact with the currently downloading agent, and manage the queue.
When the DownloadManager is initialised, it is designed to provide a reference
which can be used to provide some instructions (the DownloadManagerInterface),
but other than that, it runs without any sort of interruptions.
It does this by opening up two data structures. Primarily is the command_receiver,
and mpsc (multi-channel-single-producer) which allows commands to be sent from
the Interface, and queued up for the Manager to process.
These have been mapped in the DownloadManagerSignal docs.
The other way to interact with the DownloadManager is via the donwload_queue,
which is just a collection of ids which may be rearranged to suit
whichever download queue order is required.
+----------------------------------------------------------------------------+
| DO NOT ATTEMPT TO ADD OR REMOVE FROM THE QUEUE WITHOUT USING SIGNALS!! |
| THIS WILL CAUSE A DESYNC BETWEEN THE DOWNLOAD AGENT REGISTRY AND THE QUEUE |
| WHICH HAS NOT BEEN ACCOUNTED FOR |
+----------------------------------------------------------------------------+
This download queue does not actually own any of the DownloadAgents. It is
simply an id-based reference system. The actual Agents are stored in the
download_agent_registry HashMap, as ordering is no issue here. This is why
appending or removing from the download_queue must be done via signals.
Behold, my madness - quexeky
*/
pub struct DownloadManagerBuilder {
download_agent_registry: HashMap<DownloadableMetadata, DownloadAgent>,
download_queue: Queue,
command_receiver: Receiver<DownloadManagerSignal>,
sender: Sender<DownloadManagerSignal>,
progress: CurrentProgressObject,
status: Arc<Mutex<DownloadManagerStatus>>,
app_handle: AppHandle,
current_download_thread: Mutex<Option<JoinHandle<()>>>,
active_control_flag: Option<DownloadThreadControl>,
}
impl DownloadManagerBuilder {
pub fn build(app_handle: AppHandle) -> DownloadManager {
let queue = Queue::new();
let (command_sender, command_receiver) = channel();
let active_progress = Arc::new(Mutex::new(None));
let status = Arc::new(Mutex::new(DownloadManagerStatus::Empty));
let manager = Self {
download_agent_registry: HashMap::new(),
download_queue: queue.clone(),
command_receiver,
status: status.clone(),
sender: command_sender.clone(),
progress: active_progress.clone(),
app_handle,
current_download_thread: Mutex::new(None),
active_control_flag: None,
};
let terminator = spawn(|| manager.manage_queue());
DownloadManager::new(terminator, queue, active_progress, command_sender)
}
fn set_status(&self, status: DownloadManagerStatus) {
*lock!(self.status) = status;
}
fn remove_and_cleanup_front_download(&mut self, meta: &DownloadableMetadata) -> DownloadAgent {
self.download_queue.pop_front();
let download_agent = self.download_agent_registry.remove(meta).unwrap();
self.cleanup_current_download();
download_agent
}
// CAREFUL WITH THIS FUNCTION
// Make sure the download thread is terminated
fn cleanup_current_download(&mut self) {
self.active_control_flag = None;
*lock!(self.progress) = None;
let mut download_thread_lock = lock!(self.current_download_thread);
if let Some(unfinished_thread) = download_thread_lock.take()
&& !unfinished_thread.is_finished()
{
unfinished_thread.join().unwrap();
}
drop(download_thread_lock);
}
fn stop_and_wait_current_download(&self) -> bool {
self.set_status(DownloadManagerStatus::Paused);
if let Some(current_flag) = &self.active_control_flag {
current_flag.set(DownloadThreadControlFlag::Stop);
}
let mut download_thread_lock = lock!(self.current_download_thread);
if let Some(current_download_thread) = download_thread_lock.take() {
return current_download_thread.join().is_ok();
};
true
}
fn manage_queue(mut self) -> Result<(), ()> {
loop {
let signal = match self.command_receiver.recv() {
Ok(signal) => signal,
Err(_) => return Err(()),
};
match signal {
DownloadManagerSignal::Go => {
self.manage_go_signal();
}
DownloadManagerSignal::Stop => {
self.manage_stop_signal();
}
DownloadManagerSignal::Completed(meta) => {
self.manage_completed_signal(meta);
}
DownloadManagerSignal::Queue(download_agent) => {
self.manage_queue_signal(download_agent);
}
DownloadManagerSignal::Error(e) => {
self.manage_error_signal(e);
}
DownloadManagerSignal::UpdateUIQueue => {
self.push_ui_queue_update();
}
DownloadManagerSignal::UpdateUIStats(kbs, time) => {
self.push_ui_stats_update(kbs, time);
}
DownloadManagerSignal::Finish => {
self.stop_and_wait_current_download();
return Ok(());
}
DownloadManagerSignal::Cancel(meta) => {
self.manage_cancel_signal(&meta);
}
}
}
}
fn manage_queue_signal(&mut self, download_agent: DownloadAgent) {
debug!("got signal Queue");
let meta = download_agent.metadata();
debug!("queue metadata: {meta:?}");
if self.download_queue.exists(meta.clone()) {
warn!("download with same ID already exists");
return;
}
download_agent.on_queued(&self.app_handle);
self.download_queue.append(meta.clone());
self.download_agent_registry.insert(meta, download_agent);
send!(self.sender, DownloadManagerSignal::UpdateUIQueue);
}
fn manage_go_signal(&mut self) {
debug!("got signal Go");
if self.download_agent_registry.is_empty() {
debug!(
"Download agent registry: {:?}",
self.download_agent_registry.len()
);
return;
}
debug!("current download queue: {:?}", self.download_queue.read());
let agent_data = if let Some(agent_data) = self.download_queue.read().front() {
agent_data.clone()
} else {
return;
};
let download_agent = self
.download_agent_registry
.get(&agent_data)
.unwrap()
.clone();
let status = download_agent.status();
// This download is already going
if status != DownloadStatus::Queued {
return;
}
// Ensure all others are marked as queued
for agent in self.download_agent_registry.values() {
if agent.metadata() != agent_data && agent.status() != DownloadStatus::Queued {
agent.on_queued(&self.app_handle);
}
}
info!("starting download for {agent_data:?}");
self.active_control_flag = Some(download_agent.control_flag());
let sender = self.sender.clone();
let mut download_thread_lock = lock!(self.current_download_thread);
let app_handle = self.app_handle.clone();
*download_thread_lock = Some(spawn(move || {
loop {
let download_result = match download_agent.download(&app_handle) {
// Ok(true) is for completed and exited properly
Ok(v) => v,
Err(e) => {
error!("download {:?} has error {}", download_agent.metadata(), &e);
download_agent.on_error(&app_handle, &e);
send!(sender, DownloadManagerSignal::Error(e));
return;
}
};
// If the download gets canceled
// immediately return, on_cancelled gets called for us earlier
if !download_result {
return;
}
if download_agent.control_flag().get() == DownloadThreadControlFlag::Stop {
return;
}
let validate_result = match download_agent.validate(&app_handle) {
Ok(v) => v,
Err(e) => {
error!(
"download {:?} has validation error {}",
download_agent.metadata(),
&e
);
download_agent.on_error(&app_handle, &e);
send!(sender, DownloadManagerSignal::Error(e));
return;
}
};
if download_agent.control_flag().get() == DownloadThreadControlFlag::Stop {
return;
}
if validate_result {
download_agent.on_complete(&app_handle);
send!(
sender,
DownloadManagerSignal::Completed(download_agent.metadata())
);
send!(sender, DownloadManagerSignal::UpdateUIQueue);
return;
}
}
}));
self.set_status(DownloadManagerStatus::Downloading);
let active_control_flag = self.active_control_flag.clone().unwrap();
active_control_flag.set(DownloadThreadControlFlag::Go);
}
fn manage_stop_signal(&mut self) {
debug!("got signal Stop");
if let Some(active_control_flag) = self.active_control_flag.clone() {
self.set_status(DownloadManagerStatus::Paused);
active_control_flag.set(DownloadThreadControlFlag::Stop);
}
}
fn manage_completed_signal(&mut self, meta: DownloadableMetadata) {
debug!("got signal Completed");
if let Some(interface) = self.download_queue.read().front()
&& interface == &meta
{
self.remove_and_cleanup_front_download(&meta);
}
self.push_ui_queue_update();
send!(self.sender, DownloadManagerSignal::Go);
}
fn manage_error_signal(&mut self, error: ApplicationDownloadError) {
debug!("got signal Error");
if let Some(metadata) = self.download_queue.read().front()
&& let Some(current_agent) = self.download_agent_registry.get(metadata)
{
current_agent.on_error(&self.app_handle, &error);
self.stop_and_wait_current_download();
self.remove_and_cleanup_front_download(metadata);
}
self.push_ui_queue_update();
self.set_status(DownloadManagerStatus::Error);
}
fn manage_cancel_signal(&mut self, meta: &DownloadableMetadata) {
debug!("got signal Cancel");
// If the current download is the one we're tryna cancel
if let Some(current_metadata) = self.download_queue.read().front()
&& current_metadata == meta
&& let Some(current_download) = self.download_agent_registry.get(current_metadata)
{
self.set_status(DownloadManagerStatus::Paused);
current_download.on_cancelled(&self.app_handle);
self.stop_and_wait_current_download();
self.download_queue.pop_front();
self.cleanup_current_download();
self.download_agent_registry.remove(meta);
debug!("current download queue: {:?}", self.download_queue.read());
}
// else just cancel it
else if let Some(download_agent) = self.download_agent_registry.get(meta) {
let index = self.download_queue.get_by_meta(meta);
if let Some(index) = index {
download_agent.on_cancelled(&self.app_handle);
let _ = self.download_queue.edit().remove(index);
let removed = self.download_agent_registry.remove(meta);
debug!(
"removed {:?} from queue {:?}",
removed.map(|x| x.metadata()),
self.download_queue.read()
);
}
}
self.sender.send(DownloadManagerSignal::Go).unwrap();
self.push_ui_queue_update();
}
fn push_ui_stats_update(&self, kbs: usize, time: usize) {
let event_data = StatsUpdateEvent { speed: kbs, time };
app_emit!(&self.app_handle, "update_stats", event_data);
}
fn push_ui_queue_update(&self) {
let queue = &self.download_queue.read();
let queue_objs = queue
.iter()
.map(|key| {
let val = self.download_agent_registry.get(key).unwrap();
QueueUpdateEventQueueData {
meta: DownloadableMetadata::clone(key),
status: val.status(),
progress: val.progress().get_progress(),
current: val.progress().sum(),
max: val.progress().get_max(),
}
})
.collect();
let event_data = QueueUpdateEvent { queue: queue_objs };
app_emit!(&self.app_handle, "update_queue", event_data);
}
}

View File

@ -0,0 +1,186 @@
use std::{
any::Any,
collections::VecDeque,
fmt::Debug,
sync::{
Mutex, MutexGuard,
mpsc::{SendError, Sender},
},
thread::JoinHandle,
};
use database::DownloadableMetadata;
use log::{debug, info};
use serde::Serialize;
use utils::{lock, send};
use crate::error::ApplicationDownloadError;
use super::{
download_manager_builder::{CurrentProgressObject, DownloadAgent},
util::queue::Queue,
};
pub enum DownloadManagerSignal {
/// Resumes (or starts) the `DownloadManager`
Go,
/// Pauses the `DownloadManager`
Stop,
/// Called when a `DownloadAgent` has fully completed a download.
Completed(DownloadableMetadata),
/// Generates and appends a `DownloadAgent`
/// to the registry and queue
Queue(DownloadAgent),
/// Tells the Manager to stop the current
/// download, sync everything to disk, and
/// then exit
Finish,
/// Stops, removes, and tells a download to cleanup
Cancel(DownloadableMetadata),
/// Any error which occurs in the agent
Error(ApplicationDownloadError),
/// Pushes UI update
UpdateUIQueue,
UpdateUIStats(usize, usize), //kb/s and seconds
}
#[derive(Debug)]
pub enum DownloadManagerStatus {
Downloading,
Paused,
Empty,
Error,
}
impl Serialize for DownloadManagerStatus {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&format!["{self:?}"])
}
}
#[derive(Serialize, Clone, Debug, PartialEq)]
pub enum DownloadStatus {
Queued,
Downloading,
Validating,
Error,
}
/// Accessible front-end for the `DownloadManager`
///
/// The system works entirely through signals, both internally and externally,
/// all of which are accessible through the `DownloadManagerSignal` type, but
/// should not be used directly. Rather, signals are abstracted through this
/// interface.
///
/// The actual download queue may be accessed through the .`edit()` function,
/// which provides raw access to the underlying queue.
/// THIS EDITING IS BLOCKING!!!
#[derive(Debug)]
pub struct DownloadManager {
terminator: Mutex<Option<JoinHandle<Result<(), ()>>>>,
download_queue: Queue,
progress: CurrentProgressObject,
command_sender: Sender<DownloadManagerSignal>,
}
#[allow(dead_code)]
impl DownloadManager {
pub fn new(
terminator: JoinHandle<Result<(), ()>>,
download_queue: Queue,
progress: CurrentProgressObject,
command_sender: Sender<DownloadManagerSignal>,
) -> Self {
Self {
terminator: Mutex::new(Some(terminator)),
download_queue,
progress,
command_sender,
}
}
pub fn queue_download(
&self,
download: DownloadAgent,
) -> Result<(), SendError<DownloadManagerSignal>> {
info!("creating download with meta {:?}", download.metadata());
self.command_sender
.send(DownloadManagerSignal::Queue(download))?;
self.command_sender.send(DownloadManagerSignal::Go)
}
pub fn edit(&self) -> MutexGuard<'_, VecDeque<DownloadableMetadata>> {
self.download_queue.edit()
}
pub fn read_queue(&self) -> VecDeque<DownloadableMetadata> {
self.download_queue.read()
}
pub fn get_current_download_progress(&self) -> Option<f64> {
let progress_object = (*lock!(self.progress)).clone()?;
Some(progress_object.get_progress())
}
pub fn rearrange_string(&self, meta: &DownloadableMetadata, new_index: usize) {
let mut queue = self.edit();
let current_index =
get_index_from_id(&mut queue, meta).expect("Failed to get meta index from id");
let to_move = queue
.remove(current_index)
.expect("Failed to remove meta at index from queue");
queue.insert(new_index, to_move);
send!(self.command_sender, DownloadManagerSignal::UpdateUIQueue);
}
pub fn cancel(&self, meta: DownloadableMetadata) {
send!(self.command_sender, DownloadManagerSignal::Cancel(meta));
}
pub fn rearrange(&self, current_index: usize, new_index: usize) {
if current_index == new_index {
return;
}
let needs_pause = current_index == 0 || new_index == 0;
if needs_pause {
send!(self.command_sender, DownloadManagerSignal::Stop);
}
debug!("moving download at index {current_index} to index {new_index}");
let mut queue = self.edit();
let to_move = queue.remove(current_index).expect("Failed to get");
queue.insert(new_index, to_move);
drop(queue);
if needs_pause {
send!(self.command_sender, DownloadManagerSignal::Go);
}
send!(self.command_sender, DownloadManagerSignal::UpdateUIQueue);
send!(self.command_sender, DownloadManagerSignal::Go);
}
pub fn pause_downloads(&self) {
send!(self.command_sender, DownloadManagerSignal::Stop);
}
pub fn resume_downloads(&self) {
send!(self.command_sender, DownloadManagerSignal::Go);
}
pub fn ensure_terminated(&self) -> Result<Result<(), ()>, Box<dyn Any + Send>> {
send!(self.command_sender, DownloadManagerSignal::Finish);
let terminator = lock!(self.terminator).take();
terminator.unwrap().join()
}
pub fn get_sender(&self) -> Sender<DownloadManagerSignal> {
self.command_sender.clone()
}
}
/// Takes in the locked value from .`edit()` and attempts to
/// get the index of whatever id is passed in
fn get_index_from_id(
queue: &mut MutexGuard<'_, VecDeque<DownloadableMetadata>>,
meta: &DownloadableMetadata,
) -> Option<usize> {
queue
.iter()
.position(|download_agent| download_agent == meta)
}

View File

@ -0,0 +1,31 @@
use std::sync::Arc;
use database::DownloadableMetadata;
use tauri::AppHandle;
use crate::error::ApplicationDownloadError;
use super::{
download_manager_frontend::DownloadStatus,
util::{download_thread_control_flag::DownloadThreadControl, progress_object::ProgressObject},
};
/**
* Downloadables are responsible for managing their specific object's download state
* e.g, the GameDownloadAgent is responsible for pushing game updates
*
* But the download manager manages the queue state
*/
pub trait Downloadable: Send + Sync {
fn download(&self, app_handle: &AppHandle) -> Result<bool, ApplicationDownloadError>;
fn validate(&self, app_handle: &AppHandle) -> Result<bool, ApplicationDownloadError>;
fn progress(&self) -> Arc<ProgressObject>;
fn control_flag(&self) -> DownloadThreadControl;
fn status(&self) -> DownloadStatus;
fn metadata(&self) -> DownloadableMetadata;
fn on_queued(&self, app_handle: &AppHandle);
fn on_error(&self, app_handle: &AppHandle, error: &ApplicationDownloadError);
fn on_complete(&self, app_handle: &AppHandle);
fn on_cancelled(&self, app_handle: &AppHandle);
}

View File

@ -0,0 +1,80 @@
use humansize::{BINARY, format_size};
use std::{
fmt::{Display, Formatter},
io,
sync::{Arc, mpsc::SendError},
};
use remote::error::RemoteAccessError;
use serde_with::SerializeDisplay;
#[derive(SerializeDisplay)]
pub enum DownloadManagerError<T> {
IOError(io::Error),
SignalError(SendError<T>),
}
impl<T> Display for DownloadManagerError<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DownloadManagerError::IOError(error) => write!(f, "{error}"),
DownloadManagerError::SignalError(send_error) => write!(f, "{send_error}"),
}
}
}
impl<T> From<SendError<T>> for DownloadManagerError<T> {
fn from(value: SendError<T>) -> Self {
DownloadManagerError::SignalError(value)
}
}
impl<T> From<io::Error> for DownloadManagerError<T> {
fn from(value: io::Error) -> Self {
DownloadManagerError::IOError(value)
}
}
// TODO: Rename / separate from downloads
#[derive(Debug, SerializeDisplay)]
pub enum ApplicationDownloadError {
NotInitialized,
Communication(RemoteAccessError),
DiskFull(u64, u64),
#[allow(dead_code)]
Checksum,
Lock,
IoError(Arc<io::Error>),
DownloadError(RemoteAccessError),
}
impl Display for ApplicationDownloadError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
ApplicationDownloadError::NotInitialized => {
write!(f, "Download not initalized, did something go wrong?")
}
ApplicationDownloadError::DiskFull(required, available) => write!(
f,
"Game requires {}, {} remaining left on disk.",
format_size(*required, BINARY),
format_size(*available, BINARY),
),
ApplicationDownloadError::Communication(error) => write!(f, "{error}"),
ApplicationDownloadError::Lock => write!(
f,
"failed to acquire lock. Something has gone very wrong internally. Please restart the application"
),
ApplicationDownloadError::Checksum => {
write!(f, "checksum failed to validate for download")
}
ApplicationDownloadError::IoError(error) => write!(f, "io error: {error}"),
ApplicationDownloadError::DownloadError(error) => {
write!(f, "Download failed with error {error:?}")
}
}
}
}
impl From<io::Error> for ApplicationDownloadError {
fn from(value: io::Error) -> Self {
ApplicationDownloadError::IoError(Arc::new(value))
}
}

View File

@ -0,0 +1,24 @@
use database::DownloadableMetadata;
use serde::Serialize;
use crate::download_manager_frontend::DownloadStatus;
#[derive(Serialize, Clone)]
pub struct QueueUpdateEventQueueData {
pub meta: DownloadableMetadata,
pub status: DownloadStatus,
pub progress: f64,
pub current: usize,
pub max: usize,
}
#[derive(Serialize, Clone)]
pub struct QueueUpdateEvent {
pub queue: Vec<QueueUpdateEventQueueData>,
}
#[derive(Serialize, Clone)]
pub struct StatsUpdateEvent {
pub speed: usize,
pub time: usize,
}

View File

@ -0,0 +1,44 @@
#![feature(duration_millis_float)]
#![feature(nonpoison_mutex)]
#![feature(sync_nonpoison)]
use std::{ops::Deref, sync::OnceLock};
use tauri::AppHandle;
use crate::{
download_manager_builder::DownloadManagerBuilder, download_manager_frontend::DownloadManager,
};
pub mod download_manager_builder;
pub mod download_manager_frontend;
pub mod downloadable;
pub mod error;
pub mod frontend_updates;
pub mod util;
pub static DOWNLOAD_MANAGER: DownloadManagerWrapper = DownloadManagerWrapper::new();
pub struct DownloadManagerWrapper(OnceLock<DownloadManager>);
impl DownloadManagerWrapper {
const fn new() -> Self {
DownloadManagerWrapper(OnceLock::new())
}
pub fn init(app_handle: AppHandle) {
DOWNLOAD_MANAGER
.0
.set(DownloadManagerBuilder::build(app_handle))
.expect("Failed to initialise download manager");
}
}
impl Deref for DownloadManagerWrapper {
type Target = DownloadManager;
fn deref(&self) -> &Self::Target {
match self.0.get() {
Some(download_manager) => download_manager,
None => unreachable!("Download manager should always be initialised"),
}
}
}

View File

@ -0,0 +1,50 @@
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
#[derive(PartialEq, Eq, PartialOrd, Ord)]
pub enum DownloadThreadControlFlag {
Stop,
Go,
}
/// Go => true
/// Stop => false
impl From<DownloadThreadControlFlag> for bool {
fn from(value: DownloadThreadControlFlag) -> Self {
match value {
DownloadThreadControlFlag::Go => true,
DownloadThreadControlFlag::Stop => false,
}
}
}
/// true => Go
/// false => Stop
impl From<bool> for DownloadThreadControlFlag {
fn from(value: bool) -> Self {
if value {
DownloadThreadControlFlag::Go
} else {
DownloadThreadControlFlag::Stop
}
}
}
#[derive(Clone)]
pub struct DownloadThreadControl {
inner: Arc<AtomicBool>,
}
impl DownloadThreadControl {
pub fn new(flag: DownloadThreadControlFlag) -> Self {
Self {
inner: Arc::new(AtomicBool::new(flag.into())),
}
}
pub fn get(&self) -> DownloadThreadControlFlag {
self.inner.load(Ordering::Acquire).into()
}
pub fn set(&self, flag: DownloadThreadControlFlag) {
self.inner.store(flag.into(), Ordering::Release);
}
}

View File

@ -0,0 +1,4 @@
pub mod download_thread_control_flag;
pub mod progress_object;
pub mod queue;
pub mod rolling_progress_updates;

View File

@ -0,0 +1,159 @@
use std::{
sync::{
Arc, Mutex,
atomic::{AtomicUsize, Ordering},
mpsc::Sender,
},
time::{Duration, Instant},
};
use atomic_instant_full::AtomicInstant;
use throttle_my_fn::throttle;
use utils::{lock, send};
use crate::download_manager_frontend::DownloadManagerSignal;
use super::rolling_progress_updates::RollingProgressWindow;
#[derive(Clone, Debug)]
pub struct ProgressObject {
max: Arc<Mutex<usize>>,
progress_instances: Arc<Mutex<Vec<Arc<AtomicUsize>>>>,
start: Arc<Mutex<Instant>>,
sender: Sender<DownloadManagerSignal>,
//last_update: Arc<RwLock<Instant>>,
last_update_time: Arc<AtomicInstant>,
bytes_last_update: Arc<AtomicUsize>,
rolling: RollingProgressWindow<1000>,
}
#[derive(Clone)]
pub struct ProgressHandle {
progress: Arc<AtomicUsize>,
progress_object: Arc<ProgressObject>,
}
impl ProgressHandle {
pub fn new(progress: Arc<AtomicUsize>, progress_object: Arc<ProgressObject>) -> Self {
Self {
progress,
progress_object,
}
}
pub fn set(&self, amount: usize) {
self.progress.store(amount, Ordering::Release);
}
pub fn add(&self, amount: usize) {
self.progress
.fetch_add(amount, std::sync::atomic::Ordering::AcqRel);
calculate_update(&self.progress_object);
}
pub fn skip(&self, amount: usize) {
self.progress
.fetch_add(amount, std::sync::atomic::Ordering::Acquire);
// Offset the bytes at last offset by this amount
self.progress_object
.bytes_last_update
.fetch_add(amount, Ordering::Acquire);
// Dont' fire update
}
}
impl ProgressObject {
pub fn new(max: usize, length: usize, sender: Sender<DownloadManagerSignal>) -> Self {
let arr = Mutex::new((0..length).map(|_| Arc::new(AtomicUsize::new(0))).collect());
Self {
max: Arc::new(Mutex::new(max)),
progress_instances: Arc::new(arr),
start: Arc::new(Mutex::new(Instant::now())),
sender,
last_update_time: Arc::new(AtomicInstant::now()),
bytes_last_update: Arc::new(AtomicUsize::new(0)),
rolling: RollingProgressWindow::new(),
}
}
pub fn set_time_now(&self) {
*lock!(self.start) = Instant::now();
}
pub fn sum(&self) -> usize {
lock!(self.progress_instances)
.iter()
.map(|instance| instance.load(Ordering::Acquire))
.sum()
}
pub fn reset(&self) {
self.set_time_now();
self.bytes_last_update.store(0, Ordering::Release);
self.rolling.reset();
lock!(self.progress_instances)
.iter()
.for_each(|x| x.store(0, Ordering::SeqCst));
}
pub fn get_max(&self) -> usize {
*lock!(self.max)
}
pub fn set_max(&self, new_max: usize) {
*lock!(self.max) = new_max;
}
pub fn set_size(&self, length: usize) {
*lock!(self.progress_instances) =
(0..length).map(|_| Arc::new(AtomicUsize::new(0))).collect();
}
pub fn get_progress(&self) -> f64 {
self.sum() as f64 / self.get_max() as f64
}
pub fn get(&self, index: usize) -> Arc<AtomicUsize> {
lock!(self.progress_instances)[index].clone()
}
fn update_window(&self, kilobytes_per_second: usize) {
self.rolling.update(kilobytes_per_second);
}
}
#[throttle(1, Duration::from_millis(20))]
pub fn calculate_update(progress: &ProgressObject) {
let last_update_time = progress
.last_update_time
.swap(Instant::now(), Ordering::SeqCst);
let time_since_last_update = Instant::now()
.duration_since(last_update_time)
.as_millis_f64();
let current_bytes_downloaded = progress.sum();
let max = progress.get_max();
let bytes_at_last_update = progress
.bytes_last_update
.swap(current_bytes_downloaded, Ordering::Acquire);
let bytes_since_last_update =
current_bytes_downloaded.saturating_sub(bytes_at_last_update) as f64;
let kilobytes_per_second = bytes_since_last_update / time_since_last_update;
let bytes_remaining = max.saturating_sub(current_bytes_downloaded); // bytes
progress.update_window(kilobytes_per_second as usize);
push_update(progress, bytes_remaining);
}
#[throttle(1, Duration::from_millis(250))]
pub fn push_update(progress: &ProgressObject, bytes_remaining: usize) {
let average_speed = progress.rolling.get_average();
let time_remaining = (bytes_remaining / 1000) / average_speed.max(1);
update_ui(progress, average_speed, time_remaining);
update_queue(progress);
}
fn update_ui(progress_object: &ProgressObject, kilobytes_per_second: usize, time_remaining: usize) {
send!(
progress_object.sender,
DownloadManagerSignal::UpdateUIStats(kilobytes_per_second, time_remaining)
);
}
fn update_queue(progress: &ProgressObject) {
send!(progress.sender, DownloadManagerSignal::UpdateUIQueue)
}

View File

@ -0,0 +1,45 @@
use std::{
collections::VecDeque,
sync::{Arc, Mutex, MutexGuard},
};
use database::DownloadableMetadata;
use utils::lock;
#[derive(Clone, Debug)]
pub struct Queue {
inner: Arc<Mutex<VecDeque<DownloadableMetadata>>>,
}
#[allow(dead_code)]
impl Default for Queue {
fn default() -> Self {
Self::new()
}
}
impl Queue {
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(VecDeque::new())),
}
}
pub fn read(&self) -> VecDeque<DownloadableMetadata> {
lock!(self.inner).clone()
}
pub fn edit(&self) -> MutexGuard<'_, VecDeque<DownloadableMetadata>> {
lock!(self.inner)
}
pub fn pop_front(&self) -> Option<DownloadableMetadata> {
self.edit().pop_front()
}
pub fn exists(&self, meta: DownloadableMetadata) -> bool {
self.read().contains(&meta)
}
pub fn append(&self, interface: DownloadableMetadata) {
self.edit().push_back(interface);
}
pub fn get_by_meta(&self, meta: &DownloadableMetadata) -> Option<usize> {
self.read().iter().position(|data| data == meta)
}
}

View File

@ -0,0 +1,49 @@
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};
#[derive(Clone, Debug)]
pub struct RollingProgressWindow<const S: usize> {
window: Arc<[AtomicUsize; S]>,
current: Arc<AtomicUsize>,
}
impl<const S: usize> Default for RollingProgressWindow<S> {
fn default() -> Self {
Self::new()
}
}
impl<const S: usize> RollingProgressWindow<S> {
pub fn new() -> Self {
Self {
window: Arc::new([(); S].map(|()| AtomicUsize::new(0))),
current: Arc::new(AtomicUsize::new(0)),
}
}
pub fn update(&self, kilobytes_per_second: usize) {
let index = self.current.fetch_add(1, Ordering::SeqCst);
let current = &self.window[index % S];
current.store(kilobytes_per_second, Ordering::SeqCst);
}
pub fn get_average(&self) -> usize {
let current = self.current.load(Ordering::SeqCst);
let valid = self
.window
.iter()
.enumerate()
.filter(|(i, _)| i < &current)
.map(|(_, x)| x.load(Ordering::Acquire))
.collect::<Vec<usize>>();
let amount = valid.len();
let sum = valid.into_iter().sum::<usize>();
sum / amount
}
pub fn reset(&self) {
self.window
.iter()
.for_each(|x| x.store(0, Ordering::Release));
self.current.store(0, Ordering::Release);
}
}