feat(downloads): Added DownloadThreadControl struct

Signed-off-by: quexeky <git@quexeky.dev>
This commit is contained in:
quexeky
2024-11-11 10:05:49 +11:00
parent f25bfed336
commit 5e05e6873d
4 changed files with 53 additions and 25 deletions

View File

@ -10,30 +10,26 @@ use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
use std::fs::{create_dir_all, File};
use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::sync::atomic::{AtomicU64};
use std::sync::{Arc, Mutex};
use urlencoding::encode;
#[cfg(target_os = "linux")]
use rustix::fs::{fallocate, FallocateFlags};
use super::download_logic::download_game_chunk;
use super::download_thread_control_flag::{DownloadThreadControl, DownloadThreadControlFlag};
pub struct GameDownloadAgent {
pub id: String,
pub version: String,
pub control_flag: Arc<DownloadThreadControlFlag>,
pub control_flag: DownloadThreadControl,
pub target_download_dir: usize,
contexts: Mutex<Vec<DropDownloadContext>>,
pub manifest: Mutex<Option<DropManifest>>,
pub progress: ProgressObject,
}
/// Faster alternative to a RwLock Enum.
/// true = Go
/// false = Stop
pub type DownloadThreadControlFlag = AtomicBool;
#[derive(Debug)]
pub enum GameDownloadError {
CommunicationError(RemoteAccessError),
@ -61,7 +57,7 @@ pub struct ProgressObject {
impl GameDownloadAgent {
pub fn new(id: String, version: String, target_download_dir: usize) -> Self {
// Don't run by default
let status = Arc::new(DownloadThreadControlFlag::new(false));
let status = DownloadThreadControl::new(DownloadThreadControlFlag::Stop);
Self {
id,
version,
@ -75,12 +71,6 @@ impl GameDownloadAgent {
},
}
}
pub fn set_control_flag(&self, flag: bool) {
self.control_flag.store(flag, Ordering::Relaxed);
}
pub fn get_control_flag(&self) -> bool {
self.control_flag.load(Ordering::Relaxed)
}
// Blocking
// Requires mutable self
@ -89,7 +79,7 @@ impl GameDownloadAgent {
self.generate_contexts()?;
self.set_control_flag(true);
self.control_flag.set(DownloadThreadControlFlag::Go);
Ok(())
}

View File

@ -3,12 +3,11 @@ use crate::db::DatabaseImpls;
use crate::downloads::manifest::DropDownloadContext;
use crate::remote::RemoteAccessError;
use crate::DB;
use log::{error, info};
use md5::{Context, Digest};
use reqwest::blocking::Response;
use std::io::Read;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::AtomicU64;
use std::{
fs::{File, OpenOptions},
io::{self, BufWriter, ErrorKind, Seek, SeekFrom, Write},
@ -17,7 +16,8 @@ use std::{
};
use urlencoding::encode;
use super::download_agent::{DownloadThreadControlFlag, GameDownloadError};
use super::download_agent::GameDownloadError;
use super::download_thread_control_flag::DownloadThreadControl;
pub struct DropWriter<W: Write> {
hasher: Context,
@ -63,7 +63,7 @@ impl Seek for DropWriter<File> {
pub struct DropDownloadPipeline<R: Read, W: Write> {
pub source: R,
pub destination: DropWriter<W>,
pub control_flag: Arc<DownloadThreadControlFlag>,
pub control_flag: DownloadThreadControl,
pub progress: Arc<AtomicU64>,
pub size: usize,
}
@ -71,7 +71,7 @@ impl DropDownloadPipeline<Response, File> {
fn new(
source: Response,
destination: DropWriter<File>,
control_flag: Arc<DownloadThreadControlFlag>,
control_flag: DownloadThreadControl,
progress: Arc<AtomicU64>,
size: usize,
) -> Self {
@ -91,14 +91,14 @@ impl DropDownloadPipeline<Response, File> {
let mut current_size = 0;
loop {
if self.control_flag.load(Ordering::Relaxed) == false {
if self.control_flag.get() == false {
return Ok(false);
}
let bytes_read = self.source.read(&mut copy_buf)?;
current_size += bytes_read;
buf_writer.write(&copy_buf[0..bytes_read])?;
buf_writer.write_all(&copy_buf[0..bytes_read])?;
self.progress.fetch_add(
bytes_read.try_into().unwrap(),
std::sync::atomic::Ordering::Relaxed,
@ -120,11 +120,11 @@ impl DropDownloadPipeline<Response, File> {
pub fn download_game_chunk(
ctx: DropDownloadContext,
control_flag: Arc<DownloadThreadControlFlag>,
control_flag: DownloadThreadControl,
progress: Arc<AtomicU64>,
) -> Result<bool, GameDownloadError> {
// If we're paused
if control_flag.load(Ordering::Relaxed) {
if control_flag.get() {
return Ok(false);
}

View File

@ -0,0 +1,37 @@
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
pub enum DownloadThreadControlFlag {
Stop,
Go,
}
impl From<DownloadThreadControlFlag> for bool {
fn from(value: DownloadThreadControlFlag) -> Self {
match value {
DownloadThreadControlFlag::Stop => false,
DownloadThreadControlFlag::Go => true,
}
}
}
#[derive(Clone)]
pub struct DownloadThreadControl {
inner: Arc<AtomicBool>,
}
impl DownloadThreadControl {
pub fn new(flag: DownloadThreadControlFlag) -> Self {
Self {
inner: Arc::new(AtomicBool::new(flag.into())),
}
}
pub fn get(&self) -> bool {
self.inner.load(Ordering::Relaxed)
}
pub fn set(&self, flag: DownloadThreadControlFlag) {
self.inner.store(flag.into(), Ordering::Relaxed);
}
}

View File

@ -1,4 +1,5 @@
pub mod download_agent;
pub mod download_commands;
mod download_logic;
mod download_thread_control_flag;
mod manifest;