fix: Downloads when resuming would truncate files which had not been finished

Signed-off-by: quexeky <git@quexeky.dev>
This commit is contained in:
quexeky
2025-06-12 10:23:01 +10:00
parent 47f64a3c68
commit 8c403eb8d7
3 changed files with 67 additions and 36 deletions

View File

@ -5,7 +5,9 @@ use crate::database::models::data::{
}; };
use crate::download_manager::download_manager::{DownloadManagerSignal, DownloadStatus}; use crate::download_manager::download_manager::{DownloadManagerSignal, DownloadStatus};
use crate::download_manager::downloadable::Downloadable; use crate::download_manager::downloadable::Downloadable;
use crate::download_manager::util::download_thread_control_flag::{DownloadThreadControl, DownloadThreadControlFlag}; use crate::download_manager::util::download_thread_control_flag::{
DownloadThreadControl, DownloadThreadControlFlag,
};
use crate::download_manager::util::progress_object::{ProgressHandle, ProgressObject}; use crate::download_manager::util::progress_object::{ProgressHandle, ProgressObject};
use crate::error::application_download_error::ApplicationDownloadError; use crate::error::application_download_error::ApplicationDownloadError;
use crate::error::remote_access_error::RemoteAccessError; use crate::error::remote_access_error::RemoteAccessError;
@ -16,7 +18,7 @@ use crate::DB;
use log::{debug, error, info}; use log::{debug, error, info};
use rayon::ThreadPoolBuilder; use rayon::ThreadPoolBuilder;
use slice_deque::SliceDeque; use slice_deque::SliceDeque;
use std::fs::{create_dir_all, File}; use std::fs::{create_dir_all, File, OpenOptions};
use std::path::Path; use std::path::Path;
use std::sync::mpsc::Sender; use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@ -202,7 +204,7 @@ impl GameDownloadAgent {
let container = path.parent().unwrap(); let container = path.parent().unwrap();
create_dir_all(container).unwrap(); create_dir_all(container).unwrap();
let file = File::create(path.clone()).unwrap(); let file = OpenOptions::new().read(true).write(true).create(true).open(path.clone()).unwrap();
let mut running_offset = 0; let mut running_offset = 0;
for (index, length) in chunk.lengths.iter().enumerate() { for (index, length) in chunk.lengths.iter().enumerate() {
@ -263,6 +265,8 @@ impl GameDownloadAgent {
continue; continue;
} }
debug!("Continuing download chunk {}", index);
let sender = self.sender.clone(); let sender = self.sender.clone();
let request = match make_request( let request = match make_request(
@ -290,10 +294,18 @@ impl GameDownloadAgent {
scope.spawn(move |_| { scope.spawn(move |_| {
match download_game_chunk(context, &self.control_flag, progress_handle, request) match download_game_chunk(context, &self.control_flag, progress_handle, request)
{ {
Ok(res) => { Ok(true) => {
if res { debug!(
completed_indexes.push(index); "Finished context #{} with checksum {}",
} index, context.checksum
);
completed_indexes.push(index);
}
Ok(false) => {
debug!(
"Didn't finish context #{} with checksum {}",
index, context.checksum
);
} }
Err(e) => { Err(e) => {
error!("{}", e); error!("{}", e);
@ -315,6 +327,10 @@ impl GameDownloadAgent {
completed_contexts_lock.len() completed_contexts_lock.len()
}; };
self.stored_manifest
.set_completed_contexts(self.completed_contexts.lock().unwrap().as_slice());
self.stored_manifest.write();
// If we're not out of contexts, we're not done, so we don't fire completed // If we're not out of contexts, we're not done, so we don't fire completed
if completed_lock_len != contexts.len() { if completed_lock_len != contexts.len() {
info!( info!(
@ -323,12 +339,8 @@ impl GameDownloadAgent {
completed_lock_len, completed_lock_len,
contexts.len(), contexts.len(),
); );
self.stored_manifest
.set_completed_contexts(self.completed_contexts.lock().unwrap().as_slice());
self.stored_manifest.write();
return Ok(false); return Ok(false);
} }
// We've completed // We've completed
self.sender self.sender
.send(DownloadManagerSignal::Completed(self.metadata())) .send(DownloadManagerSignal::Completed(self.metadata()))

View File

@ -90,6 +90,7 @@ impl<'a> DropDownloadPipeline<'a, Response, File> {
let mut current_size = 0; let mut current_size = 0;
loop { loop {
if self.control_flag.get() == DownloadThreadControlFlag::Stop { if self.control_flag.get() == DownloadThreadControlFlag::Stop {
buf_writer.flush()?;
return Ok(false); return Ok(false);
} }
@ -103,6 +104,7 @@ impl<'a> DropDownloadPipeline<'a, Response, File> {
break; break;
} }
} }
buf_writer.flush()?;
Ok(true) Ok(true)
} }
@ -119,6 +121,7 @@ pub fn download_game_chunk(
progress: ProgressHandle, progress: ProgressHandle,
request: RequestBuilder, request: RequestBuilder,
) -> Result<bool, ApplicationDownloadError> { ) -> Result<bool, ApplicationDownloadError> {
debug!("Starting download chunk {}, {}, {} #{}", ctx.file_name, ctx.index, ctx.offset, ctx.checksum);
// If we're paused // If we're paused
if control_flag.get() == DownloadThreadControlFlag::Stop { if control_flag.get() == DownloadThreadControlFlag::Stop {
progress.set(0); progress.set(0);
@ -152,12 +155,18 @@ pub fn download_game_chunk(
)); ));
} }
let length = content_length.unwrap().try_into().unwrap();
if length != ctx.length {
return Err(ApplicationDownloadError::DownloadError);
}
let mut pipeline = DropDownloadPipeline::new( let mut pipeline = DropDownloadPipeline::new(
response, response,
destination, destination,
control_flag, control_flag,
progress, progress,
content_length.unwrap().try_into().unwrap(), length,
); );
let completed = pipeline let completed = pipeline
@ -183,5 +192,7 @@ pub fn download_game_chunk(
return Err(ApplicationDownloadError::Checksum); return Err(ApplicationDownloadError::Checksum);
} }
debug!("Successfully finished download #{}, copied {} bytes", ctx.checksum, length);
Ok(true) Ok(true)
} }

View File

@ -1,33 +1,41 @@
use std::{ use std::{fs::File, io::{Read, Write}, path::PathBuf};
fs::File,
io::{Read, Write},
path::PathBuf,
sync::Mutex,
};
use log::{error, warn}; use log::{error, warn};
use serde::{Deserialize, Serialize}; use native_model::{Decode, Encode};
use serde_binary::binary_stream::Endian; use serde_binary::binary_stream::Endian;
#[derive(Serialize, Deserialize, Debug)] pub type DropData = v1::DropData;
pub struct DropData {
game_id: String,
game_version: String,
pub completed_contexts: Mutex<Vec<usize>>,
pub base_path: PathBuf,
}
static DROP_DATA_PATH: &str = ".dropdata"; static DROP_DATA_PATH: &str = ".dropdata";
impl DropData { pub mod v1 {
pub fn new(game_id: String, game_version: String, base_path: PathBuf) -> Self { use std::{path::PathBuf, sync::Mutex};
Self {
base_path, use native_model::native_model;
game_id, use serde::{Deserialize, Serialize};
game_version,
completed_contexts: Mutex::new(Vec::new()), #[derive(Serialize, Deserialize, Debug)]
#[native_model(id = 9, version = 1, with = native_model::rmp_serde_1_3::RmpSerde)]
pub struct DropData {
game_id: String,
game_version: String,
pub completed_contexts: Mutex<Vec<usize>>,
pub base_path: PathBuf,
}
impl DropData {
pub fn new(game_id: String, game_version: String, base_path: PathBuf) -> Self {
Self {
base_path,
game_id,
game_version,
completed_contexts: Mutex::new(Vec::new()),
}
} }
} }
}
impl DropData {
pub fn generate(game_id: String, game_version: String, base_path: PathBuf) -> Self { pub fn generate(game_id: String, game_version: String, base_path: PathBuf) -> Self {
let mut file = match File::open(base_path.join(DROP_DATA_PATH)) { let mut file = match File::open(base_path.join(DROP_DATA_PATH)) {
Ok(file) => file, Ok(file) => file,
@ -43,7 +51,7 @@ impl DropData {
} }
}; };
match serde_binary::from_vec::<DropData>(s, Endian::Little) { match native_model::rmp_serde_1_3::RmpSerde::decode(s) {
Ok(manifest) => manifest, Ok(manifest) => manifest,
Err(e) => { Err(e) => {
warn!("{}", e); warn!("{}", e);
@ -52,8 +60,8 @@ impl DropData {
} }
} }
pub fn write(&self) { pub fn write(&self) {
let manifest_raw = match serde_binary::to_vec(&self, Endian::Little) { let manifest_raw = match native_model::rmp_serde_1_3::RmpSerde::encode(&self) {
Ok(json) => json, Ok(data) => data,
Err(_) => return, Err(_) => return,
}; };