fix(download manager): use of completed signal, and pause/resuming

This commit is contained in:
DecDuck
2024-11-28 12:39:21 +11:00
parent 2dedfbbd5c
commit 64d7f649c6
9 changed files with 162 additions and 90 deletions

View File

@ -7,14 +7,30 @@
@click="startGameDownload"
>
Download game
<span v-if="progress != 0"> ({{ Math.floor(progress * 1000) / 10 }}%) </span>
<span v-if="progress != 0">
({{ Math.floor(progress * 1000) / 10 }}%)
</span>
</button>
<button
class="w-full rounded-md p-4 bg-blue-600 text-white"
@click="stopGameDownload"
>
Cancel game download
</button></template>
</button>
<button
class="w-full rounded-md p-4 bg-blue-600 text-white"
@click="pause"
>
Pause game download
</button>
<button
class="w-full rounded-md p-4 bg-blue-600 text-white"
@click="resume"
>
Resume game download
</button>
</template>
<script setup lang="ts">
import { invoke } from "@tauri-apps/api/core";
@ -42,6 +58,12 @@ async function startGameDownload() {
}, 100);
}
async function stopGameDownload() {
await invoke("stop_game_download", { "gameId": gameId.value })
await invoke("cancel_game_download", { gameId: gameId.value });
}
async function pause() {
await invoke("pause_game_downloads");
}
async function resume() {
await invoke("resume_game_downloads");
}
</script>

View File

@ -29,7 +29,7 @@ pub struct GameDownloadAgent {
contexts: Mutex<Vec<DropDownloadContext>>,
pub manifest: Mutex<Option<DropManifest>>,
pub progress: ProgressObject,
sender: Sender<DownloadManagerSignal>
sender: Sender<DownloadManagerSignal>,
}
#[derive(Debug)]
@ -39,12 +39,12 @@ pub enum GameDownloadError {
Setup(SetupError),
Lock,
IoError(io::Error),
DownloadError
DownloadError,
}
#[derive(Debug)]
pub enum SetupError {
Context
Context,
}
impl Display for GameDownloadError {
@ -61,7 +61,12 @@ impl Display for GameDownloadError {
}
impl GameDownloadAgent {
pub fn new(id: String, version: String, target_download_dir: usize, sender: Sender<DownloadManagerSignal>) -> Self {
pub fn new(
id: String,
version: String,
target_download_dir: usize,
sender: Sender<DownloadManagerSignal>,
) -> Self {
// Don't run by default
let control_flag = DownloadThreadControl::new(DownloadThreadControlFlag::Stop);
Self {
@ -72,7 +77,7 @@ impl GameDownloadAgent {
target_download_dir,
contexts: Mutex::new(Vec::new()),
progress: ProgressObject::new(0, 0),
sender
sender,
}
}
@ -81,8 +86,8 @@ impl GameDownloadAgent {
self.ensure_manifest_exists()?;
info!("Ensured manifest exists");
self.generate_contexts()?;
info!("Generated contexts");
self.ensure_contexts()?;
info!("Ensured contexts exists");
self.control_flag.set(DownloadThreadControlFlag::Go);
@ -129,7 +134,10 @@ impl GameDownloadAgent {
if response.status() != 200 {
return Err(GameDownloadError::Communication(
RemoteAccessError::ManifestDownloadFailed(response.status(), response.text().unwrap())
RemoteAccessError::ManifestDownloadFailed(
response.status(),
response.text().unwrap(),
),
));
}
@ -144,12 +152,15 @@ impl GameDownloadAgent {
}
fn set_progress_object_params(&self) {
// Avoid re-setting it
if self.progress.get_max() != 0 {
return;
}
let lock = self.contexts.lock().unwrap();
let length = lock.len();
let chunk_count = lock.iter()
.map(|chunk| chunk.length)
.sum();
let chunk_count = lock.iter().map(|chunk| chunk.length).sum();
debug!("Setting ProgressObject max to {}", chunk_count);
self.progress.set_max(chunk_count);
@ -159,6 +170,18 @@ impl GameDownloadAgent {
self.progress.set_time_now();
}
pub fn ensure_contexts(&self) -> Result<(), GameDownloadError> {
let context_lock = self.contexts.lock().unwrap();
info!("{:?} {}", context_lock, context_lock.is_empty());
if !context_lock.is_empty() {
return Ok(());
}
drop(context_lock);
self.generate_contexts()?;
return Ok(());
}
pub fn generate_contexts(&self) -> Result<(), GameDownloadError> {
let db_lock = DB.borrow_data().unwrap();
let data_base_dir = db_lock.games.install_dirs[self.target_download_dir].clone();
@ -192,14 +215,14 @@ impl GameDownloadAgent {
game_id: game_id.to_string(),
path: path.clone(),
checksum: chunk.checksums[i].clone(),
length: *length
length: *length,
});
running_offset += *length as u64;
}
#[cfg(target_os = "linux")]
if running_offset > 0 {
fallocate(file, FallocateFlags::empty(), 0, running_offset).unwrap();
let _ = fallocate(file, FallocateFlags::empty(), 0, running_offset);
}
}
@ -212,54 +235,65 @@ impl GameDownloadAgent {
}
pub fn run(&self) -> Result<(), ()> {
const DOWNLOAD_MAX_THREADS: usize = 1;
info!("downloading game: {}", self.id);
const DOWNLOAD_MAX_THREADS: usize = 4;
let pool = ThreadPoolBuilder::new()
.num_threads(DOWNLOAD_MAX_THREADS)
.build()
.unwrap();
let new_contexts = Arc::new(Mutex::new(Vec::new()));
let new_contexts_ref = new_contexts.clone();
let completed_indexes = Arc::new(Mutex::new(Vec::new()));
let completed_indexes_loop_arc = completed_indexes.clone();
pool.scope(move |scope| {
let contexts = self.contexts.lock().unwrap();
for (index, context) in contexts.iter().enumerate() {
let context = context.clone();
let control_flag = self.control_flag.clone(); // Clone arcs
let progress = self.progress.get(index); // Clone arcs
let new_contexts_ref = new_contexts_ref.clone();
let completed_indexes_ref = completed_indexes_loop_arc.clone();
scope.spawn(move |_| {
info!(
"starting download for file {} {}",
context.file_name, context.index
);
match download_game_chunk(context.clone(), control_flag, progress) {
Ok(res) => {
match res {
true => {},
false => new_contexts_ref.lock().unwrap().push(context),
Ok(res) => match res {
true => {
let mut lock = completed_indexes_ref.lock().unwrap();
lock.push(index);
}
false => {}
},
Err(e) => {
error!("GameDownloadError: {}", e);
self.sender.send(DownloadManagerSignal::Error(e)).unwrap();
new_contexts_ref.lock().unwrap().push(context);
},
}
}
});
}
});
if !new_contexts.lock().unwrap().is_empty() {
debug!("New contexts not empty");
*self.contexts.lock().unwrap() = Arc::into_inner(new_contexts).unwrap().into_inner().unwrap();
debug!("Contexts: {:?}", *self.contexts.lock().unwrap());
return Err(())
let mut context_lock = self.contexts.lock().unwrap();
let mut completed_lock = completed_indexes.lock().unwrap();
// Sort desc so we don't have to modify indexes
completed_lock.sort_by(|a, b| b.cmp(a));
for index in completed_lock.iter() {
context_lock.remove(*index);
}
info!("Contexts: {:?}", *self.contexts.lock().unwrap());
// If we're not out of contexts, we're not done, so we don't fire completed
if !context_lock.is_empty() {
info!("Download agent didn't finish, not sending completed signal");
return Ok(());
}
// We've completed
self.sender
.send(DownloadManagerSignal::Completed(self.id.clone()))
.unwrap();
Ok(())
}
}

View File

@ -34,7 +34,7 @@ pub fn get_current_game_download_progress(
}
#[tauri::command]
pub fn stop_game_download(state: tauri::State<'_, Mutex<AppState>>, game_id: String) {
pub fn cancel_game_download(state: tauri::State<'_, Mutex<AppState>>, game_id: String) {
info!("Cancelling game download {}", game_id);
state
.lock()
@ -42,6 +42,17 @@ pub fn stop_game_download(state: tauri::State<'_, Mutex<AppState>>, game_id: Str
.download_manager
.cancel_download(game_id);
}
#[tauri::command]
pub fn pause_game_downloads(state: tauri::State<'_, Mutex<AppState>>) {
state.lock().unwrap().download_manager.pause_downloads()
}
#[tauri::command]
pub fn resume_game_downloads(state: tauri::State<'_, Mutex<AppState>>) {
state.lock().unwrap().download_manager.resume_downloads()
}
#[tauri::command]
pub fn get_current_write_speed(state: tauri::State<'_, Mutex<AppState>>) {}

View File

@ -124,7 +124,6 @@ pub fn download_game_chunk(
) -> Result<bool, GameDownloadError> {
// If we're paused
if control_flag.get() == DownloadThreadControlFlag::Stop {
info!("Control flag is Stop");
progress.store(0, Ordering::Relaxed);
return Ok(false);
}

View File

@ -12,7 +12,8 @@ use log::info;
use super::{
download_agent::{GameDownloadAgent, GameDownloadError},
progress_object::ProgressObject, queue::Queue,
progress_object::ProgressObject,
queue::Queue,
};
pub enum DownloadManagerSignal {
@ -20,8 +21,7 @@ pub enum DownloadManagerSignal {
Go,
/// Pauses the DownloadManager
Stop,
/// Called when a GameDownloadAgent has finished.
/// Triggers the next download cycle to begin
/// Called when a GameDownloadAgent has fully completed a download.
Completed(String),
/// Generates and appends a GameDownloadAgent
/// to the registry and queue
@ -104,11 +104,10 @@ impl DownloadManager {
))?;
self.command_sender.send(DownloadManagerSignal::Go)
}
pub fn cancel_download(
&self,
game_id: String
) {
self.command_sender.send(DownloadManagerSignal::Cancel(game_id)).unwrap();
pub fn cancel_download(&self, game_id: String) {
self.command_sender
.send(DownloadManagerSignal::Cancel(game_id))
.unwrap();
}
pub fn edit(&self) -> MutexGuard<'_, VecDeque<Arc<AgentInterfaceData>>> {
self.download_queue.edit()
@ -139,11 +138,13 @@ impl DownloadManager {
let current_index = get_index_from_id(&mut queue, id).unwrap();
queue.remove(current_index);
}
pub fn pause_downloads(&self) -> Result<(), SendError<DownloadManagerSignal>> {
self.command_sender.send(DownloadManagerSignal::Stop)
pub fn pause_downloads(&self) {
self.command_sender
.send(DownloadManagerSignal::Stop)
.unwrap();
}
pub fn resume_downloads(&self) -> Result<(), SendError<DownloadManagerSignal>> {
self.command_sender.send(DownloadManagerSignal::Go)
pub fn resume_downloads(&self) {
self.command_sender.send(DownloadManagerSignal::Go).unwrap();
}
pub fn ensure_terminated(self) -> Result<Result<(), ()>, Box<dyn Any + Send>> {
self.command_sender

View File

@ -170,15 +170,15 @@ impl DownloadManagerBuilder {
fn manage_go_signal(&mut self) {
info!("Got signal 'Go'");
if self.active_control_flag.is_none() && !self.download_agent_registry.is_empty() {
if !self.download_agent_registry.is_empty() && !self.download_queue.empty() {
info!("Starting download agent");
let download_agent = {
let front = self.download_queue.read().front().unwrap().clone();
self.download_agent_registry.get(&front.id).unwrap().clone()
};
let download_agent_interface =
Arc::new(AgentInterfaceData::from(download_agent.clone()));
self.current_game_interface = Some(download_agent_interface);
let agent_data = self.download_queue.read().front().unwrap().clone();
let download_agent = self
.download_agent_registry
.get(&agent_data.id)
.unwrap()
.clone();
self.current_game_interface = Some(agent_data);
let progress_object = download_agent.progress.clone();
*self.progress.lock().unwrap() = Some(progress_object);
@ -191,29 +191,20 @@ impl DownloadManagerBuilder {
info!("Spawning download");
spawn(move || {
match download_agent.download() {
Ok(_) => {
// TODO wrap this pattern in a macro
let result = sender
.send(DownloadManagerSignal::Completed(download_agent.id.clone()));
if let Err(err) = result {
error!("{}", err);
}
}
// Returns once we've exited the download
// (not necessarily completed)
// The download agent will fire the completed event for us
Ok(_) => {}
// If an error occurred while *starting* the download
Err(err) => {
let result = sender.send(DownloadManagerSignal::Error(err));
if let Err(err) = result {
error!("{}", err);
}
error!("error while managing download: {}", err);
sender.send(DownloadManagerSignal::Error(err)).unwrap();
}
};
});
info!("Finished spawning Download");
active_control_flag.set(DownloadThreadControlFlag::Go);
self.set_status(DownloadManagerStatus::Downloading);
} else if let Some(active_control_flag) = self.active_control_flag.clone() {
info!("Restarting current download");
active_control_flag.set(DownloadThreadControlFlag::Go);
} else {
info!("Nothing was set");
}
@ -230,6 +221,8 @@ impl DownloadManagerBuilder {
self.active_control_flag = None;
*self.progress.lock().unwrap() = None;
}
// TODO wait until current download exits
self.download_agent_registry.remove(&game_id);
let mut lock = self.download_queue.edit();
let index = match lock.iter().position(|interface| interface.id == game_id) {
@ -237,6 +230,8 @@ impl DownloadManagerBuilder {
None => return,
};
lock.remove(index);
// Start next download
self.sender.send(DownloadManagerSignal::Go).unwrap();
info!(
"{:?}",

View File

@ -1,13 +1,16 @@
use std::{sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
}, time::Instant};
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
time::Instant,
};
#[derive(Clone)]
pub struct ProgressObject {
max: Arc<Mutex<usize>>,
progress_instances: Arc<Mutex<Vec<Arc<AtomicUsize>>>>,
start: Arc<Mutex<Instant>>
start: Arc<Mutex<Instant>>,
}
impl ProgressObject {

View File

@ -1,19 +1,22 @@
use std::{collections::VecDeque, sync::{Arc, Mutex, MutexGuard}};
use std::{
collections::VecDeque,
sync::{Arc, Mutex, MutexGuard},
};
use super::download_manager::AgentInterfaceData;
#[derive(Clone)]
pub struct Queue {
inner: Arc<Mutex<VecDeque<Arc<AgentInterfaceData>>>>
inner: Arc<Mutex<VecDeque<Arc<AgentInterfaceData>>>>,
}
impl Queue {
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(VecDeque::new()))
inner: Arc::new(Mutex::new(VecDeque::new())),
}
}
pub fn read(&self) -> VecDeque<Arc<AgentInterfaceData>> {
pub fn read(&self) -> VecDeque<Arc<AgentInterfaceData>> {
self.inner.lock().unwrap().clone()
}
pub fn edit(&self) -> MutexGuard<'_, VecDeque<Arc<AgentInterfaceData>>> {
@ -22,13 +25,15 @@ impl Queue {
pub fn pop_front(&self) -> Option<Arc<AgentInterfaceData>> {
self.edit().pop_front()
}
pub fn empty(&self) -> bool {
self.inner.lock().unwrap().len() == 0
}
/// Either inserts `interface` at the specified index, or appends to
/// the back of the deque if index is greater than the length of the deque
pub fn insert(&self, interface: AgentInterfaceData, index: usize) {
if self.read().len() > index {
self.append(interface);
}
else {
} else {
self.edit().insert(index, Arc::new(interface));
}
}
@ -44,7 +49,7 @@ impl Queue {
if front.id == game_id {
return queue.pop_front();
}
return None
return None;
}
pub fn get_by_id(&self, game_id: String) -> Option<usize> {
self.read().iter().position(|data| data.id == game_id)

View File

@ -15,8 +15,8 @@ use db::{
DATA_ROOT_DIR,
};
use downloads::download_commands::*;
use downloads::download_manager_builder::DownloadManagerBuilder;
use downloads::download_manager::DownloadManager;
use downloads::download_manager_builder::DownloadManagerBuilder;
use env_logger::Env;
use http::{header::*, response::Builder as ResponseBuilder};
use library::{fetch_game, fetch_game_status, fetch_library, Game};
@ -137,7 +137,9 @@ pub fn run() {
// Downloads
download_game,
get_current_game_download_progress,
stop_game_download
cancel_game_download,
pause_game_downloads,
resume_game_downloads,
])
.plugin(tauri_plugin_shell::init())
.plugin(tauri_plugin_dialog::init())