Fix for multi-version downloads (#125)

* fix: multi version downloads

* fix: remove debug utils

* fix: clippy
This commit is contained in:
DecDuck
2025-08-28 17:39:47 +10:00
parent 4f5fccf0c1
commit 44a1be6991
3 changed files with 74 additions and 34 deletions

View File

@ -23,6 +23,7 @@ pub enum RemoteAccessError {
ManifestDownloadFailed(StatusCode, String), ManifestDownloadFailed(StatusCode, String),
OutOfSync, OutOfSync,
Cache(std::io::Error), Cache(std::io::Error),
CorruptedState,
} }
impl Display for RemoteAccessError { impl Display for RemoteAccessError {
@ -81,6 +82,10 @@ impl Display for RemoteAccessError {
"server's and client's time are out of sync. Please ensure they are within at least 30 seconds of each other" "server's and client's time are out of sync. Please ensure they are within at least 30 seconds of each other"
), ),
RemoteAccessError::Cache(error) => write!(f, "Cache Error: {error}"), RemoteAccessError::Cache(error) => write!(f, "Cache Error: {error}"),
RemoteAccessError::CorruptedState => write!(
f,
"Drop encountered a corrupted internal state. Please report this to the developers, with details of reproduction."
),
} }
} }
} }

View File

@ -22,8 +22,8 @@ use crate::remote::requests::generate_url;
use crate::remote::utils::{DROP_CLIENT_ASYNC, DROP_CLIENT_SYNC}; use crate::remote::utils::{DROP_CLIENT_ASYNC, DROP_CLIENT_SYNC};
use log::{debug, error, info, warn}; use log::{debug, error, info, warn};
use rayon::ThreadPoolBuilder; use rayon::ThreadPoolBuilder;
use std::collections::HashMap; use std::collections::{HashMap, HashSet};
use std::fs::{OpenOptions, create_dir_all}; use std::fs::{create_dir_all, OpenOptions};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::mpsc::Sender; use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@ -242,12 +242,8 @@ impl GameDownloadAgent {
let mut buckets = Vec::new(); let mut buckets = Vec::new();
let mut current_bucket = DownloadBucket { let mut current_buckets = HashMap::<String, DownloadBucket>::new();
game_id: game_id.clone(), let mut current_bucket_sizes = HashMap::<String, usize>::new();
version: self.version.clone(),
drops: Vec::new(),
};
let mut current_bucket_size = 0;
for (raw_path, chunk) in manifest { for (raw_path, chunk) in manifest {
let path = base_path.join(Path::new(&raw_path)); let path = base_path.join(Path::new(&raw_path));
@ -282,28 +278,41 @@ impl GameDownloadAgent {
buckets.push(DownloadBucket { buckets.push(DownloadBucket {
game_id: game_id.clone(), game_id: game_id.clone(),
version: self.version.clone(), version: chunk.version_name.clone(),
drops: vec![drop], drops: vec![drop],
}); });
continue; continue;
} }
if current_bucket_size + *length >= TARGET_BUCKET_SIZE let current_bucket_size = current_bucket_sizes
.entry(chunk.version_name.clone())
.or_insert_with(|| 0);
let c_version_name = chunk.version_name.clone();
let c_game_id = game_id.clone();
let current_bucket = current_buckets
.entry(chunk.version_name.clone())
.or_insert_with(|| DownloadBucket {
game_id: c_game_id,
version: c_version_name,
drops: vec![],
});
if *current_bucket_size + length >= TARGET_BUCKET_SIZE
&& !current_bucket.drops.is_empty() && !current_bucket.drops.is_empty()
{ {
// Move current bucket into list and make a new one // Move current bucket into list and make a new one
buckets.push(current_bucket); buckets.push(current_bucket.clone());
current_bucket = DownloadBucket { *current_bucket = DownloadBucket {
game_id: game_id.clone(), game_id: game_id.clone(),
version: self.version.clone(), version: chunk.version_name.clone(),
drops: Vec::new(), drops: vec![],
}; };
current_bucket_size = 0; *current_bucket_size = 0;
} }
current_bucket.drops.push(drop); current_bucket.drops.push(drop);
current_bucket_size += *length; *current_bucket_size += *length;
} }
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
@ -312,8 +321,10 @@ impl GameDownloadAgent {
} }
} }
if !current_bucket.drops.is_empty() { for (_, bucket) in current_buckets.into_iter() {
buckets.push(current_bucket); if !bucket.drops.is_empty() {
buckets.push(bucket);
}
} }
info!("buckets: {}", buckets.len()); info!("buckets: {}", buckets.len());
@ -348,27 +359,46 @@ impl GameDownloadAgent {
.build() .build()
.unwrap(); .unwrap();
let buckets = self.buckets.lock().unwrap();
let mut download_contexts = HashMap::<String, DownloadContext>::new();
let versions = buckets
.iter()
.map(|e| &e.version)
.collect::<HashSet<_>>()
.into_iter().cloned()
.collect::<Vec<String>>();
info!("downloading across these versions: {versions:?}");
let completed_contexts = Arc::new(boxcar::Vec::new()); let completed_contexts = Arc::new(boxcar::Vec::new());
let completed_indexes_loop_arc = completed_contexts.clone(); let completed_indexes_loop_arc = completed_contexts.clone();
let download_context = DROP_CLIENT_SYNC for version in versions {
.post(generate_url(&["/api/v2/client/context"], &[]).unwrap()) let download_context = DROP_CLIENT_SYNC
.json(&ManifestBody { .post(generate_url(&["/api/v2/client/context"], &[]).unwrap())
game: self.id.clone(), .json(&ManifestBody {
version: self.version.clone(), game: self.id.clone(),
}) version: version.clone(),
.header("Authorization", generate_authorization_header()) })
.send()?; .header("Authorization", generate_authorization_header())
.send()?;
if download_context.status() != 200 { if download_context.status() != 200 {
return Err(RemoteAccessError::InvalidResponse(download_context.json()?)); return Err(RemoteAccessError::InvalidResponse(download_context.json()?));
}
let download_context = download_context.json::<DownloadContext>()?;
info!(
"download context: ({}) {}",
&version, download_context.context
);
download_contexts.insert(version, download_context);
} }
let download_context = &download_context.json::<DownloadContext>()?; let download_contexts = &download_contexts;
info!("download context: {}", download_context.context);
let buckets = self.buckets.lock().unwrap();
pool.scope(|scope| { pool.scope(|scope| {
let context_map = self.context_map.lock().unwrap(); let context_map = self.context_map.lock().unwrap();
for (index, bucket) in buckets.iter().enumerate() { for (index, bucket) in buckets.iter().enumerate() {
@ -400,6 +430,11 @@ impl GameDownloadAgent {
let sender = self.sender.clone(); let sender = self.sender.clone();
let download_context = download_contexts
.get(&bucket.version)
.ok_or(RemoteAccessError::CorruptedState)
.unwrap();
scope.spawn(move |_| { scope.spawn(move |_| {
// 3 attempts // 3 attempts
for i in 0..RETRY_COUNT { for i in 0..RETRY_COUNT {

View File

@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::path::PathBuf; use std::path::PathBuf;
#[derive(Debug, Clone)] #[derive(Debug, Clone, Serialize)]
// Drops go in buckets // Drops go in buckets
pub struct DownloadDrop { pub struct DownloadDrop {
pub index: usize, pub index: usize,
@ -14,7 +14,7 @@ pub struct DownloadDrop {
pub permissions: u32, pub permissions: u32,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone, Serialize)]
pub struct DownloadBucket { pub struct DownloadBucket {
pub game_id: String, pub game_id: String,
pub version: String, pub version: String,