Merge branch 'downloads' (again)

Signed-off-by: quexeky <git@quexeky.dev>
This commit is contained in:
quexeky
2024-11-04 18:57:54 +11:00
12 changed files with 115 additions and 296 deletions

View File

@ -1,6 +1,5 @@
use std::{
env,
fmt::{Display, Formatter},
sync::Mutex,
time::{SystemTime, UNIX_EPOCH},
};
@ -13,7 +12,6 @@ use url::{ParseError, Url};
use crate::{
db::{DatabaseAuth, DatabaseImpls},
remote::RemoteAccessError,
AppState, AppStatus, User, DB,
};

View File

@ -3,13 +3,14 @@ use crate::db::{DatabaseImpls, DATA_ROOT_DIR};
use crate::downloads::download_logic;
use crate::downloads::manifest::{DropDownloadContext, DropManifest};
use crate::downloads::progress::ProgressChecker;
use crate::{AppState, DB};
use crate::DB;
use atomic_counter::RelaxedCounter;
use log::info;
use rustix::fs::{fallocate, FallocateFlags};
use serde::{Deserialize, Serialize};
use std::fs::{create_dir_all, File};
use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex};
use urlencoding::encode;
@ -18,7 +19,7 @@ pub struct GameDownloadAgent {
pub version: String,
state: Mutex<GameDownloadState>,
contexts: Mutex<Vec<DropDownloadContext>>,
progress: ProgressChecker<DropDownloadContext>,
pub progress: ProgressChecker<DropDownloadContext>,
pub manifest: Mutex<Option<DropManifest>>,
pub callback: Arc<AtomicBool>,
}
@ -57,8 +58,9 @@ impl GameDownloadAgent {
callback: callback.clone(),
progress: ProgressChecker::new(
Box::new(download_logic::download_game_chunk),
Arc::new(AtomicUsize::new(0)),
Arc::new(RelaxedCounter::new(0)),
callback,
0,
),
contexts: Mutex::new(Vec::new()),
}
@ -119,6 +121,13 @@ impl GameDownloadAgent {
}
let manifest_download = response.json::<DropManifest>().unwrap();
let length = manifest_download
.values()
.map(|chunk| {
return chunk.lengths.iter().sum::<usize>();
})
.sum::<usize>();
self.progress.set_capacity(length);
if let Ok(mut manifest) = self.manifest.lock() {
*manifest = Some(manifest_download)
} else {

View File

@ -60,7 +60,7 @@ pub async fn start_game_downloads(
}
});
info!("Spawned download");
return Ok(());
Ok(())
}
pub fn start_game_download(
@ -104,5 +104,17 @@ pub async fn stop_specific_game_download(
info!("Stopping callback");
callback.store(true, Ordering::Release);
return Ok(());
Ok(())
}
#[tauri::command]
pub async fn get_game_download_progress(
state: tauri::State<'_, Mutex<AppState>>,
game_id: String,
) -> Result<f64, String> {
let lock = state.lock().unwrap();
let download_agent = lock.game_downloads.get(&game_id).unwrap();
let progress = download_agent.progress.get_progress_percentage();
info!("{}", progress);
Ok(progress)
}

View File

@ -2,14 +2,12 @@ use crate::auth::generate_authorization_header;
use crate::db::DatabaseImpls;
use crate::downloads::manifest::DropDownloadContext;
use crate::DB;
use gxhash::{gxhash128, GxHasher};
use log::info;
use atomic_counter::{AtomicCounter, RelaxedCounter};
use log::{error, info};
use md5::{Context, Digest};
use reqwest::blocking::Response;
use std::{
fs::{File, OpenOptions},
hash::Hasher,
io::{self, BufReader, BufWriter, Error, ErrorKind, Read, Seek, SeekFrom, Write},
io::{self, BufWriter, Error, ErrorKind, Seek, SeekFrom, Write},
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering},
@ -22,13 +20,15 @@ pub struct DropFileWriter {
file: File,
hasher: Context,
callback: Arc<AtomicBool>,
progress: Arc<RelaxedCounter>,
}
impl DropFileWriter {
fn new(path: PathBuf, callback: Arc<AtomicBool>) -> Self {
fn new(path: PathBuf, callback: Arc<AtomicBool>, progress: Arc<RelaxedCounter>) -> Self {
Self {
file: OpenOptions::new().write(true).open(path).unwrap(),
hasher: Context::new(),
callback,
progress,
}
}
fn finish(mut self) -> io::Result<Digest> {
@ -45,6 +45,8 @@ impl Write for DropFileWriter {
"Interrupt command recieved",
));
}
let len = buf.len();
self.progress.add(len);
//info!("Writing data to writer");
self.hasher.write_all(buf).unwrap();
@ -61,7 +63,11 @@ impl Seek for DropFileWriter {
self.file.seek(pos)
}
}
pub fn download_game_chunk(ctx: DropDownloadContext, callback: Arc<AtomicBool>) {
pub fn download_game_chunk(
ctx: DropDownloadContext,
callback: Arc<AtomicBool>,
progress: Arc<RelaxedCounter>,
) {
if callback.load(Ordering::Acquire) {
info!("Callback stopped download at start");
return;
@ -88,7 +94,7 @@ pub fn download_game_chunk(ctx: DropDownloadContext, callback: Arc<AtomicBool>)
.send()
.unwrap();
let mut file: DropFileWriter = DropFileWriter::new(ctx.path, callback);
let mut file: DropFileWriter = DropFileWriter::new(ctx.path, callback, progress);
if ctx.offset != 0 {
file.seek(SeekFrom::Start(ctx.offset))
@ -98,15 +104,22 @@ pub fn download_game_chunk(ctx: DropDownloadContext, callback: Arc<AtomicBool>)
// Writing everything to disk directly is probably slightly faster because it balances out the writes,
// but this is better than the performance loss from constantly reading the callbacks
//let mut writer = BufWriter::with_capacity(1024 * 1024, file);
let mut writer = BufWriter::with_capacity(1024 * 1024, file);
//copy_to_drop_file_writer(&mut response, &mut file);
match io::copy(&mut response, &mut file) {
match io::copy(&mut response, &mut writer) {
Ok(_) => {}
Err(e) => {
info!("Copy errored with error {}", e)
}
}
let file = match writer.into_inner() {
Ok(file) => file,
Err(_) => {
error!("Failed to acquire writer from BufWriter");
return;
}
};
let res = hex::encode(file.finish().unwrap().0);
if res != ctx.checksum {
@ -118,24 +131,3 @@ pub fn download_game_chunk(ctx: DropDownloadContext, callback: Arc<AtomicBool>)
// stream.flush().unwrap();
}
pub fn copy_to_drop_file_writer(response: &mut Response, writer: &mut DropFileWriter) {
loop {
info!("Writing to file writer");
let mut buf = [0u8; 1024];
response.read(&mut buf).unwrap();
match writer.write_all(&buf) {
Ok(_) => {}
Err(e) => match e.kind() {
ErrorKind::Interrupted => {
info!("Interrupted");
return;
}
_ => {
println!("{}", e);
return;
}
},
}
}
}

View File

@ -1,8 +1,6 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs::File;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
pub type DropManifest = HashMap<String, DropChunk>;
#[derive(Serialize, Deserialize, Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]

View File

@ -1,15 +1,17 @@
use atomic_counter::{AtomicCounter, RelaxedCounter};
use log::info;
use rayon::ThreadPoolBuilder;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex};
pub struct ProgressChecker<T>
where
T: 'static + Send + Sync,
{
counter: Arc<AtomicUsize>,
f: Arc<Box<dyn Fn(T, Arc<AtomicBool>) + Send + Sync + 'static>>,
counter: Arc<RelaxedCounter>,
f: Arc<Box<dyn Fn(T, Arc<AtomicBool>, Arc<RelaxedCounter>) + Send + Sync + 'static>>,
callback: Arc<AtomicBool>,
capacity: Mutex<usize>,
}
impl<T> ProgressChecker<T>
@ -17,22 +19,25 @@ where
T: Send + Sync,
{
pub fn new(
f: Box<dyn Fn(T, Arc<AtomicBool>) + Send + Sync + 'static>,
counter_reference: Arc<AtomicUsize>,
f: Box<dyn Fn(T, Arc<AtomicBool>, Arc<RelaxedCounter>) + Send + Sync + 'static>,
counter: Arc<RelaxedCounter>,
callback: Arc<AtomicBool>,
capacity: usize,
) -> Self {
Self {
f: f.into(),
counter: counter_reference,
counter,
callback,
capacity: capacity.into(),
}
}
#[allow(dead_code)]
pub fn run_contexts_sequentially(&self, contexts: Vec<T>) {
for context in contexts {
(self.f)(context, self.callback.clone());
self.counter.fetch_add(1, Ordering::Release);
(self.f)(context, self.callback.clone(), self.counter.clone());
}
}
#[allow(dead_code)]
pub fn run_contexts_parallel_background(&self, contexts: Vec<T>, max_threads: usize) {
let threads = ThreadPoolBuilder::new()
// If max_threads == 0, then the limit will be determined
@ -43,8 +48,9 @@ where
for context in contexts {
let callback = self.callback.clone();
let counter = self.counter.clone();
let f = self.f.clone();
threads.spawn(move || f(context, callback));
threads.spawn(move || f(context, callback, counter));
}
}
pub fn run_context_parallel(&self, contexts: Vec<T>, max_threads: usize) {
@ -56,20 +62,25 @@ where
threads.scope(|s| {
for context in contexts {
let callback = self.callback.clone();
let counter = self.counter.clone();
let f = self.f.clone();
s.spawn(move |_| {
info!("Running thread");
f(context, callback)
f(context, callback, counter)
});
}
});
info!("Concluded scope");
}
pub fn set_capacity(&self, capacity: usize) {
let mut lock = self.capacity.lock().unwrap();
*lock = capacity;
}
pub fn get_progress(&self) -> usize {
self.counter.load(Ordering::Relaxed)
self.counter.get()
}
// I strongly dislike type casting in my own code, so I've shovelled it into here
pub fn get_progress_percentage<C: Into<f64>>(&self, capacity: C) -> f64 {
(self.get_progress() as f64) / (capacity.into())
pub fn get_progress_percentage(&self) -> f64 {
(self.get_progress() as f64) / (*self.capacity.lock().unwrap() as f64)
}
}

View File

@ -11,7 +11,8 @@ use crate::downloads::download_agent::GameDownloadAgent;
use auth::{auth_initiate, generate_authorization_header, recieve_handshake};
use db::{DatabaseInterface, DATA_ROOT_DIR};
use downloads::download_commands::{
queue_game_download, start_game_downloads, stop_specific_game_download,
get_game_download_progress, queue_game_download, start_game_downloads,
stop_specific_game_download,
};
use env_logger::Env;
use http::{header::*, response::Builder as ResponseBuilder};
@ -96,6 +97,7 @@ pub fn run() {
let mut builder = tauri::Builder::default().plugin(tauri_plugin_dialog::init());
#[cfg(desktop)]
#[allow(unused_variables)]
{
builder = builder.plugin(tauri_plugin_single_instance::init(|_app, argv, _cwd| {
// when defining deep link schemes at runtime, you must also check `argv` here
@ -119,7 +121,8 @@ pub fn run() {
// Downloads
queue_game_download,
start_game_downloads,
stop_specific_game_download
stop_specific_game_download,
get_game_download_progress
])
.plugin(tauri_plugin_shell::init())
.setup(|app| {

View File

@ -6,7 +6,6 @@ use tauri::{AppHandle, Manager};
use crate::db::DatabaseGameStatus;
use crate::db::DatabaseImpls;
use crate::remote::RemoteAccessError;
use crate::{auth::generate_authorization_header, AppState, DB};
#[derive(serde::Serialize)]

View File

@ -1,23 +1,25 @@
use atomic_counter::RelaxedCounter;
use crate::downloads::progress::ProgressChecker;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
#[test]
fn test_progress_sequentially() {
let counter = Arc::new(AtomicUsize::new(0));
let counter = Arc::new(RelaxedCounter::new(0));
let callback = Arc::new(AtomicBool::new(false));
let p = ProgressChecker::new(Box::new(test_fn), counter.clone(), callback);
let p = ProgressChecker::new(Box::new(test_fn), counter.clone(), callback, 100);
p.run_contexts_sequentially((1..100).collect());
println!("Progress: {}", p.get_progress_percentage(100));
println!("Progress: {}", p.get_progress_percentage());
}
#[test]
fn test_progress_parallel() {
let counter = Arc::new(AtomicUsize::new(0));
let counter = Arc::new(RelaxedCounter::new(0));
let callback = Arc::new(AtomicBool::new(false));
let p = ProgressChecker::new(Box::new(test_fn), counter.clone(), callback);
let p = ProgressChecker::new(Box::new(test_fn), counter.clone(), callback, 100);
p.run_contexts_parallel_background((1..100).collect(), 10);
}
fn test_fn(int: usize, callback: Arc<AtomicBool>) {
fn test_fn(int: usize, _callback: Arc<AtomicBool>, _counter: Arc<RelaxedCounter>) {
println!("{}", int);
}