feat(downloads): Pausing and resuming game downloads

Signed-off-by: quexeky <git@quexeky.dev>
This commit is contained in:
quexeky
2024-11-09 19:55:36 +11:00
parent 97bb1fac68
commit 55b7921ee6
9 changed files with 159 additions and 83 deletions

View File

@ -25,6 +25,18 @@
> >
Get game download progress Get game download progress
</button> </button>
<button
class="w-full rounded-md p-4 bg-blue-600 text-white"
@click="pauseGameDownloadWrapper"
>
Pause game download
</button>
<button
class="w-full rounded-md p-4 bg-blue-600 text-white"
@click="resumeGameDownloadWrapper"
>
Resume game download
</button>
</template> </template>
<script setup lang="ts"> <script setup lang="ts">
import { invoke } from "@tauri-apps/api/core"; import { invoke } from "@tauri-apps/api/core";
@ -62,7 +74,7 @@ function startGameDownloadsWrapper() {
} }
async function cancelGameDownload() { async function cancelGameDownload() {
console.log("Cancelling game download"); console.log("Cancelling game download");
await invoke("stop_specific_game_download", { gameId: gameId.value }) await invoke("cancel_specific_game_download", { gameId: gameId.value })
} }
function cancelGameDownloadWrapper() { function cancelGameDownloadWrapper() {
console.log("Triggered game cancel wrapper"); console.log("Triggered game cancel wrapper");
@ -83,5 +95,29 @@ function getGameDownloadProgressWrapper() {
console.log(e) console.log(e)
}) })
}
async function pauseGameDownload() {
console.log("Getting game download status");
await invoke("pause_game_download", { gameId: gameId.value })
}
function pauseGameDownloadWrapper() {
pauseGameDownload()
.then(() => {})
.catch((e) => {
console.log(e)
})
}
async function resumeGameDownload() {
console.log("Getting game download status");
await invoke("resume_game_download", { gameId: gameId.value })
}
function resumeGameDownloadWrapper() {
resumeGameDownload()
.then(() => {})
.catch((e) => {
console.log(e)
})
} }
</script> </script>

View File

@ -39,7 +39,7 @@ hex = "0.4.3"
tauri-plugin-dialog = "2" tauri-plugin-dialog = "2"
env_logger = "0.11.5" env_logger = "0.11.5"
http = "1.1.0" http = "1.1.0"
tokio = { version = "1.40.0", features = ["rt", "tokio-macros"] } tokio = { version = "1.40.0", features = ["rt", "tokio-macros", "signal"] }
urlencoding = "2.1.3" urlencoding = "2.1.3"
md5 = "0.7.0" md5 = "0.7.0"
atomic-counter = "1.0.1" atomic-counter = "1.0.1"

View File

@ -5,28 +5,28 @@ use crate::downloads::manifest::{DropDownloadContext, DropManifest};
use crate::downloads::progress::ProgressChecker; use crate::downloads::progress::ProgressChecker;
use crate::DB; use crate::DB;
use atomic_counter::RelaxedCounter; use atomic_counter::RelaxedCounter;
use http::status;
use log::info; use log::info;
use rustix::fs::{fallocate, FallocateFlags}; use rustix::fs::{fallocate, FallocateFlags};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fs::{create_dir_all, File}; use std::fs::{create_dir_all, File};
use std::path::Path; use std::path::Path;
use std::sync::atomic::AtomicBool; use std::sync::{Arc, Mutex, RwLock};
use std::sync::{Arc, Mutex};
use urlencoding::encode; use urlencoding::encode;
pub struct GameDownloadAgent { pub struct GameDownloadAgent {
pub id: String, pub id: String,
pub version: String, pub version: String,
state: Mutex<GameDownloadState>, pub status: Arc<RwLock<GameDownloadState>>,
contexts: Mutex<Vec<DropDownloadContext>>, contexts: Mutex<Vec<DropDownloadContext>>,
pub progress: ProgressChecker<DropDownloadContext>, pub progress: ProgressChecker<DropDownloadContext>,
pub manifest: Mutex<Option<DropManifest>>, pub manifest: Mutex<Option<DropManifest>>,
pub callback: Arc<AtomicBool>,
} }
#[derive(Serialize, Deserialize, Clone, Eq, PartialEq)] #[derive(Serialize, Deserialize, Clone, Eq, PartialEq)]
pub enum GameDownloadState { pub enum GameDownloadState {
Uninitialised, Uninitialised,
Queued, Queued,
Paused,
Manifest, Manifest,
Downloading, Downloading,
Finished, Finished,
@ -49,17 +49,16 @@ pub enum SystemError {
impl GameDownloadAgent { impl GameDownloadAgent {
pub fn new(id: String, version: String) -> Self { pub fn new(id: String, version: String) -> Self {
let callback = Arc::new(AtomicBool::new(false)); let status = Arc::new(RwLock::new(GameDownloadState::Uninitialised));
Self { Self {
id, id,
version, version,
state: Mutex::from(GameDownloadState::Uninitialised), status: status.clone(),
manifest: Mutex::new(None), manifest: Mutex::new(None),
callback: callback.clone(),
progress: ProgressChecker::new( progress: ProgressChecker::new(
Box::new(download_logic::download_game_chunk), Box::new(download_logic::download_game_chunk),
Arc::new(RelaxedCounter::new(0)), Arc::new(RelaxedCounter::new(0)),
callback, status,
0, 0,
), ),
contexts: Mutex::new(Vec::new()), contexts: Mutex::new(Vec::new()),
@ -138,11 +137,11 @@ impl GameDownloadAgent {
} }
pub fn change_state(&self, state: GameDownloadState) { pub fn change_state(&self, state: GameDownloadState) {
let mut lock = self.state.lock().unwrap(); let mut lock = self.status.write().unwrap();
*lock = state; *lock = state;
} }
pub fn get_state(&self) -> GameDownloadState { pub fn get_state(&self) -> GameDownloadState {
let lock = self.state.lock().unwrap(); let lock = self.status.read().unwrap();
lock.clone() lock.clone()
} }

View File

@ -90,20 +90,16 @@ pub fn start_game_download(
} }
#[tauri::command] #[tauri::command]
pub async fn stop_specific_game_download( pub async fn cancel_specific_game_download(
state: tauri::State<'_, Mutex<AppState>>, state: tauri::State<'_, Mutex<AppState>>,
game_id: String, game_id: String,
) -> Result<(), String> { ) -> Result<(), String> {
info!("called stop_specific_game_download"); info!("called stop_specific_game_download");
let callback = { let status = get_game_download(state, game_id).change_state(GameDownloadState::Cancelled);
let lock = state.lock().unwrap();
let download_agent = lock.game_downloads.get(&game_id).unwrap();
download_agent.callback.clone()
};
//TODO: Drop the game download instance
info!("Stopping callback"); info!("Stopping callback");
callback.store(true, Ordering::Release);
Ok(()) Ok(())
} }
@ -113,25 +109,36 @@ pub async fn get_game_download_progress(
state: tauri::State<'_, Mutex<AppState>>, state: tauri::State<'_, Mutex<AppState>>,
game_id: String, game_id: String,
) -> Result<f64, String> { ) -> Result<f64, String> {
let lock = state.lock().unwrap(); let progress = get_game_download(state, game_id).progress.get_progress_percentage();
let download_agent = lock.game_downloads.get(&game_id).unwrap();
let progress = download_agent.progress.get_progress_percentage();
info!("{}", progress); info!("{}", progress);
Ok(progress) Ok(progress)
} }
/*
#[tauri::command] #[tauri::command]
async fn resume_game_download( pub async fn pause_game_download(
state: tauri::State<'_, Mutex<AppState>>, state: tauri::State<'_, Mutex<AppState>>,
game_id: String, game_id: String,
) -> Result<(), String> { ) -> Result<(), String> {
get_game_download(state, game_id).change_state(GameDownloadState::Paused);
let download = {
let lock = state.lock().unwrap();
lock.game_downloads.get(&game_id).unwrap().clone()
};
Ok(()) Ok(())
} }
*/
#[tauri::command]
pub async fn resume_game_download(
state: tauri::State<'_, Mutex<AppState>>,
game_id: String,
) -> Result<(), String> {
get_game_download(state, game_id).change_state(GameDownloadState::Downloading);
Ok(())
}
fn get_game_download(
state: tauri::State<'_, Mutex<AppState>>,
game_id: String,
) -> Arc<GameDownloadAgent> {
let lock = state.lock().unwrap();
let download_agent = lock.game_downloads.get(&game_id).unwrap();
download_agent.clone()
}

View File

@ -5,45 +5,94 @@ use crate::DB;
use atomic_counter::{AtomicCounter, RelaxedCounter}; use atomic_counter::{AtomicCounter, RelaxedCounter};
use log::{error, info}; use log::{error, info};
use md5::{Context, Digest}; use md5::{Context, Digest};
#[cfg(windows)]
use tokio::signal::windows::Signal;
use tokio::sync::{broadcast::Receiver, mpsc};
use std::{ use std::{
fs::{File, OpenOptions}, fs::{File, OpenOptions},
io::{self, BufWriter, Error, ErrorKind, Seek, SeekFrom, Write}, io::{self, BufWriter, Error, ErrorKind, Seek, SeekFrom, Write},
path::PathBuf, path::PathBuf,
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
Arc, Arc, RwLock,
}, }, thread::sleep, time::Duration,
}; };
use urlencoding::encode; use urlencoding::encode;
use super::download_agent::GameDownloadState;
pub struct DropFileWriter { pub struct DropFileWriter {
file: File, file: File,
hasher: Context, hasher: Context,
callback: Arc<AtomicBool>,
progress: Arc<RelaxedCounter>, progress: Arc<RelaxedCounter>,
status: Arc<RwLock<GameDownloadState>>,
} }
impl DropFileWriter { impl DropFileWriter {
fn new(path: PathBuf, callback: Arc<AtomicBool>, progress: Arc<RelaxedCounter>) -> Self { fn new(path: PathBuf, status: Arc<RwLock<GameDownloadState>>, progress: Arc<RelaxedCounter>) -> Self {
Self { Self {
file: OpenOptions::new().write(true).open(path).unwrap(), file: OpenOptions::new().write(true).open(path).unwrap(),
hasher: Context::new(), hasher: Context::new(),
callback,
progress, progress,
status
} }
} }
fn finish(mut self) -> io::Result<Digest> { fn finish(mut self) -> io::Result<Digest> {
self.flush().unwrap(); self.flush().unwrap();
Ok(self.hasher.compute()) Ok(self.hasher.compute())
} }
fn manage_state(&mut self) -> Option<Result<usize, Error>> {
match {self.status.read().unwrap().clone()} {
GameDownloadState::Uninitialised => todo!(),
GameDownloadState::Queued => {
return Some(Err(Error::new(
ErrorKind::NotConnected,
"Download has not yet been started"
)))
},
GameDownloadState::Manifest => {
return Some(Err(Error::new(
ErrorKind::NotFound,
"Manifest still not finished downloading"
)))
},
GameDownloadState::Downloading => {},
GameDownloadState::Finished => {
return Some(Err(Error::new(
ErrorKind::AlreadyExists, "Download already finished")))
},
GameDownloadState::Stalled => {
return Some(Err(Error::new(
ErrorKind::Interrupted, "Download Stalled"
)))
},
GameDownloadState::Failed => {
return Some(Err(Error::new(
ErrorKind::BrokenPipe,
"Download Failed"
)))
},
GameDownloadState::Cancelled => {
return Some(Err(Error::new(
ErrorKind::ConnectionAborted,
"Interrupt command recieved",
)));
},
GameDownloadState::Paused => {
info!("Game download paused");
sleep(Duration::from_secs(1));
},
};
None
}
} }
// TODO: Implement error handling // TODO: Implement error handling
impl Write for DropFileWriter { impl Write for DropFileWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if self.callback.load(Ordering::Acquire) { // TODO: Tidy up these error messages / types because these ones don't really seem to fit
return Err(Error::new( if let Some(value) = self.manage_state() {
ErrorKind::ConnectionAborted, return value;
"Interrupt command recieved",
));
} }
let len = buf.len(); let len = buf.len();
self.progress.add(len); self.progress.add(len);
@ -65,10 +114,10 @@ impl Seek for DropFileWriter {
} }
pub fn download_game_chunk( pub fn download_game_chunk(
ctx: DropDownloadContext, ctx: DropDownloadContext,
callback: Arc<AtomicBool>, status: Arc<RwLock<GameDownloadState>>,
progress: Arc<RelaxedCounter>, progress: Arc<RelaxedCounter>,
) { ) {
if callback.load(Ordering::Acquire) { if *status.read().unwrap() == GameDownloadState::Cancelled {
info!("Callback stopped download at start"); info!("Callback stopped download at start");
return; return;
} }
@ -88,13 +137,16 @@ pub fn download_game_chunk(
let header = generate_authorization_header(); let header = generate_authorization_header();
let mut response = client let mut response = match client
.get(chunk_url) .get(chunk_url)
.header("Authorization", header) .header("Authorization", header)
.send() .send() {
.unwrap(); Ok(response) => response,
Err(e) => { info!("{}", e); return; },
};
let mut file: DropFileWriter = DropFileWriter::new(ctx.path, callback, progress);
let mut file: DropFileWriter = DropFileWriter::new(ctx.path, status, progress);
if ctx.offset != 0 { if ctx.offset != 0 {
file.seek(SeekFrom::Start(ctx.offset)) file.seek(SeekFrom::Start(ctx.offset))

View File

@ -2,15 +2,17 @@ use atomic_counter::{AtomicCounter, RelaxedCounter};
use log::info; use log::info;
use rayon::ThreadPoolBuilder; use rayon::ThreadPoolBuilder;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex, RwLock};
use super::download_agent::GameDownloadState;
pub struct ProgressChecker<T> pub struct ProgressChecker<T>
where where
T: 'static + Send + Sync, T: 'static + Send + Sync,
{ {
counter: Arc<RelaxedCounter>, counter: Arc<RelaxedCounter>,
f: Arc<Box<dyn Fn(T, Arc<AtomicBool>, Arc<RelaxedCounter>) + Send + Sync + 'static>>, f: Arc<Box<dyn Fn(T, Arc<RwLock<GameDownloadState>>, Arc<RelaxedCounter>) + Send + Sync + 'static>>,
callback: Arc<AtomicBool>, status: Arc<RwLock<GameDownloadState>>,
capacity: Mutex<usize>, capacity: Mutex<usize>,
} }
@ -19,40 +21,18 @@ where
T: Send + Sync, T: Send + Sync,
{ {
pub fn new( pub fn new(
f: Box<dyn Fn(T, Arc<AtomicBool>, Arc<RelaxedCounter>) + Send + Sync + 'static>, f: Box<dyn Fn(T, Arc<RwLock<GameDownloadState>>, Arc<RelaxedCounter>) + Send + Sync + 'static>,
counter: Arc<RelaxedCounter>, counter: Arc<RelaxedCounter>,
callback: Arc<AtomicBool>, status: Arc<RwLock<GameDownloadState>>,
capacity: usize, capacity: usize,
) -> Self { ) -> Self {
Self { Self {
f: f.into(), f: f.into(),
counter, counter,
callback, status,
capacity: capacity.into(), capacity: capacity.into(),
} }
} }
#[allow(dead_code)]
pub fn run_contexts_sequentially(&self, contexts: Vec<T>) {
for context in contexts {
(self.f)(context, self.callback.clone(), self.counter.clone());
}
}
#[allow(dead_code)]
pub fn run_contexts_parallel_background(&self, contexts: Vec<T>, max_threads: usize) {
let threads = ThreadPoolBuilder::new()
// If max_threads == 0, then the limit will be determined
// by Rayon's internal RAYON_NUM_THREADS
.num_threads(max_threads)
.build()
.unwrap();
for context in contexts {
let callback = self.callback.clone();
let counter = self.counter.clone();
let f = self.f.clone();
threads.spawn(move || f(context, callback, counter));
}
}
pub fn run_context_parallel(&self, contexts: Vec<T>, max_threads: usize) { pub fn run_context_parallel(&self, contexts: Vec<T>, max_threads: usize) {
let threads = ThreadPoolBuilder::new() let threads = ThreadPoolBuilder::new()
.num_threads(max_threads) .num_threads(max_threads)
@ -61,12 +41,12 @@ where
threads.scope(|s| { threads.scope(|s| {
for context in contexts { for context in contexts {
let callback = self.callback.clone(); let status = self.status.clone();
let counter = self.counter.clone(); let counter = self.counter.clone();
let f = self.f.clone(); let f = self.f.clone();
s.spawn(move |_| { s.spawn(move |_| {
info!("Running thread"); info!("Running thread");
f(context, callback, counter) f(context, status, counter)
}); });
} }
}); });

View File

@ -11,10 +11,7 @@ use crate::db::DatabaseImpls;
use crate::downloads::download_agent::GameDownloadAgent; use crate::downloads::download_agent::GameDownloadAgent;
use auth::{auth_initiate, generate_authorization_header, recieve_handshake}; use auth::{auth_initiate, generate_authorization_header, recieve_handshake};
use db::{DatabaseInterface, DATA_ROOT_DIR}; use db::{DatabaseInterface, DATA_ROOT_DIR};
use downloads::download_commands::{ use downloads::download_commands::*;
get_game_download_progress, queue_game_download, start_game_downloads,
stop_specific_game_download,
};
use env_logger::Env; use env_logger::Env;
use http::{header::*, response::Builder as ResponseBuilder}; use http::{header::*, response::Builder as ResponseBuilder};
use library::{fetch_game, fetch_library, Game}; use library::{fetch_game, fetch_library, Game};
@ -122,8 +119,10 @@ pub fn run() {
// Downloads // Downloads
queue_game_download, queue_game_download,
start_game_downloads, start_game_downloads,
stop_specific_game_download, cancel_specific_game_download,
get_game_download_progress get_game_download_progress,
resume_game_download,
pause_game_download
]) ])
.plugin(tauri_plugin_shell::init()) .plugin(tauri_plugin_shell::init())
.setup(|app| { .setup(|app| {

View File

@ -23,5 +23,4 @@ impl Peer {
pub fn disconnect(&mut self) { pub fn disconnect(&mut self) {
todo!() todo!()
} }
} }

View File

@ -1,9 +1,11 @@
/*
use atomic_counter::RelaxedCounter; use atomic_counter::RelaxedCounter;
use crate::downloads::progress::ProgressChecker; use crate::downloads::progress::ProgressChecker;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::Arc; use std::sync::Arc;
#[test] #[test]
fn test_progress_sequentially() { fn test_progress_sequentially() {
let counter = Arc::new(RelaxedCounter::new(0)); let counter = Arc::new(RelaxedCounter::new(0));
@ -23,3 +25,5 @@ fn test_progress_parallel() {
fn test_fn(int: usize, _callback: Arc<AtomicBool>, _counter: Arc<RelaxedCounter>) { fn test_fn(int: usize, _callback: Arc<AtomicBool>, _counter: Arc<RelaxedCounter>) {
println!("{}", int); println!("{}", int);
} }
*/