mirror of
https://github.com/Drop-OSS/drop-app.git
synced 2025-11-16 01:31:22 +10:00
chore: Preparation to merge from 156
Signed-off-by: quexeky <git@quexeky.dev>
This commit is contained in:
26
src-tauri/games/Cargo.toml
Normal file
26
src-tauri/games/Cargo.toml
Normal file
@ -0,0 +1,26 @@
|
||||
[package]
|
||||
name = "games"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
atomic-instant-full = "0.1.0"
|
||||
bitcode = "0.6.7"
|
||||
boxcar = "0.2.14"
|
||||
database = { version = "0.1.0", path = "../database" }
|
||||
download_manager = { version = "0.1.0", path = "../download_manager" }
|
||||
hex = "0.4.3"
|
||||
log = "0.4.28"
|
||||
md5 = "0.8.0"
|
||||
rayon = "1.11.0"
|
||||
remote = { version = "0.1.0", path = "../remote" }
|
||||
reqwest = "0.12.23"
|
||||
rustix = "1.1.2"
|
||||
serde = { version = "1.0.228", features = ["derive"] }
|
||||
serde_with = "3.15.0"
|
||||
sysinfo = "0.37.2"
|
||||
tauri = "2.8.5"
|
||||
throttle_my_fn = "0.2.6"
|
||||
utils = { version = "0.1.0", path = "../utils" }
|
||||
native_model = { version = "0.6.4", features = ["rmp_serde_1_3"], git = "https://github.com/Drop-OSS/native_model.git"}
|
||||
serde_json = "1.0.145"
|
||||
24
src-tauri/games/src/collections/collection.rs
Normal file
24
src-tauri/games/src/collections/collection.rs
Normal file
@ -0,0 +1,24 @@
|
||||
use bitcode::{Decode, Encode};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::library::Game;
|
||||
|
||||
pub type Collections = Vec<Collection>;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Default, Encode, Decode)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Collection {
|
||||
id: String,
|
||||
name: String,
|
||||
is_default: bool,
|
||||
user_id: String,
|
||||
entries: Vec<CollectionObject>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Default, Encode, Decode)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct CollectionObject {
|
||||
collection_id: String,
|
||||
game_id: String,
|
||||
game: Game,
|
||||
}
|
||||
1
src-tauri/games/src/collections/mod.rs
Normal file
1
src-tauri/games/src/collections/mod.rs
Normal file
@ -0,0 +1 @@
|
||||
pub mod collection;
|
||||
716
src-tauri/games/src/downloads/download_agent.rs
Normal file
716
src-tauri/games/src/downloads/download_agent.rs
Normal file
@ -0,0 +1,716 @@
|
||||
use database::{
|
||||
ApplicationTransientStatus, DownloadType, DownloadableMetadata, borrow_db_checked,
|
||||
borrow_db_mut_checked,
|
||||
};
|
||||
use download_manager::download_manager_frontend::{DownloadManagerSignal, DownloadStatus};
|
||||
use download_manager::downloadable::Downloadable;
|
||||
use download_manager::error::ApplicationDownloadError;
|
||||
use download_manager::util::download_thread_control_flag::{
|
||||
DownloadThreadControl, DownloadThreadControlFlag,
|
||||
};
|
||||
use download_manager::util::progress_object::{ProgressHandle, ProgressObject};
|
||||
use log::{debug, error, info, warn};
|
||||
use rayon::ThreadPoolBuilder;
|
||||
use remote::auth::generate_authorization_header;
|
||||
use remote::error::RemoteAccessError;
|
||||
use remote::requests::generate_url;
|
||||
use remote::utils::{DROP_CLIENT_ASYNC, DROP_CLIENT_SYNC};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fs::{OpenOptions, create_dir_all};
|
||||
use std::io;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::mpsc::Sender;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Instant;
|
||||
use tauri::AppHandle;
|
||||
use utils::{app_emit, lock, send};
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
use rustix::fs::{FallocateFlags, fallocate};
|
||||
|
||||
use crate::downloads::manifest::{
|
||||
DownloadBucket, DownloadContext, DownloadDrop, DropManifest, DropValidateContext, ManifestBody,
|
||||
};
|
||||
use crate::downloads::utils::get_disk_available;
|
||||
use crate::downloads::validate::validate_game_chunk;
|
||||
use crate::library::{on_game_complete, push_game_update, set_partially_installed};
|
||||
use crate::state::GameStatusManager;
|
||||
|
||||
use super::download_logic::download_game_bucket;
|
||||
use super::drop_data::DropData;
|
||||
|
||||
static RETRY_COUNT: usize = 3;
|
||||
|
||||
const TARGET_BUCKET_SIZE: usize = 63 * 1000 * 1000;
|
||||
const MAX_FILES_PER_BUCKET: usize = (1024 / 4) - 1;
|
||||
|
||||
pub struct GameDownloadAgent {
|
||||
pub id: String,
|
||||
pub version: String,
|
||||
pub control_flag: DownloadThreadControl,
|
||||
buckets: Mutex<Vec<DownloadBucket>>,
|
||||
context_map: Mutex<HashMap<String, bool>>,
|
||||
pub manifest: Mutex<Option<DropManifest>>,
|
||||
pub progress: Arc<ProgressObject>,
|
||||
sender: Sender<DownloadManagerSignal>,
|
||||
pub dropdata: DropData,
|
||||
status: Mutex<DownloadStatus>,
|
||||
}
|
||||
|
||||
impl GameDownloadAgent {
|
||||
pub async fn new_from_index(
|
||||
id: String,
|
||||
version: String,
|
||||
target_download_dir: usize,
|
||||
sender: Sender<DownloadManagerSignal>,
|
||||
) -> Result<Self, ApplicationDownloadError> {
|
||||
let base_dir = {
|
||||
let db_lock = borrow_db_checked();
|
||||
|
||||
db_lock.applications.install_dirs[target_download_dir].clone()
|
||||
};
|
||||
|
||||
Self::new(id, version, base_dir, sender).await
|
||||
}
|
||||
pub async fn new(
|
||||
id: String,
|
||||
version: String,
|
||||
base_dir: PathBuf,
|
||||
sender: Sender<DownloadManagerSignal>,
|
||||
) -> Result<Self, ApplicationDownloadError> {
|
||||
// Don't run by default
|
||||
let control_flag = DownloadThreadControl::new(DownloadThreadControlFlag::Stop);
|
||||
|
||||
let base_dir_path = Path::new(&base_dir);
|
||||
let data_base_dir_path = base_dir_path.join(id.clone());
|
||||
|
||||
let stored_manifest =
|
||||
DropData::generate(id.clone(), version.clone(), data_base_dir_path.clone());
|
||||
|
||||
let context_lock = stored_manifest.contexts.lock().unwrap().clone();
|
||||
|
||||
let result = Self {
|
||||
id,
|
||||
version,
|
||||
control_flag,
|
||||
manifest: Mutex::new(None),
|
||||
buckets: Mutex::new(Vec::new()),
|
||||
context_map: Mutex::new(HashMap::new()),
|
||||
progress: Arc::new(ProgressObject::new(0, 0, sender.clone())),
|
||||
sender,
|
||||
dropdata: stored_manifest,
|
||||
status: Mutex::new(DownloadStatus::Queued),
|
||||
};
|
||||
|
||||
result.ensure_manifest_exists().await?;
|
||||
|
||||
let required_space = lock!(result.manifest)
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.values()
|
||||
.map(|e| {
|
||||
e.lengths
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter(|(i, _)| *context_lock.get(&e.checksums[*i]).unwrap_or(&false))
|
||||
.map(|(_, v)| v)
|
||||
.sum::<usize>()
|
||||
})
|
||||
.sum::<usize>() as u64;
|
||||
|
||||
let available_space = get_disk_available(data_base_dir_path)? as u64;
|
||||
|
||||
if required_space > available_space {
|
||||
return Err(ApplicationDownloadError::DiskFull(
|
||||
required_space,
|
||||
available_space,
|
||||
));
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
// Blocking
|
||||
pub fn setup_download(&self, app_handle: &AppHandle) -> Result<(), ApplicationDownloadError> {
|
||||
let mut db_lock = borrow_db_mut_checked();
|
||||
let status = ApplicationTransientStatus::Downloading {
|
||||
version_name: self.version.clone(),
|
||||
};
|
||||
db_lock
|
||||
.applications
|
||||
.transient_statuses
|
||||
.insert(self.metadata(), status.clone());
|
||||
// Don't use GameStatusManager because this game isn't installed
|
||||
push_game_update(app_handle, &self.metadata().id, None, (None, Some(status)));
|
||||
|
||||
if !self.check_manifest_exists() {
|
||||
return Err(ApplicationDownloadError::NotInitialized);
|
||||
}
|
||||
|
||||
self.ensure_buckets()?;
|
||||
|
||||
self.control_flag.set(DownloadThreadControlFlag::Go);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Blocking
|
||||
pub fn download(&self, app_handle: &AppHandle) -> Result<bool, ApplicationDownloadError> {
|
||||
self.setup_download(app_handle)?;
|
||||
let timer = Instant::now();
|
||||
|
||||
info!("beginning download for {}...", self.metadata().id);
|
||||
|
||||
let res = self.run().map_err(ApplicationDownloadError::Communication);
|
||||
|
||||
debug!(
|
||||
"{} took {}ms to download",
|
||||
self.id,
|
||||
timer.elapsed().as_millis()
|
||||
);
|
||||
res
|
||||
}
|
||||
|
||||
pub fn check_manifest_exists(&self) -> bool {
|
||||
lock!(self.manifest).is_some()
|
||||
}
|
||||
|
||||
pub async fn ensure_manifest_exists(&self) -> Result<(), ApplicationDownloadError> {
|
||||
if lock!(self.manifest).is_some() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.download_manifest().await
|
||||
}
|
||||
|
||||
async fn download_manifest(&self) -> Result<(), ApplicationDownloadError> {
|
||||
let client = DROP_CLIENT_ASYNC.clone();
|
||||
let url = generate_url(
|
||||
&["/api/v1/client/game/manifest"],
|
||||
&[("id", &self.id), ("version", &self.version)],
|
||||
)
|
||||
.map_err(ApplicationDownloadError::Communication)?;
|
||||
|
||||
let response = client
|
||||
.get(url)
|
||||
.header("Authorization", generate_authorization_header())
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| ApplicationDownloadError::Communication(e.into()))?;
|
||||
|
||||
if response.status() != 200 {
|
||||
return Err(ApplicationDownloadError::Communication(
|
||||
RemoteAccessError::ManifestDownloadFailed(
|
||||
response.status(),
|
||||
response.text().await.unwrap(),
|
||||
),
|
||||
));
|
||||
}
|
||||
|
||||
let manifest_download: DropManifest = response
|
||||
.json()
|
||||
.await
|
||||
.map_err(|e| ApplicationDownloadError::Communication(e.into()))?;
|
||||
|
||||
if let Ok(mut manifest) = self.manifest.lock() {
|
||||
*manifest = Some(manifest_download);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
Err(ApplicationDownloadError::Lock)
|
||||
}
|
||||
|
||||
// Sets it up for both download and validate
|
||||
fn setup_progress(&self) {
|
||||
let buckets = lock!(self.buckets);
|
||||
|
||||
let chunk_count = buckets.iter().map(|e| e.drops.len()).sum();
|
||||
|
||||
let total_length = buckets
|
||||
.iter()
|
||||
.map(|bucket| bucket.drops.iter().map(|e| e.length).sum::<usize>())
|
||||
.sum();
|
||||
|
||||
self.progress.set_max(total_length);
|
||||
self.progress.set_size(chunk_count);
|
||||
self.progress.reset();
|
||||
}
|
||||
|
||||
pub fn ensure_buckets(&self) -> Result<(), ApplicationDownloadError> {
|
||||
if lock!(self.buckets).is_empty() {
|
||||
self.generate_buckets()?;
|
||||
}
|
||||
|
||||
*lock!(self.context_map) = self.dropdata.get_contexts();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn generate_buckets(&self) -> Result<(), ApplicationDownloadError> {
|
||||
let manifest = lock!(self.manifest)
|
||||
.clone()
|
||||
.ok_or(ApplicationDownloadError::NotInitialized)?;
|
||||
let game_id = self.id.clone();
|
||||
|
||||
let base_path = Path::new(&self.dropdata.base_path);
|
||||
create_dir_all(base_path)?;
|
||||
|
||||
let mut buckets = Vec::new();
|
||||
|
||||
let mut current_buckets = HashMap::<String, DownloadBucket>::new();
|
||||
let mut current_bucket_sizes = HashMap::<String, usize>::new();
|
||||
|
||||
for (raw_path, chunk) in manifest {
|
||||
let path = base_path.join(Path::new(&raw_path));
|
||||
|
||||
let container = path
|
||||
.parent()
|
||||
.ok_or(ApplicationDownloadError::IoError(Arc::new(io::Error::new(
|
||||
io::ErrorKind::NotFound,
|
||||
"no parent directory",
|
||||
))))?;
|
||||
create_dir_all(container)?;
|
||||
|
||||
let already_exists = path.exists();
|
||||
let file = OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true)
|
||||
.truncate(false)
|
||||
.open(&path)?;
|
||||
let mut file_running_offset = 0;
|
||||
|
||||
for (index, length) in chunk.lengths.iter().enumerate() {
|
||||
let drop = DownloadDrop {
|
||||
filename: raw_path.to_string(),
|
||||
start: file_running_offset,
|
||||
length: *length,
|
||||
checksum: chunk.checksums[index].clone(),
|
||||
permissions: chunk.permissions,
|
||||
path: path.clone(),
|
||||
index,
|
||||
};
|
||||
file_running_offset += *length;
|
||||
|
||||
if *length >= TARGET_BUCKET_SIZE {
|
||||
// They get their own bucket
|
||||
|
||||
buckets.push(DownloadBucket {
|
||||
game_id: game_id.clone(),
|
||||
version: chunk.version_name.clone(),
|
||||
drops: vec![drop],
|
||||
});
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
let current_bucket_size = current_bucket_sizes
|
||||
.entry(chunk.version_name.clone())
|
||||
.or_insert_with(|| 0);
|
||||
let c_version_name = chunk.version_name.clone();
|
||||
let c_game_id = game_id.clone();
|
||||
let current_bucket = current_buckets
|
||||
.entry(chunk.version_name.clone())
|
||||
.or_insert_with(|| DownloadBucket {
|
||||
game_id: c_game_id,
|
||||
version: c_version_name,
|
||||
drops: vec![],
|
||||
});
|
||||
|
||||
if (*current_bucket_size + length >= TARGET_BUCKET_SIZE
|
||||
|| current_bucket.drops.len() >= MAX_FILES_PER_BUCKET)
|
||||
&& !current_bucket.drops.is_empty()
|
||||
{
|
||||
// Move current bucket into list and make a new one
|
||||
buckets.push(current_bucket.clone());
|
||||
*current_bucket = DownloadBucket {
|
||||
game_id: game_id.clone(),
|
||||
version: chunk.version_name.clone(),
|
||||
drops: vec![],
|
||||
};
|
||||
*current_bucket_size = 0;
|
||||
}
|
||||
|
||||
current_bucket.drops.push(drop);
|
||||
*current_bucket_size += *length;
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
if file_running_offset > 0 && !already_exists {
|
||||
let _ = fallocate(file, FallocateFlags::empty(), 0, file_running_offset as u64);
|
||||
}
|
||||
}
|
||||
|
||||
for (_, bucket) in current_buckets.into_iter() {
|
||||
if !bucket.drops.is_empty() {
|
||||
buckets.push(bucket);
|
||||
}
|
||||
}
|
||||
|
||||
info!("buckets: {}", buckets.len());
|
||||
|
||||
let existing_contexts = self.dropdata.get_contexts();
|
||||
self.dropdata.set_contexts(
|
||||
&buckets
|
||||
.iter()
|
||||
.flat_map(|x| x.drops.iter().map(|v| v.checksum.clone()))
|
||||
.map(|x| {
|
||||
let contains = existing_contexts.get(&x).unwrap_or(&false);
|
||||
(x, *contains)
|
||||
})
|
||||
.collect::<Vec<(String, bool)>>(),
|
||||
);
|
||||
|
||||
*lock!(self.buckets) = buckets;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn run(&self) -> Result<bool, RemoteAccessError> {
|
||||
self.setup_progress();
|
||||
let max_download_threads = borrow_db_checked().settings.max_download_threads;
|
||||
|
||||
debug!(
|
||||
"downloading game: {} with {} threads",
|
||||
self.id, max_download_threads
|
||||
);
|
||||
let pool = ThreadPoolBuilder::new()
|
||||
.num_threads(max_download_threads)
|
||||
.build()
|
||||
.unwrap_or_else(|_| {
|
||||
panic!("failed to build thread pool with {max_download_threads} threads")
|
||||
});
|
||||
|
||||
let buckets = lock!(self.buckets);
|
||||
|
||||
let mut download_contexts = HashMap::<String, DownloadContext>::new();
|
||||
|
||||
let versions = buckets
|
||||
.iter()
|
||||
.map(|e| &e.version)
|
||||
.collect::<HashSet<_>>()
|
||||
.into_iter()
|
||||
.cloned()
|
||||
.collect::<Vec<String>>();
|
||||
|
||||
info!("downloading across these versions: {versions:?}");
|
||||
|
||||
let completed_contexts = Arc::new(boxcar::Vec::new());
|
||||
let completed_indexes_loop_arc = completed_contexts.clone();
|
||||
|
||||
for version in versions {
|
||||
let download_context = DROP_CLIENT_SYNC
|
||||
.post(generate_url(&["/api/v2/client/context"], &[])?)
|
||||
.json(&ManifestBody {
|
||||
game: self.id.clone(),
|
||||
version: version.clone(),
|
||||
})
|
||||
.header("Authorization", generate_authorization_header())
|
||||
.send()?;
|
||||
|
||||
if download_context.status() != 200 {
|
||||
return Err(RemoteAccessError::InvalidResponse(download_context.json()?));
|
||||
}
|
||||
|
||||
let download_context = download_context.json::<DownloadContext>()?;
|
||||
info!(
|
||||
"download context: ({}) {}",
|
||||
&version, download_context.context
|
||||
);
|
||||
download_contexts.insert(version, download_context);
|
||||
}
|
||||
|
||||
let download_contexts = &download_contexts;
|
||||
|
||||
pool.scope(|scope| {
|
||||
let context_map = lock!(self.context_map);
|
||||
for (index, bucket) in buckets.iter().enumerate() {
|
||||
let mut bucket = (*bucket).clone();
|
||||
let completed_contexts = completed_indexes_loop_arc.clone();
|
||||
|
||||
let progress = self.progress.get(index);
|
||||
let progress_handle = ProgressHandle::new(progress, self.progress.clone());
|
||||
|
||||
// If we've done this one already, skip it
|
||||
// Note to future DecDuck, DropData gets loaded into context_map
|
||||
let todo_drops = bucket
|
||||
.drops
|
||||
.into_iter()
|
||||
.filter(|e| {
|
||||
let todo = !*context_map.get(&e.checksum).unwrap_or(&false);
|
||||
if !todo {
|
||||
progress_handle.skip(e.length);
|
||||
}
|
||||
todo
|
||||
})
|
||||
.collect::<Vec<DownloadDrop>>();
|
||||
|
||||
if todo_drops.is_empty() {
|
||||
continue;
|
||||
};
|
||||
|
||||
bucket.drops = todo_drops;
|
||||
|
||||
let sender = self.sender.clone();
|
||||
|
||||
let download_context =
|
||||
download_contexts.get(&bucket.version).unwrap_or_else(|| {
|
||||
panic!(
|
||||
"Could not get bucket version {}. Corrupted state.",
|
||||
bucket.version
|
||||
)
|
||||
});
|
||||
|
||||
scope.spawn(move |_| {
|
||||
// 3 attempts
|
||||
for i in 0..RETRY_COUNT {
|
||||
let loop_progress_handle = progress_handle.clone();
|
||||
match download_game_bucket(
|
||||
&bucket,
|
||||
download_context,
|
||||
&self.control_flag,
|
||||
loop_progress_handle,
|
||||
) {
|
||||
Ok(true) => {
|
||||
for drop in bucket.drops {
|
||||
completed_contexts.push(drop.checksum);
|
||||
}
|
||||
return;
|
||||
}
|
||||
Ok(false) => return,
|
||||
Err(e) => {
|
||||
warn!("game download agent error: {e}");
|
||||
|
||||
let retry = matches!(
|
||||
&e,
|
||||
ApplicationDownloadError::Communication(_)
|
||||
| ApplicationDownloadError::Checksum
|
||||
| ApplicationDownloadError::Lock
|
||||
| ApplicationDownloadError::IoError(_)
|
||||
);
|
||||
|
||||
if i == RETRY_COUNT - 1 || !retry {
|
||||
warn!("retry logic failed, not re-attempting.");
|
||||
send!(sender, DownloadManagerSignal::Error(e));
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
let newly_completed = completed_contexts.clone();
|
||||
|
||||
let completed_lock_len = {
|
||||
let mut context_map_lock = lock!(self.context_map);
|
||||
for (_, item) in newly_completed.iter() {
|
||||
context_map_lock.insert(item.clone(), true);
|
||||
}
|
||||
|
||||
context_map_lock.values().filter(|x| **x).count()
|
||||
};
|
||||
|
||||
let context_map_lock = lock!(self.context_map);
|
||||
let contexts = buckets
|
||||
.iter()
|
||||
.flat_map(|x| x.drops.iter().map(|e| e.checksum.clone()))
|
||||
.map(|x| {
|
||||
let completed = context_map_lock.get(&x).unwrap_or(&false);
|
||||
(x, *completed)
|
||||
})
|
||||
.collect::<Vec<(String, bool)>>();
|
||||
drop(context_map_lock);
|
||||
|
||||
self.dropdata.set_contexts(&contexts);
|
||||
self.dropdata.write();
|
||||
|
||||
// If there are any contexts left which are false
|
||||
if !contexts.iter().all(|x| x.1) {
|
||||
info!(
|
||||
"download agent for {} exited without completing ({}/{}) ({} buckets)",
|
||||
self.id.clone(),
|
||||
completed_lock_len,
|
||||
contexts.len(),
|
||||
buckets.len()
|
||||
);
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
fn setup_validate(&self, app_handle: &AppHandle) {
|
||||
self.setup_progress();
|
||||
|
||||
self.control_flag.set(DownloadThreadControlFlag::Go);
|
||||
|
||||
let status = ApplicationTransientStatus::Validating {
|
||||
version_name: self.version.clone(),
|
||||
};
|
||||
|
||||
let mut db_lock = borrow_db_mut_checked();
|
||||
db_lock
|
||||
.applications
|
||||
.transient_statuses
|
||||
.insert(self.metadata(), status.clone());
|
||||
push_game_update(app_handle, &self.metadata().id, None, (None, Some(status)));
|
||||
}
|
||||
|
||||
pub fn validate(&self, app_handle: &AppHandle) -> Result<bool, ApplicationDownloadError> {
|
||||
self.setup_validate(app_handle);
|
||||
|
||||
let buckets = lock!(self.buckets);
|
||||
let contexts: Vec<DropValidateContext> = buckets
|
||||
.clone()
|
||||
.into_iter()
|
||||
.flat_map(|e| -> Vec<DropValidateContext> { e.into() })
|
||||
.collect();
|
||||
let max_download_threads = borrow_db_checked().settings.max_download_threads;
|
||||
|
||||
info!("{} validation contexts", contexts.len());
|
||||
let pool = ThreadPoolBuilder::new()
|
||||
.num_threads(max_download_threads)
|
||||
.build()
|
||||
.unwrap_or_else(|_| {
|
||||
panic!("failed to build thread pool with {max_download_threads} threads")
|
||||
});
|
||||
|
||||
let invalid_chunks = Arc::new(boxcar::Vec::new());
|
||||
pool.scope(|scope| {
|
||||
for (index, context) in contexts.iter().enumerate() {
|
||||
let current_progress = self.progress.get(index);
|
||||
let progress_handle = ProgressHandle::new(current_progress, self.progress.clone());
|
||||
let invalid_chunks_scoped = invalid_chunks.clone();
|
||||
let sender = self.sender.clone();
|
||||
|
||||
scope.spawn(move |_| {
|
||||
match validate_game_chunk(context, &self.control_flag, progress_handle) {
|
||||
Ok(true) => {}
|
||||
Ok(false) => {
|
||||
invalid_chunks_scoped.push(context.checksum.clone());
|
||||
}
|
||||
Err(e) => {
|
||||
error!("{e}");
|
||||
send!(sender, DownloadManagerSignal::Error(e));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// If there are any contexts left which are false
|
||||
if !invalid_chunks.is_empty() {
|
||||
info!("validation of game id {} failed", self.id);
|
||||
|
||||
for context in invalid_chunks.iter() {
|
||||
self.dropdata.set_context(context.1.clone(), false);
|
||||
}
|
||||
|
||||
self.dropdata.write();
|
||||
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
pub fn cancel(&self, app_handle: &AppHandle) {
|
||||
// See docs on usage
|
||||
set_partially_installed(
|
||||
&self.metadata(),
|
||||
self.dropdata.base_path.display().to_string(),
|
||||
Some(app_handle),
|
||||
);
|
||||
|
||||
self.dropdata.write();
|
||||
}
|
||||
}
|
||||
|
||||
impl Downloadable for GameDownloadAgent {
|
||||
fn download(&self, app_handle: &AppHandle) -> Result<bool, ApplicationDownloadError> {
|
||||
*lock!(self.status) = DownloadStatus::Downloading;
|
||||
self.download(app_handle)
|
||||
}
|
||||
|
||||
fn validate(&self, app_handle: &AppHandle) -> Result<bool, ApplicationDownloadError> {
|
||||
*lock!(self.status) = DownloadStatus::Validating;
|
||||
self.validate(app_handle)
|
||||
}
|
||||
|
||||
fn progress(&self) -> Arc<ProgressObject> {
|
||||
self.progress.clone()
|
||||
}
|
||||
|
||||
fn control_flag(&self) -> DownloadThreadControl {
|
||||
self.control_flag.clone()
|
||||
}
|
||||
|
||||
fn metadata(&self) -> DownloadableMetadata {
|
||||
DownloadableMetadata {
|
||||
id: self.id.clone(),
|
||||
version: Some(self.version.clone()),
|
||||
download_type: DownloadType::Game,
|
||||
}
|
||||
}
|
||||
|
||||
fn on_queued(&self, app_handle: &tauri::AppHandle) {
|
||||
*self.status.lock().unwrap() = DownloadStatus::Queued;
|
||||
let mut db_lock = borrow_db_mut_checked();
|
||||
let status = ApplicationTransientStatus::Queued {
|
||||
version_name: self.version.clone(),
|
||||
};
|
||||
db_lock
|
||||
.applications
|
||||
.transient_statuses
|
||||
.insert(self.metadata(), status.clone());
|
||||
push_game_update(app_handle, &self.id, None, (None, Some(status)));
|
||||
}
|
||||
|
||||
fn on_error(&self, app_handle: &tauri::AppHandle, error: &ApplicationDownloadError) {
|
||||
*lock!(self.status) = DownloadStatus::Error;
|
||||
app_emit!(app_handle, "download_error", error.to_string());
|
||||
|
||||
error!("error while managing download: {error:?}");
|
||||
|
||||
let mut handle = borrow_db_mut_checked();
|
||||
handle
|
||||
.applications
|
||||
.transient_statuses
|
||||
.remove(&self.metadata());
|
||||
|
||||
push_game_update(
|
||||
app_handle,
|
||||
&self.id,
|
||||
None,
|
||||
GameStatusManager::fetch_state(&self.id, &handle),
|
||||
);
|
||||
}
|
||||
|
||||
fn on_complete(&self, app_handle: &tauri::AppHandle) {
|
||||
match on_game_complete(
|
||||
&self.metadata(),
|
||||
self.dropdata.base_path.to_string_lossy().to_string(),
|
||||
app_handle,
|
||||
) {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("could not mark game as complete: {e}");
|
||||
send!(
|
||||
self.sender,
|
||||
DownloadManagerSignal::Error(ApplicationDownloadError::DownloadError(e))
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn on_cancelled(&self, app_handle: &tauri::AppHandle) {
|
||||
info!("cancelled {}", self.id);
|
||||
self.cancel(app_handle);
|
||||
}
|
||||
|
||||
fn status(&self) -> DownloadStatus {
|
||||
lock!(self.status).clone()
|
||||
}
|
||||
}
|
||||
295
src-tauri/games/src/downloads/download_logic.rs
Normal file
295
src-tauri/games/src/downloads/download_logic.rs
Normal file
@ -0,0 +1,295 @@
|
||||
use std::fs::{Permissions, set_permissions};
|
||||
use std::io::Read;
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use std::{
|
||||
fs::{File, OpenOptions},
|
||||
io::{self, BufWriter, Seek, SeekFrom, Write},
|
||||
path::PathBuf,
|
||||
};
|
||||
|
||||
use download_manager::error::ApplicationDownloadError;
|
||||
use download_manager::util::download_thread_control_flag::{
|
||||
DownloadThreadControl, DownloadThreadControlFlag,
|
||||
};
|
||||
use download_manager::util::progress_object::ProgressHandle;
|
||||
use log::{debug, info, warn};
|
||||
use md5::{Context, Digest};
|
||||
use remote::auth::generate_authorization_header;
|
||||
use remote::error::{DropServerError, RemoteAccessError};
|
||||
use remote::requests::generate_url;
|
||||
use remote::utils::DROP_CLIENT_SYNC;
|
||||
use reqwest::blocking::Response;
|
||||
|
||||
use crate::downloads::manifest::{ChunkBody, DownloadBucket, DownloadContext, DownloadDrop};
|
||||
|
||||
static MAX_PACKET_LENGTH: usize = 4096 * 4;
|
||||
static BUMP_SIZE: usize = 4096 * 16;
|
||||
|
||||
pub struct DropWriter<W: Write> {
|
||||
hasher: Context,
|
||||
destination: BufWriter<W>,
|
||||
progress: ProgressHandle,
|
||||
}
|
||||
impl DropWriter<File> {
|
||||
fn new(path: PathBuf, progress: ProgressHandle) -> Result<Self, io::Error> {
|
||||
let destination = OpenOptions::new()
|
||||
.write(true)
|
||||
.create(true)
|
||||
.truncate(false)
|
||||
.open(&path)?;
|
||||
Ok(Self {
|
||||
destination: BufWriter::with_capacity(1024 * 1024, destination),
|
||||
hasher: Context::new(),
|
||||
progress,
|
||||
})
|
||||
}
|
||||
|
||||
fn finish(mut self) -> io::Result<Digest> {
|
||||
self.flush()?;
|
||||
Ok(self.hasher.finalize())
|
||||
}
|
||||
}
|
||||
// Write automatically pushes to file and hasher
|
||||
impl Write for DropWriter<File> {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.hasher
|
||||
.write_all(buf)
|
||||
.map_err(|e| io::Error::other(format!("Unable to write to hasher: {e}")))?;
|
||||
let bytes_written = self.destination.write(buf)?;
|
||||
self.progress.add(bytes_written);
|
||||
|
||||
Ok(bytes_written)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
self.hasher.flush()?;
|
||||
self.destination.flush()
|
||||
}
|
||||
}
|
||||
// Seek moves around destination output
|
||||
impl Seek for DropWriter<File> {
|
||||
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
|
||||
self.destination.seek(pos)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DropDownloadPipeline<'a, R: Read, W: Write> {
|
||||
pub source: R,
|
||||
pub drops: Vec<DownloadDrop>,
|
||||
pub destination: Vec<DropWriter<W>>,
|
||||
pub control_flag: &'a DownloadThreadControl,
|
||||
#[allow(dead_code)]
|
||||
progress: ProgressHandle,
|
||||
}
|
||||
|
||||
impl<'a> DropDownloadPipeline<'a, Response, File> {
|
||||
fn new(
|
||||
source: Response,
|
||||
drops: Vec<DownloadDrop>,
|
||||
control_flag: &'a DownloadThreadControl,
|
||||
progress: ProgressHandle,
|
||||
) -> Result<Self, io::Error> {
|
||||
Ok(Self {
|
||||
source,
|
||||
destination: drops
|
||||
.iter()
|
||||
.map(|drop| DropWriter::new(drop.path.clone(), progress.clone()))
|
||||
.try_collect()?,
|
||||
drops,
|
||||
control_flag,
|
||||
progress,
|
||||
})
|
||||
}
|
||||
|
||||
fn copy(&mut self) -> Result<bool, io::Error> {
|
||||
let mut copy_buffer = [0u8; MAX_PACKET_LENGTH];
|
||||
for (index, drop) in self.drops.iter().enumerate() {
|
||||
let destination = self
|
||||
.destination
|
||||
.get_mut(index)
|
||||
.ok_or(io::Error::other("no destination"))?;
|
||||
let mut remaining = drop.length;
|
||||
if drop.start != 0 {
|
||||
destination.seek(SeekFrom::Start(drop.start as u64))?;
|
||||
}
|
||||
let mut last_bump = 0;
|
||||
loop {
|
||||
let size = MAX_PACKET_LENGTH.min(remaining);
|
||||
let size = self
|
||||
.source
|
||||
.read(&mut copy_buffer[0..size])
|
||||
.inspect_err(|_| {
|
||||
info!("got error from {}", drop.filename);
|
||||
})?;
|
||||
remaining -= size;
|
||||
last_bump += size;
|
||||
|
||||
destination.write_all(©_buffer[0..size])?;
|
||||
|
||||
if last_bump > BUMP_SIZE {
|
||||
last_bump -= BUMP_SIZE;
|
||||
if self.control_flag.get() == DownloadThreadControlFlag::Stop {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
|
||||
if remaining == 0 {
|
||||
break;
|
||||
};
|
||||
}
|
||||
|
||||
if self.control_flag.get() == DownloadThreadControlFlag::Stop {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn debug_skip_checksum(self) {
|
||||
self.destination
|
||||
.into_iter()
|
||||
.for_each(|mut e| e.flush().unwrap());
|
||||
}
|
||||
|
||||
fn finish(self) -> Result<Vec<Digest>, io::Error> {
|
||||
let checksums = self
|
||||
.destination
|
||||
.into_iter()
|
||||
.map(|e| e.finish())
|
||||
.try_collect()?;
|
||||
Ok(checksums)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn download_game_bucket(
|
||||
bucket: &DownloadBucket,
|
||||
ctx: &DownloadContext,
|
||||
control_flag: &DownloadThreadControl,
|
||||
progress: ProgressHandle,
|
||||
) -> Result<bool, ApplicationDownloadError> {
|
||||
// If we're paused
|
||||
if control_flag.get() == DownloadThreadControlFlag::Stop {
|
||||
progress.set(0);
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
let header = generate_authorization_header();
|
||||
|
||||
let url = generate_url(&["/api/v2/client/chunk"], &[])
|
||||
.map_err(ApplicationDownloadError::Communication)?;
|
||||
|
||||
let body = ChunkBody::create(ctx, &bucket.drops);
|
||||
|
||||
let response = DROP_CLIENT_SYNC
|
||||
.post(url)
|
||||
.json(&body)
|
||||
.header("Authorization", header)
|
||||
.send()
|
||||
.map_err(|e| ApplicationDownloadError::Communication(e.into()))?;
|
||||
|
||||
if response.status() != 200 {
|
||||
info!("chunk request got status code: {}", response.status());
|
||||
let raw_res = response.text().map_err(|e| {
|
||||
ApplicationDownloadError::Communication(RemoteAccessError::FetchError(e.into()))
|
||||
})?;
|
||||
info!("{raw_res}");
|
||||
if let Ok(err) = serde_json::from_str::<DropServerError>(&raw_res) {
|
||||
return Err(ApplicationDownloadError::Communication(
|
||||
RemoteAccessError::InvalidResponse(err),
|
||||
));
|
||||
}
|
||||
return Err(ApplicationDownloadError::Communication(
|
||||
RemoteAccessError::UnparseableResponse(raw_res),
|
||||
));
|
||||
}
|
||||
|
||||
let lengths = response
|
||||
.headers()
|
||||
.get("Content-Lengths")
|
||||
.ok_or(ApplicationDownloadError::Communication(
|
||||
RemoteAccessError::UnparseableResponse("missing Content-Lengths header".to_owned()),
|
||||
))?
|
||||
.to_str()
|
||||
.map_err(|e| {
|
||||
ApplicationDownloadError::Communication(RemoteAccessError::UnparseableResponse(
|
||||
e.to_string(),
|
||||
))
|
||||
})?;
|
||||
|
||||
for (i, raw_length) in lengths.split(",").enumerate() {
|
||||
let length = raw_length.parse::<usize>().unwrap_or(0);
|
||||
let Some(drop) = bucket.drops.get(i) else {
|
||||
warn!("invalid number of Content-Lengths recieved: {i}, {lengths}");
|
||||
return Err(ApplicationDownloadError::DownloadError(
|
||||
RemoteAccessError::InvalidResponse(DropServerError {
|
||||
status_code: 400,
|
||||
status_message: format!(
|
||||
"invalid number of Content-Lengths recieved: {i}, {lengths}"
|
||||
),
|
||||
}),
|
||||
));
|
||||
};
|
||||
if drop.length != length {
|
||||
warn!(
|
||||
"for {}, expected {}, got {} ({})",
|
||||
drop.filename, drop.length, raw_length, length
|
||||
);
|
||||
return Err(ApplicationDownloadError::DownloadError(
|
||||
RemoteAccessError::InvalidResponse(DropServerError {
|
||||
status_code: 400,
|
||||
status_message: format!(
|
||||
"for {}, expected {}, got {} ({})",
|
||||
drop.filename, drop.length, raw_length, length
|
||||
),
|
||||
}),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
let timestep = start.elapsed().as_millis();
|
||||
|
||||
debug!("took {}ms to start downloading", timestep);
|
||||
|
||||
let mut pipeline =
|
||||
DropDownloadPipeline::new(response, bucket.drops.clone(), control_flag, progress)
|
||||
.map_err(|e| ApplicationDownloadError::IoError(Arc::new(e)))?;
|
||||
|
||||
let completed = pipeline
|
||||
.copy()
|
||||
.map_err(|e| ApplicationDownloadError::IoError(Arc::new(e)))?;
|
||||
if !completed {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
// If we complete the file, set the permissions (if on Linux)
|
||||
#[cfg(unix)]
|
||||
{
|
||||
for drop in bucket.drops.iter() {
|
||||
let permissions = Permissions::from_mode(drop.permissions);
|
||||
set_permissions(drop.path.clone(), permissions)
|
||||
.map_err(|e| ApplicationDownloadError::IoError(Arc::new(e)))?;
|
||||
}
|
||||
}
|
||||
|
||||
let checksums = pipeline
|
||||
.finish()
|
||||
.map_err(|e| ApplicationDownloadError::IoError(Arc::new(e)))?;
|
||||
|
||||
for (index, drop) in bucket.drops.iter().enumerate() {
|
||||
let res = hex::encode(**checksums.get(index).unwrap());
|
||||
if res != drop.checksum {
|
||||
warn!("context didn't match... doing nothing because we will validate later.");
|
||||
// return Ok(false);
|
||||
// return Err(ApplicationDownloadError::Checksum);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
94
src-tauri/games/src/downloads/drop_data.rs
Normal file
94
src-tauri/games/src/downloads/drop_data.rs
Normal file
@ -0,0 +1,94 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fs::File,
|
||||
io::{self, Read, Write},
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use log::error;
|
||||
use native_model::{Decode, Encode};
|
||||
use utils::lock;
|
||||
|
||||
pub type DropData = v1::DropData;
|
||||
|
||||
pub static DROP_DATA_PATH: &str = ".dropdata";
|
||||
|
||||
pub mod v1 {
|
||||
use std::{collections::HashMap, path::PathBuf, sync::Mutex};
|
||||
|
||||
use native_model::native_model;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[native_model(id = 9, version = 1, with = native_model::rmp_serde_1_3::RmpSerde)]
|
||||
pub struct DropData {
|
||||
pub game_id: String,
|
||||
pub game_version: String,
|
||||
pub contexts: Mutex<HashMap<String, bool>>,
|
||||
pub base_path: PathBuf,
|
||||
}
|
||||
|
||||
impl DropData {
|
||||
pub fn new(game_id: String, game_version: String, base_path: PathBuf) -> Self {
|
||||
Self {
|
||||
base_path,
|
||||
game_id,
|
||||
game_version,
|
||||
contexts: Mutex::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DropData {
|
||||
pub fn generate(game_id: String, game_version: String, base_path: PathBuf) -> Self {
|
||||
match DropData::read(&base_path) {
|
||||
Ok(v) => v,
|
||||
Err(_) => DropData::new(game_id, game_version, base_path),
|
||||
}
|
||||
}
|
||||
pub fn read(base_path: &Path) -> Result<Self, io::Error> {
|
||||
let mut file = File::open(base_path.join(DROP_DATA_PATH))?;
|
||||
|
||||
let mut s = Vec::new();
|
||||
file.read_to_end(&mut s)?;
|
||||
|
||||
native_model::rmp_serde_1_3::RmpSerde::decode(s).map_err(|e| {
|
||||
io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
format!("Failed to decode drop data: {e}"),
|
||||
)
|
||||
})
|
||||
}
|
||||
pub fn write(&self) {
|
||||
let manifest_raw = match native_model::rmp_serde_1_3::RmpSerde::encode(&self) {
|
||||
Ok(data) => data,
|
||||
Err(_) => return,
|
||||
};
|
||||
|
||||
let mut file = match File::create(self.base_path.join(DROP_DATA_PATH)) {
|
||||
Ok(file) => file,
|
||||
Err(e) => {
|
||||
error!("{e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match file.write_all(&manifest_raw) {
|
||||
Ok(()) => {}
|
||||
Err(e) => error!("{e}"),
|
||||
}
|
||||
}
|
||||
pub fn set_contexts(&self, completed_contexts: &[(String, bool)]) {
|
||||
*lock!(self.contexts) = completed_contexts
|
||||
.iter()
|
||||
.map(|s| (s.0.clone(), s.1))
|
||||
.collect();
|
||||
}
|
||||
pub fn set_context(&self, context: String, state: bool) {
|
||||
lock!(self.contexts).entry(context).insert_entry(state);
|
||||
}
|
||||
pub fn get_contexts(&self) -> HashMap<String, bool> {
|
||||
lock!(self.contexts).clone()
|
||||
}
|
||||
}
|
||||
29
src-tauri/games/src/downloads/error.rs
Normal file
29
src-tauri/games/src/downloads/error.rs
Normal file
@ -0,0 +1,29 @@
|
||||
use std::fmt::Display;
|
||||
|
||||
use serde_with::SerializeDisplay;
|
||||
|
||||
#[derive(SerializeDisplay)]
|
||||
pub enum LibraryError {
|
||||
MetaNotFound(String),
|
||||
VersionNotFound(String),
|
||||
}
|
||||
impl Display for LibraryError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"{}",
|
||||
match self {
|
||||
LibraryError::MetaNotFound(id) => {
|
||||
format!(
|
||||
"Could not locate any installed version of game ID {id} in the database"
|
||||
)
|
||||
}
|
||||
LibraryError::VersionNotFound(game_id) => {
|
||||
format!(
|
||||
"Could not locate any installed version for game id {game_id} in the database"
|
||||
)
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
98
src-tauri/games/src/downloads/manifest.rs
Normal file
98
src-tauri/games/src/downloads/manifest.rs
Normal file
@ -0,0 +1,98 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
// Drops go in buckets
|
||||
pub struct DownloadDrop {
|
||||
pub index: usize,
|
||||
pub filename: String,
|
||||
pub path: PathBuf,
|
||||
pub start: usize,
|
||||
pub length: usize,
|
||||
pub checksum: String,
|
||||
pub permissions: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct DownloadBucket {
|
||||
pub game_id: String,
|
||||
pub version: String,
|
||||
pub drops: Vec<DownloadDrop>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct DownloadContext {
|
||||
pub context: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ChunkBodyFile {
|
||||
filename: String,
|
||||
chunk_index: usize,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ChunkBody {
|
||||
pub context: String,
|
||||
pub files: Vec<ChunkBodyFile>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct ManifestBody {
|
||||
pub game: String,
|
||||
pub version: String,
|
||||
}
|
||||
|
||||
impl ChunkBody {
|
||||
pub fn create(context: &DownloadContext, drops: &[DownloadDrop]) -> ChunkBody {
|
||||
Self {
|
||||
context: context.context.clone(),
|
||||
files: drops
|
||||
.iter()
|
||||
.map(|e| ChunkBodyFile {
|
||||
filename: e.filename.clone(),
|
||||
chunk_index: e.index,
|
||||
})
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub type DropManifest = HashMap<String, DropChunk>;
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct DropChunk {
|
||||
pub permissions: u32,
|
||||
pub ids: Vec<String>,
|
||||
pub checksums: Vec<String>,
|
||||
pub lengths: Vec<usize>,
|
||||
pub version_name: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct DropValidateContext {
|
||||
pub index: usize,
|
||||
pub offset: usize,
|
||||
pub path: PathBuf,
|
||||
pub checksum: String,
|
||||
pub length: usize,
|
||||
}
|
||||
|
||||
impl From<DownloadBucket> for Vec<DropValidateContext> {
|
||||
fn from(value: DownloadBucket) -> Self {
|
||||
value
|
||||
.drops
|
||||
.into_iter()
|
||||
.map(|e| DropValidateContext {
|
||||
index: e.index,
|
||||
offset: e.start,
|
||||
path: e.path,
|
||||
checksum: e.checksum,
|
||||
length: e.length,
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
7
src-tauri/games/src/downloads/mod.rs
Normal file
7
src-tauri/games/src/downloads/mod.rs
Normal file
@ -0,0 +1,7 @@
|
||||
pub mod download_agent;
|
||||
mod download_logic;
|
||||
pub mod drop_data;
|
||||
pub mod error;
|
||||
mod manifest;
|
||||
pub mod utils;
|
||||
pub mod validate;
|
||||
25
src-tauri/games/src/downloads/utils.rs
Normal file
25
src-tauri/games/src/downloads/utils.rs
Normal file
@ -0,0 +1,25 @@
|
||||
use std::{io, path::PathBuf, sync::Arc};
|
||||
|
||||
use download_manager::error::ApplicationDownloadError;
|
||||
use sysinfo::{Disk, DiskRefreshKind, Disks};
|
||||
|
||||
pub fn get_disk_available(mount_point: PathBuf) -> Result<u64, ApplicationDownloadError> {
|
||||
let disks = Disks::new_with_refreshed_list_specifics(DiskRefreshKind::nothing().with_storage());
|
||||
|
||||
let mut disk_iter = disks.into_iter().collect::<Vec<&Disk>>();
|
||||
disk_iter.sort_by(|a, b| {
|
||||
b.mount_point()
|
||||
.to_string_lossy()
|
||||
.len()
|
||||
.cmp(&a.mount_point().to_string_lossy().len())
|
||||
});
|
||||
|
||||
for disk in disk_iter {
|
||||
if mount_point.starts_with(disk.mount_point()) {
|
||||
return Ok(disk.available_space());
|
||||
}
|
||||
}
|
||||
Err(ApplicationDownloadError::IoError(Arc::new(
|
||||
io::Error::other("could not find disk of path"),
|
||||
)))
|
||||
}
|
||||
104
src-tauri/games/src/downloads/validate.rs
Normal file
104
src-tauri/games/src/downloads/validate.rs
Normal file
@ -0,0 +1,104 @@
|
||||
use std::{
|
||||
fs::File,
|
||||
io::{self, BufWriter, Read, Seek, SeekFrom, Write},
|
||||
};
|
||||
|
||||
use download_manager::{
|
||||
error::ApplicationDownloadError,
|
||||
util::{
|
||||
download_thread_control_flag::{DownloadThreadControl, DownloadThreadControlFlag},
|
||||
progress_object::ProgressHandle,
|
||||
},
|
||||
};
|
||||
use log::debug;
|
||||
use md5::Context;
|
||||
|
||||
use crate::downloads::manifest::DropValidateContext;
|
||||
|
||||
pub fn validate_game_chunk(
|
||||
ctx: &DropValidateContext,
|
||||
control_flag: &DownloadThreadControl,
|
||||
progress: ProgressHandle,
|
||||
) -> Result<bool, ApplicationDownloadError> {
|
||||
debug!(
|
||||
"Starting chunk validation {}, {}, {} #{}",
|
||||
ctx.path.display(),
|
||||
ctx.index,
|
||||
ctx.offset,
|
||||
ctx.checksum
|
||||
);
|
||||
// If we're paused
|
||||
if control_flag.get() == DownloadThreadControlFlag::Stop {
|
||||
progress.set(0);
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let Ok(mut source) = File::open(&ctx.path) else {
|
||||
return Ok(false);
|
||||
};
|
||||
|
||||
if ctx.offset != 0 {
|
||||
source
|
||||
.seek(SeekFrom::Start(ctx.offset as u64))
|
||||
.expect("Failed to seek to file offset");
|
||||
}
|
||||
|
||||
let mut hasher = md5::Context::new();
|
||||
|
||||
let completed = validate_copy(&mut source, &mut hasher, ctx.length, control_flag, progress)?;
|
||||
if !completed {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let res = hex::encode(hasher.finalize().0);
|
||||
if res != ctx.checksum {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
debug!(
|
||||
"Successfully finished verification #{}, copied {} bytes",
|
||||
ctx.checksum, ctx.length
|
||||
);
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
fn validate_copy(
|
||||
source: &mut File,
|
||||
dest: &mut Context,
|
||||
size: usize,
|
||||
control_flag: &DownloadThreadControl,
|
||||
progress: ProgressHandle,
|
||||
) -> Result<bool, io::Error> {
|
||||
let copy_buf_size = 512;
|
||||
let mut copy_buf = vec![0; copy_buf_size];
|
||||
let mut buf_writer = BufWriter::with_capacity(1024 * 1024, dest);
|
||||
let mut total_bytes = 0;
|
||||
|
||||
loop {
|
||||
if control_flag.get() == DownloadThreadControlFlag::Stop {
|
||||
buf_writer.flush()?;
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let mut bytes_read = source.read(&mut copy_buf)?;
|
||||
total_bytes += bytes_read;
|
||||
|
||||
// If we read over (likely), truncate our read to
|
||||
// the right size
|
||||
if total_bytes > size {
|
||||
let over = total_bytes - size;
|
||||
bytes_read -= over;
|
||||
total_bytes = size;
|
||||
}
|
||||
|
||||
buf_writer.write_all(©_buf[0..bytes_read])?;
|
||||
progress.add(bytes_read);
|
||||
|
||||
if total_bytes >= size {
|
||||
break;
|
||||
}
|
||||
}
|
||||
buf_writer.flush()?;
|
||||
Ok(true)
|
||||
}
|
||||
7
src-tauri/games/src/lib.rs
Normal file
7
src-tauri/games/src/lib.rs
Normal file
@ -0,0 +1,7 @@
|
||||
#![feature(iterator_try_collect)]
|
||||
|
||||
pub mod collections;
|
||||
pub mod downloads;
|
||||
pub mod library;
|
||||
pub mod scan;
|
||||
pub mod state;
|
||||
300
src-tauri/games/src/library.rs
Normal file
300
src-tauri/games/src/library.rs
Normal file
@ -0,0 +1,300 @@
|
||||
use bitcode::{Decode, Encode};
|
||||
use database::{
|
||||
ApplicationTransientStatus, Database, DownloadableMetadata, GameDownloadStatus, GameVersion,
|
||||
borrow_db_checked, borrow_db_mut_checked,
|
||||
};
|
||||
use log::{debug, error, warn};
|
||||
use remote::{
|
||||
auth::generate_authorization_header, error::RemoteAccessError, requests::generate_url,
|
||||
utils::DROP_CLIENT_SYNC,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs::remove_dir_all;
|
||||
use std::thread::spawn;
|
||||
use tauri::AppHandle;
|
||||
use utils::app_emit;
|
||||
|
||||
use crate::state::{GameStatusManager, GameStatusWithTransient};
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct FetchGameStruct {
|
||||
game: Game,
|
||||
status: GameStatusWithTransient,
|
||||
version: Option<GameVersion>,
|
||||
}
|
||||
|
||||
impl FetchGameStruct {
|
||||
pub fn new(game: Game, status: GameStatusWithTransient, version: Option<GameVersion>) -> Self {
|
||||
Self {
|
||||
game,
|
||||
status,
|
||||
version,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, Default, Encode, Decode)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Game {
|
||||
id: String,
|
||||
m_name: String,
|
||||
m_short_description: String,
|
||||
m_description: String,
|
||||
// mDevelopers
|
||||
// mPublishers
|
||||
m_icon_object_id: String,
|
||||
m_banner_object_id: String,
|
||||
m_cover_object_id: String,
|
||||
m_image_library_object_ids: Vec<String>,
|
||||
m_image_carousel_object_ids: Vec<String>,
|
||||
}
|
||||
impl Game {
|
||||
pub fn id(&self) -> &String {
|
||||
&self.id
|
||||
}
|
||||
}
|
||||
#[derive(serde::Serialize, Clone)]
|
||||
pub struct GameUpdateEvent {
|
||||
pub game_id: String,
|
||||
pub status: (
|
||||
Option<GameDownloadStatus>,
|
||||
Option<ApplicationTransientStatus>,
|
||||
),
|
||||
pub version: Option<GameVersion>,
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by:
|
||||
* - on_cancel, when cancelled, for obvious reasons
|
||||
* - when downloading, so if drop unexpectedly quits, we can resume the download. hidden by the "Downloading..." transient state, though
|
||||
* - when scanning, to import the game
|
||||
*/
|
||||
pub fn set_partially_installed(
|
||||
meta: &DownloadableMetadata,
|
||||
install_dir: String,
|
||||
app_handle: Option<&AppHandle>,
|
||||
) {
|
||||
set_partially_installed_db(&mut borrow_db_mut_checked(), meta, install_dir, app_handle);
|
||||
}
|
||||
|
||||
pub fn set_partially_installed_db(
|
||||
db_lock: &mut Database,
|
||||
meta: &DownloadableMetadata,
|
||||
install_dir: String,
|
||||
app_handle: Option<&AppHandle>,
|
||||
) {
|
||||
db_lock.applications.transient_statuses.remove(meta);
|
||||
db_lock.applications.game_statuses.insert(
|
||||
meta.id.clone(),
|
||||
GameDownloadStatus::PartiallyInstalled {
|
||||
version_name: meta.version.as_ref().unwrap().clone(),
|
||||
install_dir,
|
||||
},
|
||||
);
|
||||
db_lock
|
||||
.applications
|
||||
.installed_game_version
|
||||
.insert(meta.id.clone(), meta.clone());
|
||||
|
||||
if let Some(app_handle) = app_handle {
|
||||
push_game_update(
|
||||
app_handle,
|
||||
&meta.id,
|
||||
None,
|
||||
GameStatusManager::fetch_state(&meta.id, db_lock),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn uninstall_game_logic(meta: DownloadableMetadata, app_handle: &AppHandle) {
|
||||
debug!("triggered uninstall for agent");
|
||||
let mut db_handle = borrow_db_mut_checked();
|
||||
db_handle
|
||||
.applications
|
||||
.transient_statuses
|
||||
.insert(meta.clone(), ApplicationTransientStatus::Uninstalling {});
|
||||
|
||||
push_game_update(
|
||||
app_handle,
|
||||
&meta.id,
|
||||
None,
|
||||
GameStatusManager::fetch_state(&meta.id, &db_handle),
|
||||
);
|
||||
|
||||
let previous_state = db_handle.applications.game_statuses.get(&meta.id).cloned();
|
||||
|
||||
let previous_state = if let Some(state) = previous_state {
|
||||
state
|
||||
} else {
|
||||
warn!("uninstall job doesn't have previous state, failing silently");
|
||||
return;
|
||||
};
|
||||
|
||||
if let Some((_, install_dir)) = match previous_state {
|
||||
GameDownloadStatus::Installed {
|
||||
version_name,
|
||||
install_dir,
|
||||
} => Some((version_name, install_dir)),
|
||||
GameDownloadStatus::SetupRequired {
|
||||
version_name,
|
||||
install_dir,
|
||||
} => Some((version_name, install_dir)),
|
||||
GameDownloadStatus::PartiallyInstalled {
|
||||
version_name,
|
||||
install_dir,
|
||||
} => Some((version_name, install_dir)),
|
||||
_ => None,
|
||||
} {
|
||||
db_handle
|
||||
.applications
|
||||
.transient_statuses
|
||||
.insert(meta.clone(), ApplicationTransientStatus::Uninstalling {});
|
||||
|
||||
drop(db_handle);
|
||||
|
||||
let app_handle = app_handle.clone();
|
||||
spawn(move || {
|
||||
if let Err(e) = remove_dir_all(install_dir) {
|
||||
error!("{e}");
|
||||
} else {
|
||||
let mut db_handle = borrow_db_mut_checked();
|
||||
db_handle.applications.transient_statuses.remove(&meta);
|
||||
db_handle
|
||||
.applications
|
||||
.installed_game_version
|
||||
.remove(&meta.id);
|
||||
db_handle
|
||||
.applications
|
||||
.game_statuses
|
||||
.insert(meta.id.clone(), GameDownloadStatus::Remote {});
|
||||
let _ = db_handle.applications.transient_statuses.remove(&meta);
|
||||
|
||||
push_game_update(
|
||||
&app_handle,
|
||||
&meta.id,
|
||||
None,
|
||||
GameStatusManager::fetch_state(&meta.id, &db_handle),
|
||||
);
|
||||
|
||||
debug!("uninstalled game id {}", &meta.id);
|
||||
app_emit!(&app_handle, "update_library", ());
|
||||
}
|
||||
});
|
||||
} else {
|
||||
warn!("invalid previous state for uninstall, failing silently.");
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_current_meta(game_id: &String) -> Option<DownloadableMetadata> {
|
||||
borrow_db_checked()
|
||||
.applications
|
||||
.installed_game_version
|
||||
.get(game_id)
|
||||
.cloned()
|
||||
}
|
||||
|
||||
pub fn on_game_complete(
|
||||
meta: &DownloadableMetadata,
|
||||
install_dir: String,
|
||||
app_handle: &AppHandle,
|
||||
) -> Result<(), RemoteAccessError> {
|
||||
// Fetch game version information from remote
|
||||
if meta.version.is_none() {
|
||||
return Err(RemoteAccessError::GameNotFound(meta.id.clone()));
|
||||
}
|
||||
|
||||
let client = DROP_CLIENT_SYNC.clone();
|
||||
let response = generate_url(
|
||||
&["/api/v1/client/game/version"],
|
||||
&[
|
||||
("id", &meta.id),
|
||||
("version", meta.version.as_ref().unwrap()),
|
||||
],
|
||||
)?;
|
||||
let response = client
|
||||
.get(response)
|
||||
.header("Authorization", generate_authorization_header())
|
||||
.send()?;
|
||||
|
||||
let game_version: GameVersion = response.json()?;
|
||||
|
||||
let mut handle = borrow_db_mut_checked();
|
||||
handle
|
||||
.applications
|
||||
.game_versions
|
||||
.entry(meta.id.clone())
|
||||
.or_default()
|
||||
.insert(meta.version.clone().unwrap(), game_version.clone());
|
||||
handle
|
||||
.applications
|
||||
.installed_game_version
|
||||
.insert(meta.id.clone(), meta.clone());
|
||||
|
||||
drop(handle);
|
||||
|
||||
let status = if game_version.setup_command.is_empty() {
|
||||
GameDownloadStatus::Installed {
|
||||
version_name: meta.version.clone().unwrap(),
|
||||
install_dir,
|
||||
}
|
||||
} else {
|
||||
GameDownloadStatus::SetupRequired {
|
||||
version_name: meta.version.clone().unwrap(),
|
||||
install_dir,
|
||||
}
|
||||
};
|
||||
|
||||
let mut db_handle = borrow_db_mut_checked();
|
||||
db_handle
|
||||
.applications
|
||||
.game_statuses
|
||||
.insert(meta.id.clone(), status.clone());
|
||||
drop(db_handle);
|
||||
app_emit!(
|
||||
app_handle,
|
||||
&format!("update_game/{}", meta.id),
|
||||
GameUpdateEvent {
|
||||
game_id: meta.id.clone(),
|
||||
status: (Some(status), None),
|
||||
version: Some(game_version),
|
||||
}
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn push_game_update(
|
||||
app_handle: &AppHandle,
|
||||
game_id: &String,
|
||||
version: Option<GameVersion>,
|
||||
status: GameStatusWithTransient,
|
||||
) {
|
||||
if let Some(GameDownloadStatus::Installed { .. } | GameDownloadStatus::SetupRequired { .. }) =
|
||||
&status.0
|
||||
&& version.is_none()
|
||||
{
|
||||
panic!("pushed game for installed game that doesn't have version information");
|
||||
}
|
||||
|
||||
app_emit!(
|
||||
app_handle,
|
||||
&format!("update_game/{game_id}"),
|
||||
GameUpdateEvent {
|
||||
game_id: game_id.clone(),
|
||||
status,
|
||||
version,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct FrontendGameOptions {
|
||||
launch_string: String,
|
||||
}
|
||||
|
||||
impl FrontendGameOptions {
|
||||
pub fn launch_string(&self) -> &String {
|
||||
&self.launch_string
|
||||
}
|
||||
}
|
||||
47
src-tauri/games/src/scan.rs
Normal file
47
src-tauri/games/src/scan.rs
Normal file
@ -0,0 +1,47 @@
|
||||
use std::fs;
|
||||
|
||||
use database::{DownloadType, DownloadableMetadata, borrow_db_mut_checked};
|
||||
use log::warn;
|
||||
|
||||
use crate::{
|
||||
downloads::drop_data::{DROP_DATA_PATH, DropData},
|
||||
library::set_partially_installed_db,
|
||||
};
|
||||
|
||||
pub fn scan_install_dirs() {
|
||||
let mut db_lock = borrow_db_mut_checked();
|
||||
for install_dir in db_lock.applications.install_dirs.clone() {
|
||||
let Ok(files) = fs::read_dir(install_dir) else {
|
||||
continue;
|
||||
};
|
||||
for game in files.into_iter().flatten() {
|
||||
let drop_data_file = game.path().join(DROP_DATA_PATH);
|
||||
if !drop_data_file.exists() {
|
||||
continue;
|
||||
}
|
||||
let game_id = game.file_name().display().to_string();
|
||||
let Ok(drop_data) = DropData::read(&game.path()) else {
|
||||
warn!(
|
||||
".dropdata exists for {}, but couldn't read it. is it corrupted?",
|
||||
game.file_name().display()
|
||||
);
|
||||
continue;
|
||||
};
|
||||
if db_lock.applications.game_statuses.contains_key(&game_id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let metadata = DownloadableMetadata::new(
|
||||
drop_data.game_id,
|
||||
Some(drop_data.game_version),
|
||||
DownloadType::Game,
|
||||
);
|
||||
set_partially_installed_db(
|
||||
&mut db_lock,
|
||||
&metadata,
|
||||
drop_data.base_path.to_str().unwrap().to_string(),
|
||||
None,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
35
src-tauri/games/src/state.rs
Normal file
35
src-tauri/games/src/state.rs
Normal file
@ -0,0 +1,35 @@
|
||||
use database::models::data::{
|
||||
ApplicationTransientStatus, Database, DownloadType, DownloadableMetadata, GameDownloadStatus,
|
||||
};
|
||||
|
||||
pub type GameStatusWithTransient = (
|
||||
Option<GameDownloadStatus>,
|
||||
Option<ApplicationTransientStatus>,
|
||||
);
|
||||
pub struct GameStatusManager {}
|
||||
|
||||
impl GameStatusManager {
|
||||
pub fn fetch_state(game_id: &String, database: &Database) -> GameStatusWithTransient {
|
||||
let online_state = database
|
||||
.applications
|
||||
.transient_statuses
|
||||
.get(&DownloadableMetadata {
|
||||
id: game_id.to_string(),
|
||||
download_type: DownloadType::Game,
|
||||
version: None,
|
||||
})
|
||||
.cloned();
|
||||
|
||||
let offline_state = database.applications.game_statuses.get(game_id).cloned();
|
||||
|
||||
if online_state.is_some() {
|
||||
return (None, online_state);
|
||||
}
|
||||
|
||||
if offline_state.is_some() {
|
||||
return (offline_state, None);
|
||||
}
|
||||
|
||||
(None, None)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user