diff --git a/pages/store/index.vue b/pages/store/index.vue index 8ee2ae7..1b3ea21 100644 --- a/pages/store/index.vue +++ b/pages/store/index.vue @@ -9,10 +9,16 @@ + diff --git a/src-tauri/src/downloads/download_agent.rs b/src-tauri/src/downloads/download_agent.rs index b8fc0d5..093f1cc 100644 --- a/src-tauri/src/downloads/download_agent.rs +++ b/src-tauri/src/downloads/download_agent.rs @@ -68,30 +68,30 @@ impl GameDownloadAgent { if self.manifest.lock().unwrap().is_none() { return Ok(()); } - self.ensure_manifest_exists().await + self.ensure_manifest_exists() } - pub async fn begin_download(&self, max_threads: usize) -> Result<(), GameDownloadError> { + pub fn begin_download(&self, max_threads: usize) -> Result<(), GameDownloadError> { self.change_state(GameDownloadState::Downloading); // TODO we're coping the whole context thing // It's not necessary, I just can't figure out to make the borrow checker happy { let lock = self.contexts.lock().unwrap().to_vec(); self.progress - .run_context_parallel(lock, max_threads).await; + .run_context_parallel(lock, max_threads); } Ok(()) } - pub async fn ensure_manifest_exists(&self) -> Result<(), GameDownloadError> { + pub fn ensure_manifest_exists(&self) -> Result<(), GameDownloadError> { if self.manifest.lock().unwrap().is_some() { return Ok(()); } - self.download_manifest().await + self.download_manifest() } - async fn download_manifest(&self) -> Result<(), GameDownloadError> { + fn download_manifest(&self) -> Result<(), GameDownloadError> { let base_url = DB.fetch_base_url(); let manifest_url = base_url .join( @@ -106,12 +106,11 @@ impl GameDownloadAgent { let header = generate_authorization_header(); info!("Generating & sending client"); - let client = reqwest::Client::new(); + let client = reqwest::blocking::Client::new(); let response = client .get(manifest_url.to_string()) .header("Authorization", header) .send() - .await .unwrap(); if response.status() != 200 { @@ -119,7 +118,7 @@ impl GameDownloadAgent { return Err(GameDownloadError::Status(response.status().as_u16())); } - let manifest_download = response.json::().await.unwrap(); + let manifest_download = response.json::().unwrap(); if let Ok(mut manifest) = self.manifest.lock() { *manifest = Some(manifest_download) } else { diff --git a/src-tauri/src/downloads/download_commands.rs b/src-tauri/src/downloads/download_commands.rs index a4f0f01..550d837 100644 --- a/src-tauri/src/downloads/download_commands.rs +++ b/src-tauri/src/downloads/download_commands.rs @@ -1,4 +1,4 @@ -use std::sync::{atomic::Ordering, Arc, Mutex}; +use std::{sync::{atomic::Ordering, Arc, Mutex}, thread}; use log::info; @@ -27,41 +27,46 @@ pub async fn start_game_downloads( state: tauri::State<'_, Mutex>, ) -> Result<(), GameDownloadError> { info!("Downloading Games"); - loop { - let mut current_id = String::new(); - let mut download_agent = None; - { - let lock = state.lock().unwrap(); - for (id, agent) in &lock.game_downloads { - if agent.get_state() == GameDownloadState::Queued { - download_agent = Some(agent.clone()); - current_id = id.clone(); - info!("Got queued game to download"); - break; + let lock = state.lock().unwrap(); + let mut game_downloads = lock.game_downloads.clone(); + drop(lock); + thread::spawn(move || { + loop { + let mut current_id = String::new(); + let mut download_agent = None; + { + for (id, agent) in &game_downloads { + if agent.get_state() == GameDownloadState::Queued { + download_agent = Some(agent.clone()); + current_id = id.clone(); + info!("Got queued game to download"); + break; + } } + if download_agent.is_none() { + info!("No more games left to download"); + return; + } + }; + info!("Downloading game"); + { + start_game_download(max_threads, download_agent.unwrap()).unwrap(); + game_downloads.remove_entry(¤t_id); } - if download_agent.is_none() { - info!("No more games left to download"); - return Ok(()) - } - }; - info!("Downloading game"); - { - start_game_download(max_threads, download_agent.unwrap()).await?; - let mut lock = state.lock().unwrap(); - lock.game_downloads.remove_entry(¤t_id); - } - } + } + }); + info!("Spawned download"); + return Ok(()) } -pub async fn start_game_download( +pub fn start_game_download( max_threads: usize, download_agent: Arc ) -> Result<(), GameDownloadError> { info!("Triggered Game Download"); - download_agent.ensure_manifest_exists().await?; + download_agent.ensure_manifest_exists()?; let local_manifest = { let manifest = download_agent.manifest.lock().unwrap(); @@ -70,19 +75,21 @@ pub async fn start_game_download( download_agent.generate_job_contexts(&local_manifest, download_agent.version.clone(), download_agent.id.clone()).unwrap(); - download_agent.begin_download(max_threads).await?; + download_agent.begin_download(max_threads).unwrap(); Ok(()) } #[tauri::command] pub async fn stop_specific_game_download(state: tauri::State<'_, Mutex>, game_id: String) -> Result<(), String> { + info!("called stop_specific_game_download"); let lock = state.lock().unwrap(); let download_agent = lock.game_downloads.get(&game_id).unwrap(); let callback = download_agent.callback.clone(); drop(lock); + info!("Stopping callback"); callback.store(true, Ordering::Release); return Ok(()) diff --git a/src-tauri/src/downloads/download_logic.rs b/src-tauri/src/downloads/download_logic.rs index 8825c68..d432780 100644 --- a/src-tauri/src/downloads/download_logic.rs +++ b/src-tauri/src/downloads/download_logic.rs @@ -5,7 +5,8 @@ use crate::DB; use gxhash::{gxhash128, GxHasher}; use log::info; use md5::{Context, Digest}; -use std::{fs::{File, OpenOptions}, hash::Hasher, io::{self, BufWriter, Error, ErrorKind, Seek, SeekFrom, Write}, path::PathBuf, sync::{atomic::{AtomicBool, Ordering}, Arc}}; +use reqwest::blocking::Response; +use std::{fs::{File, OpenOptions}, hash::Hasher, io::{self, BufReader, BufWriter, Error, ErrorKind, Read, Seek, SeekFrom, Write}, path::PathBuf, sync::{atomic::{AtomicBool, Ordering}, Arc}}; use urlencoding::encode; pub struct DropFileWriter { @@ -30,8 +31,10 @@ impl DropFileWriter { impl Write for DropFileWriter { fn write(&mut self, buf: &[u8]) -> std::io::Result { if self.callback.load(Ordering::Acquire) { - return Err(Error::new(ErrorKind::Interrupted, "Interrupt command recieved")); + return Err(Error::new(ErrorKind::ConnectionAborted, "Interrupt command recieved")); } + + //info!("Writing data to writer"); self.hasher.write_all(buf).unwrap(); self.file.write(buf) } @@ -48,6 +51,7 @@ impl Seek for DropFileWriter { } pub fn download_game_chunk(ctx: DropDownloadContext, callback: Arc) { if callback.load(Ordering::Acquire) { + info!("Callback stopped download at start"); return; } let base_url = DB.fetch_base_url(); @@ -83,16 +87,14 @@ pub fn download_game_chunk(ctx: DropDownloadContext, callback: Arc) // Writing everything to disk directly is probably slightly faster because it balances out the writes, // but this is better than the performance loss from constantly reading the callbacks - let mut writer = BufWriter::with_capacity(1024 * 1024, file); + //let mut writer = BufWriter::with_capacity(1024 * 1024, file); - match response.copy_to(&mut writer) { + //copy_to_drop_file_writer(&mut response, &mut file); + match io::copy(&mut response, &mut file) { Ok(_) => {}, - Err(_) => { println!("Stopped printing chunk {}", ctx.file_name); return; } - }; - let file = match writer.into_inner() { - Ok(inner) => inner, - Err(_) => panic!("Failed to get BufWriter inner"), - }; + Err(e) => { info!("Copy errored with error {}", e)}, + } + let res = hex::encode(file.finish().unwrap().0); if res != ctx.checksum { info!("Checksum failed. Original: {}, Calculated: {} for {}", ctx.checksum, res, ctx.file_name); @@ -100,3 +102,23 @@ pub fn download_game_chunk(ctx: DropDownloadContext, callback: Arc) // stream.flush().unwrap(); } + +pub fn copy_to_drop_file_writer(response: &mut Response, writer: &mut DropFileWriter) { + loop { + info!("Writing to file writer"); + let mut buf = [0u8; 1024]; + response.read(&mut buf).unwrap(); + match writer.write_all(&buf) { + Ok(_) => {}, + Err(e) => { + match e.kind() { + ErrorKind::Interrupted => { + info!("Interrupted"); + return; + } + _ => { println!("{}", e); return;} + } + }, + } + } +} \ No newline at end of file diff --git a/src-tauri/src/downloads/progress.rs b/src-tauri/src/downloads/progress.rs index 78d56ed..671ba34 100644 --- a/src-tauri/src/downloads/progress.rs +++ b/src-tauri/src/downloads/progress.rs @@ -1,6 +1,5 @@ +use log::info; use rayon::ThreadPoolBuilder; -use uuid::timestamp::context; -use std::os::unix::thread; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; @@ -28,12 +27,6 @@ where callback } } - pub async fn run_contexts_sequentially_async(&self, contexts: Vec) { - for context in contexts { - (self.f)(context, self.callback.clone()); - self.counter.fetch_add(1, Ordering::Release); - } - } pub fn run_contexts_sequentially(&self, contexts: Vec) { for context in contexts { (self.f)(context, self.callback.clone()); @@ -54,7 +47,7 @@ where threads.spawn(move || f(context, callback)); } } - pub async fn run_context_parallel(&self, contexts: Vec, max_threads: usize) { + pub fn run_context_parallel(&self, contexts: Vec, max_threads: usize) { let threads = ThreadPoolBuilder::new() .num_threads(max_threads) .build() @@ -64,9 +57,10 @@ where for context in contexts { let callback = self.callback.clone(); let f = self.f.clone(); - s.spawn(move |_| f(context, callback)); + s.spawn(move |_| {info!("Running thread"); f(context, callback)}); } }); + info!("Concluded scope"); } pub fn get_progress(&self) -> usize {