feat: temporary queue ui and flamegraph instructions

This commit is contained in:
DecDuck
2024-12-07 20:21:22 +11:00
parent de52dac0ab
commit 5cbeb3bdb6
12 changed files with 201 additions and 42 deletions

3
.gitignore vendored
View File

@ -24,3 +24,6 @@ dist-ssr
*.sw?
.nuxt
.output
src-tauri/flamegraph.svg
src-tuair/perf*

15
DEBUG.md Normal file
View File

@ -0,0 +1,15 @@
# How to create Flamegraph
Run this in `src-tauri`:
```
WEBKIT_DISABLE_DMABUF_RENDERER=1 CARGO_PROFILE_RELEASE_DEBUG=true cargo flamegraph --release
```
You can leave out `WEBKIT_DISABLE_DMABUF_RENDERER=1` if you're not on NVIDIA/Linux
And then run this in the root dir:
```
yarn dev --port 1432
```
And then do what you want, and it'll create the flamegraph for you

View File

@ -5,6 +5,8 @@
</template>
<script setup lang="ts">
import "~/composables/queue";
import { invoke } from "@tauri-apps/api/core";
import { AppStatus } from "~/types";
import { listen } from "@tauri-apps/api/event";

13
composables/queue.ts Normal file
View File

@ -0,0 +1,13 @@
import { listen } from "@tauri-apps/api/event";
export type QueueState = {
queue: Array<{ id: string; status: string }>;
};
export const useQueueState = () =>
useState<QueueState>("queue", () => ({ queue: [] }));
listen("update_queue", (event) => {
const queue = useQueueState();
queue.value = event.payload as QueueState;
});

View File

@ -2,7 +2,13 @@
<div class="flex flex-col bg-zinc-900 overflow-hidden">
<Header class="select-none" />
<div class="grow overflow-y-auto">
<span class="text-white">{{ queueState }}</span>
<slot />
</div>
</div>
</template>
<script setup lang="ts">
const queueState = useQueueState();
</script>

View File

@ -1,6 +1,7 @@
use crate::auth::generate_authorization_header;
use crate::db::DatabaseImpls;
use crate::downloads::manifest::{DropDownloadContext, DropManifest};
use crate::downloads::progress_object::ProgressHandle;
use crate::remote::RemoteAccessError;
use crate::DB;
use log::{debug, error, info};
@ -28,7 +29,7 @@ pub struct GameDownloadAgent {
pub target_download_dir: usize,
contexts: Mutex<Vec<DropDownloadContext>>,
pub manifest: Mutex<Option<DropManifest>>,
pub progress: ProgressObject,
pub progress: Arc<ProgressObject>,
sender: Sender<DownloadManagerSignal>,
}
@ -76,7 +77,7 @@ impl GameDownloadAgent {
manifest: Mutex::new(None),
target_download_dir,
contexts: Mutex::new(Vec::new()),
progress: ProgressObject::new(0, 0),
progress: Arc::new(ProgressObject::new(0, 0, sender.clone())),
sender,
}
}
@ -234,7 +235,7 @@ impl GameDownloadAgent {
pub fn run(&self) -> Result<(), ()> {
info!("downloading game: {}", self.id);
const DOWNLOAD_MAX_THREADS: usize = 4;
const DOWNLOAD_MAX_THREADS: usize = 1;
let pool = ThreadPoolBuilder::new()
.num_threads(DOWNLOAD_MAX_THREADS)
@ -251,10 +252,11 @@ impl GameDownloadAgent {
let context = context.clone();
let control_flag = self.control_flag.clone(); // Clone arcs
let progress = self.progress.get(index); // Clone arcs
let progress_handle = ProgressHandle::new(progress, self.progress.clone());
let completed_indexes_ref = completed_indexes_loop_arc.clone();
scope.spawn(move |_| {
match download_game_chunk(context.clone(), control_flag, progress) {
match download_game_chunk(context.clone(), control_flag, progress_handle) {
Ok(res) => match res {
true => {
let mut lock = completed_indexes_ref.lock().unwrap();

View File

@ -19,6 +19,7 @@ use urlencoding::encode;
use super::download_agent::GameDownloadError;
use super::download_thread_control_flag::{DownloadThreadControl, DownloadThreadControlFlag};
use super::progress_object::{ProgressHandle, ProgressObject};
pub struct DropWriter<W: Write> {
hasher: Context,
@ -65,7 +66,7 @@ pub struct DropDownloadPipeline<R: Read, W: Write> {
pub source: R,
pub destination: DropWriter<W>,
pub control_flag: DownloadThreadControl,
pub progress: Arc<AtomicUsize>,
pub progress: ProgressHandle,
pub size: usize,
}
impl DropDownloadPipeline<Response, File> {
@ -73,7 +74,7 @@ impl DropDownloadPipeline<Response, File> {
source: Response,
destination: DropWriter<File>,
control_flag: DownloadThreadControl,
progress: Arc<AtomicUsize>,
progress: ProgressHandle,
size: usize,
) -> Self {
Self {
@ -100,8 +101,7 @@ impl DropDownloadPipeline<Response, File> {
current_size += bytes_read;
buf_writer.write_all(&copy_buf[0..bytes_read])?;
self.progress
.fetch_add(bytes_read, std::sync::atomic::Ordering::Relaxed);
self.progress.add(bytes_read);
if current_size == self.size {
break;
@ -120,11 +120,11 @@ impl DropDownloadPipeline<Response, File> {
pub fn download_game_chunk(
ctx: DropDownloadContext,
control_flag: DownloadThreadControl,
progress: Arc<AtomicUsize>,
progress: ProgressHandle,
) -> Result<bool, GameDownloadError> {
// If we're paused
if control_flag.get() == DownloadThreadControlFlag::Stop {
progress.store(0, Ordering::Relaxed);
progress.set(0);
return Ok(false);
}

View File

@ -9,9 +9,11 @@ use std::{
};
use log::info;
use serde::Serialize;
use super::{
download_agent::{GameDownloadAgent, GameDownloadError},
download_manager_builder::CurrentProgressObject,
progress_object::ProgressObject,
queue::Queue,
};
@ -32,6 +34,8 @@ pub enum DownloadManagerSignal {
Cancel(String),
/// Any error which occurs in the agent
Error(GameDownloadError),
/// Pushes UI update
Update,
}
pub enum DownloadManagerStatus {
Downloading,
@ -39,10 +43,12 @@ pub enum DownloadManagerStatus {
Empty,
Error(GameDownloadError),
}
#[derive(Serialize, Clone)]
pub enum GameDownloadStatus {
Queued,
Downloading,
Paused,
Uninitialised,
Error,
}
@ -59,18 +65,20 @@ pub enum GameDownloadStatus {
pub struct DownloadManager {
terminator: JoinHandle<Result<(), ()>>,
download_queue: Queue,
progress: Arc<Mutex<Option<ProgressObject>>>,
progress: CurrentProgressObject,
command_sender: Sender<DownloadManagerSignal>,
}
pub struct AgentInterfaceData {
pub struct GameDownloadAgentQueueStandin {
pub id: String,
pub status: Mutex<GameDownloadStatus>,
pub progress: Arc<ProgressObject>,
}
impl From<Arc<GameDownloadAgent>> for AgentInterfaceData {
impl From<Arc<GameDownloadAgent>> for GameDownloadAgentQueueStandin {
fn from(value: Arc<GameDownloadAgent>) -> Self {
Self {
id: value.id.clone(),
status: Mutex::from(GameDownloadStatus::Uninitialised),
status: Mutex::from(GameDownloadStatus::Queued),
progress: value.progress.clone(),
}
}
}
@ -79,7 +87,7 @@ impl DownloadManager {
pub fn new(
terminator: JoinHandle<Result<(), ()>>,
download_queue: Queue,
progress: Arc<Mutex<Option<ProgressObject>>>,
progress: CurrentProgressObject,
command_sender: Sender<DownloadManagerSignal>,
) -> Self {
Self {
@ -109,10 +117,10 @@ impl DownloadManager {
.send(DownloadManagerSignal::Cancel(game_id))
.unwrap();
}
pub fn edit(&self) -> MutexGuard<'_, VecDeque<Arc<AgentInterfaceData>>> {
pub fn edit(&self) -> MutexGuard<'_, VecDeque<Arc<GameDownloadAgentQueueStandin>>> {
self.download_queue.edit()
}
pub fn read_queue(&self) -> VecDeque<Arc<AgentInterfaceData>> {
pub fn read_queue(&self) -> VecDeque<Arc<GameDownloadAgentQueueStandin>> {
self.download_queue.read()
}
pub fn get_current_game_download_progress(&self) -> Option<f64> {
@ -157,7 +165,7 @@ impl DownloadManager {
/// Takes in the locked value from .edit() and attempts to
/// get the index of whatever game_id is passed in
fn get_index_from_id(
queue: &mut MutexGuard<'_, VecDeque<Arc<AgentInterfaceData>>>,
queue: &mut MutexGuard<'_, VecDeque<Arc<GameDownloadAgentQueueStandin>>>,
id: String,
) -> Option<usize> {
queue

View File

@ -7,21 +7,20 @@ use std::{
thread::spawn,
};
use log::{error, info, warn};
use rustbreak::Database;
use log::{error, info};
use tauri::{AppHandle, Emitter};
use crate::{
db::DatabaseGameStatus,
library::{on_game_complete, GameUpdateEvent},
library::{on_game_complete, GameUpdateEvent, QueueUpdateEvent, QueueUpdateEventQueueData},
DB,
};
use super::{
download_agent::{GameDownloadAgent, GameDownloadError},
download_manager::{
AgentInterfaceData, DownloadManager, DownloadManagerSignal, DownloadManagerStatus,
GameDownloadStatus,
DownloadManager, DownloadManagerSignal, DownloadManagerStatus,
GameDownloadAgentQueueStandin, GameDownloadStatus,
},
download_thread_control_flag::{DownloadThreadControl, DownloadThreadControlFlag},
progress_object::ProgressObject,
@ -65,16 +64,19 @@ Behold, my madness - quexeky
*/
// Refactored to consolidate this type. It's a monster.
pub type CurrentProgressObject = Arc<Mutex<Option<Arc<ProgressObject>>>>;
pub struct DownloadManagerBuilder {
download_agent_registry: HashMap<String, Arc<GameDownloadAgent>>,
download_queue: Queue,
command_receiver: Receiver<DownloadManagerSignal>,
sender: Sender<DownloadManagerSignal>,
progress: Arc<Mutex<Option<ProgressObject>>>,
progress: CurrentProgressObject,
status: Arc<Mutex<DownloadManagerStatus>>,
app_handle: AppHandle,
current_game_interface: Option<Arc<AgentInterfaceData>>, // Should be the only game download agent in the map with the "Go" flag
current_game_interface: Option<Arc<GameDownloadAgentQueueStandin>>, // Should be the only game download agent in the map with the "Go" flag
active_control_flag: Option<DownloadThreadControl>,
}
@ -121,6 +123,21 @@ impl DownloadManagerBuilder {
.unwrap();
}
fn push_manager_update(&self) {
let queue = self.download_queue.read();
let queue_objs: Vec<QueueUpdateEventQueueData> = queue
.iter()
.map(|interface| QueueUpdateEventQueueData {
id: interface.id.clone(),
status: interface.status.lock().unwrap().clone(),
progress: interface.progress.get_progress(),
})
.collect();
let event_data = QueueUpdateEvent { queue: queue_objs };
self.app_handle.emit("update_queue", event_data).unwrap();
}
fn manage_queue(mut self) -> Result<(), ()> {
loop {
let signal = match self.command_receiver.recv() {
@ -153,6 +170,9 @@ impl DownloadManagerBuilder {
DownloadManagerSignal::Cancel(id) => {
self.manage_cancel_signal(id);
}
DownloadManagerSignal::Update => {
self.push_manager_update();
}
};
}
}
@ -175,9 +195,18 @@ impl DownloadManagerBuilder {
self.active_control_flag = None;
*self.progress.lock().unwrap() = None;
on_game_complete(game_id, download_agent.version.clone(), &self.app_handle);
if let Err(error) =
on_game_complete(game_id, download_agent.version.clone(), &self.app_handle)
{
self.sender
.send(DownloadManagerSignal::Error(
GameDownloadError::Communication(error),
))
.unwrap();
}
}
}
self.sender.send(DownloadManagerSignal::Update).unwrap();
self.sender.send(DownloadManagerSignal::Go).unwrap();
}
@ -189,10 +218,11 @@ impl DownloadManagerBuilder {
target_download_dir,
self.sender.clone(),
));
let agent_status = GameDownloadStatus::Uninitialised;
let interface_data = AgentInterfaceData {
let agent_status = GameDownloadStatus::Queued;
let interface_data = GameDownloadAgentQueueStandin {
id: id.clone(),
status: Mutex::new(agent_status),
progress: download_agent.progress.clone(),
};
let version_name = download_agent.version.clone();
self.download_agent_registry
@ -200,6 +230,7 @@ impl DownloadManagerBuilder {
self.download_queue.append(interface_data);
self.set_game_status(id, DatabaseGameStatus::Queued { version_name });
self.sender.send(DownloadManagerSignal::Update).unwrap();
}
fn manage_go_signal(&mut self) {
@ -217,6 +248,8 @@ impl DownloadManagerBuilder {
.unwrap()
.clone();
self.current_game_interface = Some(agent_data);
// Cloning option should be okay because it only clones the Arc inside, not the AgentInterfaceData
let agent_data = self.current_game_interface.clone().unwrap();
let version_name = download_agent.version.clone();
@ -243,6 +276,11 @@ impl DownloadManagerBuilder {
};
});
// Set status for game
let mut status_handle = agent_data.status.lock().unwrap();
*status_handle = GameDownloadStatus::Downloading;
// Set flags for download manager
active_control_flag.set(DownloadThreadControlFlag::Go);
self.set_status(DownloadManagerStatus::Downloading);
self.set_game_status(
@ -260,6 +298,8 @@ impl DownloadManagerBuilder {
self.current_game_interface.as_ref().unwrap().id.clone(),
DatabaseGameStatus::Remote {},
);
self.sender.send(DownloadManagerSignal::Update).unwrap();
}
fn manage_cancel_signal(&mut self, game_id: String) {
if let Some(current_flag) = &self.active_control_flag {

View File

@ -1,27 +1,84 @@
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
mpsc::Sender,
Arc, Mutex,
},
time::Instant,
};
use log::info;
use super::download_manager::DownloadManagerSignal;
#[derive(Clone)]
pub struct ProgressObject {
max: Arc<Mutex<usize>>,
progress_instances: Arc<Mutex<Vec<Arc<AtomicUsize>>>>,
start: Arc<Mutex<Instant>>,
sender: Sender<DownloadManagerSignal>,
points_towards_update: Arc<AtomicUsize>,
points_to_push_update: Arc<Mutex<usize>>,
}
pub struct ProgressHandle {
progress: Arc<AtomicUsize>,
progress_object: Arc<ProgressObject>,
}
impl ProgressHandle {
pub fn new(progress: Arc<AtomicUsize>, progress_object: Arc<ProgressObject>) -> Self {
Self {
progress,
progress_object,
}
}
pub fn set(&self, amount: usize) {
self.progress.store(amount, Ordering::Relaxed);
}
pub fn add(&self, amount: usize) {
self.progress
.fetch_add(amount, std::sync::atomic::Ordering::Relaxed);
self.progress_object.check_push_update(amount);
}
}
static PROGRESS_UPDATES: usize = 100;
impl ProgressObject {
pub fn new(max: usize, length: usize) -> Self {
pub fn new(max: usize, length: usize, sender: Sender<DownloadManagerSignal>) -> Self {
let arr = Mutex::new((0..length).map(|_| Arc::new(AtomicUsize::new(0))).collect());
// TODO: consolidate this calculate with the set_max function below
let points_to_push_update = max / PROGRESS_UPDATES;
Self {
max: Arc::new(Mutex::new(max)),
progress_instances: Arc::new(arr),
start: Arc::new(Mutex::new(Instant::now())),
sender,
points_towards_update: Arc::new(AtomicUsize::new(0)),
points_to_push_update: Arc::new(Mutex::new(points_to_push_update)),
}
}
pub fn check_push_update(&self, amount_added: usize) {
let current_amount = self
.points_towards_update
.fetch_add(amount_added, Ordering::Relaxed);
let to_update_handle = self.points_to_push_update.lock().unwrap();
let to_update = to_update_handle.clone();
drop(to_update_handle);
if current_amount < to_update {
return;
}
self.points_towards_update
.fetch_sub(to_update, Ordering::Relaxed);
self.sender.send(DownloadManagerSignal::Update).unwrap();
}
pub fn set_time_now(&self) {
*self.start.lock().unwrap() = Instant::now();
}
@ -37,13 +94,14 @@ impl ProgressObject {
*self.max.lock().unwrap()
}
pub fn set_max(&self, new_max: usize) {
*self.max.lock().unwrap() = new_max
*self.max.lock().unwrap() = new_max;
*self.points_to_push_update.lock().unwrap() = new_max / PROGRESS_UPDATES;
info!("points to push update: {}", new_max / PROGRESS_UPDATES);
}
pub fn set_size(&self, length: usize) {
*self.progress_instances.lock().unwrap() =
(0..length).map(|_| Arc::new(AtomicUsize::new(0))).collect();
}
pub fn get_progress(&self) -> f64 {
self.sum() as f64 / self.get_max() as f64
}

View File

@ -3,11 +3,11 @@ use std::{
sync::{Arc, Mutex, MutexGuard},
};
use super::download_manager::AgentInterfaceData;
use super::download_manager::GameDownloadAgentQueueStandin;
#[derive(Clone)]
pub struct Queue {
inner: Arc<Mutex<VecDeque<Arc<AgentInterfaceData>>>>,
inner: Arc<Mutex<VecDeque<Arc<GameDownloadAgentQueueStandin>>>>,
}
impl Queue {
@ -16,13 +16,13 @@ impl Queue {
inner: Arc::new(Mutex::new(VecDeque::new())),
}
}
pub fn read(&self) -> VecDeque<Arc<AgentInterfaceData>> {
pub fn read(&self) -> VecDeque<Arc<GameDownloadAgentQueueStandin>> {
self.inner.lock().unwrap().clone()
}
pub fn edit(&self) -> MutexGuard<'_, VecDeque<Arc<AgentInterfaceData>>> {
pub fn edit(&self) -> MutexGuard<'_, VecDeque<Arc<GameDownloadAgentQueueStandin>>> {
self.inner.lock().unwrap()
}
pub fn pop_front(&self) -> Option<Arc<AgentInterfaceData>> {
pub fn pop_front(&self) -> Option<Arc<GameDownloadAgentQueueStandin>> {
self.edit().pop_front()
}
pub fn empty(&self) -> bool {
@ -30,17 +30,17 @@ impl Queue {
}
/// Either inserts `interface` at the specified index, or appends to
/// the back of the deque if index is greater than the length of the deque
pub fn insert(&self, interface: AgentInterfaceData, index: usize) {
pub fn insert(&self, interface: GameDownloadAgentQueueStandin, index: usize) {
if self.read().len() > index {
self.append(interface);
} else {
self.edit().insert(index, Arc::new(interface));
}
}
pub fn append(&self, interface: AgentInterfaceData) {
pub fn append(&self, interface: GameDownloadAgentQueueStandin) {
self.edit().push_back(Arc::new(interface));
}
pub fn pop_front_if_equal(&self, game_id: String) -> Option<Arc<AgentInterfaceData>> {
pub fn pop_front_if_equal(&self, game_id: String) -> Option<Arc<GameDownloadAgentQueueStandin>> {
let mut queue = self.edit();
let front = match queue.front() {
Some(front) => front,

View File

@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::fmt::format;
use std::sync::Mutex;
@ -42,6 +42,18 @@ pub struct GameUpdateEvent {
pub status: DatabaseGameStatus,
}
#[derive(Serialize, Clone)]
pub struct QueueUpdateEventQueueData {
pub id: String,
pub status: GameDownloadStatus,
pub progress: f64,
}
#[derive(serde::Serialize, Clone)]
pub struct QueueUpdateEvent {
pub queue: Vec<QueueUpdateEventQueueData>,
}
// Game version with some fields missing and size information
#[derive(serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]