v2 download API and fixes (#112)

* fix: potential download fixes

* fix: show installed games not on remote

* fix: more download_logic error handling

* partial: move to async

* feat: interactivity improvements

* feat: v2 download API

* fix: download seek offsets

* fix: clippy

* fix: apply clippy suggestion

* fix: performance improvements starting up download

* fix: finished bucket file

* fix: ui tweaks and fixes

* fix: revert version to 0.3.2

* fix: clippy
This commit is contained in:
DecDuck
2025-08-09 15:50:21 +10:00
committed by GitHub
parent 3b830e2a44
commit 16365713cf
27 changed files with 859 additions and 603 deletions

View File

@ -11,13 +11,15 @@ use crate::download_manager::util::download_thread_control_flag::{
use crate::download_manager::util::progress_object::{ProgressHandle, ProgressObject};
use crate::error::application_download_error::ApplicationDownloadError;
use crate::error::remote_access_error::RemoteAccessError;
use crate::games::downloads::manifest::{DropDownloadContext, DropManifest};
use crate::games::downloads::manifest::{
DownloadBucket, DownloadContext, DownloadDrop, DropManifest, DropValidateContext, ManifestBody,
};
use crate::games::downloads::validate::validate_game_chunk;
use crate::games::library::{on_game_complete, push_game_update, set_partially_installed};
use crate::games::state::GameStatusManager;
use crate::process::utils::get_disk_available;
use crate::remote::requests::make_request;
use crate::remote::utils::DROP_CLIENT_SYNC;
use crate::remote::requests::generate_url;
use crate::remote::utils::{DROP_CLIENT_ASYNC, DROP_CLIENT_SYNC};
use log::{debug, error, info, warn};
use rayon::ThreadPoolBuilder;
use std::collections::HashMap;
@ -31,16 +33,18 @@ use tauri::{AppHandle, Emitter};
#[cfg(target_os = "linux")]
use rustix::fs::{FallocateFlags, fallocate};
use super::download_logic::download_game_chunk;
use super::download_logic::download_game_bucket;
use super::drop_data::DropData;
static RETRY_COUNT: usize = 3;
const TARGET_BUCKET_SIZE: usize = 63 * 1000 * 1000;
pub struct GameDownloadAgent {
pub id: String,
pub version: String,
pub control_flag: DownloadThreadControl,
contexts: Mutex<Vec<DropDownloadContext>>,
buckets: Mutex<Vec<DownloadBucket>>,
context_map: Mutex<HashMap<String, bool>>,
pub manifest: Mutex<Option<DropManifest>>,
pub progress: Arc<ProgressObject>,
@ -50,19 +54,21 @@ pub struct GameDownloadAgent {
}
impl GameDownloadAgent {
pub fn new_from_index(
pub async fn new_from_index(
id: String,
version: String,
target_download_dir: usize,
sender: Sender<DownloadManagerSignal>,
) -> Result<Self, ApplicationDownloadError> {
let db_lock = borrow_db_checked();
let base_dir = db_lock.applications.install_dirs[target_download_dir].clone();
drop(db_lock);
let base_dir = {
let db_lock = borrow_db_checked();
Self::new(id, version, base_dir, sender)
db_lock.applications.install_dirs[target_download_dir].clone()
};
Self::new(id, version, base_dir, sender).await
}
pub fn new(
pub async fn new(
id: String,
version: String,
base_dir: PathBuf,
@ -82,7 +88,7 @@ impl GameDownloadAgent {
version,
control_flag,
manifest: Mutex::new(None),
contexts: Mutex::new(Vec::new()),
buckets: Mutex::new(Vec::new()),
context_map: Mutex::new(HashMap::new()),
progress: Arc::new(ProgressObject::new(0, 0, sender.clone())),
sender,
@ -90,7 +96,7 @@ impl GameDownloadAgent {
status: Mutex::new(DownloadStatus::Queued),
};
result.ensure_manifest_exists()?;
result.ensure_manifest_exists().await?;
let required_space = result
.manifest
@ -100,8 +106,7 @@ impl GameDownloadAgent {
.unwrap()
.values()
.map(|e| e.lengths.iter().sum::<usize>())
.sum::<usize>()
as u64;
.sum::<usize>() as u64;
let available_space = get_disk_available(data_base_dir_path)? as u64;
@ -117,26 +122,25 @@ impl GameDownloadAgent {
// Blocking
pub fn setup_download(&self, app_handle: &AppHandle) -> Result<(), ApplicationDownloadError> {
self.ensure_manifest_exists()?;
let mut db_lock = borrow_db_mut_checked();
let status = ApplicationTransientStatus::Downloading {
version_name: self.version.clone(),
};
db_lock
.applications
.transient_statuses
.insert(self.metadata(), status.clone());
// Don't use GameStatusManager because this game isn't installed
push_game_update(app_handle, &self.metadata().id, None, (None, Some(status)));
self.ensure_contexts()?;
if !self.check_manifest_exists() {
return Err(ApplicationDownloadError::NotInitialized);
}
self.ensure_buckets()?;
self.control_flag.set(DownloadThreadControlFlag::Go);
let mut db_lock = borrow_db_mut_checked();
db_lock.applications.transient_statuses.insert(
self.metadata(),
ApplicationTransientStatus::Downloading {
version_name: self.version.clone(),
},
);
push_game_update(
app_handle,
&self.metadata().id,
None,
GameStatusManager::fetch_state(&self.metadata().id, &db_lock),
);
Ok(())
}
@ -147,9 +151,7 @@ impl GameDownloadAgent {
info!("beginning download for {}...", self.metadata().id);
let res = self
.run()
.map_err(|()| ApplicationDownloadError::DownloadError);
let res = self.run().map_err(ApplicationDownloadError::Communication);
debug!(
"{} took {}ms to download",
@ -159,37 +161,43 @@ impl GameDownloadAgent {
res
}
pub fn ensure_manifest_exists(&self) -> Result<(), ApplicationDownloadError> {
pub fn check_manifest_exists(&self) -> bool {
self.manifest.lock().unwrap().is_some()
}
pub async fn ensure_manifest_exists(&self) -> Result<(), ApplicationDownloadError> {
if self.manifest.lock().unwrap().is_some() {
return Ok(());
}
self.download_manifest()
self.download_manifest().await
}
fn download_manifest(&self) -> Result<(), ApplicationDownloadError> {
let header = generate_authorization_header();
let client = DROP_CLIENT_SYNC.clone();
let response = make_request(
&client,
async fn download_manifest(&self) -> Result<(), ApplicationDownloadError> {
let client = DROP_CLIENT_ASYNC.clone();
let url = generate_url(
&["/api/v1/client/game/manifest"],
&[("id", &self.id), ("version", &self.version)],
|f| f.header("Authorization", header),
)
.map_err(ApplicationDownloadError::Communication)?
.send()
.map_err(|e| ApplicationDownloadError::Communication(e.into()))?;
.map_err(ApplicationDownloadError::Communication)?;
let response = client
.get(url)
.header("Authorization", generate_authorization_header())
.send()
.await
.map_err(|e| ApplicationDownloadError::Communication(e.into()))?;
if response.status() != 200 {
return Err(ApplicationDownloadError::Communication(
RemoteAccessError::ManifestDownloadFailed(
response.status(),
response.text().unwrap(),
response.text().await.unwrap(),
),
));
}
let manifest_download: DropManifest = response.json().unwrap();
let manifest_download: DropManifest = response.json().await.unwrap();
if let Ok(mut manifest) = self.manifest.lock() {
*manifest = Some(manifest_download);
@ -201,20 +209,23 @@ impl GameDownloadAgent {
// Sets it up for both download and validate
fn setup_progress(&self) {
let contexts = self.contexts.lock().unwrap();
let buckets = self.buckets.lock().unwrap();
let length = contexts.len();
let chunk_count = buckets.iter().map(|e| e.drops.len()).sum();
let chunk_count = contexts.iter().map(|chunk| chunk.length).sum();
let total_length = buckets
.iter()
.map(|bucket| bucket.drops.iter().map(|e| e.length).sum::<usize>())
.sum();
self.progress.set_max(chunk_count);
self.progress.set_size(length);
self.progress.set_max(total_length);
self.progress.set_size(chunk_count);
self.progress.reset();
}
pub fn ensure_contexts(&self) -> Result<(), ApplicationDownloadError> {
if self.contexts.lock().unwrap().is_empty() {
self.generate_contexts()?;
pub fn ensure_buckets(&self) -> Result<(), ApplicationDownloadError> {
if self.buckets.lock().unwrap().is_empty() {
self.generate_buckets()?;
}
*self.context_map.lock().unwrap() = self.dropdata.get_contexts();
@ -222,14 +233,22 @@ impl GameDownloadAgent {
Ok(())
}
pub fn generate_contexts(&self) -> Result<(), ApplicationDownloadError> {
pub fn generate_buckets(&self) -> Result<(), ApplicationDownloadError> {
let manifest = self.manifest.lock().unwrap().clone().unwrap();
let game_id = self.id.clone();
let mut contexts = Vec::new();
let base_path = Path::new(&self.dropdata.base_path);
create_dir_all(base_path).unwrap();
let mut buckets = Vec::new();
let mut current_bucket = DownloadBucket {
game_id: game_id.clone(),
version: self.version.clone(),
drops: Vec::new(),
};
let mut current_bucket_size = 0;
for (raw_path, chunk) in manifest {
let path = base_path.join(Path::new(&raw_path));
@ -244,42 +263,79 @@ impl GameDownloadAgent {
.truncate(false)
.open(path.clone())
.unwrap();
let mut running_offset = 0;
let mut file_running_offset = 0;
for (index, length) in chunk.lengths.iter().enumerate() {
contexts.push(DropDownloadContext {
file_name: raw_path.to_string(),
version: chunk.version_name.to_string(),
offset: running_offset,
index,
game_id: game_id.to_string(),
path: path.clone(),
checksum: chunk.checksums[index].clone(),
let drop = DownloadDrop {
filename: raw_path.to_string(),
start: file_running_offset,
length: *length,
checksum: chunk.checksums[index].clone(),
permissions: chunk.permissions,
});
running_offset += *length as u64;
path: path.clone(),
index,
};
file_running_offset += *length;
if *length >= TARGET_BUCKET_SIZE {
// They get their own bucket
buckets.push(DownloadBucket {
game_id: game_id.clone(),
version: self.version.clone(),
drops: vec![drop],
});
continue;
}
if current_bucket_size + *length >= TARGET_BUCKET_SIZE
&& !current_bucket.drops.is_empty()
{
// Move current bucket into list and make a new one
buckets.push(current_bucket);
current_bucket = DownloadBucket {
game_id: game_id.clone(),
version: self.version.clone(),
drops: Vec::new(),
};
current_bucket_size = 0;
}
current_bucket.drops.push(drop);
current_bucket_size += *length;
}
#[cfg(target_os = "linux")]
if running_offset > 0 && !already_exists {
let _ = fallocate(file, FallocateFlags::empty(), 0, running_offset);
if file_running_offset > 0 && !already_exists {
let _ = fallocate(file, FallocateFlags::empty(), 0, file_running_offset as u64);
}
}
let existing_contexts = self.dropdata.get_completed_contexts();
if !current_bucket.drops.is_empty() {
buckets.push(current_bucket);
}
info!("buckets: {}", buckets.len());
let existing_contexts = self.dropdata.get_contexts();
self.dropdata.set_contexts(
&contexts
&buckets
.iter()
.map(|x| (x.checksum.clone(), existing_contexts.contains(&x.checksum)))
.flat_map(|x| x.drops.iter().map(|v| v.checksum.clone()))
.map(|x| {
let contains = existing_contexts.get(&x).unwrap_or(&false);
(x, *contains)
})
.collect::<Vec<(String, bool)>>(),
);
*self.contexts.lock().unwrap() = contexts;
*self.buckets.lock().unwrap() = buckets;
Ok(())
}
fn run(&self) -> Result<bool, ()> {
fn run(&self) -> Result<bool, RemoteAccessError> {
self.setup_progress();
let max_download_threads = borrow_db_checked().settings.max_download_threads;
@ -295,78 +351,81 @@ impl GameDownloadAgent {
let completed_contexts = Arc::new(boxcar::Vec::new());
let completed_indexes_loop_arc = completed_contexts.clone();
let contexts = self.contexts.lock().unwrap();
let download_context = DROP_CLIENT_SYNC
.post(generate_url(&["/api/v2/client/context"], &[]).unwrap())
.json(&ManifestBody {
game: self.id.clone(),
version: self.version.clone(),
})
.header("Authorization", generate_authorization_header())
.send()?;
if download_context.status() != 200 {
return Err(RemoteAccessError::InvalidResponse(download_context.json()?));
}
let download_context = &download_context.json::<DownloadContext>()?;
info!("download context: {}", download_context.context);
let buckets = self.buckets.lock().unwrap();
pool.scope(|scope| {
let client = &DROP_CLIENT_SYNC.clone();
let context_map = self.context_map.lock().unwrap();
for (index, context) in contexts.iter().enumerate() {
let client = client.clone();
let completed_indexes = completed_indexes_loop_arc.clone();
for (index, bucket) in buckets.iter().enumerate() {
let mut bucket = (*bucket).clone();
let completed_contexts = completed_indexes_loop_arc.clone();
let progress = self.progress.get(index);
let progress_handle = ProgressHandle::new(progress, self.progress.clone());
// If we've done this one already, skip it
// Note to future DecDuck, DropData gets loaded into context_map
if let Some(v) = context_map.get(&context.checksum)
&& *v
{
progress_handle.skip(context.length);
let todo_drops = bucket
.drops
.into_iter()
.filter(|e| {
let todo = !*context_map.get(&e.checksum).unwrap_or(&false);
if !todo {
progress_handle.skip(e.length);
}
todo
})
.collect::<Vec<DownloadDrop>>();
if todo_drops.is_empty() {
continue;
}
};
bucket.drops = todo_drops;
let sender = self.sender.clone();
let request = match make_request(
&client,
&["/api/v1/client/chunk"],
&[
("id", &context.game_id),
("version", &context.version),
("name", &context.file_name),
("chunk", &context.index.to_string()),
],
|r| r,
) {
Ok(request) => request,
Err(e) => {
sender
.send(DownloadManagerSignal::Error(
ApplicationDownloadError::Communication(e),
))
.unwrap();
continue;
}
};
scope.spawn(move |_| {
// 3 attempts
for i in 0..RETRY_COUNT {
let loop_progress_handle = progress_handle.clone();
match download_game_chunk(
context,
match download_game_bucket(
&bucket,
download_context,
&self.control_flag,
loop_progress_handle,
request.try_clone().unwrap(),
) {
Ok(true) => {
completed_indexes.push(context.checksum.clone());
for drop in bucket.drops {
completed_contexts.push(drop.checksum);
}
return;
}
Ok(false) => return,
Err(e) => {
warn!("game download agent error: {e}");
let retry = match &e {
ApplicationDownloadError::Communication(
_remote_access_error,
) => true,
ApplicationDownloadError::Checksum => true,
ApplicationDownloadError::Lock => true,
ApplicationDownloadError::IoError(_error_kind) => false,
ApplicationDownloadError::DownloadError => false,
ApplicationDownloadError::DiskFull(_, _) => false,
};
let retry = matches!(
&e,
ApplicationDownloadError::Communication(_)
| ApplicationDownloadError::Checksum
| ApplicationDownloadError::Lock
);
if i == RETRY_COUNT - 1 || !retry {
warn!("retry logic failed, not re-attempting.");
@ -390,14 +449,14 @@ impl GameDownloadAgent {
context_map_lock.values().filter(|x| **x).count()
};
let context_map_lock = self.context_map.lock().unwrap();
let contexts = contexts
let contexts = buckets
.iter()
.flat_map(|x| x.drops.iter().map(|e| e.checksum.clone()))
.map(|x| {
(
x.checksum.clone(),
context_map_lock.get(&x.checksum).copied().unwrap_or(false),
)
let completed = context_map_lock.get(&x).unwrap_or(&false);
(x, *completed)
})
.collect::<Vec<(String, bool)>>();
drop(context_map_lock);
@ -408,10 +467,11 @@ impl GameDownloadAgent {
// If there are any contexts left which are false
if !contexts.iter().all(|x| x.1) {
info!(
"download agent for {} exited without completing ({}/{})",
"download agent for {} exited without completing ({}/{}) ({} buckets)",
self.id.clone(),
completed_lock_len,
contexts.len(),
buckets.len()
);
return Ok(false);
}
@ -442,13 +502,15 @@ impl GameDownloadAgent {
pub fn validate(&self, app_handle: &AppHandle) -> Result<bool, ApplicationDownloadError> {
self.setup_validate(app_handle);
let contexts = self.contexts.lock().unwrap();
let buckets = self.buckets.lock().unwrap();
let contexts: Vec<DropValidateContext> = buckets
.clone()
.into_iter()
.flat_map(|e| -> Vec<DropValidateContext> { e.into() })
.collect();
let max_download_threads = borrow_db_checked().settings.max_download_threads;
debug!(
"validating game: {} with {} threads",
self.dropdata.game_id, max_download_threads
);
info!("{} validation contexts", contexts.len());
let pool = ThreadPoolBuilder::new()
.num_threads(max_download_threads)
.build()
@ -549,6 +611,13 @@ impl Downloadable for GameDownloadAgent {
.applications
.transient_statuses
.remove(&self.metadata());
push_game_update(
app_handle,
&self.id,
None,
GameStatusManager::fetch_state(&self.id, &handle),
);
}
fn on_complete(&self, app_handle: &tauri::AppHandle) {