Compare commits

..

3 Commits

Author SHA1 Message Date
9a85f68d3a chore: commit changes, still bad 2025-07-26 19:56:38 +10:00
aad2d964eb fix: app states 2025-07-26 17:57:12 +10:00
f92ec38231 feat: move to async runtime 2025-07-26 17:37:11 +10:00
47 changed files with 1923 additions and 1655 deletions

View File

@ -1,5 +1,4 @@
<template>
<LoadingIndicator />
<NuxtLayout class="select-none w-screen h-screen">
<NuxtPage />
<ModalStack />
@ -23,6 +22,7 @@ const router = useRouter();
const state = useAppState();
try {
state.value = JSON.parse(await invoke("fetch_state"));
console.log(state.value)
} catch (e) {
console.error("failed to parse state", e);
}

View File

@ -23,7 +23,7 @@
<MenuItems
class="absolute bg-zinc-900 right-0 top-10 z-50 w-56 origin-top-right focus:outline-none shadow-md"
>
<div class="flex-col gap-y-2">
<PanelWidget class="flex-col gap-y-2">
<NuxtLink
to="/id/me"
class="transition inline-flex items-center w-full py-3 px-4 hover:bg-zinc-800"
@ -65,7 +65,7 @@
</button>
</MenuItem>
</div>
</div>
</PanelWidget>
</MenuItems>
</transition>
</Menu>

View File

@ -13,7 +13,11 @@
<div class="max-w-lg">
<slot />
<div class="mt-10">
<div>
<button
@click="() => authWrapper_wrapper()"
:disabled="loading"
class="text-sm text-left font-semibold leading-7 text-blue-600"
>
<div v-if="loading" role="status">
<svg
aria-hidden="true"
@ -33,19 +37,10 @@
</svg>
<span class="sr-only">Loading...</span>
</div>
<span class="inline-flex gap-x-8 items-center" v-else>
<button
@click="() => authWrapper_wrapper()"
:disabled="loading"
class="px-3 py-1 inline-flex items-center gap-x-2 bg-zinc-700 rounded text-sm text-left font-semibold leading-7 text-white"
>
Sign in with your browser <ArrowTopRightOnSquareIcon class="size-4" />
</button>
<NuxtLink href="/auth/code" class="text-zinc-100 text-sm hover:text-zinc-300">
Use a code &rarr;
</NuxtLink>
<span v-else>
Sign in with your browser <span aria-hidden="true">&rarr;</span>
</span>
</div>
</button>
<div class="mt-5" v-if="offerManual">
<h1 class="text-zinc-100 font-semibold">Having trouble?</h1>
@ -126,7 +121,6 @@
<script setup lang="ts">
import { XCircleIcon } from "@heroicons/vue/16/solid";
import { ArrowTopRightOnSquareIcon } from "@heroicons/vue/20/solid";
import { invoke } from "@tauri-apps/api/core";
const loading = ref(false);

View File

@ -1,7 +0,0 @@
<template></template>
<script setup lang="ts">
const loading = useLoadingIndicator();
watch(loading.isLoading, console.log);
</script>

View File

@ -1,6 +1,6 @@
<template>
<div
class="h-16 cursor-pointer flex flex-row items-center justify-between bg-zinc-950"
class="h-10 cursor-pointer flex flex-row items-center justify-between bg-zinc-950"
>
<div class="px-5 py-3 grow" @mousedown="() => window.startDragging()">
<Wordmark class="mt-1" />

View File

@ -1,7 +1,7 @@
{
"name": "drop-app",
"private": true,
"version": "0.3.1",
"version": "0.3.0-rc-8",
"type": "module",
"scripts": {
"build": "nuxt build",

View File

@ -1,37 +0,0 @@
<template>
<div class="min-h-full w-full flex items-center justify-center">
<div class="flex flex-col items-center">
<div class="text-center">
<h1 class="text-3xl font-semibold font-display leading-6 text-zinc-100">
Device authorization
</h1>
<div class="mt-4">
<p class="text-sm text-zinc-400 max-w-md mx-auto">
Open Drop on another one of your devices, and use your account
dropdown to "Authorize client", and enter the code below.
</p>
<div
class="mt-8 flex items-center justify-center gap-x-5 text-8xl font-bold text-zinc-100"
>
<span v-for="letter in code.split('')">{{ letter }}</span>
</div>
</div>
<div class="mt-10 flex items-center justify-center gap-x-6">
<NuxtLink href="/auth" class="text-sm font-semibold text-blue-600"
><span aria-hidden="true">&larr;</span> Use a different method
</NuxtLink>
</div>
</div>
</div>
</div>
</template>
<script setup lang="ts">
import { invoke } from "@tauri-apps/api/core";
const code = await invoke<string>("auth_initiate_code");
definePageMeta({
layout: "mini",
});
</script>

View File

@ -76,8 +76,9 @@
<TransitionGroup name="slide" tag="div" class="h-full">
<img
v-for="(url, index) in mediaUrls"
:key="index"
:key="url"
:src="url"
loading="lazy"
class="absolute inset-0 w-full h-full object-cover"
v-show="index === currentImageIndex"
/>
@ -157,8 +158,8 @@
<template #default>
<div class="sm:flex sm:items-start">
<div class="mt-3 text-center sm:mt-0 sm:text-left">
<h3 class="text-base font-semibold text-zinc-100">
Install {{ game.mName }}?
<h3 class="text-base font-semibold text-zinc-100"
>Install {{ game.mName }}?
</h3>
<div class="mt-2">
<p class="text-sm text-zinc-400">
@ -350,7 +351,9 @@
<template #buttons>
<LoadingButton
@click="() => install()"
:disabled="!(versionOptions && versionOptions.length > 0)"
:disabled="
!(versionOptions && versionOptions.length > 0)
"
:loading="installLoading"
type="submit"
class="ml-2 w-full sm:w-fit"
@ -368,11 +371,7 @@
</template>
</ModalTemplate>
<GameOptionsModal
v-if="status.type === GameStatusEnum.Installed"
v-model="configureModalOpen"
:game-id="game.id"
/>
<GameOptionsModal v-if="status.type === GameStatusEnum.Installed" v-model="configureModalOpen" :game-id="game.id" />
<Transition
enter="transition ease-out duration-300"
@ -422,7 +421,7 @@
<img
v-for="(url, index) in mediaUrls"
v-show="currentImageIndex === index"
:key="index"
:key="url"
:src="url"
class="max-h-[90vh] max-w-[90vw] object-contain"
:alt="`${game.mName} screenshot ${index + 1}`"
@ -443,6 +442,10 @@
<script setup lang="ts">
import {
Dialog,
DialogTitle,
TransitionChild,
TransitionRoot,
Listbox,
ListboxButton,
ListboxLabel,
@ -480,10 +483,7 @@ const bannerUrl = await useObject(game.value.mBannerObjectId);
// Get all available images
const mediaUrls = await Promise.all(
game.value.mImageCarouselObjectIds.map(async (v) => {
const src = await useObject(v);
return src;
})
game.value.mImageCarouselObjectIds.map((id) => useObject(id))
);
const htmlDescription = micromark(game.value.mDescription);
@ -497,6 +497,7 @@ const currentImageIndex = ref(0);
const configureModalOpen = ref(false);
async function installFlow() {
installFlowOpen.value = true;
versionOptions.value = undefined;
@ -535,11 +536,12 @@ async function install() {
}
async function resumeDownload() {
try {
await invoke("resume_download", { gameId: game.value.id });
} catch (e) {
console.error(e);
}
try {
await invoke("resume_download", { gameId: game.value.id })
}
catch(e) {
console.error(e)
}
}
async function launch() {

1224
src-tauri/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
[package]
name = "drop-app"
version = "0.3.1"
version = "0.3.0-rc-8"
description = "The client application for the open-source, self-hosted game distribution platform Drop"
authors = ["Drop OSS"]
edition = "2024"
@ -25,7 +25,6 @@ tauri-build = { version = "2.0.0", features = [] }
[dependencies]
tauri-plugin-shell = "2.2.1"
serde_json = "1"
rayon = "1.10.0"
webbrowser = "1.0.2"
url = "2.5.2"
tauri-plugin-deep-link = "2"
@ -68,8 +67,11 @@ known-folders = "1.2.0"
native_model = { version = "0.6.1", features = ["rmp_serde_1_3"] }
tauri-plugin-opener = "2.4.0"
bitcode = "0.6.6"
reqwest-websocket = "0.5.0"
futures-lite = "2.6.0"
async-trait = "0.1.88"
futures = "0.3.31"
tokio-util = { version = "0.7.15", features = ["io"] }
async-scoped = { version = "0.9.0", features = ["use-tokio"] }
async-once-cell = "0.5.4"
# tailscale = { path = "./tailscale" }
[dependencies.dynfmt]
@ -96,14 +98,14 @@ features = ["fs"]
version = "1.10.0"
features = ["v4", "fast-rng", "macro-diagnostics"]
[dependencies.rustbreak]
version = "2"
features = ["other_errors"] # You can also use "yaml_enc" or "bin_enc"
[dependencies.dropbreak]
git = "https://github.com/Drop-OSS/dropbreak.git"
features = ["other_errors"] # You can also use "yaml_enc" or "bin_enc"
[dependencies.reqwest]
version = "0.12"
default-features = false
features = ["json", "http2", "blocking", "rustls-tls-webpki-roots"]
features = ["json", "http2", "rustls-tls-webpki-roots", "stream"]
[dependencies.serde]
version = "1"

View File

@ -1,3 +1,3 @@
fn main() {
tauri_build::build();
tauri_build::build()
}

View File

@ -3,7 +3,7 @@ use log::debug;
use tauri::AppHandle;
use tauri_plugin_autostart::ManagerExt;
pub fn toggle_autostart_logic(app: AppHandle, enabled: bool) -> Result<(), String> {
pub async fn toggle_autostart_logic(app: AppHandle, enabled: bool) -> Result<(), String> {
let manager = app.autolaunch();
if enabled {
manager.enable().map_err(|e| e.to_string())?;
@ -14,16 +14,18 @@ pub fn toggle_autostart_logic(app: AppHandle, enabled: bool) -> Result<(), Strin
}
// Store the state in DB
let mut db_handle = borrow_db_mut_checked();
let mut db_handle = borrow_db_mut_checked().await;
db_handle.settings.autostart = enabled;
drop(db_handle);
Ok(())
}
pub fn get_autostart_enabled_logic(app: AppHandle) -> Result<bool, tauri_plugin_autostart::Error> {
pub async fn get_autostart_enabled_logic(
app: AppHandle,
) -> Result<bool, tauri_plugin_autostart::Error> {
// First check DB state
let db_handle = borrow_db_checked();
let db_handle = borrow_db_checked().await;
let db_state = db_handle.settings.autostart;
drop(db_handle);
@ -44,8 +46,8 @@ pub fn get_autostart_enabled_logic(app: AppHandle) -> Result<bool, tauri_plugin_
}
// New function to sync state on startup
pub fn sync_autostart_on_startup(app: &AppHandle) -> Result<(), String> {
let db_handle = borrow_db_checked();
pub async fn sync_autostart_on_startup(app: &AppHandle) -> Result<(), String> {
let db_handle = borrow_db_checked().await;
let should_be_enabled = db_handle.settings.autostart;
drop(db_handle);
@ -65,11 +67,11 @@ pub fn sync_autostart_on_startup(app: &AppHandle) -> Result<(), String> {
Ok(())
}
#[tauri::command]
pub fn toggle_autostart(app: AppHandle, enabled: bool) -> Result<(), String> {
toggle_autostart_logic(app, enabled)
pub async fn toggle_autostart(app: AppHandle, enabled: bool) -> Result<(), String> {
toggle_autostart_logic(app, enabled).await
}
#[tauri::command]
pub fn get_autostart_enabled(app: AppHandle) -> Result<bool, tauri_plugin_autostart::Error> {
get_autostart_enabled_logic(app)
pub async fn get_autostart_enabled(app: AppHandle) -> Result<bool, tauri_plugin_autostart::Error> {
get_autostart_enabled_logic(app).await
}

View File

@ -1,20 +1,25 @@
use log::{debug, error};
use tauri::AppHandle;
use crate::AppState;
use crate::DropFunctionState;
#[tauri::command]
pub fn quit(app: tauri::AppHandle, state: tauri::State<'_, std::sync::Mutex<AppState<'_>>>) {
cleanup_and_exit(&app, &state);
pub async fn quit<>(app: tauri::AppHandle, state: tauri::State<'_, DropFunctionState<'_>>) -> Result<(), ()> {
cleanup_and_exit(&app, &state).await;
Ok(())
}
pub fn cleanup_and_exit(app: &AppHandle, state: &tauri::State<'_, std::sync::Mutex<AppState<'_>>>) {
pub async fn cleanup_and_exit(
app: &AppHandle,
state: &tauri::State<'_, DropFunctionState<'_>>,
) {
debug!("cleaning up and exiting application");
let download_manager = state.lock().unwrap().download_manager.clone();
match download_manager.ensure_terminated() {
let download_manager = state.lock().await.download_manager.clone();
match download_manager.ensure_terminated().await {
Ok(res) => match res {
Ok(()) => debug!("download manager terminated correctly"),
Err(()) => error!("download manager failed to terminate correctly"),
Ok(_) => debug!("download manager terminated correctly"),
Err(_) => error!("download manager failed to terminate correctly"),
},
Err(e) => panic!("{e:?}"),
}

View File

@ -1,10 +1,10 @@
use crate::AppState;
use crate::DropFunctionState;
#[tauri::command]
pub fn fetch_state(
state: tauri::State<'_, std::sync::Mutex<AppState<'_>>>,
pub async fn fetch_state(
state: tauri::State<'_, DropFunctionState<'_>>,
) -> Result<String, String> {
let guard = state.lock().unwrap();
let guard = state.lock().await;
let cloned_state = serde_json::to_string(&guard.clone()).map_err(|e| e.to_string())?;
drop(guard);
Ok(cloned_state)

View File

@ -11,7 +11,7 @@ use crate::{
};
use super::{
db::{borrow_db_checked, DATA_ROOT_DIR},
db::{DATA_ROOT_DIR, borrow_db_checked},
debug::SystemData,
models::data::Settings,
};
@ -19,19 +19,19 @@ use super::{
// Will, in future, return disk/remaining size
// Just returns the directories that have been set up
#[tauri::command]
pub fn fetch_download_dir_stats() -> Vec<PathBuf> {
let lock = borrow_db_checked();
pub async fn fetch_download_dir_stats() -> Vec<PathBuf> {
let lock = borrow_db_checked().await;
lock.applications.install_dirs.clone()
}
#[tauri::command]
pub fn delete_download_dir(index: usize) {
let mut lock = borrow_db_mut_checked();
pub async fn delete_download_dir(index: usize) {
let mut lock = borrow_db_mut_checked().await;
lock.applications.install_dirs.remove(index);
}
#[tauri::command]
pub fn add_download_dir(new_dir: PathBuf) -> Result<(), DownloadManagerError<()>> {
pub async fn add_download_dir(new_dir: PathBuf) -> Result<(), DownloadManagerError<()>> {
// Check the new directory is all good
let new_dir_path = Path::new(&new_dir);
if new_dir_path.exists() {
@ -48,7 +48,7 @@ pub fn add_download_dir(new_dir: PathBuf) -> Result<(), DownloadManagerError<()>
}
// Add it to the dictionary
let mut lock = borrow_db_mut_checked();
let mut lock = borrow_db_mut_checked().await;
if lock.applications.install_dirs.contains(&new_dir) {
return Err(Error::new(
ErrorKind::AlreadyExists,
@ -63,8 +63,8 @@ pub fn add_download_dir(new_dir: PathBuf) -> Result<(), DownloadManagerError<()>
}
#[tauri::command]
pub fn update_settings(new_settings: Value) {
let mut db_lock = borrow_db_mut_checked();
pub async fn update_settings(new_settings: Value) {
let mut db_lock = borrow_db_mut_checked().await;
let mut current_settings = serde_json::to_value(db_lock.settings.clone()).unwrap();
for (key, value) in new_settings.as_object().unwrap() {
current_settings[key] = value.clone();
@ -73,12 +73,12 @@ pub fn update_settings(new_settings: Value) {
db_lock.settings = new_settings;
}
#[tauri::command]
pub fn fetch_settings() -> Settings {
borrow_db_checked().settings.clone()
pub async fn fetch_settings() -> Settings {
borrow_db_checked().await.settings.clone()
}
#[tauri::command]
pub fn fetch_system_data() -> SystemData {
let db_handle = borrow_db_checked();
pub async fn fetch_system_data() -> SystemData {
let db_handle = borrow_db_checked().await;
SystemData::new(
db_handle.auth.as_ref().unwrap().client_id.clone(),
db_handle.base_url.clone(),

View File

@ -3,14 +3,19 @@ use std::{
mem::ManuallyDrop,
ops::{Deref, DerefMut},
path::PathBuf,
sync::{Arc, LazyLock, RwLockReadGuard, RwLockWriteGuard},
sync::{Arc, LazyLock},
};
use async_once_cell::OnceCell;
use chrono::Utc;
use log::{debug, error, info, warn};
use dropbreak::{DeSerError, DeSerializer, PathDatabase, RustbreakError};
use log::{debug, info, warn};
use native_model::{Decode, Encode};
use rustbreak::{DeSerError, DeSerializer, PathDatabase, RustbreakError};
use serde::{de::DeserializeOwned, Serialize};
use serde::{Serialize, de::DeserializeOwned};
use tokio::{
spawn,
sync::{RwLockReadGuard, RwLockWriteGuard},
};
use url::Url;
use crate::DB;
@ -27,15 +32,15 @@ pub struct DropDatabaseSerializer;
impl<T: native_model::Model + Serialize + DeserializeOwned> DeSerializer<T>
for DropDatabaseSerializer
{
fn serialize(&self, val: &T) -> rustbreak::error::DeSerResult<Vec<u8>> {
fn serialize(&self, val: &T) -> dropbreak::error::DeSerResult<Vec<u8>> {
native_model::rmp_serde_1_3::RmpSerde::encode(val)
.map_err(|e| DeSerError::Internal(e.to_string()))
}
fn deserialize<R: std::io::Read>(&self, mut s: R) -> rustbreak::error::DeSerResult<T> {
fn deserialize<R: std::io::Read>(&self, mut s: R) -> dropbreak::error::DeSerResult<T> {
let mut buf = Vec::new();
s.read_to_end(&mut buf)
.map_err(|e| rustbreak::error::DeSerError::Other(e.into()))?;
.map_err(|e| dropbreak::error::DeSerError::Other(e.into()))?;
let val = native_model::rmp_serde_1_3::RmpSerde::decode(buf)
.map_err(|e| DeSerError::Internal(e.to_string()))?;
Ok(val)
@ -43,15 +48,33 @@ impl<T: native_model::Model + Serialize + DeserializeOwned> DeSerializer<T>
}
pub type DatabaseInterface =
rustbreak::Database<Database, rustbreak::backend::PathBackend, DropDatabaseSerializer>;
dropbreak::Database<Database, dropbreak::backend::PathBackend, DropDatabaseSerializer>;
pub struct OnceCellDatabase(OnceCell<DatabaseInterface>);
impl OnceCellDatabase {
pub const fn new() -> Self {
Self(OnceCell::new())
}
pub async fn init(&self, init: impl Future<Output = DatabaseInterface>) {
self.0.get_or_init(init).await;
}
}
impl<'a> Deref for OnceCellDatabase {
type Target = DatabaseInterface;
fn deref(&self) -> &Self::Target {
self.0.get().unwrap()
}
}
pub trait DatabaseImpls {
fn set_up_database() -> DatabaseInterface;
fn database_is_set_up(&self) -> bool;
fn fetch_base_url(&self) -> Url;
async fn set_up_database() -> DatabaseInterface;
async fn database_is_set_up(&self) -> bool;
async fn fetch_base_url(&self) -> Url;
}
impl DatabaseImpls for DatabaseInterface {
fn set_up_database() -> DatabaseInterface {
async fn set_up_database() -> DatabaseInterface {
let db_path = DATA_ROOT_DIR.join("drop.db");
let games_base_dir = DATA_ROOT_DIR.join("games");
let logs_root_dir = DATA_ROOT_DIR.join("logs");
@ -67,37 +90,40 @@ impl DatabaseImpls for DatabaseInterface {
let exists = fs::exists(db_path.clone()).unwrap();
if exists { match PathDatabase::load_from_path(db_path.clone()) {
Ok(db) => db,
Err(e) => handle_invalid_database(e, db_path, games_base_dir, cache_dir),
} } else {
let default = Database::new(games_base_dir, None, cache_dir);
debug!(
"Creating database at path {}",
db_path.as_os_str().to_str().unwrap()
);
PathDatabase::create_at_path(db_path, default)
.expect("Database could not be created")
match exists {
true => match PathDatabase::load_from_path(db_path.clone()).await {
Ok(db) => db,
Err(e) => handle_invalid_database(e, db_path, games_base_dir, cache_dir).await,
},
false => {
let default = Database::new(games_base_dir, None, cache_dir);
debug!(
"Creating database at path {}",
db_path.as_os_str().to_str().unwrap()
);
PathDatabase::create_at_path(db_path, default)
.await
.expect("Database could not be created")
}
}
}
fn database_is_set_up(&self) -> bool {
!self.borrow_data().unwrap().base_url.is_empty()
async fn database_is_set_up(&self) -> bool {
!self.borrow_data().await.base_url.is_empty()
}
fn fetch_base_url(&self) -> Url {
let handle = self.borrow_data().unwrap();
async fn fetch_base_url(&self) -> Url {
let handle = self.borrow_data().await;
Url::parse(&handle.base_url).unwrap()
}
}
// TODO: Make the error relelvant rather than just assume that it's a Deserialize error
fn handle_invalid_database(
async fn handle_invalid_database(
_e: RustbreakError,
db_path: PathBuf,
games_base_dir: PathBuf,
cache_dir: PathBuf,
) -> rustbreak::Database<Database, rustbreak::backend::PathBackend, DropDatabaseSerializer> {
) -> dropbreak::Database<Database, dropbreak::backend::PathBackend, DropDatabaseSerializer> {
warn!("{_e}");
let new_path = {
let time = Utc::now().timestamp();
@ -114,7 +140,9 @@ fn handle_invalid_database(
cache_dir,
);
PathDatabase::create_at_path(db_path, db).expect("Database could not be created")
PathDatabase::create_at_path(db_path, db)
.await
.expect("Database could not be created")
}
// To automatically save the database upon drop
@ -134,42 +162,31 @@ impl<'a> Deref for DBRead<'a> {
&self.0
}
}
impl DerefMut for DBWrite<'_> {
impl<'a> DerefMut for DBWrite<'a> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl Drop for DBWrite<'_> {
impl<'a> Drop for DBWrite<'a> {
fn drop(&mut self) {
unsafe {
ManuallyDrop::drop(&mut self.0);
}
match DB.save() {
Ok(()) => {}
Err(e) => {
error!("database failed to save with error {e}");
panic!("database failed to save with error {e}")
spawn(async {
match DB.save().await {
Ok(_) => {}
Err(e) => {
panic!("database failed to save with error {e}")
}
}
}
});
}
}
pub fn borrow_db_checked<'a>() -> DBRead<'a> {
match DB.borrow_data() {
Ok(data) => DBRead(data),
Err(e) => {
error!("database borrow failed with error {e}");
panic!("database borrow failed with error {e}");
}
}
pub async fn borrow_db_checked<'a>() -> DBRead<'a> {
DBRead(DB.borrow_data().await)
}
pub fn borrow_db_mut_checked<'a>() -> DBWrite<'a> {
match DB.borrow_data_mut() {
Ok(data) => DBWrite(ManuallyDrop::new(data)),
Err(e) => {
error!("database borrow mut failed with error {e}");
panic!("database borrow mut failed with error {e}");
}
}
pub async fn borrow_db_mut_checked<'a>() -> DBWrite<'a> {
DBWrite(ManuallyDrop::new(DB.borrow_data_mut().await))
}

View File

@ -1,4 +1,4 @@
pub mod commands;
pub mod db;
pub mod debug;
pub mod models;
pub mod models;

View File

@ -27,7 +27,7 @@ pub mod data {
use serde_with::serde_as;
use std::{collections::HashMap, path::PathBuf};
use super::{Serialize, Deserialize, native_model};
use super::*;
fn default_template() -> String {
"{}".to_owned()
@ -174,7 +174,7 @@ pub mod data {
use serde_with::serde_as;
use super::{Serialize, Deserialize, native_model, Settings, DatabaseAuth, v1, GameVersion, DownloadableMetadata, ApplicationTransientStatus};
use super::*;
#[native_model(id = 1, version = 2, with = native_model::rmp_serde_1_3::RmpSerde)]
#[derive(Serialize, Deserialize, Clone, Default)]
@ -283,7 +283,7 @@ pub mod data {
mod v3 {
use std::path::PathBuf;
use super::{Serialize, Deserialize, native_model, Settings, DatabaseAuth, DatabaseApplications, DatabaseCompatInfo, v2};
use super::*;
#[native_model(id = 1, version = 3, with = native_model::rmp_serde_1_3::RmpSerde)]
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct Database {
@ -336,7 +336,7 @@ pub mod data {
transient_statuses: HashMap::new(),
},
prev_database,
base_url: String::new(),
base_url: "".to_owned(),
auth: None,
settings: Settings::default(),
cache_dir,

View File

@ -1,31 +1,33 @@
use std::sync::Mutex;
use crate::{database::models::data::DownloadableMetadata, AppState};
use crate::{database::models::data::DownloadableMetadata, DropFunctionState};
#[tauri::command]
pub fn pause_downloads(state: tauri::State<'_, Mutex<AppState>>) {
state.lock().unwrap().download_manager.pause_downloads();
pub async fn pause_downloads(state: tauri::State<'_, DropFunctionState<'_>>) -> Result<(), ()> {
state.lock().await.download_manager.pause_downloads();
Ok(())
}
#[tauri::command]
pub fn resume_downloads(state: tauri::State<'_, Mutex<AppState>>) {
state.lock().unwrap().download_manager.resume_downloads();
pub async fn resume_downloads(state: tauri::State<'_, DropFunctionState<'_>>) -> Result<(), ()> {
state.lock().await.download_manager.resume_downloads();
Ok(())
}
#[tauri::command]
pub fn move_download_in_queue(
state: tauri::State<'_, Mutex<AppState>>,
pub async fn move_download_in_queue(
state: tauri::State<'_, DropFunctionState<'_>>,
old_index: usize,
new_index: usize,
) {
) -> Result<(), ()> {
state
.lock()
.unwrap()
.await
.download_manager
.rearrange(old_index, new_index);
Ok(())
}
#[tauri::command]
pub fn cancel_game(state: tauri::State<'_, Mutex<AppState>>, meta: DownloadableMetadata) {
state.lock().unwrap().download_manager.cancel(meta);
pub async fn cancel_game(state: tauri::State<'_, DropFunctionState<'_>>, meta: DownloadableMetadata) -> Result<(), ()> {
state.lock().await.download_manager.cancel(meta);
Ok(())
}

View File

@ -1,14 +1,15 @@
use std::{
collections::HashMap,
sync::{
Arc, Mutex,
Arc,
mpsc::{Receiver, Sender, channel},
},
thread::{JoinHandle, spawn},
};
use ::futures::future::join_all;
use log::{debug, error, info, warn};
use tauri::{AppHandle, Emitter};
use tokio::{runtime::Runtime, sync::Mutex};
use crate::{
database::models::data::DownloadableMetadata,
@ -69,14 +70,15 @@ Behold, my madness - quexeky
pub struct DownloadManagerBuilder {
download_agent_registry: HashMap<DownloadableMetadata, DownloadAgent>,
download_queue: Queue,
command_receiver: Receiver<DownloadManagerSignal>,
command_receiver: Mutex<Receiver<DownloadManagerSignal>>,
sender: Sender<DownloadManagerSignal>,
progress: CurrentProgressObject,
status: Arc<Mutex<DownloadManagerStatus>>,
app_handle: AppHandle,
runtime: Runtime,
current_download_agent: Option<DownloadAgent>, // Should be the only download agent in the map with the "Go" flag
current_download_thread: Mutex<Option<JoinHandle<()>>>,
current_download_thread: Mutex<Option<tokio::task::JoinHandle<()>>>,
active_control_flag: Option<DownloadThreadControl>,
}
impl DownloadManagerBuilder {
@ -89,97 +91,110 @@ impl DownloadManagerBuilder {
let manager = Self {
download_agent_registry: HashMap::new(),
download_queue: queue.clone(),
command_receiver,
command_receiver: Mutex::new(command_receiver),
status: status.clone(),
sender: command_sender.clone(),
progress: active_progress.clone(),
app_handle,
runtime: tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.unwrap(),
current_download_agent: None,
current_download_thread: Mutex::new(None),
active_control_flag: None,
};
let terminator = spawn(|| manager.manage_queue());
let terminator = tauri::async_runtime::spawn(async {
if let Err(_err) = manager.manage_queue().await {
panic!("download manager exited with error");
} else {
Ok(())
}
});
DownloadManager::new(terminator, queue, active_progress, command_sender)
}
fn set_status(&self, status: DownloadManagerStatus) {
*self.status.lock().unwrap() = status;
async fn set_status(&self, status: DownloadManagerStatus) {
*self.status.lock().await = status;
}
fn remove_and_cleanup_front_download(&mut self, meta: &DownloadableMetadata) -> DownloadAgent {
async fn remove_and_cleanup_front_download(
&mut self,
meta: &DownloadableMetadata,
) -> DownloadAgent {
self.download_queue.pop_front();
let download_agent = self.download_agent_registry.remove(meta).unwrap();
self.cleanup_current_download();
self.cleanup_current_download().await;
download_agent
}
// CAREFUL WITH THIS FUNCTION
// Make sure the download thread is terminated
fn cleanup_current_download(&mut self) {
async fn cleanup_current_download(&mut self) {
self.active_control_flag = None;
*self.progress.lock().unwrap() = None;
*self.progress.lock().await = None;
self.current_download_agent = None;
let mut download_thread_lock = self.current_download_thread.lock().unwrap();
let mut download_thread_lock = self.current_download_thread.lock().await;
*download_thread_lock = None;
drop(download_thread_lock);
}
fn stop_and_wait_current_download(&self) {
self.set_status(DownloadManagerStatus::Paused);
async fn stop_and_wait_current_download(&self) {
self.set_status(DownloadManagerStatus::Paused).await;
if let Some(current_flag) = &self.active_control_flag {
current_flag.set(DownloadThreadControlFlag::Stop);
}
let mut download_thread_lock = self.current_download_thread.lock().unwrap();
let mut download_thread_lock = self.current_download_thread.lock().await;
if let Some(current_download_thread) = download_thread_lock.take() {
current_download_thread.join().unwrap();
current_download_thread.await.unwrap();
}
}
fn manage_queue(mut self) -> Result<(), ()> {
async fn manage_queue(mut self) -> Result<(), ()> {
loop {
let signal = match self.command_receiver.recv() {
let signal = match self.command_receiver.lock().await.recv() {
Ok(signal) => signal,
Err(_) => return Err(()),
};
match signal {
DownloadManagerSignal::Go => {
self.manage_go_signal();
self.manage_go_signal().await;
}
DownloadManagerSignal::Stop => {
self.manage_stop_signal();
self.manage_stop_signal().await;
}
DownloadManagerSignal::Completed(meta) => {
self.manage_completed_signal(meta);
self.manage_completed_signal(meta).await;
}
DownloadManagerSignal::Queue(download_agent) => {
self.manage_queue_signal(download_agent);
self.manage_queue_signal(download_agent).await;
}
DownloadManagerSignal::Error(e) => {
self.manage_error_signal(e);
self.manage_error_signal(e).await;
}
DownloadManagerSignal::UpdateUIQueue => {
self.push_ui_queue_update();
self.push_ui_queue_update().await;
}
DownloadManagerSignal::UpdateUIStats(kbs, time) => {
self.push_ui_stats_update(kbs, time);
}
DownloadManagerSignal::Finish => {
self.stop_and_wait_current_download();
self.stop_and_wait_current_download().await;
return Ok(());
}
DownloadManagerSignal::Cancel(meta) => {
self.manage_cancel_signal(&meta);
self.manage_cancel_signal(&meta).await;
}
}
};
}
}
fn manage_queue_signal(&mut self, download_agent: DownloadAgent) {
async fn manage_queue_signal(&mut self, download_agent: DownloadAgent) {
debug!("got signal Queue");
let meta = download_agent.metadata();
@ -190,7 +205,7 @@ impl DownloadManagerBuilder {
return;
}
download_agent.on_initialised(&self.app_handle);
download_agent.on_initialised(&self.app_handle).await;
self.download_queue.append(meta.clone());
self.download_agent_registry.insert(meta, download_agent);
@ -199,7 +214,7 @@ impl DownloadManagerBuilder {
.unwrap();
}
fn manage_go_signal(&mut self) {
async fn manage_go_signal(&mut self) {
debug!("got signal Go");
if self.download_agent_registry.is_empty() {
debug!(
@ -233,110 +248,117 @@ impl DownloadManagerBuilder {
.unwrap()
.clone();
self.active_control_flag = Some(download_agent.control_flag());
self.active_control_flag = Some(download_agent.control_flag().await);
self.current_download_agent = Some(download_agent.clone());
info!("fetched control flag");
let sender = self.sender.clone();
let mut download_thread_lock = self.current_download_thread.lock().unwrap();
info!("cloned sender");
let mut download_thread_lock = self.current_download_thread.lock().await;
info!("acquired download thread lock");
let app_handle = self.app_handle.clone();
*download_thread_lock = Some(spawn(move || {
loop {
let download_result = match download_agent.download(&app_handle) {
// Ok(true) is for completed and exited properly
Ok(v) => v,
Err(e) => {
error!("download {:?} has error {}", download_agent.metadata(), &e);
download_agent.on_error(&app_handle, &e);
sender.send(DownloadManagerSignal::Error(e)).unwrap();
return;
}
};
info!("starting download agent thread");
// If the download gets cancel
if !download_result {
download_agent.on_incomplete(&app_handle);
return;
*download_thread_lock = Some(self.runtime.spawn(async move {
info!("started download agent thread");
match download_agent.download(&app_handle).await {
// Ok(true) is for completed and exited properly
Ok(true) => {
debug!("download {:?} has completed", download_agent.metadata());
match download_agent.validate().await {
Ok(true) => {
download_agent.on_complete(&app_handle).await;
sender
.send(DownloadManagerSignal::Completed(download_agent.metadata()))
.unwrap();
}
Ok(false) => {
download_agent.on_incomplete(&app_handle).await;
}
Err(e) => {
error!(
"download {:?} has validation error {}",
download_agent.metadata(),
&e
);
download_agent.on_error(&app_handle, &e).await;
sender.send(DownloadManagerSignal::Error(e)).unwrap();
}
}
}
let validate_result = match download_agent.validate() {
Ok(v) => v,
Err(e) => {
error!(
"download {:?} has validation error {}",
download_agent.metadata(),
&e
);
download_agent.on_error(&app_handle, &e);
sender.send(DownloadManagerSignal::Error(e)).unwrap();
return;
}
};
if validate_result {
download_agent.on_complete(&app_handle);
sender
.send(DownloadManagerSignal::Completed(download_agent.metadata()))
.unwrap();
sender.send(DownloadManagerSignal::UpdateUIQueue).unwrap();
return;
// Ok(false) is for incomplete but exited properly
Ok(false) => {
debug!("Donwload agent finished incomplete");
download_agent.on_incomplete(&app_handle).await;
}
Err(e) => {
error!("download {:?} has error {}", download_agent.metadata(), &e);
download_agent.on_error(&app_handle, &e).await;
sender.send(DownloadManagerSignal::Error(e)).unwrap();
}
}
sender.send(DownloadManagerSignal::UpdateUIQueue).unwrap();
}));
self.set_status(DownloadManagerStatus::Downloading);
self.set_status(DownloadManagerStatus::Downloading).await;
let active_control_flag = self.active_control_flag.clone().unwrap();
active_control_flag.set(DownloadThreadControlFlag::Go);
// download_thread_lock.take().unwrap().await;
}
fn manage_stop_signal(&mut self) {
async fn manage_stop_signal(&mut self) {
debug!("got signal Stop");
if let Some(active_control_flag) = self.active_control_flag.clone() {
self.set_status(DownloadManagerStatus::Paused);
self.set_status(DownloadManagerStatus::Paused).await;
active_control_flag.set(DownloadThreadControlFlag::Stop);
}
}
fn manage_completed_signal(&mut self, meta: DownloadableMetadata) {
async fn manage_completed_signal(&mut self, meta: DownloadableMetadata) {
debug!("got signal Completed");
if let Some(interface) = &self.current_download_agent
&& interface.metadata() == meta
{
self.remove_and_cleanup_front_download(&meta);
self.remove_and_cleanup_front_download(&meta).await;
}
self.push_ui_queue_update();
self.push_ui_queue_update().await;
self.sender.send(DownloadManagerSignal::Go).unwrap();
}
fn manage_error_signal(&mut self, error: ApplicationDownloadError) {
async fn manage_error_signal(&mut self, error: ApplicationDownloadError) {
debug!("got signal Error");
if let Some(current_agent) = self.current_download_agent.clone() {
current_agent.on_error(&self.app_handle, &error);
current_agent.on_error(&self.app_handle, &error).await;
self.stop_and_wait_current_download();
self.remove_and_cleanup_front_download(&current_agent.metadata());
self.stop_and_wait_current_download().await;
self.remove_and_cleanup_front_download(&current_agent.metadata())
.await;
}
self.set_status(DownloadManagerStatus::Error);
self.set_status(DownloadManagerStatus::Error).await;
}
fn manage_cancel_signal(&mut self, meta: &DownloadableMetadata) {
async fn manage_cancel_signal(&mut self, meta: &DownloadableMetadata) {
debug!("got signal Cancel");
if let Some(current_download) = &self.current_download_agent {
if &current_download.metadata() == meta {
self.set_status(DownloadManagerStatus::Paused);
current_download.on_cancelled(&self.app_handle);
self.stop_and_wait_current_download();
self.set_status(DownloadManagerStatus::Paused).await;
current_download.on_cancelled(&self.app_handle).await;
self.stop_and_wait_current_download().await;
self.download_queue.pop_front();
self.cleanup_current_download();
self.cleanup_current_download().await;
debug!("current download queue: {:?}", self.download_queue.read());
}
// TODO: Collapse these two into a single if statement somehow
else if let Some(download_agent) = self.download_agent_registry.get(meta) {
let index = self.download_queue.get_by_meta(meta);
if let Some(index) = index {
download_agent.on_cancelled(&self.app_handle);
download_agent.on_cancelled(&self.app_handle).await;
let _ = self.download_queue.edit().remove(index).unwrap();
let removed = self.download_agent_registry.remove(meta);
debug!(
@ -349,7 +371,7 @@ impl DownloadManagerBuilder {
} else if let Some(download_agent) = self.download_agent_registry.get(meta) {
let index = self.download_queue.get_by_meta(meta);
if let Some(index) = index {
download_agent.on_cancelled(&self.app_handle);
download_agent.on_cancelled(&self.app_handle).await;
let _ = self.download_queue.edit().remove(index).unwrap();
let removed = self.download_agent_registry.remove(meta);
debug!(
@ -359,28 +381,26 @@ impl DownloadManagerBuilder {
);
}
}
self.push_ui_queue_update();
self.push_ui_queue_update().await;
}
fn push_ui_stats_update(&self, kbs: usize, time: usize) {
let event_data = StatsUpdateEvent { speed: kbs, time };
self.app_handle.emit("update_stats", event_data).unwrap();
}
fn push_ui_queue_update(&self) {
async fn push_ui_queue_update(&self) {
let queue = &self.download_queue.read();
let queue_objs = queue
.iter()
.map(|key| {
let val = self.download_agent_registry.get(key).unwrap();
QueueUpdateEventQueueData {
meta: DownloadableMetadata::clone(key),
status: val.status(),
progress: val.progress().get_progress(),
current: val.progress().sum(),
max: val.progress().get_max(),
}
})
.collect();
let queue_objs = join_all(queue.iter().map(async |key| {
let val = self.download_agent_registry.get(key).unwrap();
QueueUpdateEventQueueData {
meta: DownloadableMetadata::clone(key),
status: val.status().await,
progress: val.progress().await.get_progress(),
current: val.progress().await.sum(),
max: val.progress().await.get_max(),
}
}))
.await;
let event_data = QueueUpdateEvent { queue: queue_objs };
self.app_handle.emit("update_queue", event_data).unwrap();

View File

@ -1,16 +1,15 @@
use std::{
any::Any,
collections::VecDeque,
fmt::Debug,
sync::{
mpsc::{SendError, Sender},
Mutex, MutexGuard,
},
thread::JoinHandle,
};
use log::{debug, info};
use serde::Serialize;
use tauri::async_runtime::JoinHandle;
use crate::{
database::models::data::DownloadableMetadata,
@ -23,13 +22,13 @@ use super::{
};
pub enum DownloadManagerSignal {
/// Resumes (or starts) the `DownloadManager`
/// Resumes (or starts) the DownloadManager
Go,
/// Pauses the `DownloadManager`
/// Pauses the DownloadManager
Stop,
/// Called when a `DownloadAgent` has fully completed a download.
/// Called when a DownloadAgent has fully completed a download.
Completed(DownloadableMetadata),
/// Generates and appends a `DownloadAgent`
/// Generates and appends a DownloadAgent
/// to the registry and queue
Queue(DownloadAgent),
/// Tells the Manager to stop the current
@ -70,14 +69,14 @@ pub enum DownloadStatus {
Error,
}
/// Accessible front-end for the `DownloadManager`
/// Accessible front-end for the DownloadManager
///
/// The system works entirely through signals, both internally and externally,
/// all of which are accessible through the `DownloadManagerSignal` type, but
/// all of which are accessible through the DownloadManagerSignal type, but
/// should not be used directly. Rather, signals are abstracted through this
/// interface.
///
/// The actual download queue may be accessed through the .`edit()` function,
/// The actual download queue may be accessed through the .edit() function,
/// which provides raw access to the underlying queue.
/// THIS EDITING IS BLOCKING!!!
pub struct DownloadManager {
@ -118,8 +117,8 @@ impl DownloadManager {
pub fn read_queue(&self) -> VecDeque<DownloadableMetadata> {
self.download_queue.read()
}
pub fn get_current_download_progress(&self) -> Option<f64> {
let progress_object = (*self.progress.lock().unwrap()).clone()?;
pub async fn get_current_download_progress(&self) -> Option<f64> {
let progress_object = (*self.progress.lock().await).clone()?;
Some(progress_object.get_progress())
}
pub fn rearrange_string(&self, meta: &DownloadableMetadata, new_index: usize) {
@ -139,7 +138,7 @@ impl DownloadManager {
pub fn rearrange(&self, current_index: usize, new_index: usize) {
if current_index == new_index {
return;
}
};
let needs_pause = current_index == 0 || new_index == 0;
if needs_pause {
@ -171,19 +170,19 @@ impl DownloadManager {
pub fn resume_downloads(&self) {
self.command_sender.send(DownloadManagerSignal::Go).unwrap();
}
pub fn ensure_terminated(&self) -> Result<Result<(), ()>, Box<dyn Any + Send>> {
pub async fn ensure_terminated(&self) -> Result<Result<(), ()>, tauri::Error> {
self.command_sender
.send(DownloadManagerSignal::Finish)
.unwrap();
let terminator = self.terminator.lock().unwrap().take();
terminator.unwrap().join()
terminator.unwrap().await
}
pub fn get_sender(&self) -> Sender<DownloadManagerSignal> {
self.command_sender.clone()
}
}
/// Takes in the locked value from .`edit()` and attempts to
/// Takes in the locked value from .edit() and attempts to
/// get the index of whatever id is passed in
fn get_index_from_id(
queue: &mut MutexGuard<'_, VecDeque<DownloadableMetadata>>,

View File

@ -12,16 +12,17 @@ use super::{
util::{download_thread_control_flag::DownloadThreadControl, progress_object::ProgressObject},
};
#[async_trait::async_trait]
pub trait Downloadable: Send + Sync {
fn download(&self, app_handle: &AppHandle) -> Result<bool, ApplicationDownloadError>;
fn progress(&self) -> Arc<ProgressObject>;
fn control_flag(&self) -> DownloadThreadControl;
fn validate(&self) -> Result<bool, ApplicationDownloadError>;
fn status(&self) -> DownloadStatus;
async fn download(&self, app_handle: &AppHandle) -> Result<bool, ApplicationDownloadError>;
async fn progress(&self) -> Arc<ProgressObject>;
async fn control_flag(&self) -> DownloadThreadControl;
async fn validate(&self) -> Result<bool, ApplicationDownloadError>;
async fn status(&self) -> DownloadStatus;
fn metadata(&self) -> DownloadableMetadata;
fn on_initialised(&self, app_handle: &AppHandle);
fn on_error(&self, app_handle: &AppHandle, error: &ApplicationDownloadError);
fn on_complete(&self, app_handle: &AppHandle);
fn on_incomplete(&self, app_handle: &AppHandle);
fn on_cancelled(&self, app_handle: &AppHandle);
async fn on_initialised(&self, app_handle: &AppHandle);
async fn on_error(&self, app_handle: &AppHandle, error: &ApplicationDownloadError);
async fn on_complete(&self, app_handle: &AppHandle);
async fn on_incomplete(&self, app_handle: &AppHandle);
async fn on_cancelled(&self, app_handle: &AppHandle);
}

View File

@ -22,7 +22,10 @@ impl From<DownloadThreadControlFlag> for bool {
/// false => Stop
impl From<bool> for DownloadThreadControlFlag {
fn from(value: bool) -> Self {
if value { DownloadThreadControlFlag::Go } else { DownloadThreadControlFlag::Stop }
match value {
true => DownloadThreadControlFlag::Go,
false => DownloadThreadControlFlag::Stop,
}
}
}

View File

@ -11,7 +11,7 @@ pub struct RollingProgressWindow<const S: usize> {
impl<const S: usize> RollingProgressWindow<S> {
pub fn new() -> Self {
Self {
window: Arc::new([(); S].map(|()| AtomicUsize::new(0))),
window: Arc::new([(); S].map(|_| AtomicUsize::new(0))),
current: Arc::new(AtomicUsize::new(0)),
}
}

View File

@ -4,6 +4,7 @@ use serde_with::SerializeDisplay;
#[derive(SerializeDisplay)]
pub enum ProcessError {
SetupRequired,
NotInstalled,
AlreadyRunning,
NotDownloaded,
@ -18,6 +19,7 @@ pub enum ProcessError {
impl Display for ProcessError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
ProcessError::SetupRequired => "Game not set up",
ProcessError::NotInstalled => "Game not installed",
ProcessError::AlreadyRunning => "Game already running",
ProcessError::NotDownloaded => "Game not downloaded",

View File

@ -13,7 +13,6 @@ use super::drop_server_error::DropServerError;
#[derive(Debug, SerializeDisplay)]
pub enum RemoteAccessError {
FetchError(Arc<reqwest::Error>),
FetchErrorWS(Arc<reqwest_websocket::Error>),
ParsingError(ParseError),
InvalidEndpoint,
HandshakeFailed(String),
@ -22,7 +21,7 @@ pub enum RemoteAccessError {
UnparseableResponse(String),
ManifestDownloadFailed(StatusCode, String),
OutOfSync,
Cache(std::io::Error),
Cache(cacache::Error),
}
impl Display for RemoteAccessError {
@ -30,10 +29,7 @@ impl Display for RemoteAccessError {
match self {
RemoteAccessError::FetchError(error) => {
if error.is_connect() {
return write!(
f,
"Failed to connect to Drop server. Check if you access Drop through a browser, and then try again."
);
return write!(f, "Failed to connect to Drop server. Check if you access Drop through a browser, and then try again.");
}
write!(
@ -42,44 +38,24 @@ impl Display for RemoteAccessError {
error,
error
.source()
.map(std::string::ToString::to_string)
.map(|e| e.to_string())
.or_else(|| Some("Unknown error".to_string()))
.unwrap()
)
}
RemoteAccessError::FetchErrorWS(error) => write!(
f,
"{}: {}",
error,
error
.source()
.map(|e| e.to_string())
.or_else(|| Some("Unknown error".to_string()))
.unwrap()
),
},
RemoteAccessError::ParsingError(parse_error) => {
write!(f, "{parse_error}")
}
RemoteAccessError::InvalidEndpoint => write!(f, "invalid drop endpoint"),
RemoteAccessError::HandshakeFailed(message) => {
write!(f, "failed to complete handshake: {message}")
}
RemoteAccessError::HandshakeFailed(message) => write!(f, "failed to complete handshake: {message}"),
RemoteAccessError::GameNotFound(id) => write!(f, "could not find game on server: {id}"),
RemoteAccessError::InvalidResponse(error) => write!(
RemoteAccessError::InvalidResponse(error) => write!(f, "server returned an invalid response: {}, {}", error.status_code, error.status_message),
RemoteAccessError::UnparseableResponse(error) => write!(f, "server returned an invalid response: {error}"),
RemoteAccessError::ManifestDownloadFailed(status, response) => write!(
f,
"server returned an invalid response: {}, {}",
error.status_code, error.status_message
),
RemoteAccessError::UnparseableResponse(error) => {
write!(f, "server returned an invalid response: {error}")
}
RemoteAccessError::ManifestDownloadFailed(status, response) => {
write!(f, "failed to download game manifest: {status} {response}")
}
RemoteAccessError::OutOfSync => write!(
f,
"server's and client's time are out of sync. Please ensure they are within at least 30 seconds of each other"
"failed to download game manifest: {status} {response}"
),
RemoteAccessError::OutOfSync => write!(f, "server's and client's time are out of sync. Please ensure they are within at least 30 seconds of each other"),
RemoteAccessError::Cache(error) => write!(f, "Cache Error: {error}"),
}
}
@ -90,11 +66,6 @@ impl From<reqwest::Error> for RemoteAccessError {
RemoteAccessError::FetchError(Arc::new(err))
}
}
impl From<reqwest_websocket::Error> for RemoteAccessError {
fn from(err: reqwest_websocket::Error) -> Self {
RemoteAccessError::FetchErrorWS(Arc::new(err))
}
}
impl From<ParseError> for RemoteAccessError {
fn from(err: ParseError) -> Self {
RemoteAccessError::ParsingError(err)

View File

@ -1,110 +1,116 @@
use reqwest::blocking::Client;
use reqwest::Client;
use serde_json::json;
use url::Url;
use crate::{
DB,
database::db::DatabaseImpls,
error::remote_access_error::RemoteAccessError,
remote::{auth::generate_authorization_header, requests::make_request},
DB,
};
use super::collection::{Collection, Collections};
#[tauri::command]
pub fn fetch_collections() -> Result<Collections, RemoteAccessError> {
pub async fn fetch_collections() -> Result<Collections, RemoteAccessError> {
let client = Client::new();
let response = make_request(&client, &["/api/v1/client/collection"], &[], |r| {
r.header("Authorization", generate_authorization_header())
})?
.send()?;
let response = make_request(&client, &["/api/v1/client/collection"], &[], async |r| {
r.header("Authorization", generate_authorization_header().await)
})
.await?
.send()
.await?;
Ok(response.json()?)
Ok(response.json().await?)
}
#[tauri::command]
pub fn fetch_collection(collection_id: String) -> Result<Collection, RemoteAccessError> {
pub async fn fetch_collection(collection_id: String) -> Result<Collection, RemoteAccessError> {
let client = Client::new();
let response = make_request(
&client,
&["/api/v1/client/collection/", &collection_id],
&[],
|r| r.header("Authorization", generate_authorization_header()),
)?
.send()?;
async |r| r.header("Authorization", generate_authorization_header().await),
)
.await?
.send()
.await?;
Ok(response.json()?)
Ok(response.json().await?)
}
#[tauri::command]
pub fn create_collection(name: String) -> Result<Collection, RemoteAccessError> {
pub async fn create_collection(name: String) -> Result<Collection, RemoteAccessError> {
let client = Client::new();
let base_url = DB.fetch_base_url();
let base_url = DB.fetch_base_url().await;
let base_url = Url::parse(&format!("{base_url}api/v1/client/collection/"))?;
let response = client
.post(base_url)
.header("Authorization", generate_authorization_header())
.header("Authorization", generate_authorization_header().await)
.json(&json!({"name": name}))
.send()?;
.send()
.await?;
Ok(response.json()?)
Ok(response.json().await?)
}
#[tauri::command]
pub fn add_game_to_collection(
pub async fn add_game_to_collection(
collection_id: String,
game_id: String,
) -> Result<(), RemoteAccessError> {
let client = Client::new();
let url = Url::parse(&format!(
"{}api/v1/client/collection/{}/entry/",
DB.fetch_base_url(),
DB.fetch_base_url().await,
collection_id
))?;
client
.post(url)
.header("Authorization", generate_authorization_header())
.header("Authorization", generate_authorization_header().await)
.json(&json!({"id": game_id}))
.send()?;
.send()
.await?;
Ok(())
}
#[tauri::command]
pub fn delete_collection(collection_id: String) -> Result<bool, RemoteAccessError> {
pub async fn delete_collection(collection_id: String) -> Result<(), RemoteAccessError> {
let client = Client::new();
let base_url = Url::parse(&format!(
"{}api/v1/client/collection/{}",
DB.fetch_base_url(),
DB.fetch_base_url().await,
collection_id
))?;
let response = client
client
.delete(base_url)
.header("Authorization", generate_authorization_header())
.send()?;
.header("Authorization", generate_authorization_header().await)
.send().await?;
Ok(response.json()?)
Ok(())
}
#[tauri::command]
pub fn delete_game_in_collection(
pub async fn delete_game_in_collection(
collection_id: String,
game_id: String,
) -> Result<(), RemoteAccessError> {
let client = Client::new();
let base_url = Url::parse(&format!(
"{}api/v1/client/collection/{}/entry",
DB.fetch_base_url(),
DB.fetch_base_url().await,
collection_id
))?;
client
.delete(base_url)
.header("Authorization", generate_authorization_header())
.header("Authorization", generate_authorization_header().await)
.json(&json!({"id": game_id}))
.send()?;
.send().await?;
Ok(())
}

View File

@ -1,28 +1,24 @@
use std::sync::Mutex;
use serde::Deserialize;
use tauri::AppHandle;
use crate::{
database::models::data::GameVersion,
error::{library_error::LibraryError, remote_access_error::RemoteAccessError},
games::library::{
database::{db::borrow_db_mut_checked, models::data::GameVersion}, error::{library_error::LibraryError, remote_access_error::RemoteAccessError}, games::library::{
fetch_game_logic_offline, fetch_library_logic_offline, get_current_meta,
uninstall_game_logic,
},
offline, AppState,
}, offline, DropFunctionState
};
use super::{
library::{
fetch_game_logic, fetch_game_verion_options_logic, fetch_library_logic, FetchGameStruct,
Game,
FetchGameStruct, Game, fetch_game_logic, fetch_game_verion_options_logic,
fetch_library_logic,
},
state::{GameStatusManager, GameStatusWithTransient},
};
#[tauri::command]
pub fn fetch_library(
state: tauri::State<'_, Mutex<AppState>>,
pub async fn fetch_library(
state: tauri::State<'_, DropFunctionState<'_>>,
) -> Result<Vec<Game>, RemoteAccessError> {
offline!(
state,
@ -30,12 +26,13 @@ pub fn fetch_library(
fetch_library_logic_offline,
state
)
.await
}
#[tauri::command]
pub fn fetch_game(
pub async fn fetch_game(
game_id: String,
state: tauri::State<'_, Mutex<AppState>>,
state: tauri::State<'_, DropFunctionState<'_>>,
) -> Result<FetchGameStruct, RemoteAccessError> {
offline!(
state,
@ -44,28 +41,74 @@ pub fn fetch_game(
game_id,
state
)
.await
}
#[tauri::command]
pub fn fetch_game_status(id: String) -> GameStatusWithTransient {
GameStatusManager::fetch_state(&id)
pub async fn fetch_game_status(id: String) -> GameStatusWithTransient {
GameStatusManager::fetch_state(&id).await
}
#[tauri::command]
pub fn uninstall_game(game_id: String, app_handle: AppHandle) -> Result<(), LibraryError> {
let meta = match get_current_meta(&game_id) {
pub async fn uninstall_game(game_id: String, app_handle: AppHandle) -> Result<(), LibraryError> {
let meta = match get_current_meta(&game_id).await {
Some(data) => data,
None => return Err(LibraryError::MetaNotFound(game_id)),
};
uninstall_game_logic(meta, &app_handle);
uninstall_game_logic(meta, &app_handle).await;
Ok(())
}
#[tauri::command]
pub fn fetch_game_verion_options(
pub async fn fetch_game_verion_options(
game_id: String,
state: tauri::State<'_, Mutex<AppState>>,
state: tauri::State<'_, DropFunctionState<'_>>,
) -> Result<Vec<GameVersion>, RemoteAccessError> {
fetch_game_verion_options_logic(game_id, state)
fetch_game_verion_options_logic(game_id, state).await
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct FrontendGameOptions {
launch_string: String,
}
#[tauri::command]
pub async fn update_game_configuration(
game_id: String,
options: FrontendGameOptions,
) -> Result<(), LibraryError> {
let mut handle = borrow_db_mut_checked().await;
let installed_version = handle
.applications
.installed_game_version
.get(&game_id)
.ok_or(LibraryError::MetaNotFound(game_id))?;
let id = installed_version.id.clone();
let version = installed_version.version.clone().unwrap();
let mut existing_configuration = handle
.applications
.game_versions
.get(&id)
.unwrap()
.get(&version)
.unwrap()
.clone();
// Add more options in here
existing_configuration.launch_command_template = options.launch_string;
// Add no more options past here
handle
.applications
.game_versions
.get_mut(&id)
.unwrap()
.insert(version.to_string(), existing_configuration);
Ok(())
}

View File

@ -1,6 +1,6 @@
use std::{
path::PathBuf,
sync::{Arc, Mutex},
sync::Arc,
};
use crate::{
@ -8,39 +8,38 @@ use crate::{
download_manager::{
download_manager_frontend::DownloadManagerSignal, downloadable::Downloadable,
},
error::download_manager_error::DownloadManagerError,
AppState,
error::download_manager_error::DownloadManagerError, DropFunctionState,
};
use super::download_agent::GameDownloadAgent;
#[tauri::command]
pub fn download_game(
pub async fn download_game(
game_id: String,
game_version: String,
install_dir: usize,
state: tauri::State<'_, Mutex<AppState>>,
state: tauri::State<'_, DropFunctionState<'_>>,
) -> Result<(), DownloadManagerError<DownloadManagerSignal>> {
let sender = state.lock().unwrap().download_manager.get_sender();
let sender = state.lock().await.download_manager.get_sender();
let game_download_agent = Arc::new(Box::new(GameDownloadAgent::new_from_index(
game_id,
game_version,
install_dir,
sender,
)) as Box<dyn Downloadable + Send + Sync>);
).await) as Box<dyn Downloadable + Send + Sync>);
Ok(state
.lock()
.unwrap()
.await
.download_manager
.queue_download(game_download_agent)?)
}
#[tauri::command]
pub fn resume_download(
pub async fn resume_download(
game_id: String,
state: tauri::State<'_, Mutex<AppState>>,
state: tauri::State<'_, DropFunctionState<'_>>,
) -> Result<(), DownloadManagerError<DownloadManagerSignal>> {
let s = borrow_db_checked()
let s = borrow_db_checked().await
.applications
.game_statuses
.get(&game_id)
@ -56,7 +55,7 @@ pub fn resume_download(
install_dir,
} => (version_name, install_dir),
};
let sender = state.lock().unwrap().download_manager.get_sender();
let sender = state.lock().await.download_manager.get_sender();
let parent_dir: PathBuf = install_dir.into();
let game_download_agent = Arc::new(Box::new(GameDownloadAgent::new(
game_id,
@ -66,7 +65,7 @@ pub fn resume_download(
)) as Box<dyn Downloadable + Send + Sync>);
Ok(state
.lock()
.unwrap()
.await
.download_manager
.queue_download(game_download_agent)?)
}

View File

@ -1,5 +1,6 @@
use crate::DB;
use crate::auth::generate_authorization_header;
use crate::database::db::{borrow_db_checked, borrow_db_mut_checked};
use crate::database::db::{DatabaseImpls, borrow_db_checked, borrow_db_mut_checked};
use crate::database::models::data::{
ApplicationTransientStatus, DownloadType, DownloadableMetadata,
};
@ -15,8 +16,7 @@ use crate::games::downloads::manifest::{DropDownloadContext, DropManifest};
use crate::games::downloads::validate::game_validate_logic;
use crate::games::library::{on_game_complete, on_game_incomplete, push_game_update};
use crate::remote::requests::make_request;
use log::{debug, error, info};
use rayon::ThreadPoolBuilder;
use log::{debug, error, info, warn};
use std::collections::HashMap;
use std::fs::{OpenOptions, create_dir_all};
use std::path::{Path, PathBuf};
@ -24,6 +24,7 @@ use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use tauri::{AppHandle, Emitter};
use tokio::sync::mpsc;
#[cfg(target_os = "linux")]
use rustix::fs::{FallocateFlags, fallocate};
@ -31,6 +32,12 @@ use rustix::fs::{FallocateFlags, fallocate};
use super::download_logic::download_game_chunk;
use super::drop_data::DropData;
// This is cursed but necessary
// See the message where it is used
unsafe fn extend_lifetime<'b, R>(r: &'b R) -> &'static R {
unsafe { std::mem::transmute::<&'b R, &'static R>(r) }
}
pub struct GameDownloadAgent {
pub id: String,
pub version: String,
@ -45,13 +52,13 @@ pub struct GameDownloadAgent {
}
impl GameDownloadAgent {
pub fn new_from_index(
pub async fn new_from_index(
id: String,
version: String,
target_download_dir: usize,
sender: Sender<DownloadManagerSignal>,
) -> Self {
let db_lock = borrow_db_checked();
let db_lock = borrow_db_checked().await;
let base_dir = db_lock.applications.install_dirs[target_download_dir].clone();
drop(db_lock);
@ -87,8 +94,8 @@ impl GameDownloadAgent {
}
// Blocking
pub fn setup_download(&self) -> Result<(), ApplicationDownloadError> {
self.ensure_manifest_exists()?;
pub async fn setup_download(&self) -> Result<(), ApplicationDownloadError> {
self.ensure_manifest_exists().await?;
self.ensure_contexts()?;
@ -98,8 +105,9 @@ impl GameDownloadAgent {
}
// Blocking
pub fn download(&self, app_handle: &AppHandle) -> Result<bool, ApplicationDownloadError> {
self.setup_download()?;
pub async fn download(&self, app_handle: &AppHandle) -> Result<bool, ApplicationDownloadError> {
debug!("starting download");
self.setup_download().await?;
self.set_progress_object_params();
let timer = Instant::now();
push_game_update(
@ -115,7 +123,8 @@ impl GameDownloadAgent {
);
let res = self
.run()
.map_err(|()| ApplicationDownloadError::DownloadError);
.await
.map_err(|_| ApplicationDownloadError::DownloadError);
debug!(
"{} took {}ms to download",
@ -125,37 +134,39 @@ impl GameDownloadAgent {
res
}
pub fn ensure_manifest_exists(&self) -> Result<(), ApplicationDownloadError> {
pub async fn ensure_manifest_exists(&self) -> Result<(), ApplicationDownloadError> {
if self.manifest.lock().unwrap().is_some() {
return Ok(());
}
self.download_manifest()
self.download_manifest().await
}
fn download_manifest(&self) -> Result<(), ApplicationDownloadError> {
let header = generate_authorization_header();
let client = reqwest::blocking::Client::new();
async fn download_manifest(&self) -> Result<(), ApplicationDownloadError> {
let header = generate_authorization_header().await;
let client = reqwest::Client::new();
let response = make_request(
&client,
&["/api/v1/client/game/manifest"],
&[("id", &self.id), ("version", &self.version)],
|f| f.header("Authorization", header),
async |f| f.header("Authorization", header),
)
.await
.map_err(ApplicationDownloadError::Communication)?
.send()
.await
.map_err(|e| ApplicationDownloadError::Communication(e.into()))?;
if response.status() != 200 {
return Err(ApplicationDownloadError::Communication(
RemoteAccessError::ManifestDownloadFailed(
response.status(),
response.text().unwrap(),
response.text().await.unwrap(),
),
));
}
let manifest_download: DropManifest = response.json().unwrap();
let manifest_download: DropManifest = response.json().await.unwrap();
if let Ok(mut manifest) = self.manifest.lock() {
*manifest = Some(manifest_download);
@ -187,7 +198,10 @@ impl GameDownloadAgent {
self.generate_contexts()?;
}
*self.context_map.lock().unwrap() = self.stored_manifest.get_contexts();
self.context_map
.lock()
.unwrap()
.extend(self.stored_manifest.get_contexts());
Ok(())
}
@ -248,31 +262,42 @@ impl GameDownloadAgent {
Ok(())
}
// TODO: Change return value on Err
pub fn run(&self) -> Result<bool, ()> {
let max_download_threads = borrow_db_checked().settings.max_download_threads;
pub async fn run(&self) -> Result<bool, ()> {
let max_download_threads = borrow_db_checked().await.settings.max_download_threads;
debug!(
"downloading game: {} with {} threads",
self.id, max_download_threads
);
let pool = ThreadPoolBuilder::new()
.num_threads(max_download_threads)
.build()
let base_url = DB
.fetch_base_url()
.await
.join("/api/v1/client/chunk")
.unwrap();
let client = reqwest::Client::new();
let client_ref = unsafe { extend_lifetime(&client) };
let completed_contexts = Arc::new(boxcar::Vec::new());
let completed_indexes_loop_arc = completed_contexts.clone();
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(max_download_threads)
.thread_name("drop-download-thread")
.enable_io()
.enable_time()
.build()
.map_err(|e| {
warn!("failed to create download scheduler: {e}");
()
})?;
let (tx, mut rx) = mpsc::channel(32);
// Scope this for safety
{
let contexts = self.contexts.lock().unwrap();
debug!("{contexts:#?}");
let contexts = self.contexts.lock().unwrap();
debug!("{contexts:#?}");
pool.scope(|scope| {
let client = &reqwest::blocking::Client::new();
let context_map = self.context_map.lock().unwrap();
for (index, context) in contexts.iter().enumerate() {
let client = client.clone();
let completed_indexes = completed_indexes_loop_arc.clone();
let progress = self.progress.get(index);
let progress_handle = ProgressHandle::new(progress, self.progress.clone());
@ -284,33 +309,40 @@ impl GameDownloadAgent {
let sender = self.sender.clone();
let request = match make_request(
&client,
&["/api/v1/client/chunk"],
&[
("id", &context.game_id),
("version", &context.version),
("name", &context.file_name),
("chunk", &context.index.to_string()),
],
|r| r,
) {
Ok(request) => request,
Err(e) => {
sender
.send(DownloadManagerSignal::Error(
ApplicationDownloadError::Communication(e),
))
.unwrap();
continue;
let local_tx = tx.clone();
/*
This lifetime extensions are necessary, because this loop acts like a scope
but Rust doesn't know that.
*/
let context = unsafe { extend_lifetime(context) };
let self_static = unsafe { extend_lifetime(self) };
let mut base_url = base_url.clone();
rt.spawn(async move {
{
let mut query = base_url.query_pairs_mut();
let query_params = [
("id", &context.game_id),
("version", &context.version),
("name", &context.file_name),
("chunk", &context.index.to_string()),
];
for (param, val) in query_params {
query.append_pair(param.as_ref(), val.as_ref());
}
}
};
scope.spawn(move |_| {
match download_game_chunk(context, &self.control_flag, progress_handle, request)
let request = client_ref.get(base_url);
match download_game_chunk(
context,
&self_static.control_flag,
progress_handle,
request,
)
.await
{
Ok(true) => {
completed_indexes.push(context.checksum.clone());
local_tx.send(context.checksum.clone()).await.unwrap();
}
Ok(false) => {}
Err(e) => {
@ -320,28 +352,31 @@ impl GameDownloadAgent {
}
});
}
});
}
let newly_completed = completed_contexts.clone();
let mut newly_completed = Vec::new();
while let Some(completed_checksum) = rx.recv().await {
newly_completed.push(completed_checksum);
}
let completed_lock_len = {
let mut context_map_lock = self.context_map.lock().unwrap();
for (_, item) in newly_completed.iter() {
context_map_lock.insert(item.clone(), true);
}
// 'return' from the download
let mut context_map_lock = self.context_map.lock().unwrap();
for item in newly_completed.iter() {
context_map_lock.insert(item.clone(), true);
}
let completed_lock_len = context_map_lock.values().filter(|x| **x).count();
context_map_lock.values().filter(|x| **x).count()
};
let context_map_lock = self.context_map.lock().unwrap();
let contexts = self.contexts.lock().unwrap();
let contexts = contexts
.iter()
.map(|x| {
(
x.checksum.clone(),
context_map_lock.get(&x.checksum).copied().unwrap_or(false),
context_map_lock.get(&x.checksum).cloned().unwrap_or(false),
)
})
.collect::<Vec<(String, bool)>>();
drop(context_map_lock);
self.stored_manifest.set_contexts(&contexts);
@ -362,20 +397,8 @@ impl GameDownloadAgent {
}
}
#[async_trait::async_trait]
impl Downloadable for GameDownloadAgent {
fn download(&self, app_handle: &AppHandle) -> Result<bool, ApplicationDownloadError> {
*self.status.lock().unwrap() = DownloadStatus::Downloading;
self.download(app_handle)
}
fn progress(&self) -> Arc<ProgressObject> {
self.progress.clone()
}
fn control_flag(&self) -> DownloadThreadControl {
self.control_flag.clone()
}
fn metadata(&self) -> DownloadableMetadata {
DownloadableMetadata {
id: self.id.clone(),
@ -384,11 +407,25 @@ impl Downloadable for GameDownloadAgent {
}
}
fn on_initialised(&self, _app_handle: &tauri::AppHandle) {
async fn download(&self, app_handle: &AppHandle) -> Result<bool, ApplicationDownloadError> {
debug!("starting download from downloadable trait");
*self.status.lock().unwrap() = DownloadStatus::Downloading;
self.download(app_handle).await
}
async fn progress(&self) -> Arc<ProgressObject> {
self.progress.clone()
}
async fn control_flag(&self) -> DownloadThreadControl {
self.control_flag.clone()
}
async fn on_initialised(&self, _app_handle: &tauri::AppHandle) {
*self.status.lock().unwrap() = DownloadStatus::Queued;
}
fn on_error(&self, app_handle: &tauri::AppHandle, error: &ApplicationDownloadError) {
async fn on_error(&self, app_handle: &tauri::AppHandle, error: &ApplicationDownloadError) {
*self.status.lock().unwrap() = DownloadStatus::Error;
app_handle
.emit("download_error", error.to_string())
@ -396,47 +433,49 @@ impl Downloadable for GameDownloadAgent {
error!("error while managing download: {error}");
let mut handle = borrow_db_mut_checked();
let mut handle = borrow_db_mut_checked().await;
handle
.applications
.transient_statuses
.remove(&self.metadata());
}
fn on_complete(&self, app_handle: &tauri::AppHandle) {
async fn on_complete(&self, app_handle: &tauri::AppHandle) {
on_game_complete(
&self.metadata(),
self.stored_manifest.base_path.to_string_lossy().to_string(),
app_handle,
)
.await
.unwrap();
}
// TODO: fix this function. It doesn't restart the download properly, nor does it reset the state properly
fn on_incomplete(&self, app_handle: &tauri::AppHandle) {
async fn on_incomplete(&self, app_handle: &tauri::AppHandle) {
on_game_incomplete(
&self.metadata(),
self.stored_manifest.base_path.to_string_lossy().to_string(),
app_handle,
)
.await
.unwrap();
println!("Attempting to redownload");
}
fn on_cancelled(&self, _app_handle: &tauri::AppHandle) {}
async fn on_cancelled(&self, _app_handle: &tauri::AppHandle) {}
fn status(&self) -> DownloadStatus {
async fn status(&self) -> DownloadStatus {
self.status.lock().unwrap().clone()
}
fn validate(&self) -> Result<bool, ApplicationDownloadError> {
async fn validate(&self) -> Result<bool, ApplicationDownloadError> {
*self.status.lock().unwrap() = DownloadStatus::Validating;
let contexts = self.contexts.lock().unwrap().clone();
game_validate_logic(
&self.stored_manifest,
self.contexts.lock().unwrap().clone(),
contexts,
self.progress.clone(),
self.sender.clone(),
&self.control_flag,
)
.await
}
}

View File

@ -6,70 +6,71 @@ use crate::error::application_download_error::ApplicationDownloadError;
use crate::error::drop_server_error::DropServerError;
use crate::error::remote_access_error::RemoteAccessError;
use crate::games::downloads::manifest::DropDownloadContext;
use crate::remote::auth::generate_authorization_header;
use log::{debug, warn};
use crate::remote::auth::{generate_authorization_header, generate_authorization_header_part};
use futures::TryStreamExt;
use log::{debug, info, warn};
use md5::{Context, Digest};
use reqwest::blocking::{RequestBuilder, Response};
use reqwest::RequestBuilder;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt};
use tokio_util::io::StreamReader;
use std::fs::{set_permissions, Permissions};
use std::io::Read;
use std::fs::{Permissions, set_permissions};
use std::io::Write;
#[cfg(unix)]
use std::os::unix::fs::PermissionsExt;
use std::{
fs::{File, OpenOptions},
io::{self, BufWriter, Seek, SeekFrom, Write},
io::{self, SeekFrom},
path::PathBuf,
};
pub struct DropWriter<W: Write> {
pub struct DropWriter<W: AsyncWrite> {
hasher: Context,
destination: W,
}
impl DropWriter<File> {
fn new(path: PathBuf) -> Self {
let destination = OpenOptions::new().write(true).create(true).truncate(false).open(&path).unwrap();
async fn new(path: PathBuf) -> Self {
Self {
destination,
destination: OpenOptions::new().write(true).open(path).await.unwrap(),
hasher: Context::new(),
}
}
fn finish(mut self) -> io::Result<Digest> {
self.flush().unwrap();
async fn finish(mut self) -> io::Result<Digest> {
self.flush().await.unwrap();
Ok(self.hasher.compute())
}
}
// Write automatically pushes to file and hasher
impl Write for DropWriter<File> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
async fn write(&mut self, mut buf: &[u8]) -> io::Result<()> {
self.hasher
.write_all(buf)
.map_err(|e| io::Error::other(format!("Unable to write to hasher: {e}")))?;
self.destination.write(buf)
self.destination.write_all_buf(&mut buf).await
}
fn flush(&mut self) -> io::Result<()> {
async fn flush(&mut self) -> io::Result<()> {
self.hasher.flush()?;
self.destination.flush()
self.destination.flush().await
}
}
// Seek moves around destination output
impl Seek for DropWriter<File> {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.destination.seek(pos)
async fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.destination.seek(pos).await
}
}
pub struct DropDownloadPipeline<'a, R: Read, W: Write> {
pub struct DropDownloadPipeline<'a, R: AsyncRead, W: AsyncWrite> {
pub source: R,
pub destination: DropWriter<W>,
pub control_flag: &'a DownloadThreadControl,
pub progress: ProgressHandle,
pub size: usize,
}
impl<'a> DropDownloadPipeline<'a, Response, File> {
impl<'a, R> DropDownloadPipeline<'a, R, File>
where
R: AsyncRead + Unpin,
{
fn new(
source: Response,
source: R,
destination: DropWriter<File>,
control_flag: &'a DownloadThreadControl,
progress: ProgressHandle,
@ -84,19 +85,19 @@ impl<'a> DropDownloadPipeline<'a, Response, File> {
}
}
fn copy(&mut self) -> Result<bool, io::Error> {
async fn copy(&mut self) -> Result<bool, io::Error> {
let copy_buf_size = 512;
let mut copy_buf = vec![0; copy_buf_size];
let mut buf_writer = BufWriter::with_capacity(1024 * 1024, &mut self.destination);
let mut current_size = 0;
loop {
if self.control_flag.get() == DownloadThreadControlFlag::Stop {
buf_writer.flush()?;
self.destination.flush().await?;
return Ok(false);
}
let mut bytes_read = self.source.read(&mut copy_buf)?;
let mut bytes_read = self.source.read(&mut copy_buf).await?;
info!("read {}", bytes_read);
current_size += bytes_read;
if current_size > self.size {
@ -106,7 +107,7 @@ impl<'a> DropDownloadPipeline<'a, Response, File> {
current_size = self.size;
}
buf_writer.write_all(&copy_buf[0..bytes_read])?;
self.destination.write(&copy_buf[0..bytes_read]).await?;
self.progress.add(bytes_read);
if current_size >= self.size {
@ -117,18 +118,18 @@ impl<'a> DropDownloadPipeline<'a, Response, File> {
break;
}
}
buf_writer.flush()?;
self.destination.flush().await?;
Ok(true)
}
fn finish(self) -> Result<Digest, io::Error> {
let checksum = self.destination.finish()?;
async fn finish(self) -> Result<Digest, io::Error> {
let checksum = self.destination.finish().await?;
Ok(checksum)
}
}
pub fn download_game_chunk(
pub async fn download_game_chunk(
ctx: &DropDownloadContext,
control_flag: &DownloadThreadControl,
progress: ProgressHandle,
@ -139,29 +140,34 @@ pub fn download_game_chunk(
progress.set(0);
return Ok(false);
}
let header_generator = generate_authorization_header_part().await;
let response = request
.header("Authorization", generate_authorization_header())
.header("Authorization", header_generator())
.send()
.await
.map_err(|e| ApplicationDownloadError::Communication(e.into()))?;
if response.status() != 200 {
debug!("chunk request got status code: {}", response.status());
let raw_res = response.text().unwrap();
let raw_res = response.text().await.unwrap();
if let Ok(err) = serde_json::from_str::<DropServerError>(&raw_res) {
return Err(ApplicationDownloadError::Communication(
RemoteAccessError::InvalidResponse(err),
));
}
};
return Err(ApplicationDownloadError::Communication(
RemoteAccessError::UnparseableResponse(raw_res),
));
}
let mut destination = DropWriter::new(ctx.path.clone());
let mut destination = DropWriter::new(ctx.path.clone()).await;
if ctx.offset != 0 {
destination
.seek(SeekFrom::Start(ctx.offset))
.await
.expect("Failed to seek to file offset");
}
@ -169,7 +175,7 @@ pub fn download_game_chunk(
if content_length.is_none() {
warn!("recieved 0 length content from server");
return Err(ApplicationDownloadError::Communication(
RemoteAccessError::InvalidResponse(response.json().unwrap()),
RemoteAccessError::InvalidResponse(response.json().await.unwrap()),
));
}
@ -179,15 +185,21 @@ pub fn download_game_chunk(
return Err(ApplicationDownloadError::DownloadError);
}
let response_stream = StreamReader::new(
response
.bytes_stream()
.map_err(|e| std::io::Error::other(e)),
);
let mut pipeline =
DropDownloadPipeline::new(response, destination, control_flag, progress, length);
DropDownloadPipeline::new(response_stream, destination, control_flag, progress, length);
let completed = pipeline
.copy()
.await
.map_err(|e| ApplicationDownloadError::IoError(e.kind()))?;
if !completed {
return Ok(false);
}
};
// If we complete the file, set the permissions (if on Linux)
#[cfg(unix)]
@ -198,6 +210,7 @@ pub fn download_game_chunk(
let checksum = pipeline
.finish()
.await
.map_err(|e| ApplicationDownloadError::IoError(e.kind()))?;
let res = hex::encode(checksum.0);

View File

@ -1,5 +1,7 @@
use std::{
collections::HashMap, fs::File, io::{Read, Write}, path::PathBuf
fs::File,
io::{Read, Write},
path::PathBuf,
};
use log::{debug, error, info, warn};
@ -10,7 +12,7 @@ pub type DropData = v1::DropData;
static DROP_DATA_PATH: &str = ".dropdata";
pub mod v1 {
use std::{collections::HashMap, path::PathBuf, sync::Mutex};
use std::{path::PathBuf, sync::Mutex};
use native_model::native_model;
use serde::{Deserialize, Serialize};
@ -20,7 +22,7 @@ pub mod v1 {
pub struct DropData {
pub game_id: String,
pub game_version: String,
pub contexts: Mutex<HashMap<String, bool>>,
pub contexts: Mutex<Vec<(String, bool)>>,
pub base_path: PathBuf,
}
@ -30,7 +32,7 @@ pub mod v1 {
base_path,
game_id,
game_version,
contexts: Mutex::new(HashMap::new()),
contexts: Mutex::new(Vec::new()),
}
}
}
@ -38,9 +40,12 @@ pub mod v1 {
impl DropData {
pub fn generate(game_id: String, game_version: String, base_path: PathBuf) -> Self {
let mut file = if let Ok(file) = File::open(base_path.join(DROP_DATA_PATH)) { file } else {
debug!("Generating new dropdata for game {game_id}");
return DropData::new(game_id, game_version, base_path);
let mut file = match File::open(base_path.join(DROP_DATA_PATH)) {
Ok(file) => file,
Err(_) => {
debug!("Generating new dropdata for game {game_id}");
return DropData::new(game_id, game_version, base_path);
}
};
let mut s = Vec::new();
@ -50,7 +55,7 @@ impl DropData {
error!("{e}");
return DropData::new(game_id, game_version, base_path);
}
}
};
match native_model::rmp_serde_1_3::RmpSerde::decode(s) {
Ok(manifest) => manifest,
@ -75,28 +80,25 @@ impl DropData {
};
match file.write_all(&manifest_raw) {
Ok(()) => {}
Ok(_) => {}
Err(e) => error!("{e}"),
}
};
}
pub fn set_contexts(&self, completed_contexts: &[(String, bool)]) {
*self.contexts.lock().unwrap() = completed_contexts.iter().map(|s| (s.0.clone(), s.1)).collect();
}
pub fn set_context(&self, context: String, state: bool) {
self.contexts.lock().unwrap().entry(context).insert_entry(state);
*self.contexts.lock().unwrap() = completed_contexts.to_owned();
}
pub fn get_completed_contexts(&self) -> Vec<String> {
self.contexts
.lock()
.unwrap()
.iter()
.filter_map(|x| if *x.1 { Some(x.0.clone()) } else { None })
.filter_map(|x| if x.1 { Some(x.0.clone()) } else { None })
.collect()
}
pub fn get_contexts(&self) -> HashMap<String, bool> {
pub fn get_contexts(&self) -> Vec<(String, bool)> {
info!(
"Any contexts which are complete? {}",
self.contexts.lock().unwrap().iter().any(|x| *x.1)
self.contexts.lock().unwrap().iter().any(|x| x.1)
);
self.contexts.lock().unwrap().clone()
}

View File

@ -6,7 +6,6 @@ use std::{
use log::{debug, error, info};
use md5::Context;
use rayon::ThreadPoolBuilder;
use crate::{
database::db::borrow_db_checked,
@ -21,7 +20,7 @@ use crate::{
games::downloads::{drop_data::DropData, manifest::DropDownloadContext},
};
pub fn game_validate_logic(
pub async fn game_validate_logic(
dropdata: &DropData,
contexts: Vec<DropDownloadContext>,
progress: Arc<ProgressObject>,
@ -29,50 +28,47 @@ pub fn game_validate_logic(
control_flag: &DownloadThreadControl,
) -> Result<bool, ApplicationDownloadError> {
progress.reset(contexts.len());
let max_download_threads = borrow_db_checked().settings.max_download_threads;
let max_download_threads = borrow_db_checked().await.settings.max_download_threads;
debug!(
"validating game: {} with {} threads",
dropdata.game_id, max_download_threads
);
let pool = ThreadPoolBuilder::new()
.num_threads(max_download_threads)
.build()
.unwrap();
debug!("{contexts:#?}");
let invalid_chunks = Arc::new(boxcar::Vec::new());
pool.scope(|scope| {
for (index, context) in contexts.iter().enumerate() {
let current_progress = progress.get(index);
let progress_handle = ProgressHandle::new(current_progress, progress.clone());
let invalid_chunks_scoped = invalid_chunks.clone();
let sender = sender.clone();
unsafe {
async_scoped::TokioScope::scope_and_collect(|scope| {
for (index, context) in contexts.iter().enumerate() {
let current_progress = progress.get(index);
let progress_handle = ProgressHandle::new(current_progress, progress.clone());
let invalid_chunks_scoped = invalid_chunks.clone();
let sender = sender.clone();
scope.spawn(move |_| {
match validate_game_chunk(context, control_flag, progress_handle) {
Ok(true) => {
debug!(
"Finished context #{} with checksum {}",
index, context.checksum
);
scope.spawn(async move {
match validate_game_chunk(context, control_flag, progress_handle) {
Ok(true) => {
debug!(
"Finished context #{} with checksum {}",
index, context.checksum
);
}
Ok(false) => {
debug!(
"Didn't finish context #{} with checksum {}",
index, &context.checksum
);
invalid_chunks_scoped.push(context.checksum.clone());
}
Err(e) => {
error!("{e}");
sender.send(DownloadManagerSignal::Error(e)).unwrap();
}
}
Ok(false) => {
debug!(
"Didn't finish context #{} with checksum {}",
index, &context.checksum
);
invalid_chunks_scoped.push(context.checksum.clone());
}
Err(e) => {
error!("{e}");
sender.send(DownloadManagerSignal::Error(e)).unwrap();
}
}
});
}
});
});
}
}).await
};
// If there are any contexts left which are false
if !invalid_chunks.is_empty() {
@ -81,13 +77,6 @@ pub fn game_validate_logic(
dropdata.game_id.clone(),
invalid_chunks
);
for context in invalid_chunks.iter() {
dropdata.set_context(context.1.clone(), false);
}
dropdata.write();
return Ok(false);
}
@ -109,9 +98,7 @@ pub fn validate_game_chunk(
return Ok(false);
}
let Ok(mut source) = File::open(&ctx.path) else {
return Ok(false);
};
let mut source = File::open(&ctx.path).unwrap();
if ctx.offset != 0 {
source
@ -125,7 +112,7 @@ pub fn validate_game_chunk(
validate_copy(&mut source, &mut hasher, ctx.length, control_flag, progress).unwrap();
if !completed {
return Ok(false);
}
};
let res = hex::encode(hasher.compute().0);
if res != ctx.checksum {

View File

@ -1,25 +1,23 @@
use std::fs::remove_dir_all;
use std::sync::Mutex;
use std::thread::spawn;
use log::{debug, error, warn};
use serde::{Deserialize, Serialize};
use tauri::AppHandle;
use tauri::Emitter;
use tokio::spawn;
use crate::database::db::{borrow_db_checked, borrow_db_mut_checked};
use crate::database::models::data::{
ApplicationTransientStatus, DownloadableMetadata, GameDownloadStatus, GameVersion,
};
use crate::download_manager::download_manager_frontend::DownloadStatus;
use crate::error::library_error::LibraryError;
use crate::error::remote_access_error::RemoteAccessError;
use crate::games::state::{GameStatusManager, GameStatusWithTransient};
use crate::remote::auth::generate_authorization_header;
use crate::remote::cache::{cache_object, get_cached_object, get_cached_object_db};
use crate::remote::requests::make_request;
use crate::AppState;
use bitcode::{Encode, Decode};
use crate::DropFunctionState;
use bitcode::{Decode, Encode};
#[derive(Serialize, Deserialize, Debug)]
pub struct FetchGameStruct {
@ -73,30 +71,32 @@ pub struct StatsUpdateEvent {
pub time: usize,
}
pub fn fetch_library_logic(
state: tauri::State<'_, Mutex<AppState>>,
pub async fn fetch_library_logic(
state: tauri::State<'_, DropFunctionState<'_>>,
) -> Result<Vec<Game>, RemoteAccessError> {
let header = generate_authorization_header();
let header = generate_authorization_header().await;
let client = reqwest::blocking::Client::new();
let response = make_request(&client, &["/api/v1/client/user/library"], &[], |f| {
let client = reqwest::Client::new();
let response = make_request(&client, &["/api/v1/client/user/library"], &[], async |f| {
f.header("Authorization", header)
})?
.send()?;
})
.await?
.send()
.await?;
if response.status() != 200 {
let err = response.json().unwrap();
let err = response.json().await.unwrap();
warn!("{err:?}");
return Err(RemoteAccessError::InvalidResponse(err));
}
let mut games: Vec<Game> = response.json()?;
let mut games: Vec<Game> = response.json().await?;
let mut handle = state.lock().unwrap();
let mut handle = state.lock().await;
let mut db_handle = borrow_db_mut_checked();
let mut db_handle = borrow_db_mut_checked().await;
for game in &games {
for game in games.iter() {
handle.games.insert(game.id.clone(), game.clone());
if !db_handle.applications.game_statuses.contains_key(&game.id) {
db_handle
@ -113,22 +113,22 @@ pub fn fetch_library_logic(
}
// We should always have a cache of the object
// Pass db_handle because otherwise we get a gridlock
let game = get_cached_object_db::<Game>(&meta.id.clone(), &db_handle)?;
let game = get_cached_object_db::<String, Game>(meta.id.clone(), &db_handle).await?;
games.push(game);
}
drop(handle);
drop(db_handle);
cache_object("library", &games)?;
cache_object("library", &games).await?;
Ok(games)
}
pub fn fetch_library_logic_offline(
_state: tauri::State<'_, Mutex<AppState>>,
pub async fn fetch_library_logic_offline(
_state: tauri::State<'_, DropFunctionState<'_>>,
) -> Result<Vec<Game>, RemoteAccessError> {
let mut games: Vec<Game> = get_cached_object("library")?;
let mut games: Vec<Game> = get_cached_object("library").await?;
let db_handle = borrow_db_checked();
let db_handle = borrow_db_checked().await;
games.retain(|game| {
db_handle
@ -139,13 +139,13 @@ pub fn fetch_library_logic_offline(
Ok(games)
}
pub fn fetch_game_logic(
pub async fn fetch_game_logic(
id: String,
state: tauri::State<'_, Mutex<AppState>>,
state: tauri::State<'_, DropFunctionState<'_>>,
) -> Result<FetchGameStruct, RemoteAccessError> {
let mut state_handle = state.lock().unwrap();
let mut state_handle = state.lock().await;
let handle = borrow_db_checked();
let handle = borrow_db_checked().await;
let metadata_option = handle.applications.installed_game_version.get(&id);
let version = match metadata_option {
@ -165,7 +165,7 @@ pub fn fetch_game_logic(
let game = state_handle.games.get(&id);
if let Some(game) = game {
let status = GameStatusManager::fetch_state(&id);
let status = GameStatusManager::fetch_state(&id).await;
let data = FetchGameStruct {
game: game.clone(),
@ -173,29 +173,31 @@ pub fn fetch_game_logic(
version,
};
cache_object(&id, game)?;
cache_object(id, game).await?;
return Ok(data);
}
let client = reqwest::blocking::Client::new();
let response = make_request(&client, &["/api/v1/client/game/", &id], &[], |r| {
r.header("Authorization", generate_authorization_header())
})?
.send()?;
let client = reqwest::Client::new();
let response = make_request(&client, &["/api/v1/client/game/", &id], &[], async |r| {
r.header("Authorization", generate_authorization_header().await)
})
.await?
.send()
.await?;
if response.status() == 404 {
return Err(RemoteAccessError::GameNotFound(id));
}
if response.status() != 200 {
let err = response.json().unwrap();
let err = response.json().await.unwrap();
warn!("{err:?}");
return Err(RemoteAccessError::InvalidResponse(err));
}
let game: Game = response.json()?;
let game: Game = response.json().await?;
state_handle.games.insert(id.clone(), game.clone());
let mut db_handle = borrow_db_mut_checked();
let mut db_handle = borrow_db_mut_checked().await;
db_handle
.applications
@ -204,7 +206,7 @@ pub fn fetch_game_logic(
.or_insert(GameDownloadStatus::Remote {});
drop(db_handle);
let status = GameStatusManager::fetch_state(&id);
let status = GameStatusManager::fetch_state(&id).await;
let data = FetchGameStruct {
game: game.clone(),
@ -212,16 +214,16 @@ pub fn fetch_game_logic(
version,
};
cache_object(&id, &game)?;
cache_object(id, &game).await?;
Ok(data)
}
pub fn fetch_game_logic_offline(
pub async fn fetch_game_logic_offline(
id: String,
_state: tauri::State<'_, Mutex<AppState>>,
_state: tauri::State<'_, DropFunctionState<'_>>,
) -> Result<FetchGameStruct, RemoteAccessError> {
let handle = borrow_db_checked();
let handle = borrow_db_checked().await;
let metadata_option = handle.applications.installed_game_version.get(&id);
let version = match metadata_option {
None => None,
@ -238,8 +240,8 @@ pub fn fetch_game_logic_offline(
};
drop(handle);
let status = GameStatusManager::fetch_state(&id);
let game = get_cached_object::<Game>(&id)?;
let status = GameStatusManager::fetch_state(&id).await;
let game = get_cached_object::<String, Game>(id).await?;
Ok(FetchGameStruct {
game,
@ -248,30 +250,32 @@ pub fn fetch_game_logic_offline(
})
}
pub fn fetch_game_verion_options_logic(
pub async fn fetch_game_verion_options_logic(
game_id: String,
state: tauri::State<'_, Mutex<AppState>>,
state: tauri::State<'_, DropFunctionState<'_>>,
) -> Result<Vec<GameVersion>, RemoteAccessError> {
let client = reqwest::blocking::Client::new();
let client = reqwest::Client::new();
let response = make_request(
&client,
&["/api/v1/client/game/versions"],
&[("id", &game_id)],
|r| r.header("Authorization", generate_authorization_header()),
)?
.send()?;
async |r| r.header("Authorization", generate_authorization_header().await),
)
.await?
.send()
.await?;
if response.status() != 200 {
let err = response.json().unwrap();
let err = response.json().await.unwrap();
warn!("{err:?}");
return Err(RemoteAccessError::InvalidResponse(err));
}
let data: Vec<GameVersion> = response.json()?;
let data: Vec<GameVersion> = response.json().await?;
let state_lock = state.lock().unwrap();
let process_manager_lock = state_lock.process_manager.lock().unwrap();
let state_lock = state.lock().await;
let process_manager_lock = state_lock.process_manager.lock().await;
let data: Vec<GameVersion> = data
.into_iter()
.filter(|v| process_manager_lock.valid_platform(&v.platform).unwrap())
@ -282,9 +286,9 @@ pub fn fetch_game_verion_options_logic(
Ok(data)
}
pub fn uninstall_game_logic(meta: DownloadableMetadata, app_handle: &AppHandle) {
pub async fn uninstall_game_logic(meta: DownloadableMetadata, app_handle: &AppHandle) {
debug!("triggered uninstall for agent");
let mut db_handle = borrow_db_mut_checked();
let mut db_handle = borrow_db_mut_checked().await;
db_handle
.applications
.transient_statuses
@ -329,46 +333,52 @@ pub fn uninstall_game_logic(meta: DownloadableMetadata, app_handle: &AppHandle)
drop(db_handle);
let app_handle = app_handle.clone();
spawn(move || if let Err(e) = remove_dir_all(install_dir) {
error!("{e}");
} else {
let mut db_handle = borrow_db_mut_checked();
db_handle.applications.transient_statuses.remove(&meta);
db_handle
.applications
.installed_game_version
.remove(&meta.id);
db_handle
.applications
.game_statuses
.entry(meta.id.clone())
.and_modify(|e| *e = GameDownloadStatus::Remote {});
drop(db_handle);
spawn(async move {
match remove_dir_all(install_dir) {
Err(e) => {
error!("{e}");
}
Ok(_) => {
let mut db_handle = borrow_db_mut_checked().await;
db_handle.applications.transient_statuses.remove(&meta);
db_handle
.applications
.installed_game_version
.remove(&meta.id);
db_handle
.applications
.game_statuses
.entry(meta.id.clone())
.and_modify(|e| *e = GameDownloadStatus::Remote {});
drop(db_handle);
debug!("uninstalled game id {}", &meta.id);
app_handle.emit("update_library", ()).unwrap();
debug!("uninstalled game id {}", &meta.id);
app_handle.emit("update_library", ()).unwrap();
push_game_update(
&app_handle,
&meta.id,
None,
(Some(GameDownloadStatus::Remote {}), None),
);
push_game_update(
&app_handle,
&meta.id,
None,
(Some(GameDownloadStatus::Remote {}), None),
);
}
}
});
} else {
warn!("invalid previous state for uninstall, failing silently.");
warn!("invalid previous state for uninstall, failing silently.")
}
}
pub fn get_current_meta(game_id: &String) -> Option<DownloadableMetadata> {
pub async fn get_current_meta(game_id: &String) -> Option<DownloadableMetadata> {
borrow_db_checked()
.await
.applications
.installed_game_version
.get(game_id)
.cloned()
}
pub fn on_game_incomplete(
pub async fn on_game_incomplete(
meta: &DownloadableMetadata,
install_dir: String,
app_handle: &AppHandle,
@ -378,7 +388,7 @@ pub fn on_game_incomplete(
return Err(RemoteAccessError::GameNotFound(meta.id.clone()));
}
let client = reqwest::blocking::Client::new();
let client = reqwest::Client::new();
let response = make_request(
&client,
&["/api/v1/client/game/version"],
@ -386,13 +396,15 @@ pub fn on_game_incomplete(
("id", &meta.id),
("version", meta.version.as_ref().unwrap()),
],
|f| f.header("Authorization", generate_authorization_header()),
)?
.send()?;
async |f| f.header("Authorization", generate_authorization_header().await),
)
.await?
.send()
.await?;
let game_version: GameVersion = response.json()?;
let game_version: GameVersion = response.json().await?;
let mut handle = borrow_db_mut_checked();
let mut handle = borrow_db_mut_checked().await;
handle
.applications
.game_versions
@ -428,7 +440,7 @@ pub fn on_game_incomplete(
Ok(())
}
pub fn on_game_complete(
pub async fn on_game_complete(
meta: &DownloadableMetadata,
install_dir: String,
app_handle: &AppHandle,
@ -438,9 +450,9 @@ pub fn on_game_complete(
return Err(RemoteAccessError::GameNotFound(meta.id.clone()));
}
let header = generate_authorization_header();
let header = generate_authorization_header().await;
let client = reqwest::blocking::Client::new();
let client = reqwest::Client::new();
let response = make_request(
&client,
&["/api/v1/client/game/version"],
@ -448,13 +460,15 @@ pub fn on_game_complete(
("id", &meta.id),
("version", meta.version.as_ref().unwrap()),
],
|f| f.header("Authorization", header),
)?
.send()?;
async |f| f.header("Authorization", header),
)
.await?
.send()
.await?;
let game_version: GameVersion = response.json()?;
let game_version: GameVersion = response.json().await?;
let mut handle = borrow_db_mut_checked();
let mut handle = borrow_db_mut_checked().await;
handle
.applications
.game_versions
@ -480,7 +494,7 @@ pub fn on_game_complete(
}
};
let mut db_handle = borrow_db_mut_checked();
let mut db_handle = borrow_db_mut_checked().await;
db_handle
.applications
.game_statuses
@ -517,48 +531,3 @@ pub fn push_game_update(
)
.unwrap();
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct FrontendGameOptions {
launch_string: String,
}
#[tauri::command]
pub fn update_game_configuration(
game_id: String,
options: FrontendGameOptions,
) -> Result<(), LibraryError> {
let mut handle = borrow_db_mut_checked();
let installed_version = handle
.applications
.installed_game_version
.get(&game_id)
.ok_or(LibraryError::MetaNotFound(game_id))?;
let id = installed_version.id.clone();
let version = installed_version.version.clone().unwrap();
let mut existing_configuration = handle
.applications
.game_versions
.get(&id)
.unwrap()
.get(&version)
.unwrap()
.clone();
// Add more options in here
existing_configuration.launch_command_template = options.launch_string;
// Add no more options past here
handle
.applications
.game_versions
.get_mut(&id)
.unwrap()
.insert(version.to_string(), existing_configuration);
Ok(())
}

View File

@ -10,8 +10,8 @@ pub type GameStatusWithTransient = (
pub struct GameStatusManager {}
impl GameStatusManager {
pub fn fetch_state(game_id: &String) -> GameStatusWithTransient {
let db_lock = borrow_db_checked();
pub async fn fetch_state(game_id: &String) -> GameStatusWithTransient {
let db_lock = borrow_db_checked().await;
let online_state = match db_lock.applications.installed_game_version.get(game_id) {
Some(meta) => db_lock.applications.transient_statuses.get(meta).cloned(),
None => None,

View File

@ -1,5 +1,6 @@
#![feature(fn_traits)]
#![feature(duration_constructors)]
#![feature(impl_trait_in_assoc_type)]
#![deny(clippy::all)]
mod database;
@ -11,8 +12,9 @@ mod error;
mod process;
mod remote;
use crate::database::db::OnceCellDatabase;
use crate::games::commands::update_game_configuration;
use crate::process::commands::open_process_logs;
use crate::remote::commands::auth_initiate_code;
use crate::{database::db::DatabaseImpls, games::downloads::commands::resume_download};
use bitcode::{Decode, Encode};
use client::commands::fetch_state;
@ -39,7 +41,7 @@ use games::commands::{
fetch_game, fetch_game_status, fetch_game_verion_options, fetch_library, uninstall_game,
};
use games::downloads::commands::download_game;
use games::library::{Game, update_game_configuration};
use games::library::Game;
use log::{LevelFilter, debug, info, warn};
use log4rs::Config;
use log4rs::append::console::ConsoleAppender;
@ -53,9 +55,10 @@ use remote::commands::{
auth_initiate, fetch_drop_object, gen_drop_url, manual_recieve_handshake, retry_connect,
sign_out, use_remote,
};
use remote::fetch_object::fetch_object;
use remote::fetch_object::{fetch_object, fetch_object_offline};
use remote::server_proto::{handle_server_proto, handle_server_proto_offline};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs::File;
use std::io::Write;
use std::panic::PanicHookInfo;
@ -63,16 +66,13 @@ use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use std::time::SystemTime;
use std::{
collections::HashMap,
sync::{LazyLock, Mutex},
};
use std::{env, panic};
use tauri::menu::{Menu, MenuItem, PredefinedMenuItem};
use tauri::tray::TrayIconBuilder;
use tauri::{AppHandle, Manager, RunEvent, WindowEvent};
use tauri_plugin_deep_link::DeepLinkExt;
use tauri_plugin_dialog::DialogExt;
use tokio::sync::Mutex;
#[derive(Clone, Copy, Serialize, Eq, PartialEq)]
pub enum AppStatus {
@ -108,7 +108,7 @@ pub struct AppState<'a> {
process_manager: Arc<Mutex<ProcessManager<'a>>>,
}
fn setup(handle: AppHandle) -> AppState<'static> {
async fn setup(handle: AppHandle) -> AppState<'static> {
let logfile = FileAppender::builder()
.encoder(Box::new(PatternEncoder::new(
"{d} | {l} | {f}:{L} - {m}{n}",
@ -144,7 +144,7 @@ fn setup(handle: AppHandle) -> AppState<'static> {
let process_manager = Arc::new(Mutex::new(ProcessManager::new(handle.clone())));
debug!("checking if database is set up");
let is_set_up = DB.database_is_set_up();
let is_set_up = DB.database_is_set_up().await;
if !is_set_up {
return AppState {
status: AppStatus::NotConfigured,
@ -158,13 +158,13 @@ fn setup(handle: AppHandle) -> AppState<'static> {
debug!("database is set up");
// TODO: Account for possible failure
let (app_status, user) = auth::setup();
let (app_status, user) = auth::setup().await;
let db_handle = borrow_db_checked();
let db_handle = borrow_db_checked().await;
let mut missing_games = Vec::new();
let statuses = db_handle.applications.game_statuses.clone();
drop(db_handle);
for (game_id, status) in statuses {
for (game_id, status) in statuses.into_iter() {
match status {
GameDownloadStatus::Remote {} => {}
GameDownloadStatus::PartiallyInstalled { .. } => {}
@ -191,7 +191,7 @@ fn setup(handle: AppHandle) -> AppState<'static> {
info!("detected games missing: {missing_games:?}");
let mut db_handle = borrow_db_mut_checked();
let mut db_handle = borrow_db_mut_checked().await;
for game_id in missing_games {
db_handle
.applications
@ -205,7 +205,7 @@ fn setup(handle: AppHandle) -> AppState<'static> {
debug!("finished setup!");
// Sync autostart state
if let Err(e) = sync_autostart_on_startup(&handle) {
if let Err(e) = sync_autostart_on_startup(&handle).await {
warn!("failed to sync autostart state: {e}");
}
@ -218,7 +218,8 @@ fn setup(handle: AppHandle) -> AppState<'static> {
}
}
pub static DB: LazyLock<DatabaseInterface> = LazyLock::new(DatabaseInterface::set_up_database);
pub static DB: OnceCellDatabase = OnceCellDatabase::new();
pub type DropFunctionState<'a> = Mutex<AppState<'a>>;
pub fn custom_panic_handler(e: &PanicHookInfo) -> Option<()> {
let crash_file = DATA_ROOT_DIR.join(format!(
@ -268,7 +269,6 @@ pub fn run() {
fetch_settings,
// Auth
auth_initiate,
auth_initiate_code,
retry_connect,
manual_recieve_handshake,
sign_out,
@ -314,109 +314,142 @@ pub fn run() {
Some(vec!["--minimize"]),
))
.setup(|app| {
let handle = app.handle().clone();
let state = setup(handle);
debug!("initialized drop client");
app.manage(Mutex::new(state));
let app = app.handle().clone();
{
use tauri_plugin_deep_link::DeepLinkExt;
let _ = app.deep_link().register_all();
debug!("registered all pre-defined deep links");
}
tauri::async_runtime::spawn(async move {
DB.init(async { DatabaseInterface::set_up_database().await })
.await;
let handle = app.handle().clone();
let _main_window = tauri::WebviewWindowBuilder::new(
&handle,
"main", // BTW this is not the name of the window, just the label. Keep this 'main', there are permissions & configs that depend on it
tauri::WebviewUrl::App("index.html".into()),
)
.title("Drop Desktop App")
.min_inner_size(1000.0, 500.0)
.inner_size(1536.0, 864.0)
.decorations(false)
.shadow(false)
.data_directory(DATA_ROOT_DIR.join(".webview"))
.build()
.unwrap();
app.deep_link().on_open_url(move |event| {
debug!("handling drop:// url");
let binding = event.urls();
let url = binding.first().unwrap();
if url.host_str().unwrap() == "handshake" {
recieve_handshake(handle.clone(), url.path().to_string());
let state = setup(app.clone()).await;
info!("initialized drop client");
if !app.manage(Mutex::new(state)) {
panic!("failed to setup drop state before Tauri does")
}
});
let menu = Menu::with_items(
app,
&[
&MenuItem::with_id(app, "open", "Open", true, None::<&str>)?,
&PredefinedMenuItem::separator(app)?,
/*
&MenuItem::with_id(app, "show_library", "Library", true, None::<&str>)?,
&MenuItem::with_id(app, "show_settings", "Settings", true, None::<&str>)?,
&PredefinedMenuItem::separator(app)?,
*/
&MenuItem::with_id(app, "quit", "Quit", true, None::<&str>)?,
],
)?;
run_on_tray(|| {
TrayIconBuilder::new()
.icon(app.default_window_icon().unwrap().clone())
.menu(&menu)
.on_menu_event(|app, event| match event.id.as_ref() {
"open" => {
app.webview_windows().get("main").unwrap().show().unwrap();
}
"quit" => {
cleanup_and_exit(app, &app.state());
}
_ => {
warn!("menu event not handled: {:?}", event.id);
}
})
.build(app)
.expect("error while setting up tray menu");
});
{
let mut db_handle = borrow_db_mut_checked();
if let Some(original) = db_handle.prev_database.take() {
warn!(
"Database corrupted. Original file at {}",
original.canonicalize().unwrap().to_string_lossy()
);
app.dialog()
.message(
"Database corrupted. A copy has been saved at: ".to_string()
+ original.to_str().unwrap(),
)
.title("Database corrupted")
.show(|_| {});
{
use tauri_plugin_deep_link::DeepLinkExt;
let _ = app.deep_link().register_all();
debug!("registered all pre-defined deep links");
}
}
let deep_link_handle = app.clone();
app.deep_link().on_open_url(move |event| {
let deep_link_handle = deep_link_handle.clone();
tauri::async_runtime::block_on(async move {
debug!("handling drop:// url");
let binding = event.urls();
let url = binding.first().unwrap();
if url.host_str().unwrap() == "handshake" {
recieve_handshake(deep_link_handle, url.path().to_string()).await
}
});
});
let menu = Menu::with_items(
&app,
&[
&MenuItem::with_id(&app, "open", "Open", true, None::<&str>).unwrap(),
&PredefinedMenuItem::separator(&app).unwrap(),
/*
&MenuItem::with_id(app, "show_library", "Library", true, None::<&str>)?,
&MenuItem::with_id(app, "show_settings", "Settings", true, None::<&str>)?,
&PredefinedMenuItem::separator(app)?,
*/
&MenuItem::with_id(&app, "quit", "Quit", true, None::<&str>).unwrap(),
],
)
.unwrap();
let tray_app_handle = app.clone();
run_on_tray(move || {
TrayIconBuilder::new()
.icon(tray_app_handle.default_window_icon().unwrap().clone())
.menu(&menu)
.on_menu_event(|app, event| {
let tray_app_handle = app.clone();
tauri::async_runtime::block_on(async move {
match event.id.as_ref() {
"open" => {
app.webview_windows().get("main").unwrap().show().unwrap();
}
"quit" => {
cleanup_and_exit(
&tray_app_handle,
&tray_app_handle.state(),
)
.await;
}
_ => {
warn!("menu event not handled: {:?}", event.id);
}
}
})
})
.build(&tray_app_handle)
.expect("error while setting up tray menu");
});
{
let mut db_handle = borrow_db_mut_checked().await;
if let Some(original) = db_handle.prev_database.take() {
warn!(
"Database corrupted. Original file at {}",
original.canonicalize().unwrap().to_string_lossy()
);
app.dialog()
.message(
"Database corrupted. A copy has been saved at: ".to_string()
+ original.to_str().unwrap(),
)
.title("Database corrupted")
.show(|_| {});
}
};
let _main_window = tauri::WebviewWindowBuilder::new(
&app,
"main", // BTW this is not the name of the window, just the label. Keep this 'main', there are permissions & configs that depend on it
tauri::WebviewUrl::App("index.html".into()),
)
.title("Drop Desktop App")
.min_inner_size(1000.0, 500.0)
.inner_size(1536.0, 864.0)
.decorations(false)
.shadow(false)
.data_directory(DATA_ROOT_DIR.join(".webview"))
.build()
.unwrap();
});
Ok(())
})
.register_asynchronous_uri_scheme_protocol("object", move |_ctx, request, responder| {
tauri::async_runtime::spawn(async move {
fetch_object(request, responder).await;
.register_asynchronous_uri_scheme_protocol("object", move |ctx, request, responder| {
tauri::async_runtime::block_on(async move {
let state = ctx.app_handle().state::<DropFunctionState<'_>>();
offline!(
state,
fetch_object,
fetch_object_offline,
request,
responder
)
.await;
});
})
.register_asynchronous_uri_scheme_protocol("server", move |ctx, request, responder| {
let state: tauri::State<'_, Mutex<AppState>> = ctx.app_handle().state();
offline!(
state,
handle_server_proto,
handle_server_proto_offline,
request,
responder
);
tauri::async_runtime::block_on(async move {
let state = ctx.app_handle().state::<DropFunctionState<'_>>();
offline!(
state,
handle_server_proto,
handle_server_proto_offline,
request,
responder
)
.await;
});
})
.on_window_event(|window, event| {
if let WindowEvent::CloseRequested { api, .. } = event {

View File

@ -2,5 +2,11 @@
#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")]
fn main() {
drop_app_lib::run();
let global_runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(32)
.enable_all()
.build()
.unwrap();
tauri::async_runtime::set(global_runtime.handle().clone());
drop_app_lib::run()
}

View File

@ -1,14 +1,12 @@
use std::sync::Mutex;
use crate::{error::process_error::ProcessError, AppState};
use crate::{error::process_error::ProcessError, DropFunctionState};
#[tauri::command]
pub fn launch_game(
pub async fn launch_game(
id: String,
state: tauri::State<'_, Mutex<AppState>>,
state: tauri::State<'_, DropFunctionState<'_>>,
) -> Result<(), ProcessError> {
let state_lock = state.lock().unwrap();
let mut process_manager_lock = state_lock.process_manager.lock().unwrap();
let state_lock = state.lock().await;
let mut process_manager_lock = state_lock.process_manager.lock().await;
//let meta = DownloadableMetadata {
// id,
@ -16,10 +14,10 @@ pub fn launch_game(
// download_type: DownloadType::Game,
//};
match process_manager_lock.launch_process(id) {
Ok(()) => {}
match process_manager_lock.launch_process(id).await {
Ok(_) => {}
Err(e) => return Err(e),
}
};
drop(process_manager_lock);
drop(state_lock);
@ -28,23 +26,23 @@ pub fn launch_game(
}
#[tauri::command]
pub fn kill_game(
pub async fn kill_game(
game_id: String,
state: tauri::State<'_, Mutex<AppState>>,
state: tauri::State<'_, DropFunctionState<'_>>,
) -> Result<(), ProcessError> {
let state_lock = state.lock().unwrap();
let mut process_manager_lock = state_lock.process_manager.lock().unwrap();
let state_lock = state.lock().await;
let mut process_manager_lock = state_lock.process_manager.lock().await;
process_manager_lock
.kill_game(game_id)
.map_err(ProcessError::IOError)
}
#[tauri::command]
pub fn open_process_logs(
pub async fn open_process_logs(
game_id: String,
state: tauri::State<'_, Mutex<AppState>>,
state: tauri::State<'_, DropFunctionState<'_>>,
) -> Result<(), ProcessError> {
let state_lock = state.lock().unwrap();
let mut process_manager_lock = state_lock.process_manager.lock().unwrap();
let state_lock = state.lock().await;
let mut process_manager_lock = state_lock.process_manager.lock().await;
process_manager_lock.open_process_logs(game_id)
}

View File

@ -5,8 +5,7 @@ use std::{
path::PathBuf,
process::{Command, ExitStatus},
str::FromStr,
sync::{Arc, Mutex},
thread::spawn,
sync::{Arc},
time::{Duration, SystemTime},
};
@ -17,18 +16,16 @@ use serde::{Deserialize, Serialize};
use shared_child::SharedChild;
use tauri::{AppHandle, Emitter, Manager};
use tauri_plugin_opener::OpenerExt;
use tokio::spawn;
use crate::{
AppState, DB,
database::{
db::{DATA_ROOT_DIR, borrow_db_mut_checked},
db::{borrow_db_mut_checked, DATA_ROOT_DIR},
models::data::{
ApplicationTransientStatus, DownloadType, DownloadableMetadata, GameDownloadStatus,
GameVersion,
},
},
error::process_error::ProcessError,
games::{library::push_game_update, state::GameStatusManager},
}, error::process_error::ProcessError, games::{library::push_game_update, state::GameStatusManager}, DropFunctionState, DB
};
pub struct RunningProcess {
@ -108,7 +105,7 @@ impl ProcessManager<'_> {
Ok(())
}
fn on_process_finish(&mut self, game_id: String, result: Result<ExitStatus, std::io::Error>) {
async fn on_process_finish(&mut self, game_id: String, result: Result<ExitStatus, std::io::Error>) {
if !self.processes.contains_key(&game_id) {
warn!(
"process on_finish was called, but game_id is no longer valid. finished with result: {result:?}"
@ -120,7 +117,7 @@ impl ProcessManager<'_> {
let process = self.processes.remove(&game_id).unwrap();
let mut db_handle = borrow_db_mut_checked();
let mut db_handle = borrow_db_mut_checked().await;
let meta = db_handle
.applications
.installed_game_version
@ -158,7 +155,7 @@ impl ProcessManager<'_> {
let _ = self.app_handle.emit("launch_external_error", &game_id);
}
let status = GameStatusManager::fetch_state(&game_id);
let status = GameStatusManager::fetch_state(&game_id).await;
push_game_update(&self.app_handle, &game_id, None, status);
}
@ -167,21 +164,23 @@ impl ProcessManager<'_> {
Ok(self.game_launchers.contains_key(&(*current, *platform)))
}
pub fn launch_process(&mut self, game_id: String) -> Result<(), ProcessError> {
pub async fn launch_process(&mut self, game_id: String) -> Result<(), ProcessError> {
if self.processes.contains_key(&game_id) {
return Err(ProcessError::AlreadyRunning);
}
let version = match DB
.borrow_data()
.unwrap()
.await
.applications
.game_statuses
.get(&game_id)
.cloned()
{
Some(GameDownloadStatus::Installed { version_name, .. }) => version_name,
Some(GameDownloadStatus::SetupRequired { version_name, .. }) => version_name,
Some(GameDownloadStatus::SetupRequired { .. }) => {
return Err(ProcessError::SetupRequired);
}
_ => return Err(ProcessError::NotInstalled),
};
let meta = DownloadableMetadata {
@ -190,7 +189,7 @@ impl ProcessManager<'_> {
download_type: DownloadType::Game,
};
let mut db_lock = borrow_db_mut_checked();
let mut db_lock = borrow_db_mut_checked().await;
debug!(
"Launching process {:?} with games {:?}",
&game_id, db_lock.applications.game_versions
@ -281,7 +280,7 @@ impl ProcessManager<'_> {
let launch_string = game_launcher.create_launch_process(
&meta,
launch.to_string(),
args.clone(),
args.to_vec(),
game_version,
install_dir,
);
@ -332,14 +331,14 @@ impl ProcessManager<'_> {
let wait_thread_apphandle = self.app_handle.clone();
let wait_thread_game_id = meta.clone();
spawn(move || {
spawn(async move {
let result: Result<ExitStatus, std::io::Error> = launch_process_handle.wait();
let app_state = wait_thread_apphandle.state::<Mutex<AppState>>();
let app_state_handle = app_state.lock().unwrap();
let app_state = wait_thread_apphandle.state::<tauri::State<'_, DropFunctionState<'_>>>();
let app_state_handle = app_state.lock().await;
let mut process_manager_handle = app_state_handle.process_manager.lock().unwrap();
process_manager_handle.on_process_finish(wait_thread_game_id.id, result);
let mut process_manager_handle = app_state_handle.process_manager.lock().await;
process_manager_handle.on_process_finish(wait_thread_game_id.id, result).await;
// As everything goes out of scope, they should get dropped
// But just to explicit about it

View File

@ -1,15 +1,15 @@
use std::{collections::HashMap, env, sync::Mutex};
use std::{collections::HashMap, env};
use chrono::Utc;
use droplet_rs::ssl::sign_nonce;
use gethostname::gethostname;
use log::{debug, error, warn};
use log::{debug, error, info, warn};
use serde::{Deserialize, Serialize};
use tauri::{AppHandle, Emitter, Manager};
use url::Url;
use crate::{
AppState, AppStatus, User,
AppStatus, DropFunctionState, User,
database::{
db::{borrow_db_checked, borrow_db_mut_checked},
models::data::DatabaseAuth,
@ -32,7 +32,6 @@ struct InitiateRequestBody {
name: String,
platform: String,
capabilities: HashMap<String, CapabilityConfiguration>,
mode: String,
}
#[derive(Serialize)]
@ -50,29 +49,38 @@ struct HandshakeResponse {
id: String,
}
pub fn generate_authorization_header() -> String {
pub async fn generate_authorization_header() -> String {
let func = generate_authorization_header_part().await;
func()
}
pub async fn generate_authorization_header_part() -> Box<dyn FnOnce() -> String> {
let certs = {
let db = borrow_db_checked();
let db = borrow_db_checked().await;
db.auth.clone().unwrap()
};
let nonce = Utc::now().timestamp_millis().to_string();
Box::new(move || {
let nonce = Utc::now().timestamp_millis().to_string();
let signature = sign_nonce(certs.private, nonce.clone()).unwrap();
let signature = sign_nonce(certs.private, nonce.clone()).unwrap();
format!("Nonce {} {} {}", certs.client_id, nonce, signature)
format!("Nonce {} {} {}", certs.client_id, nonce, signature)
})
}
pub fn fetch_user() -> Result<User, RemoteAccessError> {
let header = generate_authorization_header();
pub async fn fetch_user() -> Result<User, RemoteAccessError> {
let header = generate_authorization_header().await;
let client = reqwest::blocking::Client::new();
let response = make_request(&client, &["/api/v1/client/user"], &[], |f| {
let client = reqwest::Client::new();
let response = make_request(&client, &["/api/v1/client/user"], &[], async |f| {
f.header("Authorization", header)
})?
.send()?;
})
.await?
.send()
.await?;
if response.status() != 200 {
let err: DropServerError = response.json()?;
let err: DropServerError = response.json().await?;
warn!("{err:?}");
if err.status_message == "Nonce expired" {
@ -82,11 +90,11 @@ pub fn fetch_user() -> Result<User, RemoteAccessError> {
return Err(RemoteAccessError::InvalidResponse(err));
}
response.json::<User>().map_err(std::convert::Into::into)
response.json::<User>().await.map_err(|e| e.into())
}
fn recieve_handshake_logic(app: &AppHandle, path: String) -> Result<(), RemoteAccessError> {
let path_chunks: Vec<&str> = path.split('/').collect();
async fn recieve_handshake_logic(app: &AppHandle, path: String) -> Result<(), RemoteAccessError> {
let path_chunks: Vec<&str> = path.split("/").collect();
if path_chunks.len() != 3 {
app.emit("auth/failed", ()).unwrap();
return Err(RemoteAccessError::HandshakeFailed(
@ -95,28 +103,28 @@ fn recieve_handshake_logic(app: &AppHandle, path: String) -> Result<(), RemoteAc
}
let base_url = {
let handle = borrow_db_checked();
let handle = borrow_db_checked().await;
Url::parse(handle.base_url.as_str())?
};
let client_id = path_chunks.get(1).unwrap();
let token = path_chunks.get(2).unwrap();
let body = HandshakeRequestBody {
client_id: (*client_id).to_string(),
token: (*token).to_string(),
client_id: client_id.to_string(),
token: token.to_string(),
};
let endpoint = base_url.join("/api/v1/client/auth/handshake")?;
let client = reqwest::blocking::Client::new();
let response = client.post(endpoint).json(&body).send()?;
let client = reqwest::Client::new();
let response = client.post(endpoint).json(&body).send().await?;
debug!("handshake responsded with {}", response.status().as_u16());
if !response.status().is_success() {
return Err(RemoteAccessError::InvalidResponse(response.json()?));
return Err(RemoteAccessError::InvalidResponse(response.json().await?));
}
let response_struct: HandshakeResponse = response.json()?;
let response_struct: HandshakeResponse = response.json().await?;
{
let mut handle = borrow_db_mut_checked();
let mut handle = borrow_db_mut_checked().await;
handle.auth = Some(DatabaseAuth {
private: response_struct.private,
cert: response_struct.certificate,
@ -126,50 +134,49 @@ fn recieve_handshake_logic(app: &AppHandle, path: String) -> Result<(), RemoteAc
}
let web_token = {
let header = generate_authorization_header();
let header = generate_authorization_header().await;
let token = client
.post(base_url.join("/api/v1/client/user/webtoken").unwrap())
.header("Authorization", header)
.send()
.await
.unwrap();
token.text().unwrap()
token.text().await.unwrap()
};
let mut handle = borrow_db_mut_checked();
let mut handle = borrow_db_mut_checked().await;
let mut_auth = handle.auth.as_mut().unwrap();
mut_auth.web_token = Some(web_token);
Ok(())
}
pub fn recieve_handshake(app: AppHandle, path: String) {
pub async fn recieve_handshake(app: AppHandle, path: String) {
// Tell the app we're processing
app.emit("auth/processing", ()).unwrap();
let handshake_result = recieve_handshake_logic(&app, path);
let handshake_result = recieve_handshake_logic(&app, path).await;
if let Err(e) = handshake_result {
warn!("error with authentication: {e}");
app.emit("auth/failed", e.to_string()).unwrap();
return;
}
let app_state = app.state::<Mutex<AppState>>();
let mut state_lock = app_state.lock().unwrap();
let app_state = app.state::<DropFunctionState<'_>>();
let (app_status, user) = setup();
let (app_status, user) = setup().await;
let mut state_lock = app_state.lock().await;
state_lock.status = app_status;
state_lock.user = user;
drop(state_lock);
app.emit("auth/finished", ()).unwrap();
}
pub fn auth_initiate_logic(mode: String) -> Result<String, RemoteAccessError> {
pub async fn auth_initiate_logic() -> Result<(), RemoteAccessError> {
let base_url = {
let db_lock = borrow_db_checked();
let db_lock = borrow_db_checked().await;
Url::parse(&db_lock.base_url.clone())?
};
@ -183,39 +190,42 @@ pub fn auth_initiate_logic(mode: String) -> Result<String, RemoteAccessError> {
("peerAPI".to_owned(), CapabilityConfiguration {}),
("cloudSaves".to_owned(), CapabilityConfiguration {}),
]),
mode,
};
let client = reqwest::blocking::Client::new();
let response = client.post(endpoint.to_string()).json(&body).send()?;
let client = reqwest::Client::new();
let response = client.post(endpoint.to_string()).json(&body).send().await?;
if response.status() != 200 {
let data: DropServerError = response.json()?;
let data: DropServerError = response.json().await?;
error!("could not start handshake: {}", data.status_message);
return Err(RemoteAccessError::HandshakeFailed(data.status_message));
}
let response = response.text()?;
let redir_url = response.text().await?;
let complete_redir_url = base_url.join(&redir_url)?;
Ok(response)
info!("opening web browser to continue authentication: {}", complete_redir_url);
webbrowser::open(complete_redir_url.as_ref()).unwrap();
Ok(())
}
pub fn setup() -> (AppStatus, Option<User>) {
let data = borrow_db_checked();
pub async fn setup() -> (AppStatus, Option<User>) {
let data = borrow_db_checked().await;
let auth = data.auth.clone();
drop(data);
if auth.is_some() {
let user_result = match fetch_user() {
let user_result = match fetch_user().await {
Ok(data) => data,
Err(RemoteAccessError::FetchError(_)) => {
let user = get_cached_object::<User>("user").unwrap();
let user = get_cached_object::<_, User>("user").await.unwrap();
return (AppStatus::Offline, Some(user));
}
Err(_) => return (AppStatus::SignedInNeedsReauth, None),
};
cache_object("user", &user_result).unwrap();
cache_object("user", &user_result).await.unwrap();
return (AppStatus::SignedIn, Some(user_result));
}

View File

@ -1,76 +1,65 @@
use std::{
fs::File,
io::{self, Write},
path::{Path, PathBuf},
time::SystemTime,
fmt::Display,
time::{Duration, SystemTime},
};
use crate::{
database::{db::borrow_db_checked, models::data::Database},
database::{
db::borrow_db_checked,
models::data::Database,
},
error::remote_access_error::RemoteAccessError,
};
use bitcode::{Decode, DecodeOwned, Encode};
use cacache::Integrity;
use http::{Response, header::CONTENT_TYPE, response::Builder as ResponseBuilder};
use log::debug;
use log::info;
#[macro_export]
macro_rules! offline {
($var:expr, $func1:expr, $func2:expr, $( $arg:expr ),* ) => {
if $crate::borrow_db_checked().settings.force_offline || $var.lock().unwrap().status == $crate::AppStatus::Offline {
$func2( $( $arg ), *)
(async || if $crate::borrow_db_checked().await.settings.force_offline || $var.lock().await.status == $crate::AppStatus::Offline {
$func2( $( $arg ), *).await
} else {
$func1( $( $arg ), *)
}
$func1( $( $arg ), *).await
})()
}
}
fn get_sys_time_in_secs() -> u64 {
match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
Ok(n) => n.as_secs(),
Err(_) => panic!("SystemTime before UNIX EPOCH!"),
}
}
fn get_cache_path(base: &Path, key: &str) -> PathBuf {
let key_hash = hex::encode(md5::compute(key.as_bytes()).0);
base.join(key_hash)
}
fn write_sync(base: &Path, key: &str, data: Vec<u8>) -> io::Result<()> {
let cache_path = get_cache_path(base, key);
let mut file = File::create(cache_path)?;
file.write_all(&data)?;
Ok(())
}
fn read_sync(base: &Path, key: &str) -> io::Result<Vec<u8>> {
let cache_path = get_cache_path(base, key);
let file = std::fs::read(cache_path)?;
Ok(file)
}
pub fn cache_object<D: Encode>(key: &str, data: &D) -> Result<(), RemoteAccessError> {
pub async fn cache_object<K: AsRef<str>, D: Encode>(
key: K,
data: &D,
) -> Result<Integrity, RemoteAccessError> {
let bytes = bitcode::encode(data);
write_sync(&borrow_db_checked().cache_dir, key, bytes).map_err(RemoteAccessError::Cache)
cacache::write_sync(&borrow_db_checked().await.cache_dir, key, bytes)
.map_err(RemoteAccessError::Cache)
}
pub fn get_cached_object<D: Encode + DecodeOwned>(key: &str) -> Result<D, RemoteAccessError> {
get_cached_object_db::<D>(key, &borrow_db_checked())
pub async fn get_cached_object<K: AsRef<str> + Display, D: Encode + DecodeOwned>(
key: K,
) -> Result<D, RemoteAccessError> {
get_cached_object_db::<K, D>(key, &&(borrow_db_checked().await)).await
}
pub fn get_cached_object_db<D: DecodeOwned>(
key: &str,
pub async fn get_cached_object_db<'a, K: AsRef<str> + Display, D: DecodeOwned>(
key: K,
db: &Database,
) -> Result<D, RemoteAccessError> {
let start = SystemTime::now();
let bytes = read_sync(&db.cache_dir, key).map_err(RemoteAccessError::Cache)?;
let bytes = cacache::read(&db.cache_dir, &key)
.await
.map_err(RemoteAccessError::Cache)?;
let read = start.elapsed().unwrap();
let data =
bitcode::decode::<D>(&bytes).map_err(|e| RemoteAccessError::Cache(io::Error::other(e)))?;
let decode = start.elapsed().unwrap();
debug!(
"cache object took: r:{}, d:{}, b:{}",
let data = bitcode::decode::<D>(&bytes).map_err(|_| {
RemoteAccessError::Cache(cacache::Error::EntryNotFound(
db.cache_dir.clone(),
key.to_string(),
))
})?;
let parse = start.elapsed().unwrap().abs_diff(read);
info!(
"read object: r: {}, p: {}, b: {}",
read.as_millis(),
read.abs_diff(decode).as_millis(),
parse.as_millis(),
bytes.len()
);
Ok(data)
@ -79,13 +68,17 @@ pub fn get_cached_object_db<D: DecodeOwned>(
pub struct ObjectCache {
content_type: String,
body: Vec<u8>,
expiry: u64,
expiry: u128,
}
impl ObjectCache {
pub fn has_expired(&self) -> bool {
let current = get_sys_time_in_secs();
self.expiry < current
let duration = Duration::from_millis(self.expiry.try_into().unwrap());
SystemTime::UNIX_EPOCH
.checked_add(duration)
.unwrap()
.elapsed()
.is_err()
}
}
@ -100,7 +93,12 @@ impl From<Response<Vec<u8>>> for ObjectCache {
.unwrap()
.to_owned(),
body: value.body().clone(),
expiry: get_sys_time_in_secs() + 60 * 60 * 24,
expiry: SystemTime::now()
.checked_add(Duration::from_days(1))
.unwrap()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis(),
}
}
}

View File

@ -1,15 +1,10 @@
use std::sync::Mutex;
use futures_lite::StreamExt;
use log::{debug, warn};
use reqwest::blocking::Client;
use reqwest_websocket::{Message, RequestBuilderExt};
use serde::Deserialize;
use log::debug;
use reqwest::Client;
use tauri::{AppHandle, Emitter, Manager};
use url::Url;
use crate::{
AppState, AppStatus,
AppStatus, DropFunctionState,
database::db::{borrow_db_checked, borrow_db_mut_checked},
error::remote_access_error::RemoteAccessError,
remote::{auth::generate_authorization_header, requests::make_request},
@ -22,17 +17,17 @@ use super::{
};
#[tauri::command]
pub fn use_remote(
pub async fn use_remote(
url: String,
state: tauri::State<'_, Mutex<AppState<'_>>>,
state: tauri::State<'_, DropFunctionState<'_>>,
) -> Result<(), RemoteAccessError> {
use_remote_logic(url, state)
use_remote_logic(url, state).await
}
#[tauri::command]
pub fn gen_drop_url(path: String) -> Result<String, RemoteAccessError> {
pub async fn gen_drop_url(path: String) -> Result<String, RemoteAccessError> {
let base_url = {
let handle = borrow_db_checked();
let handle = borrow_db_checked().await;
Url::parse(&handle.base_url).map_err(RemoteAccessError::ParsingError)?
};
@ -43,37 +38,39 @@ pub fn gen_drop_url(path: String) -> Result<String, RemoteAccessError> {
}
#[tauri::command]
pub fn fetch_drop_object(path: String) -> Result<Vec<u8>, RemoteAccessError> {
let _drop_url = gen_drop_url(path.clone())?;
let req = make_request(&Client::new(), &[&path], &[], |r| {
r.header("Authorization", generate_authorization_header())
})?
.send();
pub async fn fetch_drop_object(path: String) -> Result<Vec<u8>, RemoteAccessError> {
let _drop_url = gen_drop_url(path.clone()).await?;
let req = make_request(&Client::new(), &[&path], &[], async |r| {
r.header("Authorization", generate_authorization_header().await)
})
.await?
.send()
.await;
match req {
Ok(data) => {
let data = data.bytes()?.to_vec();
cache_object(&path, &data)?;
let data = data.bytes().await?.to_vec();
cache_object(&path, &data).await?;
Ok(data)
}
Err(e) => {
debug!("{e}");
get_cached_object::<Vec<u8>>(&path)
get_cached_object::<&str, Vec<u8>>(&path).await
}
}
}
#[tauri::command]
pub fn sign_out(app: AppHandle) {
pub async fn sign_out(app: AppHandle) {
// Clear auth from database
{
let mut handle = borrow_db_mut_checked();
let mut handle = borrow_db_mut_checked().await;
handle.auth = None;
}
// Update app state
{
let app_state = app.state::<Mutex<AppState>>();
let mut app_state_handle = app_state.lock().unwrap();
let app_state = app.state::<DropFunctionState<'_>>();
let mut app_state_handle = app_state.lock().await;
app_state_handle.status = AppStatus::SignedOut;
app_state_handle.user = None;
}
@ -83,92 +80,23 @@ pub fn sign_out(app: AppHandle) {
}
#[tauri::command]
pub fn retry_connect(state: tauri::State<'_, Mutex<AppState>>) {
let (app_status, user) = setup();
pub async fn retry_connect(state: tauri::State<'_, DropFunctionState<'_>>) -> Result<(), ()> {
let (app_status, user) = setup().await;
let mut guard = state.lock().unwrap();
let mut guard = state.lock().await;
guard.status = app_status;
guard.user = user;
drop(guard);
}
#[tauri::command]
pub fn auth_initiate() -> Result<(), RemoteAccessError> {
let base_url = {
let db_lock = borrow_db_checked();
Url::parse(&db_lock.base_url.clone())?
};
let redir_url = auth_initiate_logic("callback".to_string())?;
let complete_redir_url = base_url.join(&redir_url)?;
debug!("opening web browser to continue authentication");
webbrowser::open(complete_redir_url.as_ref()).unwrap();
Ok(())
}
#[derive(Deserialize)]
struct CodeWebsocketResponse {
#[serde(rename = "type")]
response_type: String,
value: String,
#[tauri::command]
pub async fn auth_initiate() -> Result<(), RemoteAccessError> {
auth_initiate_logic().await
}
#[tauri::command]
pub fn auth_initiate_code(app: AppHandle) -> Result<String, RemoteAccessError> {
let base_url = {
let db_lock = borrow_db_checked();
Url::parse(&db_lock.base_url.clone())?
};
let code = auth_initiate_logic("code".to_string())?;
let header_code = code.clone();
tauri::async_runtime::spawn(async move {
let load = async || -> Result<(), RemoteAccessError> {
let ws_url = base_url.join("/api/v1/client/auth/code/ws")?;
let response = reqwest::Client::default()
.get(ws_url)
.header("Authorization", header_code)
.upgrade()
.send()
.await?;
let mut websocket = response.into_websocket().await?;
while let Some(token) = websocket.try_next().await? {
if let Message::Text(response) = token {
let response = serde_json::from_str::<CodeWebsocketResponse>(&response)
.map_err(|e| RemoteAccessError::UnparseableResponse(e.to_string()))?;
match response.response_type.as_str() {
"token" => {
let recieve_app = app.clone();
tauri::async_runtime::spawn_blocking(move || {
manual_recieve_handshake(recieve_app, response.value);
});
return Ok(());
}
_ => return Err(RemoteAccessError::HandshakeFailed(response.value)),
}
}
}
Err(RemoteAccessError::HandshakeFailed(
"Failed to connect to websocket".to_string(),
))
};
let result = load().await;
if let Err(err) = result {
warn!("{err}");
app.emit("auth/failed", err.to_string()).unwrap();
}
});
Ok(code)
}
#[tauri::command]
pub fn manual_recieve_handshake(app: AppHandle, token: String) {
recieve_handshake(app, format!("handshake/{token}"));
pub async fn manual_recieve_handshake(app: AppHandle, token: String) {
recieve_handshake(app, format!("handshake/{token}")).await;
}

View File

@ -2,18 +2,17 @@ use http::{header::CONTENT_TYPE, response::Builder as ResponseBuilder};
use log::warn;
use tauri::UriSchemeResponder;
use crate::{DB, database::db::DatabaseImpls};
use super::{
auth::generate_authorization_header,
cache::{ObjectCache, cache_object, get_cached_object},
requests::make_request,
};
pub async fn fetch_object(request: http::Request<Vec<u8>>, responder: UriSchemeResponder) {
// Drop leading /
let object_id = &request.uri().path()[1..];
let cache_result = get_cached_object::<ObjectCache>(object_id);
let cache_result = get_cached_object::<&str, ObjectCache>(object_id).await;
if let Ok(cache_result) = &cache_result
&& !cache_result.has_expired()
{
@ -21,16 +20,23 @@ pub async fn fetch_object(request: http::Request<Vec<u8>>, responder: UriSchemeR
return;
}
let header = generate_authorization_header();
let header = generate_authorization_header().await;
let client = reqwest::Client::new();
let url = format!("{}api/v1/client/object/{object_id}", DB.fetch_base_url());
let response = client.get(url).header("Authorization", header).send().await;
let response = make_request(
&client,
&["/api/v1/client/object/", object_id],
&[],
async |f| f.header("Authorization", header),
)
.await
.unwrap()
.send()
.await;
if response.is_err() {
match cache_result {
Ok(cache_result) => responder.respond(cache_result.into()),
Err(e) => {
warn!("{e}");
warn!("{e}")
}
}
return;
@ -44,8 +50,19 @@ pub async fn fetch_object(request: http::Request<Vec<u8>>, responder: UriSchemeR
let data = Vec::from(response.bytes().await.unwrap());
let resp = resp_builder.body(data).unwrap();
if cache_result.is_err() || cache_result.unwrap().has_expired() {
cache_object::<ObjectCache>(object_id, &resp.clone().into()).unwrap();
cache_object::<&str, ObjectCache>(object_id, &resp.clone().into())
.await
.unwrap();
}
responder.respond(resp);
}
pub async fn fetch_object_offline(request: http::Request<Vec<u8>>, responder: UriSchemeResponder) {
let object_id = &request.uri().path()[1..];
let data = get_cached_object::<&str, ObjectCache>(object_id).await;
match data {
Ok(data) => responder.respond(data.into()),
Err(e) => warn!("{e}"),
}
}

View File

@ -1,14 +1,14 @@
use reqwest::blocking::{Client, RequestBuilder};
use reqwest::{Client, RequestBuilder};
use crate::{database::db::DatabaseImpls, error::remote_access_error::RemoteAccessError, DB};
pub fn make_request<T: AsRef<str>, F: FnOnce(RequestBuilder) -> RequestBuilder>(
pub async fn make_request<T: AsRef<str>, F: AsyncFnOnce(RequestBuilder) -> RequestBuilder>(
client: &Client,
path_components: &[T],
query: &[(T, T)],
f: F,
) -> Result<RequestBuilder, RemoteAccessError> {
let mut base_url = DB.fetch_base_url();
let mut base_url = DB.fetch_base_url().await;
for endpoint in path_components {
base_url = base_url.join(endpoint.as_ref())?;
}
@ -19,5 +19,5 @@ pub fn make_request<T: AsRef<str>, F: FnOnce(RequestBuilder) -> RequestBuilder>(
}
}
let response = client.get(base_url);
Ok(f(response))
Ok(f(response).await)
}

View File

@ -1,12 +1,12 @@
use std::str::FromStr;
use http::{uri::PathAndQuery, Request, Response, StatusCode, Uri};
use reqwest::blocking::Client;
use reqwest::Client;
use tauri::UriSchemeResponder;
use crate::database::db::borrow_db_checked;
pub fn handle_server_proto_offline(_request: Request<Vec<u8>>, responder: UriSchemeResponder) {
pub async fn handle_server_proto_offline(_request: Request<Vec<u8>>, responder: UriSchemeResponder) {
let four_oh_four = Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Vec::new())
@ -14,8 +14,8 @@ pub fn handle_server_proto_offline(_request: Request<Vec<u8>>, responder: UriSch
responder.respond(four_oh_four);
}
pub fn handle_server_proto(request: Request<Vec<u8>>, responder: UriSchemeResponder) {
let db_handle = borrow_db_checked();
pub async fn handle_server_proto(request: Request<Vec<u8>>, responder: UriSchemeResponder) {
let db_handle = borrow_db_checked().await;
let web_token = match &db_handle.auth.as_ref().unwrap().web_token {
Some(e) => e,
None => return,
@ -44,10 +44,11 @@ pub fn handle_server_proto(request: Request<Vec<u8>>, responder: UriSchemeRespon
.header("Authorization", format!("Bearer {web_token}"))
.headers(request.headers().clone())
.send()
.await
.unwrap();
let response_status = response.status();
let response_body = response.bytes().unwrap();
let response_body = response.bytes().await.unwrap();
let http_response = Response::builder()
.status(response_status)

View File

@ -1,12 +1,9 @@
use std::sync::Mutex;
use log::{debug, warn};
use log::{debug, info, warn};
use serde::Deserialize;
use url::Url;
use crate::{
database::db::borrow_db_mut_checked, error::remote_access_error::RemoteAccessError, AppState,
AppStatus,
database::db::borrow_db_mut_checked, error::remote_access_error::RemoteAccessError, AppStatus, DropFunctionState
};
#[derive(Deserialize)]
@ -15,29 +12,30 @@ struct DropHealthcheck {
app_name: String,
}
pub fn use_remote_logic(
pub async fn use_remote_logic(
url: String,
state: tauri::State<'_, Mutex<AppState<'_>>>,
state: tauri::State<'_, DropFunctionState<'_>>,
) -> Result<(), RemoteAccessError> {
debug!("connecting to url {url}");
info!("connecting to url {url}");
let base_url = Url::parse(&url)?;
// Test Drop url
let test_endpoint = base_url.join("/api/v1")?;
let response = reqwest::blocking::get(test_endpoint.to_string())?;
let response = reqwest::get(test_endpoint.to_string()).await?;
let result: DropHealthcheck = response.json()?;
let result: DropHealthcheck = response.json().await?;
if result.app_name != "Drop" {
warn!("user entered drop endpoint that connected, but wasn't identified as Drop");
return Err(RemoteAccessError::InvalidEndpoint);
}
let mut app_state = state.lock().unwrap();
app_state.status = AppStatus::SignedOut;
drop(app_state);
{
let mut app_state = state.lock().await;
app_state.status = AppStatus::SignedOut;
}
let mut db_state = borrow_db_mut_checked();
let mut db_state = borrow_db_mut_checked().await;
db_state.base_url = base_url.to_string();
Ok(())

View File

@ -1,7 +1,7 @@
{
"$schema": "https://schema.tauri.app/config/2.0.0",
"productName": "Drop Desktop Client",
"version": "0.3.1",
"version": "0.3.0-rc-8",
"identifier": "dev.drop.app",
"build": {
"beforeDevCommand": "yarn dev --port 1432",
@ -11,7 +11,7 @@
},
"app": {
"security": {
"csp": "",
"csp": null,
"assetProtocol": {
"enable": true,
"scope": {}