mirror of
https://github.com/Drop-OSS/drop-app.git
synced 2025-11-24 05:31:41 +10:00
refactor: into rust workspaces
This commit is contained in:
16
src-tauri/drop-downloads/Cargo.toml
Normal file
16
src-tauri/drop-downloads/Cargo.toml
Normal file
@ -0,0 +1,16 @@
|
||||
[package]
|
||||
name = "drop-downloads"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
atomic-instant-full = "0.1.0"
|
||||
drop-database = { path = "../drop-database" }
|
||||
drop-errors = { path = "../drop-errors" }
|
||||
# can't depend, cycle
|
||||
# drop-native-library = { path = "../drop-native-library" }
|
||||
log = "0.4.22"
|
||||
parking_lot = "0.12.4"
|
||||
serde = "1.0.219"
|
||||
tauri = { version = "2.7.0" }
|
||||
throttle_my_fn = "0.2.6"
|
||||
362
src-tauri/drop-downloads/src/download_manager_builder.rs
Normal file
362
src-tauri/drop-downloads/src/download_manager_builder.rs
Normal file
@ -0,0 +1,362 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{
|
||||
Arc, Mutex,
|
||||
mpsc::{Receiver, Sender, channel},
|
||||
},
|
||||
thread::{JoinHandle, spawn},
|
||||
};
|
||||
|
||||
use drop_database::models::data::DownloadableMetadata;
|
||||
use drop_errors::application_download_error::ApplicationDownloadError;
|
||||
use log::{debug, error, info, warn};
|
||||
use tauri::{AppHandle, Emitter};
|
||||
|
||||
use crate::{
|
||||
download_manager_frontend::DownloadStatus, events::{QueueUpdateEvent, QueueUpdateEventQueueData, StatsUpdateEvent}
|
||||
};
|
||||
|
||||
use super::{
|
||||
download_manager_frontend::{DownloadManager, DownloadManagerSignal, DownloadManagerStatus},
|
||||
downloadable::Downloadable,
|
||||
util::{
|
||||
download_thread_control_flag::{DownloadThreadControl, DownloadThreadControlFlag},
|
||||
progress_object::ProgressObject,
|
||||
queue::Queue,
|
||||
},
|
||||
};
|
||||
|
||||
pub type DownloadAgent = Arc<Box<dyn Downloadable + Send + Sync>>;
|
||||
pub type CurrentProgressObject = Arc<Mutex<Option<Arc<ProgressObject>>>>;
|
||||
|
||||
pub struct DownloadManagerBuilder {
|
||||
download_agent_registry: HashMap<DownloadableMetadata, DownloadAgent>,
|
||||
download_queue: Queue,
|
||||
command_receiver: Receiver<DownloadManagerSignal>,
|
||||
sender: Sender<DownloadManagerSignal>,
|
||||
progress: CurrentProgressObject,
|
||||
status: Arc<Mutex<DownloadManagerStatus>>,
|
||||
app_handle: AppHandle,
|
||||
|
||||
current_download_thread: Mutex<Option<JoinHandle<()>>>,
|
||||
active_control_flag: Option<DownloadThreadControl>,
|
||||
}
|
||||
impl DownloadManagerBuilder {
|
||||
pub fn build(app_handle: AppHandle) -> DownloadManager {
|
||||
let queue = Queue::new();
|
||||
let (command_sender, command_receiver) = channel();
|
||||
let active_progress = Arc::new(Mutex::new(None));
|
||||
let status = Arc::new(Mutex::new(DownloadManagerStatus::Empty));
|
||||
|
||||
let manager = Self {
|
||||
download_agent_registry: HashMap::new(),
|
||||
download_queue: queue.clone(),
|
||||
command_receiver,
|
||||
status: status.clone(),
|
||||
sender: command_sender.clone(),
|
||||
progress: active_progress.clone(),
|
||||
app_handle,
|
||||
|
||||
current_download_thread: Mutex::new(None),
|
||||
active_control_flag: None,
|
||||
};
|
||||
|
||||
let terminator = spawn(|| manager.manage_queue());
|
||||
|
||||
DownloadManager::new(terminator, queue, active_progress, command_sender)
|
||||
}
|
||||
|
||||
fn set_status(&self, status: DownloadManagerStatus) {
|
||||
*self.status.lock().unwrap() = status;
|
||||
}
|
||||
|
||||
fn remove_and_cleanup_front_download(&mut self, meta: &DownloadableMetadata) -> DownloadAgent {
|
||||
self.download_queue.pop_front();
|
||||
let download_agent = self.download_agent_registry.remove(meta).unwrap();
|
||||
self.cleanup_current_download();
|
||||
download_agent
|
||||
}
|
||||
|
||||
// CAREFUL WITH THIS FUNCTION
|
||||
// Make sure the download thread is terminated
|
||||
fn cleanup_current_download(&mut self) {
|
||||
self.active_control_flag = None;
|
||||
*self.progress.lock().unwrap() = None;
|
||||
|
||||
let mut download_thread_lock = self.current_download_thread.lock().unwrap();
|
||||
|
||||
if let Some(unfinished_thread) = download_thread_lock.take()
|
||||
&& !unfinished_thread.is_finished()
|
||||
{
|
||||
unfinished_thread.join().unwrap();
|
||||
}
|
||||
drop(download_thread_lock);
|
||||
}
|
||||
|
||||
fn stop_and_wait_current_download(&self) -> bool {
|
||||
self.set_status(DownloadManagerStatus::Paused);
|
||||
if let Some(current_flag) = &self.active_control_flag {
|
||||
current_flag.set(DownloadThreadControlFlag::Stop);
|
||||
}
|
||||
|
||||
let mut download_thread_lock = self.current_download_thread.lock().unwrap();
|
||||
if let Some(current_download_thread) = download_thread_lock.take() {
|
||||
return current_download_thread.join().is_ok();
|
||||
};
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
fn manage_queue(mut self) -> Result<(), ()> {
|
||||
loop {
|
||||
let signal = match self.command_receiver.recv() {
|
||||
Ok(signal) => signal,
|
||||
Err(_) => return Err(()),
|
||||
};
|
||||
|
||||
match signal {
|
||||
DownloadManagerSignal::Go => {
|
||||
self.manage_go_signal();
|
||||
}
|
||||
DownloadManagerSignal::Stop => {
|
||||
self.manage_stop_signal();
|
||||
}
|
||||
DownloadManagerSignal::Completed(meta) => {
|
||||
self.manage_completed_signal(meta);
|
||||
}
|
||||
DownloadManagerSignal::Queue(download_agent) => {
|
||||
self.manage_queue_signal(download_agent);
|
||||
}
|
||||
DownloadManagerSignal::Error(e) => {
|
||||
self.manage_error_signal(e);
|
||||
}
|
||||
DownloadManagerSignal::UpdateUIQueue => {
|
||||
self.push_ui_queue_update();
|
||||
}
|
||||
DownloadManagerSignal::UpdateUIStats(kbs, time) => {
|
||||
self.push_ui_stats_update(kbs, time);
|
||||
}
|
||||
DownloadManagerSignal::Finish => {
|
||||
self.stop_and_wait_current_download();
|
||||
return Ok(());
|
||||
}
|
||||
DownloadManagerSignal::Cancel(meta) => {
|
||||
self.manage_cancel_signal(&meta);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
fn manage_queue_signal(&mut self, download_agent: DownloadAgent) {
|
||||
debug!("got signal Queue");
|
||||
let meta = download_agent.metadata();
|
||||
|
||||
debug!("queue metadata: {meta:?}");
|
||||
|
||||
if self.download_queue.exists(meta.clone()) {
|
||||
warn!("download with same ID already exists");
|
||||
return;
|
||||
}
|
||||
|
||||
download_agent.on_queued(&self.app_handle);
|
||||
self.download_queue.append(meta.clone());
|
||||
self.download_agent_registry.insert(meta, download_agent);
|
||||
|
||||
self.sender
|
||||
.send(DownloadManagerSignal::UpdateUIQueue)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn manage_go_signal(&mut self) {
|
||||
debug!("got signal Go");
|
||||
if self.download_agent_registry.is_empty() {
|
||||
debug!(
|
||||
"Download agent registry: {:?}",
|
||||
self.download_agent_registry.len()
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
debug!("current download queue: {:?}", self.download_queue.read());
|
||||
|
||||
let agent_data = if let Some(agent_data) = self.download_queue.read().front() {
|
||||
agent_data.clone()
|
||||
} else {
|
||||
return;
|
||||
};
|
||||
|
||||
let download_agent = self
|
||||
.download_agent_registry
|
||||
.get(&agent_data)
|
||||
.unwrap()
|
||||
.clone();
|
||||
|
||||
let status = download_agent.status();
|
||||
|
||||
// This download is already going
|
||||
if status != DownloadStatus::Queued {
|
||||
return;
|
||||
}
|
||||
|
||||
// Ensure all others are marked as queued
|
||||
for agent in self.download_agent_registry.values() {
|
||||
if agent.metadata() != agent_data && agent.status() != DownloadStatus::Queued {
|
||||
agent.on_queued(&self.app_handle);
|
||||
}
|
||||
}
|
||||
|
||||
info!("starting download for {agent_data:?}");
|
||||
self.active_control_flag = Some(download_agent.control_flag());
|
||||
|
||||
let sender = self.sender.clone();
|
||||
|
||||
let mut download_thread_lock = self.current_download_thread.lock().unwrap();
|
||||
let app_handle = self.app_handle.clone();
|
||||
|
||||
*download_thread_lock = Some(spawn(move || {
|
||||
loop {
|
||||
let download_result = match download_agent.download(&app_handle) {
|
||||
// Ok(true) is for completed and exited properly
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
error!("download {:?} has error {}", download_agent.metadata(), &e);
|
||||
download_agent.on_error(&app_handle, &e);
|
||||
sender.send(DownloadManagerSignal::Error(e)).unwrap();
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// If the download gets canceled
|
||||
// immediately return, on_cancelled gets called for us earlier
|
||||
if !download_result {
|
||||
return;
|
||||
}
|
||||
|
||||
if download_agent.control_flag().get() == DownloadThreadControlFlag::Stop {
|
||||
return;
|
||||
}
|
||||
|
||||
let validate_result = match download_agent.validate(&app_handle) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
error!(
|
||||
"download {:?} has validation error {}",
|
||||
download_agent.metadata(),
|
||||
&e
|
||||
);
|
||||
download_agent.on_error(&app_handle, &e);
|
||||
sender.send(DownloadManagerSignal::Error(e)).unwrap();
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if download_agent.control_flag().get() == DownloadThreadControlFlag::Stop {
|
||||
return;
|
||||
}
|
||||
|
||||
if validate_result {
|
||||
download_agent.on_complete(&app_handle);
|
||||
sender
|
||||
.send(DownloadManagerSignal::Completed(download_agent.metadata()))
|
||||
.unwrap();
|
||||
sender.send(DownloadManagerSignal::UpdateUIQueue).unwrap();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}));
|
||||
|
||||
self.set_status(DownloadManagerStatus::Downloading);
|
||||
let active_control_flag = self.active_control_flag.clone().unwrap();
|
||||
active_control_flag.set(DownloadThreadControlFlag::Go);
|
||||
}
|
||||
fn manage_stop_signal(&mut self) {
|
||||
debug!("got signal Stop");
|
||||
|
||||
if let Some(active_control_flag) = self.active_control_flag.clone() {
|
||||
self.set_status(DownloadManagerStatus::Paused);
|
||||
active_control_flag.set(DownloadThreadControlFlag::Stop);
|
||||
}
|
||||
}
|
||||
fn manage_completed_signal(&mut self, meta: DownloadableMetadata) {
|
||||
debug!("got signal Completed");
|
||||
if let Some(interface) = self.download_queue.read().front()
|
||||
&& interface == &meta
|
||||
{
|
||||
self.remove_and_cleanup_front_download(&meta);
|
||||
}
|
||||
|
||||
self.push_ui_queue_update();
|
||||
self.sender.send(DownloadManagerSignal::Go).unwrap();
|
||||
}
|
||||
fn manage_error_signal(&mut self, error: ApplicationDownloadError) {
|
||||
debug!("got signal Error");
|
||||
if let Some(metadata) = self.download_queue.read().front()
|
||||
&& let Some(current_agent) = self.download_agent_registry.get(metadata)
|
||||
{
|
||||
current_agent.on_error(&self.app_handle, &error);
|
||||
|
||||
self.stop_and_wait_current_download();
|
||||
self.remove_and_cleanup_front_download(metadata);
|
||||
}
|
||||
self.push_ui_queue_update();
|
||||
self.set_status(DownloadManagerStatus::Error);
|
||||
}
|
||||
fn manage_cancel_signal(&mut self, meta: &DownloadableMetadata) {
|
||||
debug!("got signal Cancel");
|
||||
|
||||
// If the current download is the one we're tryna cancel
|
||||
if let Some(current_metadata) = self.download_queue.read().front()
|
||||
&& current_metadata == meta
|
||||
&& let Some(current_download) = self.download_agent_registry.get(current_metadata)
|
||||
{
|
||||
self.set_status(DownloadManagerStatus::Paused);
|
||||
current_download.on_cancelled(&self.app_handle);
|
||||
self.stop_and_wait_current_download();
|
||||
|
||||
self.download_queue.pop_front();
|
||||
|
||||
self.cleanup_current_download();
|
||||
self.download_agent_registry.remove(meta);
|
||||
debug!("current download queue: {:?}", self.download_queue.read());
|
||||
}
|
||||
// else just cancel it
|
||||
else if let Some(download_agent) = self.download_agent_registry.get(meta) {
|
||||
let index = self.download_queue.get_by_meta(meta);
|
||||
if let Some(index) = index {
|
||||
download_agent.on_cancelled(&self.app_handle);
|
||||
let _ = self.download_queue.edit().remove(index).unwrap();
|
||||
let removed = self.download_agent_registry.remove(meta);
|
||||
debug!(
|
||||
"removed {:?} from queue {:?}",
|
||||
removed.map(|x| x.metadata()),
|
||||
self.download_queue.read()
|
||||
);
|
||||
}
|
||||
}
|
||||
self.sender.send(DownloadManagerSignal::Go).unwrap();
|
||||
self.push_ui_queue_update();
|
||||
}
|
||||
fn push_ui_stats_update(&self, kbs: usize, time: usize) {
|
||||
let event_data = StatsUpdateEvent { speed: kbs, time };
|
||||
|
||||
self.app_handle.emit("update_stats", event_data).unwrap();
|
||||
}
|
||||
fn push_ui_queue_update(&self) {
|
||||
let queue = &self.download_queue.read();
|
||||
let queue_objs = queue
|
||||
.iter()
|
||||
.map(|key| {
|
||||
let val = self.download_agent_registry.get(key).unwrap();
|
||||
QueueUpdateEventQueueData {
|
||||
meta: DownloadableMetadata::clone(key),
|
||||
status: val.status(),
|
||||
progress: val.progress().get_progress(),
|
||||
current: val.progress().sum(),
|
||||
max: val.progress().get_max(),
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
let event_data = QueueUpdateEvent { queue: queue_objs };
|
||||
self.app_handle.emit("update_queue", event_data).unwrap();
|
||||
}
|
||||
}
|
||||
192
src-tauri/drop-downloads/src/download_manager_frontend.rs
Normal file
192
src-tauri/drop-downloads/src/download_manager_frontend.rs
Normal file
@ -0,0 +1,192 @@
|
||||
use std::{
|
||||
any::Any,
|
||||
collections::VecDeque,
|
||||
fmt::Debug,
|
||||
sync::{
|
||||
mpsc::{SendError, Sender},
|
||||
Mutex, MutexGuard,
|
||||
},
|
||||
thread::JoinHandle,
|
||||
};
|
||||
|
||||
use drop_database::models::data::DownloadableMetadata;
|
||||
use drop_errors::application_download_error::ApplicationDownloadError;
|
||||
use log::{debug, info};
|
||||
use serde::Serialize;
|
||||
|
||||
use super::{
|
||||
download_manager_builder::{CurrentProgressObject, DownloadAgent},
|
||||
util::queue::Queue,
|
||||
};
|
||||
|
||||
pub enum DownloadManagerSignal {
|
||||
/// Resumes (or starts) the `DownloadManager`
|
||||
Go,
|
||||
/// Pauses the `DownloadManager`
|
||||
Stop,
|
||||
/// Called when a `DownloadAgent` has fully completed a download.
|
||||
Completed(DownloadableMetadata),
|
||||
/// Generates and appends a `DownloadAgent`
|
||||
/// to the registry and queue
|
||||
Queue(DownloadAgent),
|
||||
/// Tells the Manager to stop the current
|
||||
/// download, sync everything to disk, and
|
||||
/// then exit
|
||||
Finish,
|
||||
/// Stops, removes, and tells a download to cleanup
|
||||
Cancel(DownloadableMetadata),
|
||||
/// Any error which occurs in the agent
|
||||
Error(ApplicationDownloadError),
|
||||
/// Pushes UI update
|
||||
UpdateUIQueue,
|
||||
UpdateUIStats(usize, usize), //kb/s and seconds
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum DownloadManagerStatus {
|
||||
Downloading,
|
||||
Paused,
|
||||
Empty,
|
||||
Error,
|
||||
}
|
||||
|
||||
impl Serialize for DownloadManagerStatus {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
serializer.serialize_str(&format!["{self:?}"])
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Clone, Debug, PartialEq)]
|
||||
pub enum DownloadStatus {
|
||||
Queued,
|
||||
Downloading,
|
||||
Validating,
|
||||
Error,
|
||||
}
|
||||
|
||||
/// Accessible front-end for the `DownloadManager`
|
||||
///
|
||||
/// The system works entirely through signals, both internally and externally,
|
||||
/// all of which are accessible through the `DownloadManagerSignal` type, but
|
||||
/// should not be used directly. Rather, signals are abstracted through this
|
||||
/// interface.
|
||||
///
|
||||
/// The actual download queue may be accessed through the .`edit()` function,
|
||||
/// which provides raw access to the underlying queue.
|
||||
/// THIS EDITING IS BLOCKING!!!
|
||||
pub struct DownloadManager {
|
||||
terminator: Mutex<Option<JoinHandle<Result<(), ()>>>>,
|
||||
download_queue: Queue,
|
||||
progress: CurrentProgressObject,
|
||||
command_sender: Sender<DownloadManagerSignal>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl DownloadManager {
|
||||
pub fn new(
|
||||
terminator: JoinHandle<Result<(), ()>>,
|
||||
download_queue: Queue,
|
||||
progress: CurrentProgressObject,
|
||||
command_sender: Sender<DownloadManagerSignal>,
|
||||
) -> Self {
|
||||
Self {
|
||||
terminator: Mutex::new(Some(terminator)),
|
||||
download_queue,
|
||||
progress,
|
||||
command_sender,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn queue_download(
|
||||
&self,
|
||||
download: DownloadAgent,
|
||||
) -> Result<(), SendError<DownloadManagerSignal>> {
|
||||
info!("creating download with meta {:?}", download.metadata());
|
||||
self.command_sender
|
||||
.send(DownloadManagerSignal::Queue(download))?;
|
||||
self.command_sender.send(DownloadManagerSignal::Go)
|
||||
}
|
||||
pub fn edit(&self) -> MutexGuard<'_, VecDeque<DownloadableMetadata>> {
|
||||
self.download_queue.edit()
|
||||
}
|
||||
pub fn read_queue(&self) -> VecDeque<DownloadableMetadata> {
|
||||
self.download_queue.read()
|
||||
}
|
||||
pub fn get_current_download_progress(&self) -> Option<f64> {
|
||||
let progress_object = (*self.progress.lock().unwrap()).clone()?;
|
||||
Some(progress_object.get_progress())
|
||||
}
|
||||
pub fn rearrange_string(&self, meta: &DownloadableMetadata, new_index: usize) {
|
||||
let mut queue = self.edit();
|
||||
let current_index = get_index_from_id(&mut queue, meta).unwrap();
|
||||
let to_move = queue.remove(current_index).unwrap();
|
||||
queue.insert(new_index, to_move);
|
||||
self.command_sender
|
||||
.send(DownloadManagerSignal::UpdateUIQueue)
|
||||
.unwrap();
|
||||
}
|
||||
pub fn cancel(&self, meta: DownloadableMetadata) {
|
||||
self.command_sender
|
||||
.send(DownloadManagerSignal::Cancel(meta))
|
||||
.unwrap();
|
||||
}
|
||||
pub fn rearrange(&self, current_index: usize, new_index: usize) {
|
||||
if current_index == new_index {
|
||||
return;
|
||||
}
|
||||
|
||||
let needs_pause = current_index == 0 || new_index == 0;
|
||||
if needs_pause {
|
||||
self.command_sender
|
||||
.send(DownloadManagerSignal::Stop)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
debug!("moving download at index {current_index} to index {new_index}");
|
||||
|
||||
let mut queue = self.edit();
|
||||
let to_move = queue.remove(current_index).unwrap();
|
||||
queue.insert(new_index, to_move);
|
||||
drop(queue);
|
||||
|
||||
if needs_pause {
|
||||
self.command_sender.send(DownloadManagerSignal::Go).unwrap();
|
||||
}
|
||||
self.command_sender
|
||||
.send(DownloadManagerSignal::UpdateUIQueue)
|
||||
.unwrap();
|
||||
self.command_sender.send(DownloadManagerSignal::Go).unwrap();
|
||||
}
|
||||
pub fn pause_downloads(&self) {
|
||||
self.command_sender
|
||||
.send(DownloadManagerSignal::Stop)
|
||||
.unwrap();
|
||||
}
|
||||
pub fn resume_downloads(&self) {
|
||||
self.command_sender.send(DownloadManagerSignal::Go).unwrap();
|
||||
}
|
||||
pub fn ensure_terminated(&self) -> Result<Result<(), ()>, Box<dyn Any + Send>> {
|
||||
self.command_sender
|
||||
.send(DownloadManagerSignal::Finish)
|
||||
.unwrap();
|
||||
let terminator = self.terminator.lock().unwrap().take();
|
||||
terminator.unwrap().join()
|
||||
}
|
||||
pub fn get_sender(&self) -> Sender<DownloadManagerSignal> {
|
||||
self.command_sender.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// Takes in the locked value from .`edit()` and attempts to
|
||||
/// get the index of whatever id is passed in
|
||||
fn get_index_from_id(
|
||||
queue: &mut MutexGuard<'_, VecDeque<DownloadableMetadata>>,
|
||||
meta: &DownloadableMetadata,
|
||||
) -> Option<usize> {
|
||||
queue
|
||||
.iter()
|
||||
.position(|download_agent| download_agent == meta)
|
||||
}
|
||||
30
src-tauri/drop-downloads/src/downloadable.rs
Normal file
30
src-tauri/drop-downloads/src/downloadable.rs
Normal file
@ -0,0 +1,30 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use drop_database::models::data::DownloadableMetadata;
|
||||
use drop_errors::application_download_error::ApplicationDownloadError;
|
||||
use tauri::AppHandle;
|
||||
|
||||
use super::{
|
||||
download_manager_frontend::DownloadStatus,
|
||||
util::{download_thread_control_flag::DownloadThreadControl, progress_object::ProgressObject},
|
||||
};
|
||||
|
||||
/**
|
||||
* Downloadables are responsible for managing their specific object's download state
|
||||
* e.g, the GameDownloadAgent is responsible for pushing game updates
|
||||
*
|
||||
* But the download manager manages the queue state
|
||||
*/
|
||||
pub trait Downloadable: Send + Sync {
|
||||
fn download(&self, app_handle: &AppHandle) -> Result<bool, ApplicationDownloadError>;
|
||||
fn validate(&self, app_handle: &AppHandle) -> Result<bool, ApplicationDownloadError>;
|
||||
|
||||
fn progress(&self) -> Arc<ProgressObject>;
|
||||
fn control_flag(&self) -> DownloadThreadControl;
|
||||
fn status(&self) -> DownloadStatus;
|
||||
fn metadata(&self) -> DownloadableMetadata;
|
||||
fn on_queued(&self, app_handle: &AppHandle);
|
||||
fn on_error(&self, app_handle: &AppHandle, error: &ApplicationDownloadError);
|
||||
fn on_complete(&self, app_handle: &AppHandle);
|
||||
fn on_cancelled(&self, app_handle: &AppHandle);
|
||||
}
|
||||
24
src-tauri/drop-downloads/src/events.rs
Normal file
24
src-tauri/drop-downloads/src/events.rs
Normal file
@ -0,0 +1,24 @@
|
||||
use drop_database::models::data::DownloadableMetadata;
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::download_manager_frontend::DownloadStatus;
|
||||
|
||||
#[derive(Serialize, Clone)]
|
||||
pub struct QueueUpdateEventQueueData {
|
||||
pub meta: DownloadableMetadata,
|
||||
pub status: DownloadStatus,
|
||||
pub progress: f64,
|
||||
pub current: usize,
|
||||
pub max: usize,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Clone)]
|
||||
pub struct QueueUpdateEvent {
|
||||
pub queue: Vec<QueueUpdateEventQueueData>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Clone)]
|
||||
pub struct StatsUpdateEvent {
|
||||
pub speed: usize,
|
||||
pub time: usize,
|
||||
}
|
||||
7
src-tauri/drop-downloads/src/lib.rs
Normal file
7
src-tauri/drop-downloads/src/lib.rs
Normal file
@ -0,0 +1,7 @@
|
||||
#![feature(duration_millis_float)]
|
||||
|
||||
pub mod download_manager_builder;
|
||||
pub mod download_manager_frontend;
|
||||
pub mod downloadable;
|
||||
pub mod events;
|
||||
pub mod util;
|
||||
@ -0,0 +1,46 @@
|
||||
use std::sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
};
|
||||
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub enum DownloadThreadControlFlag {
|
||||
Stop,
|
||||
Go,
|
||||
}
|
||||
/// Go => true
|
||||
/// Stop => false
|
||||
impl From<DownloadThreadControlFlag> for bool {
|
||||
fn from(value: DownloadThreadControlFlag) -> Self {
|
||||
match value {
|
||||
DownloadThreadControlFlag::Go => true,
|
||||
DownloadThreadControlFlag::Stop => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
/// true => Go
|
||||
/// false => Stop
|
||||
impl From<bool> for DownloadThreadControlFlag {
|
||||
fn from(value: bool) -> Self {
|
||||
if value { DownloadThreadControlFlag::Go } else { DownloadThreadControlFlag::Stop }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DownloadThreadControl {
|
||||
inner: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl DownloadThreadControl {
|
||||
pub fn new(flag: DownloadThreadControlFlag) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(AtomicBool::new(flag.into())),
|
||||
}
|
||||
}
|
||||
pub fn get(&self) -> DownloadThreadControlFlag {
|
||||
self.inner.load(Ordering::Acquire).into()
|
||||
}
|
||||
pub fn set(&self, flag: DownloadThreadControlFlag) {
|
||||
self.inner.store(flag.into(), Ordering::Release);
|
||||
}
|
||||
}
|
||||
4
src-tauri/drop-downloads/src/util/mod.rs
Normal file
4
src-tauri/drop-downloads/src/util/mod.rs
Normal file
@ -0,0 +1,4 @@
|
||||
pub mod download_thread_control_flag;
|
||||
pub mod progress_object;
|
||||
pub mod queue;
|
||||
pub mod rolling_progress_updates;
|
||||
165
src-tauri/drop-downloads/src/util/progress_object.rs
Normal file
165
src-tauri/drop-downloads/src/util/progress_object.rs
Normal file
@ -0,0 +1,165 @@
|
||||
use std::{
|
||||
sync::{
|
||||
Arc, Mutex,
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
mpsc::Sender,
|
||||
},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use atomic_instant_full::AtomicInstant;
|
||||
use throttle_my_fn::throttle;
|
||||
|
||||
use crate::download_manager_frontend::DownloadManagerSignal;
|
||||
|
||||
use super::rolling_progress_updates::RollingProgressWindow;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ProgressObject {
|
||||
max: Arc<Mutex<usize>>,
|
||||
progress_instances: Arc<Mutex<Vec<Arc<AtomicUsize>>>>,
|
||||
start: Arc<Mutex<Instant>>,
|
||||
sender: Sender<DownloadManagerSignal>,
|
||||
//last_update: Arc<RwLock<Instant>>,
|
||||
last_update_time: Arc<AtomicInstant>,
|
||||
bytes_last_update: Arc<AtomicUsize>,
|
||||
rolling: RollingProgressWindow<1000>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ProgressHandle {
|
||||
progress: Arc<AtomicUsize>,
|
||||
progress_object: Arc<ProgressObject>,
|
||||
}
|
||||
|
||||
impl ProgressHandle {
|
||||
pub fn new(progress: Arc<AtomicUsize>, progress_object: Arc<ProgressObject>) -> Self {
|
||||
Self {
|
||||
progress,
|
||||
progress_object,
|
||||
}
|
||||
}
|
||||
pub fn set(&self, amount: usize) {
|
||||
self.progress.store(amount, Ordering::Release);
|
||||
}
|
||||
pub fn add(&self, amount: usize) {
|
||||
self.progress
|
||||
.fetch_add(amount, std::sync::atomic::Ordering::AcqRel);
|
||||
calculate_update(&self.progress_object);
|
||||
}
|
||||
pub fn skip(&self, amount: usize) {
|
||||
self.progress
|
||||
.fetch_add(amount, std::sync::atomic::Ordering::Acquire);
|
||||
// Offset the bytes at last offset by this amount
|
||||
self.progress_object
|
||||
.bytes_last_update
|
||||
.fetch_add(amount, Ordering::Acquire);
|
||||
// Dont' fire update
|
||||
}
|
||||
}
|
||||
|
||||
impl ProgressObject {
|
||||
pub fn new(max: usize, length: usize, sender: Sender<DownloadManagerSignal>) -> Self {
|
||||
let arr = Mutex::new((0..length).map(|_| Arc::new(AtomicUsize::new(0))).collect());
|
||||
Self {
|
||||
max: Arc::new(Mutex::new(max)),
|
||||
progress_instances: Arc::new(arr),
|
||||
start: Arc::new(Mutex::new(Instant::now())),
|
||||
sender,
|
||||
|
||||
last_update_time: Arc::new(AtomicInstant::now()),
|
||||
bytes_last_update: Arc::new(AtomicUsize::new(0)),
|
||||
rolling: RollingProgressWindow::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_time_now(&self) {
|
||||
*self.start.lock().unwrap() = Instant::now();
|
||||
}
|
||||
pub fn sum(&self) -> usize {
|
||||
self.progress_instances
|
||||
.lock()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|instance| instance.load(Ordering::Acquire))
|
||||
.sum()
|
||||
}
|
||||
pub fn reset(&self) {
|
||||
self.set_time_now();
|
||||
self.bytes_last_update.store(0, Ordering::Release);
|
||||
self.rolling.reset();
|
||||
self.progress_instances
|
||||
.lock()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.for_each(|x| x.store(0, Ordering::SeqCst));
|
||||
}
|
||||
pub fn get_max(&self) -> usize {
|
||||
*self.max.lock().unwrap()
|
||||
}
|
||||
pub fn set_max(&self, new_max: usize) {
|
||||
*self.max.lock().unwrap() = new_max;
|
||||
}
|
||||
pub fn set_size(&self, length: usize) {
|
||||
*self.progress_instances.lock().unwrap() =
|
||||
(0..length).map(|_| Arc::new(AtomicUsize::new(0))).collect();
|
||||
}
|
||||
pub fn get_progress(&self) -> f64 {
|
||||
self.sum() as f64 / self.get_max() as f64
|
||||
}
|
||||
pub fn get(&self, index: usize) -> Arc<AtomicUsize> {
|
||||
self.progress_instances.lock().unwrap()[index].clone()
|
||||
}
|
||||
fn update_window(&self, kilobytes_per_second: usize) {
|
||||
self.rolling.update(kilobytes_per_second);
|
||||
}
|
||||
}
|
||||
|
||||
#[throttle(1, Duration::from_millis(20))]
|
||||
pub fn calculate_update(progress: &ProgressObject) {
|
||||
let last_update_time = progress
|
||||
.last_update_time
|
||||
.swap(Instant::now(), Ordering::SeqCst);
|
||||
let time_since_last_update = Instant::now().duration_since(last_update_time).as_millis_f64();
|
||||
|
||||
let current_bytes_downloaded = progress.sum();
|
||||
let max = progress.get_max();
|
||||
let bytes_at_last_update = progress
|
||||
.bytes_last_update
|
||||
.swap(current_bytes_downloaded, Ordering::Acquire);
|
||||
|
||||
let bytes_since_last_update = current_bytes_downloaded.saturating_sub(bytes_at_last_update) as f64;
|
||||
|
||||
let kilobytes_per_second = bytes_since_last_update / time_since_last_update;
|
||||
|
||||
let bytes_remaining = max.saturating_sub(current_bytes_downloaded); // bytes
|
||||
|
||||
progress.update_window(kilobytes_per_second as usize);
|
||||
push_update(progress, bytes_remaining);
|
||||
}
|
||||
|
||||
#[throttle(1, Duration::from_millis(250))]
|
||||
pub fn push_update(progress: &ProgressObject, bytes_remaining: usize) {
|
||||
let average_speed = progress.rolling.get_average();
|
||||
let time_remaining = (bytes_remaining / 1000) / average_speed.max(1);
|
||||
|
||||
update_ui(progress, average_speed, time_remaining);
|
||||
update_queue(progress);
|
||||
}
|
||||
|
||||
fn update_ui(progress_object: &ProgressObject, kilobytes_per_second: usize, time_remaining: usize) {
|
||||
progress_object
|
||||
.sender
|
||||
.send(DownloadManagerSignal::UpdateUIStats(
|
||||
kilobytes_per_second,
|
||||
time_remaining,
|
||||
))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn update_queue(progress: &ProgressObject) {
|
||||
progress
|
||||
.sender
|
||||
.send(DownloadManagerSignal::UpdateUIQueue)
|
||||
.unwrap();
|
||||
}
|
||||
44
src-tauri/drop-downloads/src/util/queue.rs
Normal file
44
src-tauri/drop-downloads/src/util/queue.rs
Normal file
@ -0,0 +1,44 @@
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
sync::{Arc, Mutex, MutexGuard},
|
||||
};
|
||||
|
||||
use drop_database::models::data::DownloadableMetadata;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Queue {
|
||||
inner: Arc<Mutex<VecDeque<DownloadableMetadata>>>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl Default for Queue {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl Queue {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
inner: Arc::new(Mutex::new(VecDeque::new())),
|
||||
}
|
||||
}
|
||||
pub fn read(&self) -> VecDeque<DownloadableMetadata> {
|
||||
self.inner.lock().unwrap().clone()
|
||||
}
|
||||
pub fn edit(&self) -> MutexGuard<'_, VecDeque<DownloadableMetadata>> {
|
||||
self.inner.lock().unwrap()
|
||||
}
|
||||
pub fn pop_front(&self) -> Option<DownloadableMetadata> {
|
||||
self.edit().pop_front()
|
||||
}
|
||||
pub fn exists(&self, meta: DownloadableMetadata) -> bool {
|
||||
self.read().contains(&meta)
|
||||
}
|
||||
pub fn append(&self, interface: DownloadableMetadata) {
|
||||
self.edit().push_back(interface);
|
||||
}
|
||||
pub fn get_by_meta(&self, meta: &DownloadableMetadata) -> Option<usize> {
|
||||
self.read().iter().position(|data| data == meta)
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,43 @@
|
||||
use std::sync::{
|
||||
Arc,
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RollingProgressWindow<const S: usize> {
|
||||
window: Arc<[AtomicUsize; S]>,
|
||||
current: Arc<AtomicUsize>,
|
||||
}
|
||||
impl<const S: usize> RollingProgressWindow<S> {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
window: Arc::new([(); S].map(|()| AtomicUsize::new(0))),
|
||||
current: Arc::new(AtomicUsize::new(0)),
|
||||
}
|
||||
}
|
||||
pub fn update(&self, kilobytes_per_second: usize) {
|
||||
let index = self.current.fetch_add(1, Ordering::SeqCst);
|
||||
let current = &self.window[index % S];
|
||||
current.store(kilobytes_per_second, Ordering::SeqCst);
|
||||
}
|
||||
pub fn get_average(&self) -> usize {
|
||||
let current = self.current.load(Ordering::SeqCst);
|
||||
let valid = self
|
||||
.window
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter(|(i, _)| i < ¤t)
|
||||
.map(|(_, x)| x.load(Ordering::Acquire))
|
||||
.collect::<Vec<usize>>();
|
||||
let amount = valid.len();
|
||||
let sum = valid.into_iter().sum::<usize>();
|
||||
|
||||
sum / amount
|
||||
}
|
||||
pub fn reset(&self) {
|
||||
self.window
|
||||
.iter()
|
||||
.for_each(|x| x.store(0, Ordering::Release));
|
||||
self.current.store(0, Ordering::Release);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user