Functioning download progress updates

Signed-off-by: quexeky <git@quexeky.dev>
This commit is contained in:
quexeky
2024-11-04 17:11:37 +11:00
parent bd39f1fd72
commit 0528c78092
9 changed files with 89 additions and 27 deletions

View File

@ -19,6 +19,12 @@
> >
Cancel game download Cancel game download
</button> </button>
<button
class="w-full rounded-md p-4 bg-blue-600 text-white"
@click="getGameDownloadProgressWrapper"
>
Get game download progress
</button>
</template> </template>
<script setup lang="ts"> <script setup lang="ts">
import { invoke } from "@tauri-apps/api/core"; import { invoke } from "@tauri-apps/api/core";
@ -66,4 +72,16 @@ function cancelGameDownloadWrapper() {
console.log(e) console.log(e)
}) })
} }
async function getGameDownloadProgress() {
console.log("Getting game download status");
await invoke("get_game_download_progress", { gameId: gameId.value })
}
function getGameDownloadProgressWrapper() {
getGameDownloadProgress()
.then(() => {})
.catch((e) => {
console.log(e)
})
}
</script> </script>

7
src-tauri/Cargo.lock generated
View File

@ -298,6 +298,12 @@ dependencies = [
"system-deps", "system-deps",
] ]
[[package]]
name = "atomic-counter"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62f447d68cfa5a9ab0c1c862a703da2a65b5ed1b7ce1153c9eb0169506d56019"
[[package]] [[package]]
name = "atomic-waker" name = "atomic-waker"
version = "1.1.2" version = "1.1.2"
@ -1018,6 +1024,7 @@ dependencies = [
name = "drop-app" name = "drop-app"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"atomic-counter",
"ciborium", "ciborium",
"directories", "directories",
"env_logger", "env_logger",

View File

@ -46,6 +46,7 @@ versions = { version = "6.3.2", features = ["serde"] }
urlencoding = "2.1.3" urlencoding = "2.1.3"
rustix = "0.38.37" rustix = "0.38.37"
md5 = "0.7.0" md5 = "0.7.0"
atomic-counter = "1.0.1"
[dependencies.uuid] [dependencies.uuid]
version = "1.10.0" version = "1.10.0"

View File

@ -4,6 +4,7 @@ use crate::downloads::download_logic;
use crate::downloads::manifest::{DropDownloadContext, DropManifest}; use crate::downloads::manifest::{DropDownloadContext, DropManifest};
use crate::downloads::progress::ProgressChecker; use crate::downloads::progress::ProgressChecker;
use crate::DB; use crate::DB;
use atomic_counter::RelaxedCounter;
use log::info; use log::info;
use rustix::fs::{fallocate, FallocateFlags}; use rustix::fs::{fallocate, FallocateFlags};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -18,7 +19,7 @@ pub struct GameDownloadAgent {
pub version: String, pub version: String,
state: Mutex<GameDownloadState>, state: Mutex<GameDownloadState>,
contexts: Mutex<Vec<DropDownloadContext>>, contexts: Mutex<Vec<DropDownloadContext>>,
progress: ProgressChecker<DropDownloadContext>, pub progress: ProgressChecker<DropDownloadContext>,
pub manifest: Mutex<Option<DropManifest>>, pub manifest: Mutex<Option<DropManifest>>,
pub callback: Arc<AtomicBool>, pub callback: Arc<AtomicBool>,
} }
@ -57,8 +58,9 @@ impl GameDownloadAgent {
callback: callback.clone(), callback: callback.clone(),
progress: ProgressChecker::new( progress: ProgressChecker::new(
Box::new(download_logic::download_game_chunk), Box::new(download_logic::download_game_chunk),
Arc::new(AtomicUsize::new(0)), Arc::new(RelaxedCounter::new(0)),
callback, callback,
0
), ),
contexts: Mutex::new(Vec::new()), contexts: Mutex::new(Vec::new()),
} }
@ -119,12 +121,17 @@ impl GameDownloadAgent {
} }
let manifest_download = response.json::<DropManifest>().unwrap(); let manifest_download = response.json::<DropManifest>().unwrap();
let length = manifest_download.iter().map(|(_, chunk)| {
return chunk.lengths.iter().sum::<usize>();
}).sum::<usize>();
self.progress.set_capacity(length);
if let Ok(mut manifest) = self.manifest.lock() { if let Ok(mut manifest) = self.manifest.lock() {
*manifest = Some(manifest_download) *manifest = Some(manifest_download)
} else { } else {
return Err(GameDownloadError::System(SystemError::MutexLockFailed)); return Err(GameDownloadError::System(SystemError::MutexLockFailed));
} }
Ok(()) Ok(())
} }

View File

@ -106,3 +106,15 @@ pub async fn stop_specific_game_download(
Ok(()) Ok(())
} }
#[tauri::command]
pub async fn get_game_download_progress(
state: tauri::State<'_, Mutex<AppState>>,
game_id: String
) -> Result<f64, String> {
let lock = state.lock().unwrap();
let download_agent = lock.game_downloads.get(&game_id).unwrap();
let progress = download_agent.progress.get_progress_percentage();
info!("{}", progress);
return Ok(progress)
}

View File

@ -2,6 +2,7 @@ use crate::auth::generate_authorization_header;
use crate::db::DatabaseImpls; use crate::db::DatabaseImpls;
use crate::downloads::manifest::DropDownloadContext; use crate::downloads::manifest::DropDownloadContext;
use crate::DB; use crate::DB;
use atomic_counter::{AtomicCounter, RelaxedCounter};
use log::info; use log::info;
use md5::{Context, Digest}; use md5::{Context, Digest};
use std::{ use std::{
@ -9,7 +10,7 @@ use std::{
io::{self, Error, ErrorKind, Seek, SeekFrom, Write}, io::{self, Error, ErrorKind, Seek, SeekFrom, Write},
path::PathBuf, path::PathBuf,
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Arc,
}, },
}; };
@ -19,13 +20,15 @@ pub struct DropFileWriter {
file: File, file: File,
hasher: Context, hasher: Context,
callback: Arc<AtomicBool>, callback: Arc<AtomicBool>,
progress: Arc<RelaxedCounter>
} }
impl DropFileWriter { impl DropFileWriter {
fn new(path: PathBuf, callback: Arc<AtomicBool>) -> Self { fn new(path: PathBuf, callback: Arc<AtomicBool>, progress: Arc<RelaxedCounter>) -> Self {
Self { Self {
file: OpenOptions::new().write(true).open(path).unwrap(), file: OpenOptions::new().write(true).open(path).unwrap(),
hasher: Context::new(), hasher: Context::new(),
callback, callback,
progress
} }
} }
fn finish(mut self) -> io::Result<Digest> { fn finish(mut self) -> io::Result<Digest> {
@ -42,6 +45,8 @@ impl Write for DropFileWriter {
"Interrupt command recieved", "Interrupt command recieved",
)); ));
} }
let len = buf.len();
self.progress.add(len);
//info!("Writing data to writer"); //info!("Writing data to writer");
self.hasher.write_all(buf).unwrap(); self.hasher.write_all(buf).unwrap();
@ -58,7 +63,7 @@ impl Seek for DropFileWriter {
self.file.seek(pos) self.file.seek(pos)
} }
} }
pub fn download_game_chunk(ctx: DropDownloadContext, callback: Arc<AtomicBool>) { pub fn download_game_chunk(ctx: DropDownloadContext, callback: Arc<AtomicBool>, progress: Arc<RelaxedCounter>) {
if callback.load(Ordering::Acquire) { if callback.load(Ordering::Acquire) {
info!("Callback stopped download at start"); info!("Callback stopped download at start");
return; return;
@ -85,7 +90,7 @@ pub fn download_game_chunk(ctx: DropDownloadContext, callback: Arc<AtomicBool>)
.send() .send()
.unwrap(); .unwrap();
let mut file: DropFileWriter = DropFileWriter::new(ctx.path, callback); let mut file: DropFileWriter = DropFileWriter::new(ctx.path, callback, progress);
if ctx.offset != 0 { if ctx.offset != 0 {
file.seek(SeekFrom::Start(ctx.offset)) file.seek(SeekFrom::Start(ctx.offset))

View File

@ -1,15 +1,17 @@
use atomic_counter::{AtomicCounter, RelaxedCounter};
use log::info; use log::info;
use rayon::ThreadPoolBuilder; use rayon::ThreadPoolBuilder;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::{Arc, Mutex};
pub struct ProgressChecker<T> pub struct ProgressChecker<T>
where where
T: 'static + Send + Sync, T: 'static + Send + Sync,
{ {
counter: Arc<AtomicUsize>, counter: Arc<RelaxedCounter>,
f: Arc<Box<dyn Fn(T, Arc<AtomicBool>) + Send + Sync + 'static>>, f: Arc<Box<dyn Fn(T, Arc<AtomicBool>, Arc<RelaxedCounter>) + Send + Sync + 'static>>,
callback: Arc<AtomicBool>, callback: Arc<AtomicBool>,
capacity: Mutex<usize>
} }
impl<T> ProgressChecker<T> impl<T> ProgressChecker<T>
@ -17,20 +19,21 @@ where
T: Send + Sync, T: Send + Sync,
{ {
pub fn new( pub fn new(
f: Box<dyn Fn(T, Arc<AtomicBool>) + Send + Sync + 'static>, f: Box<dyn Fn(T, Arc<AtomicBool>, Arc<RelaxedCounter>) + Send + Sync + 'static>,
counter_reference: Arc<AtomicUsize>, counter: Arc<RelaxedCounter>,
callback: Arc<AtomicBool>, callback: Arc<AtomicBool>,
capacity: usize
) -> Self { ) -> Self {
Self { Self {
f: f.into(), f: f.into(),
counter: counter_reference, counter,
callback, callback,
capacity: capacity.into()
} }
} }
pub fn run_contexts_sequentially(&self, contexts: Vec<T>) { pub fn run_contexts_sequentially(&self, contexts: Vec<T>) {
for context in contexts { for context in contexts {
(self.f)(context, self.callback.clone()); (self.f)(context, self.callback.clone(), self.counter.clone());
self.counter.fetch_add(1, Ordering::Release);
} }
} }
pub fn run_contexts_parallel_background(&self, contexts: Vec<T>, max_threads: usize) { pub fn run_contexts_parallel_background(&self, contexts: Vec<T>, max_threads: usize) {
@ -43,8 +46,9 @@ where
for context in contexts { for context in contexts {
let callback = self.callback.clone(); let callback = self.callback.clone();
let counter = self.counter.clone();
let f = self.f.clone(); let f = self.f.clone();
threads.spawn(move || f(context, callback)); threads.spawn(move || f(context, callback, counter));
} }
} }
pub fn run_context_parallel(&self, contexts: Vec<T>, max_threads: usize) { pub fn run_context_parallel(&self, contexts: Vec<T>, max_threads: usize) {
@ -56,20 +60,25 @@ where
threads.scope(|s| { threads.scope(|s| {
for context in contexts { for context in contexts {
let callback = self.callback.clone(); let callback = self.callback.clone();
let counter = self.counter.clone();
let f = self.f.clone(); let f = self.f.clone();
s.spawn(move |_| { s.spawn(move |_| {
info!("Running thread"); info!("Running thread");
f(context, callback) f(context, callback, counter)
}); });
} }
}); });
info!("Concluded scope"); info!("Concluded scope");
} }
pub fn set_capacity(&self, capacity: usize) {
let mut lock = self.capacity.lock().unwrap();
*lock = capacity;
}
pub fn get_progress(&self) -> usize { pub fn get_progress(&self) -> usize {
self.counter.load(Ordering::Relaxed) self.counter.get()
} }
// I strongly dislike type casting in my own code, so I've shovelled it into here // I strongly dislike type casting in my own code, so I've shovelled it into here
pub fn get_progress_percentage<C: Into<f64>>(&self, capacity: C) -> f64 { pub fn get_progress_percentage(&self) -> f64 {
(self.get_progress() as f64) / (capacity.into()) (self.get_progress() as f64) / (*self.capacity.lock().unwrap() as f64)
} }
} }

View File

@ -11,7 +11,7 @@ use crate::downloads::download_agent::GameDownloadAgent;
use auth::{auth_initiate, generate_authorization_header, recieve_handshake}; use auth::{auth_initiate, generate_authorization_header, recieve_handshake};
use db::{DatabaseInterface, DATA_ROOT_DIR}; use db::{DatabaseInterface, DATA_ROOT_DIR};
use downloads::download_commands::{ use downloads::download_commands::{
queue_game_download, start_game_downloads, stop_specific_game_download, get_game_download_progress, queue_game_download, start_game_downloads, stop_specific_game_download
}; };
use env_logger::Env; use env_logger::Env;
use http::{header::*, response::Builder as ResponseBuilder}; use http::{header::*, response::Builder as ResponseBuilder};
@ -117,7 +117,8 @@ pub fn run() {
// Downloads // Downloads
queue_game_download, queue_game_download,
start_game_downloads, start_game_downloads,
stop_specific_game_download stop_specific_game_download,
get_game_download_progress
]) ])
.plugin(tauri_plugin_shell::init()) .plugin(tauri_plugin_shell::init())
.setup(|app| { .setup(|app| {

View File

@ -1,23 +1,25 @@
use atomic_counter::RelaxedCounter;
use crate::downloads::progress::ProgressChecker; use crate::downloads::progress::ProgressChecker;
use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::Arc; use std::sync::Arc;
#[test] #[test]
fn test_progress_sequentially() { fn test_progress_sequentially() {
let counter = Arc::new(AtomicUsize::new(0)); let counter = Arc::new(RelaxedCounter::new(0));
let callback = Arc::new(AtomicBool::new(false)); let callback = Arc::new(AtomicBool::new(false));
let p = ProgressChecker::new(Box::new(test_fn), counter.clone(), callback); let p = ProgressChecker::new(Box::new(test_fn), counter.clone(), callback, 100);
p.run_contexts_sequentially((1..100).collect()); p.run_contexts_sequentially((1..100).collect());
println!("Progress: {}", p.get_progress_percentage(100)); println!("Progress: {}", p.get_progress_percentage());
} }
#[test] #[test]
fn test_progress_parallel() { fn test_progress_parallel() {
let counter = Arc::new(AtomicUsize::new(0)); let counter = Arc::new(RelaxedCounter::new(0));
let callback = Arc::new(AtomicBool::new(false)); let callback = Arc::new(AtomicBool::new(false));
let p = ProgressChecker::new(Box::new(test_fn), counter.clone(), callback); let p = ProgressChecker::new(Box::new(test_fn), counter.clone(), callback, 100);
p.run_contexts_parallel_background((1..100).collect(), 10); p.run_contexts_parallel_background((1..100).collect(), 10);
} }
fn test_fn(int: usize, callback: Arc<AtomicBool>) { fn test_fn(int: usize, callback: Arc<AtomicBool>, counter: Arc<RelaxedCounter>) {
println!("{}", int); println!("{}", int);
} }