mirror of
https://github.com/Drop-OSS/drop-app.git
synced 2025-11-13 08:12:44 +10:00
Lesson learned: Wrappers are the bane of my existence. Also here's the download cancelling logic.
Signed-off-by: quexeky <git@quexeky.dev>
This commit is contained in:
@ -9,10 +9,16 @@
|
|||||||
<input placeholder="VERSION NAME" v-model="versionName" />
|
<input placeholder="VERSION NAME" v-model="versionName" />
|
||||||
<button
|
<button
|
||||||
class="w-full rounded-md p-4 bg-blue-600 text-white"
|
class="w-full rounded-md p-4 bg-blue-600 text-white"
|
||||||
@click="startGameDownloads"
|
@click="startGameDownloadsWrapper"
|
||||||
>
|
>
|
||||||
Start Game Downloads
|
Start Game Downloads
|
||||||
</button>
|
</button>
|
||||||
|
<button
|
||||||
|
class="w-full rounded-md p-4 bg-blue-600 text-white"
|
||||||
|
@click="cancelGameDownloadWrapper"
|
||||||
|
>
|
||||||
|
Cancel game download
|
||||||
|
</button>
|
||||||
</template>
|
</template>
|
||||||
<script setup lang="ts">
|
<script setup lang="ts">
|
||||||
import { invoke } from "@tauri-apps/api/core";
|
import { invoke } from "@tauri-apps/api/core";
|
||||||
@ -39,6 +45,7 @@ function queueGameWrapper() {
|
|||||||
async function startGameDownloads() {
|
async function startGameDownloads() {
|
||||||
console.log("Downloading Games");
|
console.log("Downloading Games");
|
||||||
await invoke("start_game_downloads", { maxThreads: 4 })
|
await invoke("start_game_downloads", { maxThreads: 4 })
|
||||||
|
console.log("Finished downloading games");
|
||||||
}
|
}
|
||||||
function startGameDownloadsWrapper() {
|
function startGameDownloadsWrapper() {
|
||||||
startGameDownloads()
|
startGameDownloads()
|
||||||
@ -47,4 +54,16 @@ function startGameDownloadsWrapper() {
|
|||||||
console.log(e)
|
console.log(e)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
async function cancelGameDownload() {
|
||||||
|
console.log("Cancelling game download");
|
||||||
|
await invoke("stop_specific_game_download", { gameId: gameId.value })
|
||||||
|
}
|
||||||
|
function cancelGameDownloadWrapper() {
|
||||||
|
console.log("Triggered game cancel wrapper");
|
||||||
|
cancelGameDownload()
|
||||||
|
.then(() => {})
|
||||||
|
.catch((e) => {
|
||||||
|
console.log(e)
|
||||||
|
})
|
||||||
|
}
|
||||||
</script>
|
</script>
|
||||||
|
|||||||
@ -68,30 +68,30 @@ impl GameDownloadAgent {
|
|||||||
if self.manifest.lock().unwrap().is_none() {
|
if self.manifest.lock().unwrap().is_none() {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
self.ensure_manifest_exists().await
|
self.ensure_manifest_exists()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn begin_download(&self, max_threads: usize) -> Result<(), GameDownloadError> {
|
pub fn begin_download(&self, max_threads: usize) -> Result<(), GameDownloadError> {
|
||||||
self.change_state(GameDownloadState::Downloading);
|
self.change_state(GameDownloadState::Downloading);
|
||||||
// TODO we're coping the whole context thing
|
// TODO we're coping the whole context thing
|
||||||
// It's not necessary, I just can't figure out to make the borrow checker happy
|
// It's not necessary, I just can't figure out to make the borrow checker happy
|
||||||
{
|
{
|
||||||
let lock = self.contexts.lock().unwrap().to_vec();
|
let lock = self.contexts.lock().unwrap().to_vec();
|
||||||
self.progress
|
self.progress
|
||||||
.run_context_parallel(lock, max_threads).await;
|
.run_context_parallel(lock, max_threads);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn ensure_manifest_exists(&self) -> Result<(), GameDownloadError> {
|
pub fn ensure_manifest_exists(&self) -> Result<(), GameDownloadError> {
|
||||||
if self.manifest.lock().unwrap().is_some() {
|
if self.manifest.lock().unwrap().is_some() {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
self.download_manifest().await
|
self.download_manifest()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn download_manifest(&self) -> Result<(), GameDownloadError> {
|
fn download_manifest(&self) -> Result<(), GameDownloadError> {
|
||||||
let base_url = DB.fetch_base_url();
|
let base_url = DB.fetch_base_url();
|
||||||
let manifest_url = base_url
|
let manifest_url = base_url
|
||||||
.join(
|
.join(
|
||||||
@ -106,12 +106,11 @@ impl GameDownloadAgent {
|
|||||||
let header = generate_authorization_header();
|
let header = generate_authorization_header();
|
||||||
|
|
||||||
info!("Generating & sending client");
|
info!("Generating & sending client");
|
||||||
let client = reqwest::Client::new();
|
let client = reqwest::blocking::Client::new();
|
||||||
let response = client
|
let response = client
|
||||||
.get(manifest_url.to_string())
|
.get(manifest_url.to_string())
|
||||||
.header("Authorization", header)
|
.header("Authorization", header)
|
||||||
.send()
|
.send()
|
||||||
.await
|
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
if response.status() != 200 {
|
if response.status() != 200 {
|
||||||
@ -119,7 +118,7 @@ impl GameDownloadAgent {
|
|||||||
return Err(GameDownloadError::Status(response.status().as_u16()));
|
return Err(GameDownloadError::Status(response.status().as_u16()));
|
||||||
}
|
}
|
||||||
|
|
||||||
let manifest_download = response.json::<DropManifest>().await.unwrap();
|
let manifest_download = response.json::<DropManifest>().unwrap();
|
||||||
if let Ok(mut manifest) = self.manifest.lock() {
|
if let Ok(mut manifest) = self.manifest.lock() {
|
||||||
*manifest = Some(manifest_download)
|
*manifest = Some(manifest_download)
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
use std::sync::{atomic::Ordering, Arc, Mutex};
|
use std::{sync::{atomic::Ordering, Arc, Mutex}, thread};
|
||||||
|
|
||||||
use log::info;
|
use log::info;
|
||||||
|
|
||||||
@ -27,41 +27,46 @@ pub async fn start_game_downloads(
|
|||||||
state: tauri::State<'_, Mutex<AppState>>,
|
state: tauri::State<'_, Mutex<AppState>>,
|
||||||
) -> Result<(), GameDownloadError> {
|
) -> Result<(), GameDownloadError> {
|
||||||
info!("Downloading Games");
|
info!("Downloading Games");
|
||||||
loop {
|
let lock = state.lock().unwrap();
|
||||||
let mut current_id = String::new();
|
let mut game_downloads = lock.game_downloads.clone();
|
||||||
let mut download_agent = None;
|
drop(lock);
|
||||||
{
|
thread::spawn(move || {
|
||||||
let lock = state.lock().unwrap();
|
loop {
|
||||||
for (id, agent) in &lock.game_downloads {
|
let mut current_id = String::new();
|
||||||
if agent.get_state() == GameDownloadState::Queued {
|
let mut download_agent = None;
|
||||||
download_agent = Some(agent.clone());
|
{
|
||||||
current_id = id.clone();
|
for (id, agent) in &game_downloads {
|
||||||
info!("Got queued game to download");
|
if agent.get_state() == GameDownloadState::Queued {
|
||||||
break;
|
download_agent = Some(agent.clone());
|
||||||
|
current_id = id.clone();
|
||||||
|
info!("Got queued game to download");
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
if download_agent.is_none() {
|
||||||
|
info!("No more games left to download");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
info!("Downloading game");
|
||||||
|
{
|
||||||
|
start_game_download(max_threads, download_agent.unwrap()).unwrap();
|
||||||
|
game_downloads.remove_entry(¤t_id);
|
||||||
}
|
}
|
||||||
if download_agent.is_none() {
|
}
|
||||||
info!("No more games left to download");
|
});
|
||||||
return Ok(())
|
info!("Spawned download");
|
||||||
}
|
return Ok(())
|
||||||
};
|
|
||||||
info!("Downloading game");
|
|
||||||
{
|
|
||||||
start_game_download(max_threads, download_agent.unwrap()).await?;
|
|
||||||
let mut lock = state.lock().unwrap();
|
|
||||||
lock.game_downloads.remove_entry(¤t_id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn start_game_download(
|
pub fn start_game_download(
|
||||||
max_threads: usize,
|
max_threads: usize,
|
||||||
download_agent: Arc<GameDownloadAgent>
|
download_agent: Arc<GameDownloadAgent>
|
||||||
) -> Result<(), GameDownloadError> {
|
) -> Result<(), GameDownloadError> {
|
||||||
info!("Triggered Game Download");
|
info!("Triggered Game Download");
|
||||||
|
|
||||||
|
|
||||||
download_agent.ensure_manifest_exists().await?;
|
download_agent.ensure_manifest_exists()?;
|
||||||
|
|
||||||
let local_manifest = {
|
let local_manifest = {
|
||||||
let manifest = download_agent.manifest.lock().unwrap();
|
let manifest = download_agent.manifest.lock().unwrap();
|
||||||
@ -70,19 +75,21 @@ pub async fn start_game_download(
|
|||||||
|
|
||||||
download_agent.generate_job_contexts(&local_manifest, download_agent.version.clone(), download_agent.id.clone()).unwrap();
|
download_agent.generate_job_contexts(&local_manifest, download_agent.version.clone(), download_agent.id.clone()).unwrap();
|
||||||
|
|
||||||
download_agent.begin_download(max_threads).await?;
|
download_agent.begin_download(max_threads).unwrap();
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tauri::command]
|
#[tauri::command]
|
||||||
pub async fn stop_specific_game_download(state: tauri::State<'_, Mutex<AppState>>, game_id: String) -> Result<(), String> {
|
pub async fn stop_specific_game_download(state: tauri::State<'_, Mutex<AppState>>, game_id: String) -> Result<(), String> {
|
||||||
|
info!("called stop_specific_game_download");
|
||||||
let lock = state.lock().unwrap();
|
let lock = state.lock().unwrap();
|
||||||
let download_agent = lock.game_downloads.get(&game_id).unwrap();
|
let download_agent = lock.game_downloads.get(&game_id).unwrap();
|
||||||
|
|
||||||
let callback = download_agent.callback.clone();
|
let callback = download_agent.callback.clone();
|
||||||
drop(lock);
|
drop(lock);
|
||||||
|
|
||||||
|
info!("Stopping callback");
|
||||||
callback.store(true, Ordering::Release);
|
callback.store(true, Ordering::Release);
|
||||||
|
|
||||||
return Ok(())
|
return Ok(())
|
||||||
|
|||||||
@ -5,7 +5,8 @@ use crate::DB;
|
|||||||
use gxhash::{gxhash128, GxHasher};
|
use gxhash::{gxhash128, GxHasher};
|
||||||
use log::info;
|
use log::info;
|
||||||
use md5::{Context, Digest};
|
use md5::{Context, Digest};
|
||||||
use std::{fs::{File, OpenOptions}, hash::Hasher, io::{self, BufWriter, Error, ErrorKind, Seek, SeekFrom, Write}, path::PathBuf, sync::{atomic::{AtomicBool, Ordering}, Arc}};
|
use reqwest::blocking::Response;
|
||||||
|
use std::{fs::{File, OpenOptions}, hash::Hasher, io::{self, BufReader, BufWriter, Error, ErrorKind, Read, Seek, SeekFrom, Write}, path::PathBuf, sync::{atomic::{AtomicBool, Ordering}, Arc}};
|
||||||
use urlencoding::encode;
|
use urlencoding::encode;
|
||||||
|
|
||||||
pub struct DropFileWriter {
|
pub struct DropFileWriter {
|
||||||
@ -30,8 +31,10 @@ impl DropFileWriter {
|
|||||||
impl Write for DropFileWriter {
|
impl Write for DropFileWriter {
|
||||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||||
if self.callback.load(Ordering::Acquire) {
|
if self.callback.load(Ordering::Acquire) {
|
||||||
return Err(Error::new(ErrorKind::Interrupted, "Interrupt command recieved"));
|
return Err(Error::new(ErrorKind::ConnectionAborted, "Interrupt command recieved"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//info!("Writing data to writer");
|
||||||
self.hasher.write_all(buf).unwrap();
|
self.hasher.write_all(buf).unwrap();
|
||||||
self.file.write(buf)
|
self.file.write(buf)
|
||||||
}
|
}
|
||||||
@ -48,6 +51,7 @@ impl Seek for DropFileWriter {
|
|||||||
}
|
}
|
||||||
pub fn download_game_chunk(ctx: DropDownloadContext, callback: Arc<AtomicBool>) {
|
pub fn download_game_chunk(ctx: DropDownloadContext, callback: Arc<AtomicBool>) {
|
||||||
if callback.load(Ordering::Acquire) {
|
if callback.load(Ordering::Acquire) {
|
||||||
|
info!("Callback stopped download at start");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let base_url = DB.fetch_base_url();
|
let base_url = DB.fetch_base_url();
|
||||||
@ -83,16 +87,14 @@ pub fn download_game_chunk(ctx: DropDownloadContext, callback: Arc<AtomicBool>)
|
|||||||
// Writing everything to disk directly is probably slightly faster because it balances out the writes,
|
// Writing everything to disk directly is probably slightly faster because it balances out the writes,
|
||||||
// but this is better than the performance loss from constantly reading the callbacks
|
// but this is better than the performance loss from constantly reading the callbacks
|
||||||
|
|
||||||
let mut writer = BufWriter::with_capacity(1024 * 1024, file);
|
//let mut writer = BufWriter::with_capacity(1024 * 1024, file);
|
||||||
|
|
||||||
match response.copy_to(&mut writer) {
|
//copy_to_drop_file_writer(&mut response, &mut file);
|
||||||
|
match io::copy(&mut response, &mut file) {
|
||||||
Ok(_) => {},
|
Ok(_) => {},
|
||||||
Err(_) => { println!("Stopped printing chunk {}", ctx.file_name); return; }
|
Err(e) => { info!("Copy errored with error {}", e)},
|
||||||
};
|
}
|
||||||
let file = match writer.into_inner() {
|
|
||||||
Ok(inner) => inner,
|
|
||||||
Err(_) => panic!("Failed to get BufWriter inner"),
|
|
||||||
};
|
|
||||||
let res = hex::encode(file.finish().unwrap().0);
|
let res = hex::encode(file.finish().unwrap().0);
|
||||||
if res != ctx.checksum {
|
if res != ctx.checksum {
|
||||||
info!("Checksum failed. Original: {}, Calculated: {} for {}", ctx.checksum, res, ctx.file_name);
|
info!("Checksum failed. Original: {}, Calculated: {} for {}", ctx.checksum, res, ctx.file_name);
|
||||||
@ -100,3 +102,23 @@ pub fn download_game_chunk(ctx: DropDownloadContext, callback: Arc<AtomicBool>)
|
|||||||
|
|
||||||
// stream.flush().unwrap();
|
// stream.flush().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn copy_to_drop_file_writer(response: &mut Response, writer: &mut DropFileWriter) {
|
||||||
|
loop {
|
||||||
|
info!("Writing to file writer");
|
||||||
|
let mut buf = [0u8; 1024];
|
||||||
|
response.read(&mut buf).unwrap();
|
||||||
|
match writer.write_all(&buf) {
|
||||||
|
Ok(_) => {},
|
||||||
|
Err(e) => {
|
||||||
|
match e.kind() {
|
||||||
|
ErrorKind::Interrupted => {
|
||||||
|
info!("Interrupted");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
_ => { println!("{}", e); return;}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,6 +1,5 @@
|
|||||||
|
use log::info;
|
||||||
use rayon::ThreadPoolBuilder;
|
use rayon::ThreadPoolBuilder;
|
||||||
use uuid::timestamp::context;
|
|
||||||
use std::os::unix::thread;
|
|
||||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
@ -28,12 +27,6 @@ where
|
|||||||
callback
|
callback
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub async fn run_contexts_sequentially_async(&self, contexts: Vec<T>) {
|
|
||||||
for context in contexts {
|
|
||||||
(self.f)(context, self.callback.clone());
|
|
||||||
self.counter.fetch_add(1, Ordering::Release);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pub fn run_contexts_sequentially(&self, contexts: Vec<T>) {
|
pub fn run_contexts_sequentially(&self, contexts: Vec<T>) {
|
||||||
for context in contexts {
|
for context in contexts {
|
||||||
(self.f)(context, self.callback.clone());
|
(self.f)(context, self.callback.clone());
|
||||||
@ -54,7 +47,7 @@ where
|
|||||||
threads.spawn(move || f(context, callback));
|
threads.spawn(move || f(context, callback));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub async fn run_context_parallel(&self, contexts: Vec<T>, max_threads: usize) {
|
pub fn run_context_parallel(&self, contexts: Vec<T>, max_threads: usize) {
|
||||||
let threads = ThreadPoolBuilder::new()
|
let threads = ThreadPoolBuilder::new()
|
||||||
.num_threads(max_threads)
|
.num_threads(max_threads)
|
||||||
.build()
|
.build()
|
||||||
@ -64,9 +57,10 @@ where
|
|||||||
for context in contexts {
|
for context in contexts {
|
||||||
let callback = self.callback.clone();
|
let callback = self.callback.clone();
|
||||||
let f = self.f.clone();
|
let f = self.f.clone();
|
||||||
s.spawn(move |_| f(context, callback));
|
s.spawn(move |_| {info!("Running thread"); f(context, callback)});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
info!("Concluded scope");
|
||||||
|
|
||||||
}
|
}
|
||||||
pub fn get_progress(&self) -> usize {
|
pub fn get_progress(&self) -> usize {
|
||||||
|
|||||||
Reference in New Issue
Block a user