use std::fs::{Permissions, set_permissions}; use std::io::Read; #[cfg(unix)] use std::os::unix::fs::PermissionsExt; use std::sync::Arc; use std::time::Instant; use std::{ fs::{File, OpenOptions}, io::{self, BufWriter, Seek, SeekFrom, Write}, path::PathBuf, }; use download_manager::error::ApplicationDownloadError; use download_manager::util::download_thread_control_flag::{ DownloadThreadControl, DownloadThreadControlFlag, }; use download_manager::util::progress_object::ProgressHandle; use log::{debug, info, warn}; use md5::{Context, Digest}; use remote::auth::generate_authorization_header; use remote::error::{DropServerError, RemoteAccessError}; use remote::requests::generate_url; use remote::utils::DROP_CLIENT_SYNC; use reqwest::blocking::Response; use crate::downloads::manifest::{ChunkBody, DownloadBucket, DownloadContext, DownloadDrop}; static MAX_PACKET_LENGTH: usize = 4096 * 4; static BUMP_SIZE: usize = 4096 * 16; pub struct DropWriter { hasher: Context, destination: BufWriter, progress: ProgressHandle, } impl DropWriter { fn new(path: PathBuf, progress: ProgressHandle) -> Result { let destination = OpenOptions::new() .write(true) .create(true) .truncate(false) .open(&path)?; Ok(Self { destination: BufWriter::with_capacity(1024 * 1024, destination), hasher: Context::new(), progress, }) } fn finish(mut self) -> io::Result { self.flush()?; Ok(self.hasher.finalize()) } } // Write automatically pushes to file and hasher impl Write for DropWriter { fn write(&mut self, buf: &[u8]) -> io::Result { self.hasher .write_all(buf) .map_err(|e| io::Error::other(format!("Unable to write to hasher: {e}")))?; let bytes_written = self.destination.write(buf)?; self.progress.add(bytes_written); Ok(bytes_written) } fn flush(&mut self) -> io::Result<()> { self.hasher.flush()?; self.destination.flush() } } // Seek moves around destination output impl Seek for DropWriter { fn seek(&mut self, pos: SeekFrom) -> io::Result { self.destination.seek(pos) } } pub struct DropDownloadPipeline<'a, R: Read, W: Write> { pub source: R, pub drops: Vec, pub destination: Vec>, pub control_flag: &'a DownloadThreadControl, #[allow(dead_code)] progress: ProgressHandle, } impl<'a> DropDownloadPipeline<'a, Response, File> { fn new( source: Response, drops: Vec, control_flag: &'a DownloadThreadControl, progress: ProgressHandle, ) -> Result { Ok(Self { source, destination: drops .iter() .map(|drop| DropWriter::new(drop.path.clone(), progress.clone())) .try_collect()?, drops, control_flag, progress, }) } fn copy(&mut self) -> Result { let mut copy_buffer = [0u8; MAX_PACKET_LENGTH]; for (index, drop) in self.drops.iter().enumerate() { let destination = self .destination .get_mut(index) .ok_or(io::Error::other("no destination"))?; let mut remaining = drop.length; if drop.start != 0 { destination.seek(SeekFrom::Start(drop.start as u64))?; } let mut last_bump = 0; loop { let size = MAX_PACKET_LENGTH.min(remaining); let size = self .source .read(&mut copy_buffer[0..size]) .inspect_err(|_| { info!("got error from {}", drop.filename); })?; remaining -= size; last_bump += size; destination.write_all(©_buffer[0..size])?; if last_bump > BUMP_SIZE { last_bump -= BUMP_SIZE; if self.control_flag.get() == DownloadThreadControlFlag::Stop { return Ok(false); } } if remaining == 0 { break; }; } if self.control_flag.get() == DownloadThreadControlFlag::Stop { return Ok(false); } } Ok(true) } #[allow(dead_code)] fn debug_skip_checksum(self) { self.destination .into_iter() .for_each(|mut e| e.flush().unwrap()); } fn finish(self) -> Result, io::Error> { let checksums = self .destination .into_iter() .map(|e| e.finish()) .try_collect()?; Ok(checksums) } } pub fn download_game_bucket( bucket: &DownloadBucket, ctx: &DownloadContext, control_flag: &DownloadThreadControl, progress: ProgressHandle, ) -> Result { // If we're paused if control_flag.get() == DownloadThreadControlFlag::Stop { progress.set(0); return Ok(false); } let start = Instant::now(); let header = generate_authorization_header(); let url = generate_url(&["/api/v2/client/chunk"], &[]) .map_err(ApplicationDownloadError::Communication)?; let body = ChunkBody::create(ctx, &bucket.drops); let response = DROP_CLIENT_SYNC .post(url) .json(&body) .header("Authorization", header) .send() .map_err(|e| ApplicationDownloadError::Communication(e.into()))?; if response.status() != 200 { info!("chunk request got status code: {}", response.status()); let raw_res = response.text().map_err(|e| { ApplicationDownloadError::Communication(RemoteAccessError::FetchError(e.into())) })?; info!("{raw_res}"); if let Ok(err) = serde_json::from_str::(&raw_res) { return Err(ApplicationDownloadError::Communication( RemoteAccessError::InvalidResponse(err), )); } return Err(ApplicationDownloadError::Communication( RemoteAccessError::UnparseableResponse(raw_res), )); } let lengths = response .headers() .get("Content-Lengths") .ok_or(ApplicationDownloadError::Communication( RemoteAccessError::UnparseableResponse("missing Content-Lengths header".to_owned()), ))? .to_str() .map_err(|e| { ApplicationDownloadError::Communication(RemoteAccessError::UnparseableResponse( e.to_string(), )) })?; for (i, raw_length) in lengths.split(",").enumerate() { let length = raw_length.parse::().unwrap_or(0); let Some(drop) = bucket.drops.get(i) else { warn!("invalid number of Content-Lengths recieved: {i}, {lengths}"); return Err(ApplicationDownloadError::DownloadError( RemoteAccessError::InvalidResponse(DropServerError { status_code: 400, status_message: format!( "invalid number of Content-Lengths recieved: {i}, {lengths}" ), }), )); }; if drop.length != length { warn!( "for {}, expected {}, got {} ({})", drop.filename, drop.length, raw_length, length ); return Err(ApplicationDownloadError::DownloadError( RemoteAccessError::InvalidResponse(DropServerError { status_code: 400, status_message: format!( "for {}, expected {}, got {} ({})", drop.filename, drop.length, raw_length, length ), }), )); } } let timestep = start.elapsed().as_millis(); debug!("took {}ms to start downloading", timestep); let mut pipeline = DropDownloadPipeline::new(response, bucket.drops.clone(), control_flag, progress) .map_err(|e| ApplicationDownloadError::IoError(Arc::new(e)))?; let completed = pipeline .copy() .map_err(|e| ApplicationDownloadError::IoError(Arc::new(e)))?; if !completed { return Ok(false); } // If we complete the file, set the permissions (if on Linux) #[cfg(unix)] { for drop in bucket.drops.iter() { let permissions = Permissions::from_mode(drop.permissions); set_permissions(drop.path.clone(), permissions) .map_err(|e| ApplicationDownloadError::IoError(Arc::new(e)))?; } } let checksums = pipeline .finish() .map_err(|e| ApplicationDownloadError::IoError(Arc::new(e)))?; for (index, drop) in bucket.drops.iter().enumerate() { let res = hex::encode(**checksums.get(index).unwrap()); if res != drop.checksum { warn!("context didn't match... doing nothing because we will validate later."); // return Ok(false); // return Err(ApplicationDownloadError::Checksum); } } Ok(true) }