use crate::auth::generate_authorization_header; use crate::db::{set_game_status, ApplicationTransientStatus, DatabaseImpls}; use crate::download_manager::application_download_error::ApplicationDownloadError; use crate::download_manager::download_manager::{DownloadManagerSignal, DownloadStatus}; use crate::download_manager::download_thread_control_flag::{ DownloadThreadControl, DownloadThreadControlFlag, }; use crate::download_manager::downloadable::Downloadable; use crate::download_manager::downloadable_metadata::{DownloadType, DownloadableMetadata}; use crate::download_manager::progress_object::{ProgressHandle, ProgressObject}; use crate::games::downloads::manifest::{DropDownloadContext, DropManifest}; use crate::games::library::{on_game_complete, push_game_update}; use crate::remote::RemoteAccessError; use crate::DB; use log::{debug, error, info}; use rayon::ThreadPoolBuilder; use std::collections::VecDeque; use std::fs::{create_dir_all, File}; use std::path::Path; use std::sync::mpsc::Sender; use std::sync::{Arc, Mutex}; use std::time::Instant; use tauri::{AppHandle, Emitter}; use urlencoding::encode; #[cfg(target_os = "linux")] use rustix::fs::{fallocate, FallocateFlags}; use super::download_logic::download_game_chunk; use super::stored_manifest::StoredManifest; pub struct GameDownloadAgent { pub id: String, pub version: String, pub control_flag: DownloadThreadControl, contexts: Mutex>, completed_contexts: Mutex>, pub manifest: Mutex>, pub progress: Arc, sender: Sender, pub stored_manifest: StoredManifest, status: Mutex, } impl GameDownloadAgent { pub fn new( id: String, version: String, target_download_dir: usize, sender: Sender, ) -> Self { // Don't run by default let control_flag = DownloadThreadControl::new(DownloadThreadControlFlag::Stop); let db_lock = DB.borrow_data().unwrap(); let base_dir = db_lock.applications.install_dirs[target_download_dir].clone(); drop(db_lock); let base_dir_path = Path::new(&base_dir); let data_base_dir_path = base_dir_path.join(id.clone()); let stored_manifest = StoredManifest::generate(id.clone(), version.clone(), data_base_dir_path.clone()); Self { id, version, control_flag, manifest: Mutex::new(None), contexts: Mutex::new(Vec::new()), completed_contexts: Mutex::new(VecDeque::new()), progress: Arc::new(ProgressObject::new(0, 0, sender.clone())), sender, stored_manifest, status: Mutex::new(DownloadStatus::Queued), } } // Blocking pub fn setup_download(&self) -> Result<(), ApplicationDownloadError> { self.ensure_manifest_exists()?; info!("Ensured manifest exists"); self.ensure_contexts()?; info!("Ensured contexts exists"); self.control_flag.set(DownloadThreadControlFlag::Go); Ok(()) } // Blocking pub fn download(&self, app_handle: &AppHandle) -> Result { info!("Setting up download"); self.setup_download()?; info!("Setting progress object params"); self.set_progress_object_params(); info!("Running"); let timer = Instant::now(); push_game_update( app_handle, &self.metadata(), ( None, Some(ApplicationTransientStatus::Downloading { version_name: self.version.clone(), }), ), ); let res = self .run() .map_err(|_| ApplicationDownloadError::DownloadError); info!( "{} took {}ms to download", self.id, timer.elapsed().as_millis() ); res } pub fn ensure_manifest_exists(&self) -> Result<(), ApplicationDownloadError> { if self.manifest.lock().unwrap().is_some() { return Ok(()); } self.download_manifest() } fn download_manifest(&self) -> Result<(), ApplicationDownloadError> { let base_url = DB.fetch_base_url(); let manifest_url = base_url .join( format!( "/api/v1/client/metadata/manifest?id={}&version={}", self.id, encode(&self.version) ) .as_str(), ) .unwrap(); let header = generate_authorization_header(); let client = reqwest::blocking::Client::new(); let response = client .get(manifest_url.to_string()) .header("Authorization", header) .send() .unwrap(); if response.status() != 200 { return Err(ApplicationDownloadError::Communication( RemoteAccessError::ManifestDownloadFailed( response.status(), response.text().unwrap(), ), )); } let manifest_download = response.json::().unwrap(); if let Ok(mut manifest) = self.manifest.lock() { *manifest = Some(manifest_download); return Ok(()); } Err(ApplicationDownloadError::Lock) } fn set_progress_object_params(&self) { // Avoid re-setting it if self.progress.get_max() != 0 { return; } let contexts = self.contexts.lock().unwrap(); let length = contexts.len(); let chunk_count = contexts.iter().map(|chunk| chunk.length).sum(); debug!("Setting ProgressObject max to {}", chunk_count); self.progress.set_max(chunk_count); debug!("Setting ProgressObject size to {}", length); self.progress.set_size(length); debug!("Setting ProgressObject time to now"); self.progress.set_time_now(); } pub fn ensure_contexts(&self) -> Result<(), ApplicationDownloadError> { if !self.contexts.lock().unwrap().is_empty() { return Ok(()); } self.generate_contexts()?; Ok(()) } pub fn generate_contexts(&self) -> Result<(), ApplicationDownloadError> { let manifest = self.manifest.lock().unwrap().clone().unwrap(); let game_id = self.id.clone(); let mut contexts = Vec::new(); let base_path = Path::new(&self.stored_manifest.base_path); create_dir_all(base_path).unwrap(); { let mut completed_contexts_lock = self.completed_contexts.lock().unwrap(); completed_contexts_lock.clear(); completed_contexts_lock.extend(self.stored_manifest.get_completed_contexts()); } for (raw_path, chunk) in manifest { let path = base_path.join(Path::new(&raw_path)); let container = path.parent().unwrap(); create_dir_all(container).unwrap(); let file = File::create(path.clone()).unwrap(); let mut running_offset = 0; for (index, length) in chunk.lengths.iter().enumerate() { contexts.push(DropDownloadContext { file_name: raw_path.to_string(), version: chunk.version_name.to_string(), offset: running_offset, index, game_id: game_id.to_string(), path: path.clone(), checksum: chunk.checksums[index].clone(), length: *length, permissions: chunk.permissions, }); running_offset += *length as u64; } #[cfg(target_os = "linux")] if running_offset > 0 { let _ = fallocate(file, FallocateFlags::empty(), 0, running_offset); } } *self.contexts.lock().unwrap() = contexts; Ok(()) } pub fn run(&self) -> Result { info!("downloading game: {}", self.id); const DOWNLOAD_MAX_THREADS: usize = 1; let pool = ThreadPoolBuilder::new() .num_threads(DOWNLOAD_MAX_THREADS) .build() .unwrap(); let completed_indexes = Arc::new(boxcar::Vec::new()); let completed_indexes_loop_arc = completed_indexes.clone(); let base_url = DB.fetch_base_url(); let contexts = self.contexts.lock().unwrap(); pool.scope(|scope| { let client = &reqwest::blocking::Client::new(); for (index, context) in contexts.iter().enumerate() { let client = client.clone(); let completed_indexes = completed_indexes_loop_arc.clone(); let progress = self.progress.get(index); let progress_handle = ProgressHandle::new(progress, self.progress.clone()); // If we've done this one already, skip it if self.completed_contexts.lock().unwrap().contains(&index) { progress_handle.add(context.length); continue; } let sender = self.sender.clone(); let request = generate_request(&base_url, client, context); scope.spawn(move |_| { match download_game_chunk(context, &self.control_flag, progress_handle, request) { Ok(res) => { if res { completed_indexes.push(index); } } Err(e) => { error!("{}", e); sender.send(DownloadManagerSignal::Error(e)).unwrap(); } } info!("Completed context id {}", index); }); } }); let newly_completed = completed_indexes.to_owned(); let completed_lock_len = { let mut completed_contexts_lock = self.completed_contexts.lock().unwrap(); for (item, _) in newly_completed.iter() { completed_contexts_lock.push_front(item); } completed_contexts_lock.len() }; info!("Got newly completed"); // If we're not out of contexts, we're not done, so we don't fire completed if completed_lock_len != contexts.len() { info!("da for {} exited without completing", self.id.clone()); self.stored_manifest .set_completed_contexts(&self.completed_contexts.lock().unwrap().clone().into()); info!("Setting completed contexts"); self.stored_manifest.write(); info!("Wrote completed contexts"); return Ok(false); } info!("Sending completed signal"); // We've completed self.sender .send(DownloadManagerSignal::Completed(self.metadata())) .unwrap(); Ok(true) } } fn generate_request( base_url: &url::Url, client: reqwest::blocking::Client, context: &DropDownloadContext, ) -> reqwest::blocking::RequestBuilder { let chunk_url = base_url .join(&format!( "/api/v1/client/chunk?id={}&version={}&name={}&chunk={}", // Encode the parts we don't trust context.game_id, encode(&context.version), encode(&context.file_name), context.index )) .unwrap(); let header = generate_authorization_header(); client.get(chunk_url).header("Authorization", header) } impl Downloadable for GameDownloadAgent { fn download(&self, app_handle: &AppHandle) -> Result { *self.status.lock().unwrap() = DownloadStatus::Downloading; self.download(app_handle) } fn progress(&self) -> Arc { self.progress.clone() } fn control_flag(&self) -> DownloadThreadControl { self.control_flag.clone() } fn metadata(&self) -> DownloadableMetadata { DownloadableMetadata { id: self.id.clone(), version: Some(self.version.clone()), download_type: DownloadType::Game, } } fn on_initialised(&self, _app_handle: &tauri::AppHandle) { *self.status.lock().unwrap() = DownloadStatus::Queued; } fn on_error(&self, app_handle: &tauri::AppHandle, error: ApplicationDownloadError) { *self.status.lock().unwrap() = DownloadStatus::Error; app_handle .emit("download_error", error.to_string()) .unwrap(); error!("error while managing download: {}", error); set_game_status(app_handle, self.metadata(), |db_handle, meta| { db_handle.applications.transient_statuses.remove(meta); }); } fn on_complete(&self, app_handle: &tauri::AppHandle) { on_game_complete( &self.metadata(), self.stored_manifest.base_path.to_string_lossy().to_string(), app_handle, ) .unwrap(); } fn on_incomplete(&self, _app_handle: &tauri::AppHandle) { *self.status.lock().unwrap() = DownloadStatus::Queued; } fn on_cancelled(&self, _app_handle: &tauri::AppHandle) {} fn status(&self) -> DownloadStatus { self.status.lock().unwrap().clone() } }