feat(downloads): Separated chunk updates into individual counters

Also added a From<bool> for DownloadThreadControlFlag because I accidentally was calling the wrong one before and had meant to add it anyway

Signed-off-by: quexeky <git@quexeky.dev>
This commit is contained in:
quexeky
2024-11-11 18:07:45 +11:00
parent 5e05e6873d
commit 7d3c6011df
7 changed files with 68 additions and 36 deletions

View File

@ -2,7 +2,6 @@ use crate::auth::generate_authorization_header;
use crate::db::DatabaseImpls;
use crate::downloads::manifest::{DropDownloadContext, DropManifest};
use crate::remote::RemoteAccessError;
use crate::settings::DOWNLOAD_MAX_THREADS;
use crate::DB;
use log::info;
use rayon::ThreadPoolBuilder;
@ -10,7 +9,7 @@ use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
use std::fs::{create_dir_all, File};
use std::path::Path;
use std::sync::atomic::{AtomicU64};
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, Mutex};
use urlencoding::encode;
@ -19,6 +18,7 @@ use rustix::fs::{fallocate, FallocateFlags};
use super::download_logic::download_game_chunk;
use super::download_thread_control_flag::{DownloadThreadControl, DownloadThreadControlFlag};
use super::progress_object::ProgressObject;
pub struct GameDownloadAgent {
pub id: String,
@ -49,11 +49,6 @@ impl Display for GameDownloadError {
}
}
pub struct ProgressObject {
pub max: u64,
pub current: Arc<AtomicU64>,
}
impl GameDownloadAgent {
pub fn new(id: String, version: String, target_download_dir: usize) -> Self {
// Don't run by default
@ -65,10 +60,7 @@ impl GameDownloadAgent {
manifest: Mutex::new(None),
target_download_dir,
contexts: Mutex::new(Vec::new()),
progress: ProgressObject {
max: 0,
current: Arc::new(AtomicU64::new(0)),
},
progress: ProgressObject::new(0, 0),
}
}
@ -76,8 +68,10 @@ impl GameDownloadAgent {
// Requires mutable self
pub fn setup_download(&mut self) -> Result<(), GameDownloadError> {
self.ensure_manifest_exists()?;
info!("Ensured manifest exists");
self.generate_contexts()?;
info!("Generated contexts");
self.control_flag.set(DownloadThreadControlFlag::Go);
@ -140,7 +134,10 @@ impl GameDownloadAgent {
return chunk.lengths.iter().sum::<usize>();
})
.sum::<usize>();
self.progress.max = length.try_into().unwrap();
let chunk_count = manifest_download.iter().map(|(_, chunk)| {
chunk.lengths.len()
}).sum();
self.progress = ProgressObject::new(length.try_into().unwrap(), chunk_count);
if let Ok(mut manifest) = self.manifest.lock() {
*manifest = Some(manifest_download);
@ -204,6 +201,8 @@ impl GameDownloadAgent {
}
pub fn run(&self) {
const DOWNLOAD_MAX_THREADS: usize = 4;
let pool = ThreadPoolBuilder::new()
.num_threads(DOWNLOAD_MAX_THREADS)
.build()
@ -212,16 +211,16 @@ impl GameDownloadAgent {
pool.scope(move |scope| {
let contexts = self.contexts.lock().unwrap();
for context in contexts.iter() {
for (index, context) in contexts.iter().enumerate() {
let context = context.clone();
let control_flag = self.control_flag.clone(); // Clone arcs
let progress = self.progress.current.clone(); // Clone arcs
info!(
"starting download for file {} {}",
context.file_name, context.index
);
let progress = self.progress.get(index); // Clone arcs
scope.spawn(move |_| {
info!(
"starting download for file {} {}",
context.file_name, context.index
);
download_game_chunk(context, control_flag, progress).unwrap();
});
}

View File

@ -1,4 +1,4 @@
use std::sync::{Arc, Mutex};
use std::sync::{atomic::Ordering, Arc, Mutex};
use log::info;
use rayon::spawn;
@ -39,14 +39,8 @@ pub fn get_game_download_progress(
let da = use_download_agent(state, game_id)?;
let progress = &da.progress;
let current: f64 = progress
.current
.fetch_add(0, std::sync::atomic::Ordering::Relaxed) as f64;
let max = progress.max as f64;
let current_progress = current / max;
Ok(current_progress)
Ok(progress.get_progress())
}
fn use_download_agent(

View File

@ -3,11 +3,12 @@ use crate::db::DatabaseImpls;
use crate::downloads::manifest::DropDownloadContext;
use crate::remote::RemoteAccessError;
use crate::DB;
use log::info;
use md5::{Context, Digest};
use reqwest::blocking::Response;
use std::io::Read;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::AtomicUsize;
use std::{
fs::{File, OpenOptions},
io::{self, BufWriter, ErrorKind, Seek, SeekFrom, Write},
@ -17,7 +18,7 @@ use std::{
use urlencoding::encode;
use super::download_agent::GameDownloadError;
use super::download_thread_control_flag::DownloadThreadControl;
use super::download_thread_control_flag::{DownloadThreadControl, DownloadThreadControlFlag};
pub struct DropWriter<W: Write> {
hasher: Context,
@ -64,7 +65,7 @@ pub struct DropDownloadPipeline<R: Read, W: Write> {
pub source: R,
pub destination: DropWriter<W>,
pub control_flag: DownloadThreadControl,
pub progress: Arc<AtomicU64>,
pub progress: Arc<AtomicUsize>,
pub size: usize,
}
impl DropDownloadPipeline<Response, File> {
@ -72,7 +73,7 @@ impl DropDownloadPipeline<Response, File> {
source: Response,
destination: DropWriter<File>,
control_flag: DownloadThreadControl,
progress: Arc<AtomicU64>,
progress: Arc<AtomicUsize>,
size: usize,
) -> Self {
return Self {
@ -91,7 +92,7 @@ impl DropDownloadPipeline<Response, File> {
let mut current_size = 0;
loop {
if self.control_flag.get() == false {
if self.control_flag.get() == DownloadThreadControlFlag::Stop {
return Ok(false);
}
@ -121,10 +122,10 @@ impl DropDownloadPipeline<Response, File> {
pub fn download_game_chunk(
ctx: DropDownloadContext,
control_flag: DownloadThreadControl,
progress: Arc<AtomicU64>,
progress: Arc<AtomicUsize>,
) -> Result<bool, GameDownloadError> {
// If we're paused
if control_flag.get() {
if control_flag.get() == DownloadThreadControlFlag::Stop {
return Ok(false);
}

View File

@ -3,6 +3,7 @@ use std::sync::{
Arc,
};
#[derive(PartialEq, Eq, PartialOrd, Ord)]
pub enum DownloadThreadControlFlag {
Stop,
Go,
@ -15,6 +16,14 @@ impl From<DownloadThreadControlFlag> for bool {
}
}
}
impl From<bool> for DownloadThreadControlFlag {
fn from(value: bool) -> Self {
match value {
true => DownloadThreadControlFlag::Go,
false => DownloadThreadControlFlag::Stop,
}
}
}
#[derive(Clone)]
@ -28,8 +37,8 @@ impl DownloadThreadControl {
inner: Arc::new(AtomicBool::new(flag.into())),
}
}
pub fn get(&self) -> bool {
self.inner.load(Ordering::Relaxed)
pub fn get(&self) -> DownloadThreadControlFlag {
self.inner.load(Ordering::Relaxed).into()
}
pub fn set(&self, flag: DownloadThreadControlFlag) {
self.inner.store(flag.into(), Ordering::Relaxed);

View File

@ -3,3 +3,4 @@ pub mod download_commands;
mod download_logic;
mod download_thread_control_flag;
mod manifest;
mod progress_object;

View File

@ -0,0 +1,29 @@
use std::sync::{atomic::{AtomicUsize, Ordering}, Arc};
#[derive(Clone)]
pub struct ProgressObject {
max: usize,
progress_instances: Arc<Vec<Arc<AtomicUsize>>>,
}
impl ProgressObject {
pub fn new(max: usize, length: usize) -> Self {
let arr = (0..length).map(|_| { Arc::new(AtomicUsize::new(0)) }).collect();
Self {
max,
progress_instances: Arc::new(arr)
}
}
pub fn sum(&self) -> usize {
self.progress_instances.iter().map(|instance| {
instance.load(Ordering::Relaxed)
}).sum()
}
pub fn get_progress(&self) -> f64 {
self.sum() as f64 / self.max as f64
}
pub fn get(&self, index: usize) -> Arc<AtomicUsize> {
self.progress_instances[index].clone()
}
}

View File

@ -1 +0,0 @@
pub const DOWNLOAD_MAX_THREADS: usize = 4;