Some progress on thread terminations

This commit is contained in:
quexeky
2024-10-28 22:06:44 +11:00
parent 270bc8b06b
commit 99beca4dbe
4 changed files with 40 additions and 21 deletions

View File

@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize};
use urlencoding::encode; use urlencoding::encode;
use std::fs::{create_dir_all, File}; use std::fs::{create_dir_all, File};
use std::path::Path; use std::path::Path;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
pub struct GameDownloadAgent { pub struct GameDownloadAgent {
@ -20,6 +20,7 @@ pub struct GameDownloadAgent {
contexts: Mutex<Vec<DropDownloadContext>>, contexts: Mutex<Vec<DropDownloadContext>>,
progress: ProgressChecker<DropDownloadContext>, progress: ProgressChecker<DropDownloadContext>,
pub manifest: Mutex<Option<DropManifest>>, pub manifest: Mutex<Option<DropManifest>>,
pub callback: Arc<AtomicBool>
} }
#[derive(Serialize, Deserialize, Clone, Eq, PartialEq)] #[derive(Serialize, Deserialize, Clone, Eq, PartialEq)]
pub enum GameDownloadState { pub enum GameDownloadState {
@ -47,14 +48,17 @@ pub enum SystemError {
impl GameDownloadAgent { impl GameDownloadAgent {
pub fn new(id: String, version: String) -> Self { pub fn new(id: String, version: String) -> Self {
let callback = Arc::new(AtomicBool::new(false));
Self { Self {
id, id,
version, version,
state: Mutex::from(GameDownloadState::Uninitialised), state: Mutex::from(GameDownloadState::Uninitialised),
manifest: Mutex::new(None), manifest: Mutex::new(None),
callback: callback.clone(),
progress: ProgressChecker::new( progress: ProgressChecker::new(
Box::new(download_logic::download_game_chunk), Box::new(download_logic::download_game_chunk),
Arc::new(AtomicUsize::new(0)), Arc::new(AtomicUsize::new(0)),
callback
), ),
contexts: Mutex::new(Vec::new()), contexts: Mutex::new(Vec::new()),
} }

View File

@ -5,18 +5,20 @@ use crate::DB;
use gxhash::{gxhash128, GxHasher}; use gxhash::{gxhash128, GxHasher};
use log::info; use log::info;
use md5::{Context, Digest}; use md5::{Context, Digest};
use std::{fs::{File, OpenOptions}, hash::Hasher, io::{self, Seek, SeekFrom, Write}, path::PathBuf}; use std::{fs::{File, OpenOptions}, hash::Hasher, io::{self, Seek, SeekFrom, Write}, path::PathBuf, sync::{atomic::{AtomicBool, Ordering}, Arc}};
use urlencoding::encode; use urlencoding::encode;
pub struct FileWriter { pub struct DropFileWriter {
file: File, file: File,
hasher: Context, hasher: Context,
callback: Arc<AtomicBool>
} }
impl FileWriter { impl DropFileWriter {
fn new(path: PathBuf) -> Self { fn new(path: PathBuf, callback: Arc<AtomicBool>) -> Self {
Self { Self {
file: OpenOptions::new().write(true).open(path).unwrap(), file: OpenOptions::new().write(true).open(path).unwrap(),
hasher: Context::new(), hasher: Context::new(),
callback
} }
} }
fn finish(mut self) -> io::Result<Digest> { fn finish(mut self) -> io::Result<Digest> {
@ -24,8 +26,11 @@ impl FileWriter {
Ok(self.hasher.compute()) Ok(self.hasher.compute())
} }
} }
impl Write for FileWriter { impl Write for DropFileWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
if self.callback.load(Ordering::Acquire) {
}
self.hasher.write_all(buf).unwrap(); self.hasher.write_all(buf).unwrap();
self.file.write(buf) self.file.write(buf)
} }
@ -35,12 +40,15 @@ impl Write for FileWriter {
self.file.flush() self.file.flush()
} }
} }
impl Seek for FileWriter { impl Seek for DropFileWriter {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> { fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
self.file.seek(pos) self.file.seek(pos)
} }
} }
pub fn download_game_chunk(ctx: DropDownloadContext) { pub fn download_game_chunk(ctx: DropDownloadContext, callback: Arc<AtomicBool>) {
if callback.load(Ordering::Acquire) {
return;
}
let base_url = DB.fetch_base_url(); let base_url = DB.fetch_base_url();
let client = reqwest::blocking::Client::new(); let client = reqwest::blocking::Client::new();
@ -63,7 +71,7 @@ pub fn download_game_chunk(ctx: DropDownloadContext) {
.send() .send()
.unwrap(); .unwrap();
let mut file: FileWriter = FileWriter::new(ctx.path); let mut file: DropFileWriter = DropFileWriter::new(ctx.path);
if ctx.offset != 0 { if ctx.offset != 0 {
file file

View File

@ -1,7 +1,7 @@
use rayon::ThreadPoolBuilder; use rayon::ThreadPoolBuilder;
use uuid::timestamp::context; use uuid::timestamp::context;
use std::os::unix::thread; use std::os::unix::thread;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
pub struct ProgressChecker<T> pub struct ProgressChecker<T>
@ -9,7 +9,8 @@ where
T: 'static + Send + Sync, T: 'static + Send + Sync,
{ {
counter: Arc<AtomicUsize>, counter: Arc<AtomicUsize>,
f: Arc<Box<dyn Fn(T) + Send + Sync + 'static>>, f: Arc<Box<dyn Fn(T, Arc<AtomicBool>) + Send + Sync + 'static>>,
callback: Arc<AtomicBool>
} }
impl<T> ProgressChecker<T> impl<T> ProgressChecker<T>
@ -17,24 +18,26 @@ where
T: Send + Sync, T: Send + Sync,
{ {
pub fn new( pub fn new(
f: Box<dyn Fn(T) + Send + Sync + 'static>, f: Box<dyn Fn(T, Arc<AtomicBool>) + Send + Sync + 'static>,
counter_reference: Arc<AtomicUsize>, counter_reference: Arc<AtomicUsize>,
callback: Arc<AtomicBool>
) -> Self { ) -> Self {
Self { Self {
f: f.into(), f: f.into(),
counter: counter_reference, counter: counter_reference,
callback
} }
} }
pub async fn run_contexts_sequentially_async(&self, contexts: Vec<T>) { pub async fn run_contexts_sequentially_async(&self, contexts: Vec<T>) {
for context in contexts { for context in contexts {
(self.f)(context); (self.f)(context, self.callback.clone());
self.counter.fetch_add(1, Ordering::Relaxed); self.counter.fetch_add(1, Ordering::Release);
} }
} }
pub fn run_contexts_sequentially(&self, contexts: Vec<T>) { pub fn run_contexts_sequentially(&self, contexts: Vec<T>) {
for context in contexts { for context in contexts {
(self.f)(context); (self.f)(context, self.callback.clone());
self.counter.fetch_add(1, Ordering::Relaxed); self.counter.fetch_add(1, Ordering::Release);
} }
} }
pub fn run_contexts_parallel_background(&self, contexts: Vec<T>, max_threads: usize) { pub fn run_contexts_parallel_background(&self, contexts: Vec<T>, max_threads: usize) {
@ -46,8 +49,9 @@ where
.unwrap(); .unwrap();
for context in contexts { for context in contexts {
let callback = self.callback.clone();
let f = self.f.clone(); let f = self.f.clone();
threads.spawn(move || f(context)); threads.spawn(move || f(context, callback));
} }
} }
pub async fn run_context_parallel(&self, contexts: Vec<T>, max_threads: usize) { pub async fn run_context_parallel(&self, contexts: Vec<T>, max_threads: usize) {
@ -58,8 +62,9 @@ where
threads.scope(|s| { threads.scope(|s| {
for context in contexts { for context in contexts {
let callback = self.callback.clone();
let f = self.f.clone(); let f = self.f.clone();
s.spawn(move |_| f(context)); s.spawn(move |_| f(context, callback));
} }
}); });

View File

@ -1,18 +1,20 @@
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::{AtomicBool, AtomicUsize};
use crate::downloads::progress::ProgressChecker; use crate::downloads::progress::ProgressChecker;
#[test] #[test]
fn test_progress_sequentially() { fn test_progress_sequentially() {
let counter = Arc::new(AtomicUsize::new(0)); let counter = Arc::new(AtomicUsize::new(0));
let p = ProgressChecker::new(Box::new(test_fn), counter.clone()); let callback = Arc::new(AtomicBool::new(false));
let p = ProgressChecker::new(Box::new(test_fn), counter.clone(), callback);
p.run_contexts_sequentially((1..100).collect()); p.run_contexts_sequentially((1..100).collect());
println!("Progress: {}", p.get_progress_percentage(100)); println!("Progress: {}", p.get_progress_percentage(100));
} }
#[test] #[test]
fn test_progress_parallel() { fn test_progress_parallel() {
let counter = Arc::new(AtomicUsize::new(0)); let counter = Arc::new(AtomicUsize::new(0));
let p = ProgressChecker::new(Box::new(test_fn), counter.clone()); let callback = Arc::new(AtomicBool::new(false));
let p = ProgressChecker::new(Box::new(test_fn), counter.clone(), callback);
p.run_contexts_parallel_background((1..100).collect(), 10); p.run_contexts_parallel_background((1..100).collect(), 10);
} }