mirror of
https://github.com/Drop-OSS/drop-app.git
synced 2025-11-14 00:31:33 +10:00
feat(downloads): reduce scope of download agent
due to a miscommunication, the scope of the download agent has grown too much. this commit reduces that scopes, and intends for a lot of the heavy lifting to be done by the soon-to-be-implemented download manager.
This commit is contained in:
@ -1,122 +1,136 @@
|
||||
use crate::auth::generate_authorization_header;
|
||||
use crate::db::DatabaseImpls;
|
||||
use crate::downloads::manifest::DropDownloadContext;
|
||||
use crate::remote::RemoteAccessError;
|
||||
use crate::DB;
|
||||
use crate::{auth::generate_authorization_header, GAME_PAUSE_CHECK_INTERVAL};
|
||||
use atomic_counter::{AtomicCounter, RelaxedCounter};
|
||||
use log::{error, info};
|
||||
use md5::{Context, Digest};
|
||||
use reqwest::blocking::Response;
|
||||
use serde::de::Error;
|
||||
|
||||
use std::io::Read;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::RwLock;
|
||||
use std::{
|
||||
fs::{File, OpenOptions},
|
||||
io::{self, BufWriter, Error, ErrorKind, Seek, SeekFrom, Write},
|
||||
io::{self, BufWriter, ErrorKind, Seek, SeekFrom, Write},
|
||||
path::PathBuf,
|
||||
sync::{Arc, RwLock},
|
||||
thread::sleep,
|
||||
sync::Arc,
|
||||
};
|
||||
use urlencoding::encode;
|
||||
|
||||
use super::download_agent::GameDownloadState;
|
||||
use super::download_agent::{DownloadThreadControlFlag, GameDownloadError};
|
||||
|
||||
pub struct DropFileWriter {
|
||||
file: File,
|
||||
pub struct DropWriter<W: Write> {
|
||||
hasher: Context,
|
||||
progress: Arc<RelaxedCounter>,
|
||||
status: Arc<RwLock<GameDownloadState>>,
|
||||
destination: W,
|
||||
}
|
||||
impl DropFileWriter {
|
||||
fn new(
|
||||
path: PathBuf,
|
||||
status: Arc<RwLock<GameDownloadState>>,
|
||||
progress: Arc<RelaxedCounter>,
|
||||
) -> Self {
|
||||
impl DropWriter<File> {
|
||||
fn new(path: PathBuf) -> Self {
|
||||
Self {
|
||||
file: OpenOptions::new().write(true).open(path).unwrap(),
|
||||
destination: OpenOptions::new().write(true).open(path).unwrap(),
|
||||
hasher: Context::new(),
|
||||
progress,
|
||||
status,
|
||||
}
|
||||
}
|
||||
|
||||
fn finish(mut self) -> io::Result<Digest> {
|
||||
self.flush().unwrap();
|
||||
Ok(self.hasher.compute())
|
||||
}
|
||||
|
||||
fn manage_state(&mut self) -> Option<Result<usize, Error>> {
|
||||
match self.status.read().unwrap().clone() {
|
||||
GameDownloadState::Uninitialised => todo!(),
|
||||
GameDownloadState::Queued => {
|
||||
return Some(Err(Error::new(
|
||||
ErrorKind::NotConnected,
|
||||
"Download has not yet been started",
|
||||
)))
|
||||
}
|
||||
GameDownloadState::Manifest => {
|
||||
return Some(Err(Error::new(
|
||||
ErrorKind::NotFound,
|
||||
"Manifest still not finished downloading",
|
||||
)))
|
||||
}
|
||||
GameDownloadState::Downloading => {}
|
||||
GameDownloadState::Finished => {
|
||||
return Some(Err(Error::new(
|
||||
ErrorKind::AlreadyExists,
|
||||
"Download already finished",
|
||||
)))
|
||||
}
|
||||
GameDownloadState::Stalled => {
|
||||
return Some(Err(Error::new(ErrorKind::Interrupted, "Download Stalled")))
|
||||
}
|
||||
GameDownloadState::Failed => {
|
||||
return Some(Err(Error::new(ErrorKind::BrokenPipe, "Download Failed")))
|
||||
}
|
||||
GameDownloadState::Cancelled => {
|
||||
return Some(Err(Error::new(
|
||||
ErrorKind::ConnectionAborted,
|
||||
"Interrupt command recieved",
|
||||
)));
|
||||
}
|
||||
GameDownloadState::Paused => {
|
||||
info!("Game download paused");
|
||||
sleep(GAME_PAUSE_CHECK_INTERVAL);
|
||||
}
|
||||
};
|
||||
None
|
||||
}
|
||||
}
|
||||
// TODO: Implement error handling
|
||||
impl Write for DropFileWriter {
|
||||
// Write automatically pushes to file and hasher
|
||||
impl Write for DropWriter<File> {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
// TODO: Tidy up these error messages / types because these ones don't really seem to fit
|
||||
if let Some(value) = self.manage_state() {
|
||||
return value;
|
||||
}
|
||||
let len = buf.len();
|
||||
self.progress.add(len);
|
||||
|
||||
//info!("Writing data to writer");
|
||||
self.hasher.write_all(buf).unwrap();
|
||||
self.file.write(buf)
|
||||
self.hasher.write_all(buf).map_err(|e| {
|
||||
io::Error::new(
|
||||
ErrorKind::Other,
|
||||
format!("Unable to write to hasher: {}", e),
|
||||
)
|
||||
})?;
|
||||
self.destination.write(buf)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
self.hasher.flush()?;
|
||||
self.file.flush()
|
||||
self.destination.flush()
|
||||
}
|
||||
}
|
||||
impl Seek for DropFileWriter {
|
||||
// Seek moves around destination output
|
||||
impl Seek for DropWriter<File> {
|
||||
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
|
||||
self.file.seek(pos)
|
||||
self.destination.seek(pos)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DropDownloadPipeline<R: Read, W: Write> {
|
||||
pub source: R,
|
||||
pub destination: DropWriter<W>,
|
||||
pub control_flag: Arc<RwLock<DownloadThreadControlFlag>>,
|
||||
pub progress: Arc<AtomicU64>,
|
||||
pub size: usize,
|
||||
}
|
||||
impl DropDownloadPipeline<Response, File> {
|
||||
fn new(
|
||||
source: Response,
|
||||
destination: DropWriter<File>,
|
||||
control_flag: Arc<RwLock<DownloadThreadControlFlag>>,
|
||||
progress: Arc<AtomicU64>,
|
||||
size: usize,
|
||||
) -> Self {
|
||||
return Self {
|
||||
source,
|
||||
destination,
|
||||
control_flag,
|
||||
progress,
|
||||
size,
|
||||
};
|
||||
}
|
||||
|
||||
fn copy(&mut self) -> Result<bool, io::Error> {
|
||||
let copy_buf_size = 512;
|
||||
let mut copy_buf = vec![0; copy_buf_size];
|
||||
let mut buf_writer = BufWriter::with_capacity(1024 * 1024, &mut self.destination);
|
||||
|
||||
let mut current_size = 0;
|
||||
loop {
|
||||
if *self.control_flag.read().unwrap() == DownloadThreadControlFlag::Stop {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let bytes_read = self.source.read(&mut copy_buf)?;
|
||||
current_size += bytes_read;
|
||||
|
||||
buf_writer.write(©_buf[0..bytes_read])?;
|
||||
self.progress.fetch_add(
|
||||
bytes_read.try_into().unwrap(),
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
);
|
||||
|
||||
if current_size == self.size {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
fn finish(self) -> Result<Digest, io::Error> {
|
||||
let checksum = self.destination.finish()?;
|
||||
return Ok(checksum);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn download_game_chunk(
|
||||
ctx: DropDownloadContext,
|
||||
status: Arc<RwLock<GameDownloadState>>,
|
||||
progress: Arc<RelaxedCounter>,
|
||||
) {
|
||||
if *status.read().unwrap() == GameDownloadState::Cancelled {
|
||||
info!("Callback stopped download at start");
|
||||
return;
|
||||
control_flag: Arc<RwLock<DownloadThreadControlFlag>>,
|
||||
progress: Arc<AtomicU64>,
|
||||
) -> Result<bool, GameDownloadError> {
|
||||
// If we're paused
|
||||
if *control_flag.read().unwrap() == DownloadThreadControlFlag::Stop {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let base_url = DB.fetch_base_url();
|
||||
|
||||
let client = reqwest::blocking::Client::new();
|
||||
@ -133,47 +147,48 @@ pub fn download_game_chunk(
|
||||
|
||||
let header = generate_authorization_header();
|
||||
|
||||
let mut response = match client.get(chunk_url).header("Authorization", header).send() {
|
||||
Ok(response) => response,
|
||||
Err(e) => {
|
||||
info!("{}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let response = client
|
||||
.get(chunk_url)
|
||||
.header("Authorization", header)
|
||||
.send()
|
||||
.map_err(|e| GameDownloadError::CommunicationError(RemoteAccessError::FetchError(e)))?;
|
||||
|
||||
let mut file: DropFileWriter = DropFileWriter::new(ctx.path, status, progress);
|
||||
let mut destination = DropWriter::new(ctx.path);
|
||||
|
||||
if ctx.offset != 0 {
|
||||
file.seek(SeekFrom::Start(ctx.offset))
|
||||
destination
|
||||
.seek(SeekFrom::Start(ctx.offset))
|
||||
.expect("Failed to seek to file offset");
|
||||
}
|
||||
|
||||
// Writing everything to disk directly is probably slightly faster in terms of disk
|
||||
// speed because it balances out the writes, but this is better than the performance
|
||||
// loss from constantly reading the callbacks
|
||||
|
||||
let mut writer = BufWriter::with_capacity(1024 * 1024, file);
|
||||
|
||||
match io::copy(&mut response, &mut writer) {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
info!("Copy errored with error {}", e)
|
||||
}
|
||||
let content_length = response.content_length();
|
||||
if content_length.is_none() {
|
||||
return Err(GameDownloadError::CommunicationError(
|
||||
RemoteAccessError::GenericErrror(
|
||||
"Invalid download endpoint, missing Content-Length header.".to_owned(),
|
||||
),
|
||||
));
|
||||
}
|
||||
writer.flush().unwrap();
|
||||
let file = match writer.into_inner() {
|
||||
Ok(file) => file,
|
||||
Err(_) => {
|
||||
error!("Failed to acquire writer from BufWriter");
|
||||
return;
|
||||
}
|
||||
|
||||
let mut pipeline = DropDownloadPipeline::new(
|
||||
response,
|
||||
destination,
|
||||
control_flag,
|
||||
progress,
|
||||
content_length.unwrap().try_into().unwrap(),
|
||||
);
|
||||
|
||||
let completed = pipeline.copy().unwrap();
|
||||
if !completed {
|
||||
return Ok(false);
|
||||
};
|
||||
|
||||
let res = hex::encode(file.finish().unwrap().0);
|
||||
let checksum = pipeline.finish().unwrap();
|
||||
|
||||
let res = hex::encode(checksum.0);
|
||||
if res != ctx.checksum {
|
||||
info!(
|
||||
"Checksum failed. Original: {}, Calculated: {} for {}",
|
||||
ctx.checksum, res, ctx.file_name
|
||||
);
|
||||
return Err(GameDownloadError::ChecksumError);
|
||||
}
|
||||
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user