feat: move to async runtime

This commit is contained in:
DecDuck
2025-07-26 17:37:11 +10:00
parent 682c6e9c0b
commit f92ec38231
30 changed files with 1639 additions and 1197 deletions

View File

@ -15,11 +15,11 @@ use crate::{
use super::download_agent::GameDownloadAgent;
#[tauri::command]
pub fn download_game(
pub async fn download_game(
game_id: String,
game_version: String,
install_dir: usize,
state: tauri::State<'_, Mutex<AppState>>,
state: tauri::State<'_, Mutex<AppState<'_>>>,
) -> Result<(), DownloadManagerError<DownloadManagerSignal>> {
let sender = state.lock().unwrap().download_manager.get_sender();
let game_download_agent = Arc::new(Box::new(GameDownloadAgent::new_from_index(
@ -27,7 +27,7 @@ pub fn download_game(
game_version,
install_dir,
sender,
)) as Box<dyn Downloadable + Send + Sync>);
).await) as Box<dyn Downloadable + Send + Sync>);
Ok(state
.lock()
.unwrap()
@ -36,11 +36,11 @@ pub fn download_game(
}
#[tauri::command]
pub fn resume_download(
pub async fn resume_download(
game_id: String,
state: tauri::State<'_, Mutex<AppState>>,
state: tauri::State<'_, Mutex<AppState<'_>>>,
) -> Result<(), DownloadManagerError<DownloadManagerSignal>> {
let s = borrow_db_checked()
let s = borrow_db_checked().await
.applications
.game_statuses
.get(&game_id)

View File

@ -15,22 +15,28 @@ use crate::games::downloads::manifest::{DropDownloadContext, DropManifest};
use crate::games::downloads::validate::game_validate_logic;
use crate::games::library::{on_game_complete, on_game_incomplete, push_game_update};
use crate::remote::requests::make_request;
use log::{debug, error, info};
use rayon::ThreadPoolBuilder;
use log::{debug, error, info, warn};
use std::collections::HashMap;
use std::fs::{create_dir_all, OpenOptions};
use std::fs::{OpenOptions, create_dir_all};
use std::path::{Path, PathBuf};
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use tauri::{AppHandle, Emitter};
use tokio::sync::mpsc;
#[cfg(target_os = "linux")]
use rustix::fs::{fallocate, FallocateFlags};
use rustix::fs::{FallocateFlags, fallocate};
use super::download_logic::download_game_chunk;
use super::drop_data::DropData;
// This is cursed but necessary
// See the message where it is used
unsafe fn extend_lifetime<'b, R>(r: &'b R) -> &'static R {
unsafe { std::mem::transmute::<&'b R, &'static R>(r) }
}
pub struct GameDownloadAgent {
pub id: String,
pub version: String,
@ -45,13 +51,13 @@ pub struct GameDownloadAgent {
}
impl GameDownloadAgent {
pub fn new_from_index(
pub async fn new_from_index(
id: String,
version: String,
target_download_dir: usize,
sender: Sender<DownloadManagerSignal>,
) -> Self {
let db_lock = borrow_db_checked();
let db_lock = borrow_db_checked().await;
let base_dir = db_lock.applications.install_dirs[target_download_dir].clone();
drop(db_lock);
@ -87,8 +93,8 @@ impl GameDownloadAgent {
}
// Blocking
pub fn setup_download(&self) -> Result<(), ApplicationDownloadError> {
self.ensure_manifest_exists()?;
pub async fn setup_download(&self) -> Result<(), ApplicationDownloadError> {
self.ensure_manifest_exists().await?;
self.ensure_contexts()?;
@ -98,8 +104,8 @@ impl GameDownloadAgent {
}
// Blocking
pub fn download(&self, app_handle: &AppHandle) -> Result<bool, ApplicationDownloadError> {
self.setup_download()?;
pub async fn download(&self, app_handle: &AppHandle) -> Result<bool, ApplicationDownloadError> {
self.setup_download().await?;
self.set_progress_object_params();
let timer = Instant::now();
push_game_update(
@ -115,6 +121,7 @@ impl GameDownloadAgent {
);
let res = self
.run()
.await
.map_err(|_| ApplicationDownloadError::DownloadError);
debug!(
@ -125,37 +132,39 @@ impl GameDownloadAgent {
res
}
pub fn ensure_manifest_exists(&self) -> Result<(), ApplicationDownloadError> {
pub async fn ensure_manifest_exists(&self) -> Result<(), ApplicationDownloadError> {
if self.manifest.lock().unwrap().is_some() {
return Ok(());
}
self.download_manifest()
self.download_manifest().await
}
fn download_manifest(&self) -> Result<(), ApplicationDownloadError> {
let header = generate_authorization_header();
let client = reqwest::blocking::Client::new();
async fn download_manifest(&self) -> Result<(), ApplicationDownloadError> {
let header = generate_authorization_header().await;
let client = reqwest::Client::new();
let response = make_request(
&client,
&["/api/v1/client/game/manifest"],
&[("id", &self.id), ("version", &self.version)],
|f| f.header("Authorization", header),
async |f| f.header("Authorization", header),
)
.await
.map_err(ApplicationDownloadError::Communication)?
.send()
.await
.map_err(|e| ApplicationDownloadError::Communication(e.into()))?;
if response.status() != 200 {
return Err(ApplicationDownloadError::Communication(
RemoteAccessError::ManifestDownloadFailed(
response.status(),
response.text().unwrap(),
response.text().await.unwrap(),
),
));
}
let manifest_download: DropManifest = response.json().unwrap();
let manifest_download: DropManifest = response.json().await.unwrap();
if let Ok(mut manifest) = self.manifest.lock() {
*manifest = Some(manifest_download);
@ -251,30 +260,35 @@ impl GameDownloadAgent {
Ok(())
}
// TODO: Change return value on Err
pub fn run(&self) -> Result<bool, ()> {
let max_download_threads = borrow_db_checked().settings.max_download_threads;
pub async fn run(&self) -> Result<bool, ()> {
let max_download_threads = borrow_db_checked().await.settings.max_download_threads;
debug!(
"downloading game: {} with {} threads",
self.id, max_download_threads
);
let pool = ThreadPoolBuilder::new()
.num_threads(max_download_threads)
let client = &reqwest::Client::new();
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(max_download_threads)
.thread_name("drop-download-thread")
.build()
.unwrap();
.map_err(|e| {
warn!("failed to create download scheduler: {e}");
()
})?;
let completed_contexts = Arc::new(boxcar::Vec::new());
let completed_indexes_loop_arc = completed_contexts.clone();
let (tx, mut rx) = mpsc::channel(32);
// Scope this for safety
{
let contexts = self.contexts.lock().unwrap();
debug!("{contexts:#?}");
let contexts = self.contexts.lock().unwrap();
debug!("{contexts:#?}");
pool.scope(|scope| {
let client = &reqwest::blocking::Client::new();
let context_map = self.context_map.lock().unwrap();
for (index, context) in contexts.iter().enumerate() {
let client = client.clone();
let completed_indexes = completed_indexes_loop_arc.clone();
let progress = self.progress.get(index);
let progress_handle = ProgressHandle::new(progress, self.progress.clone());
@ -287,33 +301,48 @@ impl GameDownloadAgent {
let sender = self.sender.clone();
let request = match make_request(
&client,
&["/api/v1/client/chunk"],
&[
("id", &context.game_id),
("version", &context.version),
("name", &context.file_name),
("chunk", &context.index.to_string()),
],
|r| r,
) {
Ok(request) => request,
Err(e) => {
sender
.send(DownloadManagerSignal::Error(
ApplicationDownloadError::Communication(e),
))
.unwrap();
continue;
}
};
let local_tx = tx.clone();
/*
This lifetime extensions are necessary, because this loop acts like a scope
but Rust doesn't know that.
*/
let context = unsafe { extend_lifetime(context) };
let self_static = unsafe { extend_lifetime(self) };
rt.spawn(async move {
let request = match make_request(
&client,
&["/api/v1/client/chunk"],
&[
("id", &context.game_id),
("version", &context.version),
("name", &context.file_name),
("chunk", &context.index.to_string()),
],
async |r| r,
)
.await
{
Ok(request) => request,
Err(e) => {
sender
.send(DownloadManagerSignal::Error(
ApplicationDownloadError::Communication(e),
))
.unwrap();
return;
}
};
scope.spawn(move |_| {
match download_game_chunk(context, &self.control_flag, progress_handle, request)
match download_game_chunk(
context,
&self_static.control_flag,
progress_handle,
request,
)
.await
{
Ok(true) => {
completed_indexes.push(context.checksum.clone());
local_tx.send(context.checksum.clone()).await.unwrap();
}
Ok(false) => {}
Err(e) => {
@ -323,19 +352,22 @@ impl GameDownloadAgent {
}
});
}
});
}
let newly_completed = completed_contexts.to_owned();
let mut newly_completed = Vec::new();
while let Some(completed_checksum) = rx.recv().await {
println!("completed checksum {}", completed_checksum);
newly_completed.push(completed_checksum);
}
let completed_lock_len = {
let mut context_map_lock = self.context_map.lock().unwrap();
for (_, item) in newly_completed.iter() {
context_map_lock.insert(item.clone(), true);
}
// 'return' from the download
let mut context_map_lock = self.context_map.lock().unwrap();
for item in newly_completed.iter() {
context_map_lock.insert(item.clone(), true);
}
let completed_lock_len = context_map_lock.values().filter(|x| **x).count();
context_map_lock.values().filter(|x| **x).count()
};
let context_map_lock = self.context_map.lock().unwrap();
let contexts = self.contexts.lock().unwrap();
let contexts = contexts
.iter()
.map(|x| {
@ -345,6 +377,7 @@ impl GameDownloadAgent {
)
})
.collect::<Vec<(String, bool)>>();
drop(context_map_lock);
self.stored_manifest.set_contexts(&contexts);
@ -365,20 +398,8 @@ impl GameDownloadAgent {
}
}
#[async_trait::async_trait]
impl Downloadable for GameDownloadAgent {
fn download(&self, app_handle: &AppHandle) -> Result<bool, ApplicationDownloadError> {
*self.status.lock().unwrap() = DownloadStatus::Downloading;
self.download(app_handle)
}
fn progress(&self) -> Arc<ProgressObject> {
self.progress.clone()
}
fn control_flag(&self) -> DownloadThreadControl {
self.control_flag.clone()
}
fn metadata(&self) -> DownloadableMetadata {
DownloadableMetadata {
id: self.id.clone(),
@ -387,11 +408,24 @@ impl Downloadable for GameDownloadAgent {
}
}
fn on_initialised(&self, _app_handle: &tauri::AppHandle) {
async fn download(&self, app_handle: &AppHandle) -> Result<bool, ApplicationDownloadError> {
*self.status.lock().unwrap() = DownloadStatus::Downloading;
self.download(app_handle).await
}
async fn progress(&self) -> Arc<ProgressObject> {
self.progress.clone()
}
async fn control_flag(&self) -> DownloadThreadControl {
self.control_flag.clone()
}
async fn on_initialised(&self, _app_handle: &tauri::AppHandle) {
*self.status.lock().unwrap() = DownloadStatus::Queued;
}
fn on_error(&self, app_handle: &tauri::AppHandle, error: &ApplicationDownloadError) {
async fn on_error(&self, app_handle: &tauri::AppHandle, error: &ApplicationDownloadError) {
*self.status.lock().unwrap() = DownloadStatus::Error;
app_handle
.emit("download_error", error.to_string())
@ -399,46 +433,49 @@ impl Downloadable for GameDownloadAgent {
error!("error while managing download: {error}");
let mut handle = borrow_db_mut_checked();
let mut handle = borrow_db_mut_checked().await;
handle
.applications
.transient_statuses
.remove(&self.metadata());
}
fn on_complete(&self, app_handle: &tauri::AppHandle) {
async fn on_complete(&self, app_handle: &tauri::AppHandle) {
on_game_complete(
&self.metadata(),
self.stored_manifest.base_path.to_string_lossy().to_string(),
app_handle,
)
.await
.unwrap();
}
// TODO: fix this function. It doesn't restart the download properly, nor does it reset the state properly
fn on_incomplete(&self, app_handle: &tauri::AppHandle) {
async fn on_incomplete(&self, app_handle: &tauri::AppHandle) {
on_game_incomplete(
&self.metadata(),
self.stored_manifest.base_path.to_string_lossy().to_string(),
app_handle,
)
.await
.unwrap();
}
fn on_cancelled(&self, _app_handle: &tauri::AppHandle) {}
async fn on_cancelled(&self, _app_handle: &tauri::AppHandle) {}
fn status(&self) -> DownloadStatus {
async fn status(&self) -> DownloadStatus {
self.status.lock().unwrap().clone()
}
fn validate(&self) -> Result<bool, ApplicationDownloadError> {
async fn validate(&self) -> Result<bool, ApplicationDownloadError> {
*self.status.lock().unwrap() = DownloadStatus::Validating;
let contexts = self.contexts.lock().unwrap().clone();
game_validate_logic(
&self.stored_manifest,
self.contexts.lock().unwrap().clone(),
contexts,
self.progress.clone(),
self.sender.clone(),
&self.control_flag,
)
.await
}
}

View File

@ -7,68 +7,70 @@ use crate::error::drop_server_error::DropServerError;
use crate::error::remote_access_error::RemoteAccessError;
use crate::games::downloads::manifest::DropDownloadContext;
use crate::remote::auth::generate_authorization_header;
use futures::TryStreamExt;
use log::{debug, warn};
use md5::{Context, Digest};
use reqwest::blocking::{RequestBuilder, Response};
use reqwest::RequestBuilder;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt};
use tokio_util::io::StreamReader;
use std::fs::{set_permissions, Permissions};
use std::io::Read;
use std::fs::{Permissions, set_permissions};
use std::io::Write;
#[cfg(unix)]
use std::os::unix::fs::PermissionsExt;
use std::{
fs::{File, OpenOptions},
io::{self, BufWriter, Seek, SeekFrom, Write},
io::{self, SeekFrom},
path::PathBuf,
};
pub struct DropWriter<W: Write> {
pub struct DropWriter<W: AsyncWrite> {
hasher: Context,
destination: W,
}
impl DropWriter<File> {
fn new(path: PathBuf) -> Self {
async fn new(path: PathBuf) -> Self {
Self {
destination: OpenOptions::new().write(true).open(path).unwrap(),
destination: OpenOptions::new().write(true).open(path).await.unwrap(),
hasher: Context::new(),
}
}
fn finish(mut self) -> io::Result<Digest> {
self.flush().unwrap();
async fn finish(mut self) -> io::Result<Digest> {
self.flush().await.unwrap();
Ok(self.hasher.compute())
}
}
// Write automatically pushes to file and hasher
impl Write for DropWriter<File> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
async fn write(&mut self, mut buf: &[u8]) -> io::Result<()> {
self.hasher
.write_all(buf)
.map_err(|e| io::Error::other(format!("Unable to write to hasher: {e}")))?;
self.destination.write(buf)
self.destination.write_all_buf(&mut buf).await
}
fn flush(&mut self) -> io::Result<()> {
async fn flush(&mut self) -> io::Result<()> {
self.hasher.flush()?;
self.destination.flush()
self.destination.flush().await
}
}
// Seek moves around destination output
impl Seek for DropWriter<File> {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.destination.seek(pos)
async fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.destination.seek(pos).await
}
}
pub struct DropDownloadPipeline<'a, R: Read, W: Write> {
pub struct DropDownloadPipeline<'a, R: AsyncRead, W: AsyncWrite> {
pub source: R,
pub destination: DropWriter<W>,
pub control_flag: &'a DownloadThreadControl,
pub progress: ProgressHandle,
pub size: usize,
}
impl<'a> DropDownloadPipeline<'a, Response, File> {
impl<'a, R> DropDownloadPipeline<'a, R, File>
where
R: AsyncRead + Unpin,
{
fn new(
source: Response,
source: R,
destination: DropWriter<File>,
control_flag: &'a DownloadThreadControl,
progress: ProgressHandle,
@ -83,19 +85,18 @@ impl<'a> DropDownloadPipeline<'a, Response, File> {
}
}
fn copy(&mut self) -> Result<bool, io::Error> {
async fn copy(&mut self) -> Result<bool, io::Error> {
let copy_buf_size = 512;
let mut copy_buf = vec![0; copy_buf_size];
let mut buf_writer = BufWriter::with_capacity(1024 * 1024, &mut self.destination);
let mut current_size = 0;
loop {
if self.control_flag.get() == DownloadThreadControlFlag::Stop {
buf_writer.flush()?;
self.destination.flush().await?;
return Ok(false);
}
let mut bytes_read = self.source.read(&mut copy_buf)?;
let mut bytes_read = self.source.read(&mut copy_buf).await?;
current_size += bytes_read;
if current_size > self.size {
@ -105,7 +106,7 @@ impl<'a> DropDownloadPipeline<'a, Response, File> {
current_size = self.size;
}
buf_writer.write_all(&copy_buf[0..bytes_read])?;
self.destination.write(&copy_buf[0..bytes_read]).await?;
self.progress.add(bytes_read);
if current_size >= self.size {
@ -116,18 +117,18 @@ impl<'a> DropDownloadPipeline<'a, Response, File> {
break;
}
}
buf_writer.flush()?;
self.destination.flush().await?;
Ok(true)
}
fn finish(self) -> Result<Digest, io::Error> {
let checksum = self.destination.finish()?;
async fn finish(self) -> Result<Digest, io::Error> {
let checksum = self.destination.finish().await?;
Ok(checksum)
}
}
pub fn download_game_chunk(
pub async fn download_game_chunk(
ctx: &DropDownloadContext,
control_flag: &DownloadThreadControl,
progress: ProgressHandle,
@ -139,13 +140,14 @@ pub fn download_game_chunk(
return Ok(false);
}
let response = request
.header("Authorization", generate_authorization_header())
.header("Authorization", generate_authorization_header().await)
.send()
.await
.map_err(|e| ApplicationDownloadError::Communication(e.into()))?;
if response.status() != 200 {
debug!("chunk request got status code: {}", response.status());
let raw_res = response.text().unwrap();
let raw_res = response.text().await.unwrap();
if let Ok(err) = serde_json::from_str::<DropServerError>(&raw_res) {
return Err(ApplicationDownloadError::Communication(
RemoteAccessError::InvalidResponse(err),
@ -156,11 +158,12 @@ pub fn download_game_chunk(
));
}
let mut destination = DropWriter::new(ctx.path.clone());
let mut destination = DropWriter::new(ctx.path.clone()).await;
if ctx.offset != 0 {
destination
.seek(SeekFrom::Start(ctx.offset))
.await
.expect("Failed to seek to file offset");
}
@ -168,7 +171,7 @@ pub fn download_game_chunk(
if content_length.is_none() {
warn!("recieved 0 length content from server");
return Err(ApplicationDownloadError::Communication(
RemoteAccessError::InvalidResponse(response.json().unwrap()),
RemoteAccessError::InvalidResponse(response.json().await.unwrap()),
));
}
@ -178,11 +181,17 @@ pub fn download_game_chunk(
return Err(ApplicationDownloadError::DownloadError);
}
let response_stream = StreamReader::new(
response
.bytes_stream()
.map_err(|e| std::io::Error::other(e)),
);
let mut pipeline =
DropDownloadPipeline::new(response, destination, control_flag, progress, length);
DropDownloadPipeline::new(response_stream, destination, control_flag, progress, length);
let completed = pipeline
.copy()
.await
.map_err(|e| ApplicationDownloadError::IoError(e.kind()))?;
if !completed {
return Ok(false);
@ -197,6 +206,7 @@ pub fn download_game_chunk(
let checksum = pipeline
.finish()
.await
.map_err(|e| ApplicationDownloadError::IoError(e.kind()))?;
let res = hex::encode(checksum.0);

View File

@ -1,12 +1,11 @@
use std::{
fs::File,
io::{self, BufWriter, Read, Seek, SeekFrom, Write},
sync::{mpsc::Sender, Arc},
sync::{Arc, mpsc::Sender},
};
use log::{debug, error, info};
use md5::Context;
use rayon::ThreadPoolBuilder;
use crate::{
database::db::borrow_db_checked,
@ -21,7 +20,7 @@ use crate::{
games::downloads::{drop_data::DropData, manifest::DropDownloadContext},
};
pub fn game_validate_logic(
pub async fn game_validate_logic(
dropdata: &DropData,
contexts: Vec<DropDownloadContext>,
progress: Arc<ProgressObject>,
@ -29,49 +28,47 @@ pub fn game_validate_logic(
control_flag: &DownloadThreadControl,
) -> Result<bool, ApplicationDownloadError> {
progress.reset(contexts.len());
let max_download_threads = borrow_db_checked().settings.max_download_threads;
let max_download_threads = borrow_db_checked().await.settings.max_download_threads;
debug!(
"validating game: {} with {} threads",
dropdata.game_id, max_download_threads
);
let pool = ThreadPoolBuilder::new()
.num_threads(max_download_threads)
.build()
.unwrap();
debug!("{contexts:#?}");
let invalid_chunks = Arc::new(boxcar::Vec::new());
pool.scope(|scope| {
for (index, context) in contexts.iter().enumerate() {
let current_progress = progress.get(index);
let progress_handle = ProgressHandle::new(current_progress, progress.clone());
let invalid_chunks_scoped = invalid_chunks.clone();
let sender = sender.clone();
unsafe {
async_scoped::TokioScope::scope_and_collect(|scope| {
for (index, context) in contexts.iter().enumerate() {
let current_progress = progress.get(index);
let progress_handle = ProgressHandle::new(current_progress, progress.clone());
let invalid_chunks_scoped = invalid_chunks.clone();
let sender = sender.clone();
scope.spawn(move |_| {
match validate_game_chunk(context, control_flag, progress_handle) {
Ok(true) => {
debug!(
"Finished context #{} with checksum {}",
index, context.checksum
);
scope.spawn(async move {
match validate_game_chunk(context, control_flag, progress_handle) {
Ok(true) => {
debug!(
"Finished context #{} with checksum {}",
index, context.checksum
);
}
Ok(false) => {
debug!(
"Didn't finish context #{} with checksum {}",
index, &context.checksum
);
invalid_chunks_scoped.push(context.checksum.clone());
}
Err(e) => {
error!("{e}");
sender.send(DownloadManagerSignal::Error(e)).unwrap();
}
}
Ok(false) => {
debug!(
"Didn't finish context #{} with checksum {}",
index, &context.checksum
);
invalid_chunks_scoped.push(context.checksum.clone());
}
Err(e) => {
error!("{e}");
sender.send(DownloadManagerSignal::Error(e)).unwrap();
}
}
});
}
});
});
}
}).await
};
// If there are any contexts left which are false
if !invalid_chunks.is_empty() {