feat(downloads): lockless tracking of downloaded chunks

This commit is contained in:
DecDuck
2024-12-26 17:41:10 +11:00
parent a135b1321c
commit 21835858f1
4 changed files with 34 additions and 23 deletions

7
src-tauri/Cargo.lock generated
View File

@ -355,6 +355,12 @@ dependencies = [
"piper", "piper",
] ]
[[package]]
name = "boxcar"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f839cdf7e2d3198ac6ca003fd8ebc61715755f41c1cad15ff13df67531e00ed"
[[package]] [[package]]
name = "brotli" name = "brotli"
version = "7.0.0" version = "7.0.0"
@ -965,6 +971,7 @@ dependencies = [
name = "drop-app" name = "drop-app"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"boxcar",
"chrono", "chrono",
"directories", "directories",
"hex", "hex",

View File

@ -42,6 +42,7 @@ urlencoding = "2.1.3"
md5 = "0.7.0" md5 = "0.7.0"
chrono = "0.4.38" chrono = "0.4.38"
tauri-plugin-os = "2" tauri-plugin-os = "2"
boxcar = "0.2.7"
[dependencies.tauri] [dependencies.tauri]
version = "2.1.1" version = "2.1.1"

View File

@ -9,6 +9,8 @@ use log::{debug, error, info};
use rayon::ThreadPoolBuilder; use rayon::ThreadPoolBuilder;
use serde::ser::{Error, SerializeMap}; use serde::ser::{Error, SerializeMap};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::borrow::BorrowMut;
use std::collections::VecDeque;
use std::fmt::{Display, Formatter}; use std::fmt::{Display, Formatter};
use std::fs::{create_dir_all, File}; use std::fs::{create_dir_all, File};
use std::io; use std::io;
@ -32,7 +34,7 @@ pub struct GameDownloadAgent {
pub version: String, pub version: String,
pub control_flag: DownloadThreadControl, pub control_flag: DownloadThreadControl,
contexts: Vec<DropDownloadContext>, contexts: Vec<DropDownloadContext>,
completed_contexts: Mutex<Vec<usize>>, completed_contexts: VecDeque<usize>,
pub manifest: Mutex<Option<DropManifest>>, pub manifest: Mutex<Option<DropManifest>>,
pub progress: Arc<ProgressObject>, pub progress: Arc<ProgressObject>,
sender: Sender<DownloadManagerSignal>, sender: Sender<DownloadManagerSignal>,
@ -101,7 +103,7 @@ impl GameDownloadAgent {
control_flag, control_flag,
manifest: Mutex::new(None), manifest: Mutex::new(None),
contexts: Vec::new(), contexts: Vec::new(),
completed_contexts: Mutex::new(Vec::new()), completed_contexts: VecDeque::new(),
progress: Arc::new(ProgressObject::new(0, 0, sender.clone())), progress: Arc::new(ProgressObject::new(0, 0, sender.clone())),
sender, sender,
stored_manifest, stored_manifest,
@ -219,7 +221,9 @@ impl GameDownloadAgent {
let base_path = Path::new(&self.stored_manifest.base_path); let base_path = Path::new(&self.stored_manifest.base_path);
create_dir_all(base_path).unwrap(); create_dir_all(base_path).unwrap();
*self.completed_contexts.lock().unwrap() = self.stored_manifest.get_completed_contexts(); self.completed_contexts.clear();
self.completed_contexts
.extend(self.stored_manifest.get_completed_contexts());
for (raw_path, chunk) in manifest { for (raw_path, chunk) in manifest {
let path = base_path.join(Path::new(&raw_path)); let path = base_path.join(Path::new(&raw_path));
@ -255,7 +259,7 @@ impl GameDownloadAgent {
Ok(()) Ok(())
} }
pub fn run(&self) -> Result<(), ()> { pub fn run(&mut self) -> Result<(), ()> {
info!("downloading game: {}", self.id); info!("downloading game: {}", self.id);
const DOWNLOAD_MAX_THREADS: usize = 1; const DOWNLOAD_MAX_THREADS: usize = 1;
@ -264,58 +268,57 @@ impl GameDownloadAgent {
.build() .build()
.unwrap(); .unwrap();
let completed_indexes = Arc::new(Mutex::new(Vec::new())); let completed_indexes = Arc::new(boxcar::Vec::new());
let completed_indexes_loop_arc = completed_indexes.clone(); let completed_indexes_loop_arc = completed_indexes.clone();
pool.scope(move |scope| { pool.scope(|scope| {
let completed_lock = self.completed_contexts.lock().unwrap();
let count = self.contexts.len();
for (index, context) in self.contexts.iter().enumerate() { for (index, context) in self.contexts.iter().enumerate() {
let completed_indexes = completed_indexes_loop_arc.clone();
let progress = self.progress.get(index); // Clone arcs let progress = self.progress.get(index); // Clone arcs
let progress_handle = ProgressHandle::new(progress, self.progress.clone()); let progress_handle = ProgressHandle::new(progress, self.progress.clone());
// If we've done this one already, skip it // If we've done this one already, skip it
if completed_lock.contains(&index) { if self.completed_contexts.contains(&index) {
progress_handle.add(context.length); progress_handle.add(context.length);
continue; continue;
} }
let context = context.clone(); let context = context.clone();
let control_flag = self.control_flag.clone(); // Clone arcs let control_flag = self.control_flag.clone(); // Clone arcs
let completed_indexes_ref = completed_indexes_loop_arc.clone();
let sender = self.sender.clone();
scope.spawn(move |_| { scope.spawn(move |_| {
match download_game_chunk(context.clone(), control_flag, progress_handle) { match download_game_chunk(context.clone(), control_flag, progress_handle) {
Ok(res) => { Ok(res) => {
if res { if res {
let mut lock = completed_indexes_ref.lock().unwrap(); completed_indexes.push(index);
lock.push(index);
} }
} }
Err(e) => { Err(e) => {
error!("{}", e); error!("{}", e);
self.sender.send(DownloadManagerSignal::Error(e)).unwrap(); sender.send(DownloadManagerSignal::Error(e)).unwrap();
} }
} }
}); });
} }
}); });
let newly_completed = completed_indexes.to_owned();
let completed_lock_len = { let completed_lock_len = {
let mut completed_lock = self.completed_contexts.lock().unwrap(); for (item, item_ref) in newly_completed.iter() {
let newly_completed_lock = completed_indexes.lock().unwrap(); self.completed_contexts.push_front(item);
}
completed_lock.extend(newly_completed_lock.iter()); self.completed_contexts.len()
completed_lock.len()
}; };
// If we're not out of contexts, we're not done, so we don't fire completed // If we're not out of contexts, we're not done, so we don't fire completed
if completed_lock_len != self.contexts.len() { if completed_lock_len != self.contexts.len() {
info!("da for {} exited without completing", self.id.clone()); info!("da for {} exited without completing", self.id.clone());
self.stored_manifest self.stored_manifest
.set_completed_contexts(&self.completed_contexts); .set_completed_contexts(&self.completed_contexts.clone().into());
info!("Setting completed contexts"); info!("Setting completed contexts");
self.stored_manifest.write(); self.stored_manifest.write();
info!("Wrote completed contexts"); info!("Wrote completed contexts");

View File

@ -73,8 +73,8 @@ impl StoredManifest {
Err(e) => error!("{}", e), Err(e) => error!("{}", e),
}; };
} }
pub fn set_completed_contexts(&self, completed_contexts: &Mutex<Vec<usize>>) { pub fn set_completed_contexts(&self, completed_contexts: &Vec<usize>) {
*self.completed_contexts.lock().unwrap() = completed_contexts.lock().unwrap().clone(); *self.completed_contexts.lock().unwrap() = completed_contexts.clone();
} }
pub fn get_completed_contexts(&self) -> Vec<usize> { pub fn get_completed_contexts(&self) -> Vec<usize> {
self.completed_contexts.lock().unwrap().clone() self.completed_contexts.lock().unwrap().clone()