From c10964fdc7ee0d2c6f15a7b228b74c4c5ac68c04 Mon Sep 17 00:00:00 2001 From: Matthias Date: Thu, 13 Oct 2022 20:30:31 +0200 Subject: [PATCH] Replaced http server library, added tracing and metrics --- backend/.cargo/config.toml | 3 - backend/.idea/runConfigurations/Run.xml | 22 + backend/.idea/runConfigurations/clippy.xml | 19 + backend/.idea/runConfigurations/fmt.xml | 19 + backend/Cargo.toml | 27 +- backend/rustfmt.toml | 76 +++ backend/src/config.rs | 22 +- backend/src/db/connection.rs | 101 ++-- backend/src/db/manager.rs | 105 +++- backend/src/db/mod.rs | 28 +- backend/src/db/user.rs | 14 +- backend/src/main.rs | 225 +++++++- backend/src/metrics.rs | 83 +++ backend/src/routes/admin.rs | 165 +++--- backend/src/routes/auth/basic.rs | 116 ++-- backend/src/routes/auth/gitlab.rs | 154 +++--- backend/src/routes/auth/mod.rs | 43 +- backend/src/routes/auth/tfa.rs | 140 ++--- backend/src/routes/filters.rs | 85 +-- backend/src/routes/fs/mod.rs | 195 ++++--- backend/src/routes/fs/routes.rs | 605 +++++++++++---------- backend/src/routes/mod.rs | 117 +--- backend/src/routes/user.rs | 48 +- backend/src/schema.rs | 6 +- tokio-top.cmd | 1 - 25 files changed, 1403 insertions(+), 1016 deletions(-) delete mode 100644 backend/.cargo/config.toml create mode 100644 backend/.idea/runConfigurations/Run.xml create mode 100644 backend/.idea/runConfigurations/clippy.xml create mode 100644 backend/.idea/runConfigurations/fmt.xml create mode 100644 backend/rustfmt.toml create mode 100644 backend/src/metrics.rs delete mode 100644 tokio-top.cmd diff --git a/backend/.cargo/config.toml b/backend/.cargo/config.toml deleted file mode 100644 index 33306ab..0000000 --- a/backend/.cargo/config.toml +++ /dev/null @@ -1,3 +0,0 @@ -[build] -rustflags = ["--cfg", "tokio_unstable"] -incremental = true diff --git a/backend/.idea/runConfigurations/Run.xml b/backend/.idea/runConfigurations/Run.xml new file mode 100644 index 0000000..1bd5c63 --- /dev/null +++ b/backend/.idea/runConfigurations/Run.xml @@ -0,0 +1,22 @@ + + + + \ No newline at end of file diff --git a/backend/.idea/runConfigurations/clippy.xml b/backend/.idea/runConfigurations/clippy.xml new file mode 100644 index 0000000..aaa54f0 --- /dev/null +++ b/backend/.idea/runConfigurations/clippy.xml @@ -0,0 +1,19 @@ + + + + \ No newline at end of file diff --git a/backend/.idea/runConfigurations/fmt.xml b/backend/.idea/runConfigurations/fmt.xml new file mode 100644 index 0000000..6972a0a --- /dev/null +++ b/backend/.idea/runConfigurations/fmt.xml @@ -0,0 +1,19 @@ + + + + \ No newline at end of file diff --git a/backend/Cargo.toml b/backend/Cargo.toml index a55cde2..57f97f1 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -11,22 +11,17 @@ diesel = { version = "2.0.1", features = ["sqlite", "r2d2", "returning_clauses_f diesel_migrations = "2.0.0" r2d2_sqlite = "0.21.0" -warp = { version = "0.3.3", features = ["compression", "compression-brotli", "compression-gzip"] } -headers = "0.3" -tokio = { version = "1.21.2", features = ["full", "tracing"] } -tokio-util = { version = "0.7.4", features = ["codec"] } -console-subscriber = "0.1.8" -futures = "0.3.24" -bytes = "1.2.1" -tracing = { version = "0.1.37", features = ["log-always"] } -log = "0.4.17" +tiny_http = "0.12.0" +rayon-core = "1.9.3" +parking_lot = "0.12.1" +ctrlc = { version = "3.2.3", features = ["termination"] } serde = { version = "1.0.145", features = ["derive"] } +serde_json = "1.0.86" serde_repr = "0.1.9" +serde_urlencoded = "0.7.1" -pretty_env_logger = "0.4" -lazy_static = "1.4.0" -json = "0.12.4" +once_cell = "1.15.0" jsonwebtoken = "8.1.1" thiserror = "1.0.37" @@ -37,8 +32,12 @@ ureq = { version = "2.5.0", features = ["json"] } totp-rs = { version = "3.0.1", features = ["qr"] } ring = { version = "0.16.20", default-features = false } mime_guess = "2.0.4" -zip = "0.6.2" +zip = { version = "0.6.2", default-features = false } base64 = "0.13.0" image = "0.24.4" -cached = "0.39.0" stretto = "0.7.1" + +rustracing = "0.6.0" +rustracing_jaeger = "0.8.1" +prometheus = { version = "0.13.2", features = ["process"] } +prometheus-static-metric = "0.5.1" diff --git a/backend/rustfmt.toml b/backend/rustfmt.toml new file mode 100644 index 0000000..762dd70 --- /dev/null +++ b/backend/rustfmt.toml @@ -0,0 +1,76 @@ +max_width = 120 +hard_tabs = false +tab_spaces = 4 +newline_style = "Unix" +indent_style = "Block" +use_small_heuristics = "Default" +fn_call_width = 60 +attr_fn_like_width = 70 +struct_lit_width = 18 +struct_variant_width = 35 +array_width = 60 +chain_width = 100 +single_line_if_else_max_width = 50 +wrap_comments = false +format_code_in_doc_comments = false +doc_comment_code_block_width = 100 +comment_width = 80 +normalize_comments = false +normalize_doc_attributes = false +format_strings = false +format_macro_matchers = false +format_macro_bodies = true +hex_literal_case = "Preserve" +empty_item_single_line = true +struct_lit_single_line = true +fn_single_line = true +where_single_line = true +imports_indent = "Block" +imports_layout = "HorizontalVertical" +imports_granularity = "Crate" +group_imports = "StdExternalCrate" +reorder_imports = true +reorder_modules = true +reorder_impl_items = true +type_punctuation_density = "Wide" +space_before_colon = false +space_after_colon = true +spaces_around_ranges = false +binop_separator = "Front" +remove_nested_parens = true +combine_control_expr = true +short_array_element_width_threshold = 10 +overflow_delimited_expr = true +struct_field_align_threshold = 0 +enum_discrim_align_threshold = 0 +match_arm_blocks = false +match_arm_leading_pipes = "Never" +force_multiline_blocks = false +fn_args_layout = "Tall" +brace_style = "PreferSameLine" +control_brace_style = "AlwaysSameLine" +trailing_semicolon = true +trailing_comma = "Never" +match_block_trailing_comma = false +blank_lines_upper_bound = 1 +blank_lines_lower_bound = 0 +edition = "2021" +version = "Two" +inline_attribute_width = 0 +format_generated_files = true +merge_derives = true +use_try_shorthand = true +use_field_init_shorthand = true +force_explicit_abi = true +condense_wildcard_suffixes = false +color = "Auto" +required_version = "1.5.1" +unstable_features = false +disable_all_formatting = false +skip_children = false +hide_parse_errors = false +error_on_line_overflow = false +error_on_unformatted = false +ignore = [] +emit_mode = "Files" +make_backup = false diff --git a/backend/src/config.rs b/backend/src/config.rs index 1aef38b..b12ad2e 100644 --- a/backend/src/config.rs +++ b/backend/src/config.rs @@ -1,9 +1,8 @@ -use lazy_static::lazy_static; +use once_cell::sync::Lazy; -lazy_static! { - pub static ref CONFIG: Config = Config::read(); -} +pub static CONFIG: Lazy = Lazy::new(Config::read); +#[derive(serde::Deserialize)] pub struct Config { pub gitlab_id: String, pub gitlab_secret: String, @@ -19,17 +18,6 @@ pub struct Config { impl Config { fn read() -> Self { let config = std::fs::read_to_string("config.json").expect("Failed to read config.json"); - let config = json::parse(config.as_str()).expect("Failed to parse config.json"); - Self { - gitlab_id: config["gitlab_id"].as_str().expect("Config is missing 'gitlab_id'").to_string(), - gitlab_secret: config["gitlab_secret"].as_str().expect("Config is missing 'gitlab_secret'").to_string(), - gitlab_url: config["gitlab_url"].as_str().expect("Config is missing 'gitlab_url'").to_string(), - gitlab_api_url: config["gitlab_api_url"].as_str().expect("Config is missing 'gitlab_api_url'").to_string(), - gitlab_redirect_url: config["gitlab_redirect_url"].as_str().expect("Config is missing 'gitlab_redirect_url'").to_string(), - smtp_server: config["smtp_server"].as_str().expect("Config is missing 'smtp_server'").to_string(), - smtp_port: config["smtp_port"].as_u16().expect("Config is missing 'smtp_port'"), - smtp_user: config["smtp_user"].as_str().expect("Config is missing 'smtp_user'").to_string(), - smtp_password: config["smtp_password"].as_str().expect("Config is missing 'smtp_password'").to_string() - } + serde_json::from_str(config.as_str()).expect("Failed to parse config.json") } -} \ No newline at end of file +} diff --git a/backend/src/db/connection.rs b/backend/src/db/connection.rs index d4069f8..339fb03 100644 --- a/backend/src/db/connection.rs +++ b/backend/src/db/connection.rs @@ -1,25 +1,20 @@ use diesel::prelude::*; -use crate::db::manager::DB_MANAGER; +use rustracing_jaeger::Span; + +use crate::{db::manager::DB_MANAGER, metrics}; pub struct DBConnection { db: super::RawDBConnection } impl From for DBConnection { - fn from(conn: super::RawDBConnection) -> Self { - Self { - db: conn - } - } + fn from(conn: super::RawDBConnection) -> Self { Self { db: conn } } } impl DBConnection { // Users - pub fn create_user_password( - &mut self, - name: String, - password: String - ) -> super::User { + pub fn create_user_password(&mut self, span: &Span, name: String, password: String) -> super::User { + let _span = metrics::span("create_user_password_in_db", span); let mut new_user: super::User = diesel::insert_into(crate::schema::user::table) .values(super::user::NewUser { name, @@ -35,19 +30,22 @@ impl DBConnection { .get_result(&mut self.db) .expect("Failed to insert new user"); - let root_node = crate::routes::fs::create_node("".to_owned(), &new_user, false, None, true, self).expect("Couldn't create root node"); + let root_node = crate::routes::fs::create_node(span, "".to_owned(), &new_user, false, None, true, self) + .expect("Couldn't create root node"); new_user.root_id = root_node.id; - self.save_user(&new_user); + self.save_user(span, &new_user); new_user } pub fn create_user_gitlab( &mut self, + span: &Span, name: String, role: super::UserRole, gitlab_at: String, gitlab_rt: String ) -> super::User { + let _span = metrics::span("create_user_gitlab_in_db", span); let mut new_user: super::User = diesel::insert_into(crate::schema::user::table) .values(super::user::NewUser { name, @@ -63,19 +61,22 @@ impl DBConnection { .get_result(&mut self.db) .expect("Failed to insert new user"); - let root_node = crate::routes::fs::create_node("".to_owned(), &new_user, false, None, true, self).expect("Couldn't create root node"); + let root_node = crate::routes::fs::create_node(span, "".to_owned(), &new_user, false, None, true, self) + .expect("Couldn't create root node"); new_user.root_id = root_node.id; - self.save_user(&new_user); + self.save_user(span, &new_user); new_user } - pub fn get_user(&mut self, _id: i32) -> Option { + pub fn get_user(&mut self, span: &Span, _id: i32) -> Option { use crate::schema::user::dsl::*; + let _span = metrics::span("get_user_from_db", span); user.find(_id).first(&mut self.db).ok() } - pub fn find_user(&mut self, _name: &str, _gitlab: bool) -> Option { + pub fn find_user(&mut self, span: &Span, _name: &str, _gitlab: bool) -> Option { use crate::schema::user::dsl::*; + let _span = metrics::span("find_user_in_db", span); user.filter(name.eq(name)).filter(gitlab.eq(_gitlab)).first(&mut self.db).ok() } @@ -83,11 +84,9 @@ impl DBConnection { crate::schema::user::table.load(&mut self.db).expect("Could not load users") } - pub fn save_user(&mut self, user: &super::User) { - diesel::update(user) - .set(user.clone()) - .execute(&mut self.db) - .expect("Failed to save user"); + pub fn save_user(&mut self, span: &Span, user: &super::User) { + let _span = metrics::span("save_user_to_db", span); + diesel::update(user).set(user.clone()).execute(&mut self.db).expect("Failed to save user"); } pub fn delete_user(&mut self, user: &super::User) { @@ -95,7 +94,8 @@ impl DBConnection { } // Tokens - pub fn create_token(&mut self, _owner: i32, _exp: i64) -> super::Token { + pub fn create_token(&mut self, span: &Span, _owner: i32, _exp: i64) -> super::Token { + let _span = metrics::span("create_token_in_db", span); diesel::insert_into(crate::schema::tokens::table) .values(&super::token::NewToken { owner_id: _owner, @@ -105,53 +105,64 @@ impl DBConnection { .expect("Failed to save new token to database") } - pub fn get_token(&mut self, _id: i32) -> Option { + pub fn get_token(&mut self, span: &Span, _id: i32) -> Option { use crate::schema::tokens::dsl::*; + let _span = metrics::span("get_token_from_db", span); tokens.find(_id).first(&mut self.db).ok() } - pub fn delete_token(&mut self, _id: i32) { + pub fn delete_token(&mut self, span: &Span, _id: i32) { use crate::schema::tokens::dsl::*; - diesel::delete(tokens.find(_id)) - .execute(&mut self.db) - .expect("Failed to delete token"); + let _span = metrics::span("delete_token_from_db", span); + diesel::delete(tokens.find(_id)).execute(&mut self.db).expect("Failed to delete token"); } - pub fn delete_all_tokens(&mut self, _owner: i32) { + pub fn delete_all_tokens(&mut self, span: &Span, _owner: i32) { use crate::schema::tokens::dsl::*; + let _span = metrics::span("delete_user_tokens_from_db", span); diesel::delete(tokens.filter(owner_id.eq(_owner))) .execute(&mut self.db) .expect("Failed to delete token"); } - pub fn cleanup_tokens(&mut self) { + pub fn cleanup_tokens(&mut self, span: &Span) { use crate::schema::tokens::dsl::*; + let _span = metrics::span("cleanup_tokens", span); let current_time = chrono::Utc::now().timestamp(); - diesel::delete(tokens.filter(exp.le(current_time))).execute(&mut self.db).expect("Failed to cleanup tokens"); + diesel::delete(tokens.filter(exp.le(current_time))) + .execute(&mut self.db) + .expect("Failed to cleanup tokens"); } // Nodes - pub async fn get_lock(user: i32) -> std::sync::Arc> { - DB_MANAGER.get_lock(user).await + pub fn get_lock(user: i32) -> super::manager::TracingLock { DB_MANAGER.get_lock(user) } + + pub fn create_node( + &mut self, + span: &Span, + file: bool, + name: String, + parent: Option, + owner: i32 + ) -> super::Inode { + DB_MANAGER.create_node(span, &mut self.db, file, name, parent, owner) } - pub fn create_node(&mut self, file: bool, name: String, parent: Option, owner: i32) -> super::Inode { - DB_MANAGER.create_node(&mut self.db, file, name, parent, owner) + pub fn get_all_nodes(&mut self) -> Vec { + crate::schema::inode::table.load(&mut self.db).expect("Could not load nodes") } - pub fn get_node(&mut self, id: i32) -> Option { - DB_MANAGER.get_node(&mut self.db, id) + pub fn get_node(&mut self, span: &Span, id: i32) -> Option { + DB_MANAGER.get_node(span, &mut self.db, id) } - pub fn get_children(&mut self, id: i32) -> Vec { - DB_MANAGER.get_children(&mut self.db, id) + pub fn get_children(&mut self, span: &Span, id: i32) -> Vec { + DB_MANAGER.get_children(span, &mut self.db, id) } - pub fn save_node(&mut self, node: &super::Inode) { - DB_MANAGER.save_node(&mut self.db, node); - } + pub fn save_node(&mut self, span: &Span, node: &super::Inode) { DB_MANAGER.save_node(span, &mut self.db, node); } - pub fn delete_node(&mut self, node: &super::Inode) { - DB_MANAGER.delete_node(&mut self.db, node); + pub fn delete_node(&mut self, span: &Span, node: &super::Inode) { + DB_MANAGER.delete_node(span, &mut self.db, node); } -} \ No newline at end of file +} diff --git a/backend/src/db/manager.rs b/backend/src/db/manager.rs index ba4f0a6..e25863a 100644 --- a/backend/src/db/manager.rs +++ b/backend/src/db/manager.rs @@ -1,16 +1,30 @@ -use std::collections::HashMap; -use std::sync::Arc; -use lazy_static::lazy_static; -use stretto::Cache; -use tokio::sync::{Mutex, RwLock}; -use diesel::prelude::*; -use crate::db::Inode; +use std::{collections::HashMap, sync::Arc}; -lazy_static! { - pub(super) static ref DB_MANAGER: DBManager = DBManager::new(); +use diesel::prelude::*; +use once_cell::sync::Lazy; +use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; +use rustracing_jaeger::Span; +use stretto::Cache; + +use crate::{db::Inode, metrics}; + +#[derive(Clone)] +pub struct TracingLock(Arc>); +impl TracingLock { + pub fn read(&self, span: &Span) -> RwLockReadGuard<'_, ()> { + let _span = metrics::span("get_read_lock", span); + self.0.read() + } + + pub fn write(&self, span: &Span) -> RwLockWriteGuard<'_, ()> { + let _span = metrics::span("get_write_lock", span); + self.0.write() + } } -pub(super) struct DBManager { +pub static DB_MANAGER: Lazy = Lazy::new(DBManager::new); + +pub struct DBManager { locks: Mutex>>>, node_cache: Cache, children_cache: Cache> @@ -25,8 +39,17 @@ impl DBManager { } } - pub fn create_node(&self, db: &mut super::RawDBConnection, file: bool, _name: String, parent: Option, owner: i32) -> Inode { + pub fn create_node( + &self, + span: &Span, + db: &mut super::RawDBConnection, + file: bool, + _name: String, + parent: Option, + owner: i32 + ) -> Inode { use crate::schema::inode::dsl::*; + let _span = metrics::span("insert_node_into_db", span); let node: Inode = diesel::insert_into(inode) .values(crate::db::inode::NewInode { is_file: file, @@ -44,15 +67,28 @@ impl DBManager { self.children_cache.remove(&parent); } + let owner = node.owner_id.to_string(); + if file { + metrics::NODES.with_label_values(&["file", owner.as_str()]).inc(); + } else { + metrics::NODES.with_label_values(&["folder", owner.as_str()]).inc(); + } + node } - pub fn get_node(&self, db: &mut super::RawDBConnection, node_id: i32) -> Option { + pub fn get_node(&self, span: &Span, db: &mut super::RawDBConnection, node_id: i32) -> Option { use crate::schema::inode::dsl::*; + let inner_span = metrics::span("get_node", span); let node = self.node_cache.get(&node_id); match node { - Some(v) => Some(v.value().clone()), + Some(v) => { + metrics::CACHE.node.hit.inc(); + Some(v.value().clone()) + } None => { + let _span = metrics::span("get_node_from_db", &inner_span); + metrics::CACHE.node.miss.inc(); let v: Inode = inode.find(node_id).first(db).ok()?; self.node_cache.insert(node_id, v.clone(), 1); Some(v) @@ -60,12 +96,18 @@ impl DBManager { } } - pub fn get_children(&self, db: &mut super::RawDBConnection, node_id: i32) -> Vec { + pub fn get_children(&self, span: &Span, db: &mut super::RawDBConnection, node_id: i32) -> Vec { use crate::schema::inode::dsl::*; + let inner_span = metrics::span("get_children", span); let children = self.children_cache.get(&node_id); match children { - Some(v) => v.value().clone(), + Some(v) => { + metrics::CACHE.children.hit.inc(); + v.value().clone() + } None => { + let _span = metrics::span("get_children_from_db", &inner_span); + metrics::CACHE.children.miss.inc(); let v = inode.filter(parent_id.eq(node_id)).load(db).expect("Failed to get children of node"); self.children_cache.insert(node_id, v.clone(), 1); v @@ -73,16 +115,20 @@ impl DBManager { } } - pub fn save_node(&self, db: &mut super::RawDBConnection, node: &Inode) { + pub fn save_node(&self, span: &Span, db: &mut super::RawDBConnection, node: &Inode) { + let _span = metrics::span("save_node_to_db", span); self.node_cache.insert(node.id, node.clone(), 1); - diesel::update(node) - .set(node.clone()) - .execute(db) - .expect("Failed to save node"); + if let Some(p) = node.parent_id { + self.children_cache.remove(&p); + } + diesel::update(node).set(node.clone()).execute(db).expect("Failed to save node"); } - pub fn delete_node(&self, db: &mut super::RawDBConnection, node: &Inode) { + pub fn delete_node(&self, span: &Span, db: &mut super::RawDBConnection, node: &Inode) { + let inner_span = metrics::span("delete_node", span); + let owner = node.owner_id.to_string(); if node.is_file { + let _span = metrics::span("delete_node_files", &inner_span); let file_name = format!("./files/{}", node.id); let file = std::path::Path::new(&file_name); let preview_name = format!("./files/{}_preview.jpg", node.id); @@ -93,17 +139,22 @@ impl DBManager { if preview.exists() { std::fs::remove_file(preview).expect("Failed to delete preview"); } + metrics::NODES.with_label_values(&["file", owner.as_str()]).dec(); + metrics::DISK_USAGE.with_label_values(&[owner.as_str()]).sub(node.size.unwrap_or(0)); + } else { + metrics::NODES.with_label_values(&["folder", owner.as_str()]).dec(); } + + let _span = metrics::span("delete_node_from_db", &inner_span); diesel::delete(node).execute(db).expect("Failed to delete node"); self.node_cache.remove(&node.id); self.children_cache.remove(&node.id); - if let Some(p) = node.parent_id { self.children_cache.remove(&p); } + if let Some(p) = node.parent_id { + self.children_cache.remove(&p); + } } - pub async fn get_lock(&self, user: i32) -> Arc> { - self.locks.lock().await - .entry(user) - .or_insert_with(|| Arc::new(RwLock::new(()))) - .clone() + pub fn get_lock(&self, user: i32) -> TracingLock { + TracingLock(self.locks.lock().entry(user).or_insert_with(|| Arc::new(RwLock::new(()))).clone()) } } diff --git a/backend/src/db/mod.rs b/backend/src/db/mod.rs index 3e80cba..382fcd3 100644 --- a/backend/src/db/mod.rs +++ b/backend/src/db/mod.rs @@ -1,19 +1,18 @@ +mod connection; mod inode; +pub mod manager; mod token; mod user; -pub mod manager; -mod connection; -use diesel::connection::SimpleConnection; -use diesel::sqlite::SqliteConnection; -use diesel::r2d2::{ConnectionManager, Pool, PooledConnection}; +use diesel::{ + connection::SimpleConnection, + r2d2::{ConnectionManager, Pool, PooledConnection}, + sqlite::SqliteConnection +}; use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; -use warp::Filter; - pub use inode::Inode; pub use token::Token; -pub use user::{User, TfaTypes, UserRole}; -use crate::routes::AppError; +pub use user::{TfaTypes, User, UserRole}; type RawDBConnection = PooledConnection>; pub type DBPool = Pool>; @@ -41,14 +40,3 @@ pub fn build_pool() -> Pool> { pub fn run_migrations(db: &mut RawDBConnection) { db.run_pending_migrations(MIGRATIONS).expect("Failed to run migrations"); } - -pub fn with_db(pool: DBPool) -> impl Filter + Clone { - warp::any() - .map(move || pool.clone()) - .and_then(|pool: DBPool| async move { - match pool.get() { - Ok(v) => Ok(DBConnection::from(v)), - Err(_) => AppError::InternalError("Failed to get a database connection").err() - } - }) -} diff --git a/backend/src/db/user.rs b/backend/src/db/user.rs index 2dea9b5..e6eab8f 100644 --- a/backend/src/db/user.rs +++ b/backend/src/db/user.rs @@ -1,9 +1,11 @@ -use diesel::backend::RawValue; -use diesel::deserialize::{FromSql, FromSqlRow}; -use diesel::prelude::*; -use diesel::serialize::{IsNull, Output, ToSql}; -use diesel::sql_types::SmallInt; -use diesel::sqlite::Sqlite; +use diesel::{ + backend::RawValue, + deserialize::{FromSql, FromSqlRow}, + prelude::*, + serialize::{IsNull, Output, ToSql}, + sql_types::SmallInt, + sqlite::Sqlite +}; use serde_repr::{Deserialize_repr, Serialize_repr}; #[repr(i16)] diff --git a/backend/src/main.rs b/backend/src/main.rs index d069cdc..40da2c8 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -1,34 +1,225 @@ -mod db; -mod schema; -mod dto; -mod routes; mod config; +mod db; +mod dto; +mod metrics; +mod routes; +mod schema; -#[tokio::main] -async fn main() { - console_subscriber::init(); +use std::fs::File; - pretty_env_logger::formatted_builder().filter_level(log::LevelFilter::Info).init(); +use rayon_core::ThreadPoolBuilder; +use rustracing::tag::Tag; +use rustracing_jaeger::Span; +use tiny_http::{Method, Request, Response, ResponseBox, Server, StatusCode}; +use crate::{ + db::DBConnection, + metrics::TRACER, + routes::{get_reply, header, AppError} +}; + +static THREAD_COUNT: usize = 10; + +fn index_resp() -> Response { + Response::from_file(File::open(std::path::Path::new("./static/index.html")).unwrap()) + .with_header(header("content-type", "text/html; charset=utf-8")) +} + +fn parse_body(span: &mut Span, req: &mut Request) -> Result +where S: for<'a> serde::Deserialize<'a> { + let _span = TRACER.span("parse_body").child_of(span.context().unwrap()).start(); + serde_json::from_reader(req.as_reader()).map_err(|_| AppError::BadRequest("Invalid query data")) +} + +fn handle_request(mut req: Request, db: db::DBPool) { + let path = req.url().to_string(); + let resp = if !path.starts_with("/api") { + match req.method() { + &Method::Get => + if !(path.contains('\\') || path.contains("..") || path.contains(':')) { + let path_str = "./static".to_owned() + &path; + let path = std::path::Path::new(&path_str); + if path.exists() { + let resp = Response::from_file(File::open(path).unwrap()); + match path.extension().map(|s| s.to_str()).unwrap_or(None) { + Some("html") => resp.with_header(header("content-type", "text/html; charset=utf-8")), + Some("css") => resp.with_header(header("content-type", "text/css; charset=utf-8")), + Some("js") => resp.with_header(header( + "content-type", + "application/x-javascript; charset=utf-8" + )), + Some("svg") => resp.with_header(header("content-type", "image/svg+xml")), + _ => resp + } + .boxed() + } else { + index_resp().boxed() + } + } else { + index_resp().boxed() + }, + _ => Response::empty(StatusCode::from(405)).boxed() + } + } else { + let meth = req.method().clone(); + let mut span = TRACER + .span("handle_api_request") + .tag(Tag::new("http.target", path)) + .tag(Tag::new("http.method", meth.to_string())) + .start(); + let resp = match handle_api_request(&mut span, &mut req, db) { + Ok(v) => v, + Err(v) => { + let code = match v { + AppError::BadRequest(_) => 400, + AppError::Unauthorized(_) => 401, + AppError::Forbidden(_) => 403, + AppError::NotFound => 404, + AppError::InternalError(_) => 500 + }; + Response::from_data( + serde_json::to_vec(&dto::responses::Error { + statusCode: code, + message: match v { + AppError::BadRequest(v) => v.to_string(), + AppError::Unauthorized(v) => v.to_string(), + AppError::Forbidden(v) => v.to_string(), + AppError::NotFound => "Not found".to_owned(), + AppError::InternalError(v) => v.to_string() + } + }) + .unwrap() + ) + .with_header(header("content-type", "application/json; charset=utf-8")) + .with_status_code(code) + .boxed() + } + }; + span.set_tag(|| Tag::new("http.status_code", resp.status_code().0 as i64)); + resp + }; + req.respond(resp).expect("Failed to send response"); +} + +#[rustfmt::skip] +fn handle_api_request(span: &mut Span, req: &mut Request, pool: db::DBPool) -> Result { + metrics::REQUEST.inc(); + let db = &mut db::DBConnection::from(pool.get().unwrap()); + let (path, query) = { + let url = req.url().to_string(); + let mut splits = url.splitn(2, '?'); + ( + splits.next().unwrap().to_string(), + splits.next().unwrap_or("").to_string() + ) + }; + match (path.as_str(), req.method()) { + ("/api/metrics", Method::Get) => metrics::get_metrics(), + ("/api/auth/login", Method::Post) => parse_body(span, req).and_then(|v|routes::auth::basic::login(span, req, db, v)), + ("/api/auth/signup", Method::Post) => parse_body(span, req).and_then(|v| routes::auth::basic::signup(span, req, db, v)), + ("/api/auth/gitlab", Method::Get) => routes::auth::gitlab::gitlab(span, req, db), + ("/api/auth/gitlab_callback", Method::Get) => routes::auth::gitlab::gitlab_callback(span, req, db, &query), + ("/api/fs/download", Method::Post) => routes::fs::routes::download(span, req, db), + ("/api/fs/download_multi", Method::Post) => routes::fs::routes::download_multi(span, req, db), + _ => { + let span_auth = TRACER.span("parse_auth_and_path").child_of(span.context().unwrap()).start(); + let header = req.headers().iter().find(|h| h.field.as_str().as_str().eq_ignore_ascii_case("Authorization")) + .ok_or(AppError::Unauthorized("Unauthorized"))?; + let auth = header.value.as_str(); + let token = auth.starts_with("Bearer ").then(|| auth.trim_start_matches("Bearer ")) + .ok_or(AppError::Unauthorized("Invalid auth header"))?; + let info = routes::filters::authorize_jwt(span, token, db)?; + let (path, last_id) = path.to_string().rsplit_once('/') + .map(|(short_path, last)| + last.parse::() + .map_or((path.clone(), None), |i| (short_path.to_string(), Some(i))) + ) + .unwrap_or((path.to_string(), None)); + drop(span_auth); + let span = &mut TRACER.span("handle_auth_request").child_of(span.context().unwrap()).start(); + match (path.as_str(), req.method(), last_id) { + ("/api/admin/users", Method::Get, None) => routes::admin::users(span, req, db, info), + ("/api/admin/set_role", Method::Post, None) => parse_body(span, req).and_then(|v| routes::admin::set_role(span, req, db, info, v)), + ("/api/admin/logout", Method::Post, None) => parse_body(span, req).and_then(|v| routes::admin::logout(span, req, db, info, v)), + ("/api/admin/delete", Method::Post, None) => parse_body(span, req).and_then(|v| routes::admin::delete_user(span, req, db, info, v)), + ("/api/admin/disable_2fa", Method::Post, None) => parse_body(span, req).and_then(|v| routes::admin::disable_2fa(span, req, db, info, v)), + ("/api/admin/is_admin", Method::Post, None) => get_reply(&dto::responses::Success { statusCode: 200 }), + ("/api/admin/get_token", Method::Get, Some(v)) => routes::admin::get_token(span, req, db, info, v), + ("/api/auth/refresh", Method::Post, None) => routes::auth::basic::refresh(span, req, db, info), + ("/api/auth/logout_all", Method::Post, None) => routes::auth::basic::logout_all(span, req, db, info), + ("/api/auth/change_password", Method::Post, None) => parse_body(span, req).and_then(|v| routes::auth::basic::change_password(span, req, db, info, v)), + ("/api/auth/2fa/setup", Method::Post, None) => parse_body(span, req).and_then(|v| routes::auth::tfa::tfa_setup(span, req, db, info, v)), + ("/api/auth/2fa/complete", Method::Post, None) => parse_body(span, req).and_then(|v| routes::auth::tfa::tfa_complete(span, req, db, info, v)), + ("/api/auth/2fa/disable", Method::Post, None) => routes::auth::tfa::tfa_disable(span, req, db, info), + ("/api/user/info", Method::Get, None) => routes::user::info(span, req, db, info), + ("/api/user/delete", Method::Post, None) => routes::user::delete_user(span, req, db, info), + ("/api/fs/root", Method::Get, None) => routes::fs::routes::root(span, req, db, info), + ("/api/fs/node", Method::Get, Some(v)) => routes::fs::routes::node(span, req, db, info, v), + ("/api/fs/path", Method::Get, Some(v)) => routes::fs::routes::path(span, req, db, info, v), + ("/api/fs/create_folder", Method::Post, None) => parse_body(span, req).and_then(|v| routes::fs::routes::create_node(span, req, db, info, v, false)), + ("/api/fs/create_file", Method::Post, None) => parse_body(span, req).and_then(|v| routes::fs::routes::create_node(span, req, db, info, v, true)), + ("/api/fs/delete", Method::Post, Some(v)) => routes::fs::routes::delete_node(span, req, db, info, v, &pool), + ("/api/fs/upload", Method::Post, Some(v)) => routes::fs::routes::upload(span, req, db, info, v), + ("/api/fs/create_zip", Method::Post, None) => parse_body(span, req).and_then(|v| routes::fs::routes::create_zip(span, req, db, info, v, &pool)), + ("/api/fs/download_preview", Method::Get, Some(v)) => routes::fs::routes::download_preview(span, req, db, info, v), + ("/api/fs/get_type", Method::Get, Some(v)) => routes::fs::routes::get_type(span, req, db, info, v), + _ => AppError::NotFound.err() + } + } + } +} + +fn main() { let _ = config::CONFIG; - let pool: db::DBPool = db::build_pool(); + let db_pool: db::DBPool = db::build_pool(); - db::run_migrations(&mut pool.get().unwrap()); + db::run_migrations(&mut db_pool.get().unwrap()); if !std::path::Path::new("files").exists() { std::fs::create_dir("files").expect("Failed to create files directory"); } - let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel(); - - let (_addr, server) = warp::serve(routes::build_routes(pool.clone())).bind_with_graceful_shutdown(([0, 0, 0, 0], 2345), async { - shutdown_rx.await.ok(); + if !std::path::Path::new("temp").is_dir() { + std::fs::create_dir("temp").expect("Failed to create temp dir"); + } + std::fs::read_dir("temp").expect("Failed to iter temp dir").for_each(|dir| { + std::fs::remove_file(dir.expect("Failed to retrieve temp dir entry").path()) + .expect("Failed to delete file in temp dir"); }); - tokio::task::spawn(server); + metrics::init(DBConnection::from(db_pool.get().unwrap())); + + let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); + let ctrlc_shutdown = shutdown.clone(); + + let server = std::sync::Arc::new(Server::http("0.0.0.0:2345").unwrap()); + let ctrlc_server = server.clone(); + + ctrlc::set_handler(move || { + ctrlc_shutdown.store(true, std::sync::atomic::Ordering::Relaxed); + ctrlc_server.unblock(); + }) + .expect("Could not set ctrl-c handler"); + + let pool = ThreadPoolBuilder::new() + .num_threads(THREAD_COUNT) + .thread_name(|i| format!("Http listener {}", i)) + .build() + .unwrap(); + + 'server: loop { + match server.recv() { + Ok(req) => { + let inner_pool = db_pool.clone(); + pool.spawn(move || handle_request(req, inner_pool)) + } + Err(_) => + if shutdown.load(std::sync::atomic::Ordering::Relaxed) { + break 'server; + }, + } + } - tokio::signal::ctrl_c().await.expect("Failed to wait for ctrl-c"); println!("Quitting"); - shutdown_tx.send(()).expect("Failed to shutdown server"); } diff --git a/backend/src/metrics.rs b/backend/src/metrics.rs new file mode 100644 index 0000000..6fe9b52 --- /dev/null +++ b/backend/src/metrics.rs @@ -0,0 +1,83 @@ +use std::{borrow::Cow, clone::Clone, net::ToSocketAddrs}; + +use once_cell::sync::{Lazy, OnceCell}; +use prometheus::{register_int_counter, register_int_counter_vec, register_int_gauge_vec, IntCounter, IntGaugeVec}; +use prometheus_static_metric::make_static_metric; +use rustracing_jaeger::{span::SpanReceiver, Span, Tracer}; +use tiny_http::{Response, ResponseBox}; + +use crate::AppError; + +#[cfg(debug_assertions)] +static SERVICE_NAME: &str = "fileserver-testing"; + +#[cfg(not(debug_assertions))] +static SERVICE_NAME: &str = "fileserver"; + +pub static TRACER_QUIT: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false); +static TRACER_INNER: Lazy<(Tracer, SpanReceiver)> = Lazy::new(|| Tracer::new(rustracing::sampler::AllSampler)); +pub static TRACER: Lazy = Lazy::new(|| TRACER_INNER.0.clone()); +pub static TRACER_THREAD: OnceCell> = OnceCell::new(); + +make_static_metric! { + struct CacheMetrics: IntCounter { + "cache" => { + node, + children + }, + "type" => { + hit, + miss + } + } +} + +pub static NODES: Lazy = + Lazy::new(|| register_int_gauge_vec!("nodes", "All nodes by user, type", &["type", "user"]).unwrap()); +pub static DISK_USAGE: Lazy = + Lazy::new(|| register_int_gauge_vec!("disk_usage", "Disk usage by user", &["user"]).unwrap()); +pub static REQUEST: Lazy = Lazy::new(|| register_int_counter!("request", "Count of requests").unwrap()); +pub static CACHE: Lazy = Lazy::new(|| { + CacheMetrics::from(®ister_int_counter_vec!("cache", "Node cache hits/misses", &["cache", "type"]).unwrap()) +}); + +pub fn init(mut db: crate::db::DBConnection) { + TRACER_THREAD + .set(std::thread::spawn(|| { + let recv = TRACER_INNER.1.clone(); + let mut reporter = rustracing_jaeger::reporter::JaegerCompactReporter::new(SERVICE_NAME).unwrap(); + reporter.set_agent_addr("grafana.lan:6831".to_socket_addrs().unwrap().next().unwrap()); + reporter.set_reporter_addr("0.0.0.0:0".parse().unwrap()).unwrap(); + while let Ok(span) = recv.recv() { + if TRACER_QUIT.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + reporter.report(&[span]).unwrap(); + } + })) + .unwrap(); + + let nodes = Lazy::force(&NODES); + let disk_usage = Lazy::force(&DISK_USAGE); + Lazy::force(&REQUEST); + Lazy::force(&CACHE); + db.get_all_nodes().iter().for_each(|n| { + let owner = n.owner_id.to_string(); + if n.is_file { + nodes.with_label_values(&["file", owner.as_str()]).inc(); + disk_usage.with_label_values(&[owner.as_str()]).add(n.size.unwrap_or(0)); + } else { + nodes.with_label_values(&["folder", owner.as_str()]).inc(); + } + }); +} + +pub fn get_metrics() -> Result { + let metric = prometheus::gather(); + Ok(Response::from_string(prometheus::TextEncoder::new().encode_to_string(&metric).unwrap()).boxed()) +} + +pub fn span(name: N, span: &Span) -> Span +where N: Into> { + TRACER.span(name).child_of(span.context().unwrap()).start() +} diff --git a/backend/src/routes/admin.rs b/backend/src/routes/admin.rs index c6baac5..a24d425 100644 --- a/backend/src/routes/admin.rs +++ b/backend/src/routes/admin.rs @@ -1,55 +1,17 @@ -use warp::{Filter, Reply}; -use crate::db::{DBConnection, DBPool, with_db}; -use crate::dto; -use crate::routes::{AppError, get_reply}; -use crate::routes::filters::{admin, UserInfo}; +use rustracing_jaeger::Span; +use tiny_http::{Request, ResponseBox}; -pub fn build_routes(db: DBPool) -> impl Filter + Clone { - let users = warp::path!("admin" / "users") - .and(warp::get()) - .and(admin(db.clone())) - .and(with_db(db.clone())) - .and_then(users); - let set_role = warp::path!("admin" / "set_role") - .and(warp::post()) - .and(warp::body::json()) - .and(admin(db.clone())) - .and(with_db(db.clone())) - .and_then(set_role); - let logout = warp::path!("admin" / "logout") - .and(warp::post()) - .and(warp::body::json()) - .and(admin(db.clone())) - .and(with_db(db.clone())) - .and_then(logout); - let delete_user = warp::path!("admin" / "delete") - .and(warp::post()) - .and(warp::body::json()) - .and(admin(db.clone())) - .and(with_db(db.clone())) - .and_then(delete_user); - let disable_2fa = warp::path!("admin" / "disable_2fa") - .and(warp::post()) - .and(warp::body::json()) - .and(admin(db.clone())) - .and(with_db(db.clone())) - .and_then(disable_2fa); - let is_admin = warp::path!("admin" / "is_admin") - .and(warp::get()) - .and(admin(db.clone())) - .and_then(|_| async { get_reply(&dto::responses::Success { - statusCode: 200 - }) }); - let get_token = warp::path!("admin" / "get_token" / i32) - .and(warp::get()) - .and(admin(db.clone())) - .and(with_db(db)) - .and_then(get_token); +use crate::{ + db::{DBConnection, UserRole}, + dto, + routes::{filters::UserInfo, get_reply, AppError} +}; - users.or(set_role).or(logout).or(delete_user).or(disable_2fa).or(is_admin).or(get_token) -} +pub fn users(_: &Span, _: &mut Request, db: &mut DBConnection, info: UserInfo) -> Result { + if info.0.role != UserRole::Admin { + return AppError::Forbidden("Forbidden").err(); + } -async fn users(_: UserInfo, mut db: DBConnection) -> Result { let users = db.get_users(); let mut res = dto::responses::AdminUsers { @@ -70,60 +32,99 @@ async fn users(_: UserInfo, mut db: DBConnection) -> Result Result { - let mut user = db.get_user(data.user) - .ok_or(AppError::Forbidden("Invalid user"))?; +pub fn set_role( + span: &Span, + _: &mut Request, + db: &mut DBConnection, + info: UserInfo, + data: dto::requests::AdminSetRole +) -> Result { + if info.0.role != UserRole::Admin { + return AppError::Forbidden("Forbidden").err(); + } + + let mut user = db.get_user(span, data.user).ok_or(AppError::Forbidden("Invalid user"))?; user.role = data.role; - db.save_user(&user); + db.save_user(span, &user); - get_reply(&dto::responses::Success { - statusCode: 200 - }) + get_reply(&dto::responses::Success { statusCode: 200 }) } -async fn logout(data: dto::requests::Admin, _: UserInfo, mut db: DBConnection) -> Result { - db.delete_all_tokens(data.user); +pub fn logout( + span: &Span, + _: &mut Request, + db: &mut DBConnection, + info: UserInfo, + data: dto::requests::Admin +) -> Result { + if info.0.role != UserRole::Admin { + return AppError::Forbidden("Forbidden").err(); + } - get_reply(&dto::responses::Success { - statusCode: 200 - }) + db.delete_all_tokens(span, data.user); + + get_reply(&dto::responses::Success { statusCode: 200 }) } -async fn delete_user(data: dto::requests::Admin, _: UserInfo, mut db: DBConnection) -> Result { - let user = db.get_user(data.user) - .ok_or(AppError::Forbidden("Invalid user"))?; - - db.delete_all_tokens(data.user); +pub fn delete_user( + span: &Span, + _: &mut Request, + db: &mut DBConnection, + info: UserInfo, + data: dto::requests::Admin +) -> Result { + if info.0.role != UserRole::Admin { + return AppError::Forbidden("Forbidden").err(); + } - let root_node = super::fs::get_node_and_validate(&user, user.root_id, &mut db).expect("Failed to get root node for deleting"); + let user = db.get_user(span, data.user).ok_or(AppError::Forbidden("Invalid user"))?; - super::fs::delete_node_root(&root_node, &mut db); + db.delete_all_tokens(span, data.user); + + let root_node = + super::fs::get_node_and_validate(span, &user, user.root_id, db).expect("Failed to get root node for deleting"); + + super::fs::delete_node_root(span, &root_node, db); db.delete_user(&user); - get_reply(&dto::responses::Success { - statusCode: 200 - }) + get_reply(&dto::responses::Success { statusCode: 200 }) } -async fn disable_2fa(data: dto::requests::Admin, _: UserInfo, mut db: DBConnection) -> Result { - let mut user = db.get_user(data.user) - .ok_or(AppError::Forbidden("Invalid user"))?; +pub fn disable_2fa( + span: &Span, + _: &mut Request, + db: &mut DBConnection, + info: UserInfo, + data: dto::requests::Admin +) -> Result { + if info.0.role != UserRole::Admin { + return AppError::Forbidden("Forbidden").err(); + } + + let mut user = db.get_user(span, data.user).ok_or(AppError::Forbidden("Invalid user"))?; user.tfa_type = crate::db::TfaTypes::None; - db.save_user(&user); + db.save_user(span, &user); - get_reply(&dto::responses::Success { - statusCode: 200 - }) + get_reply(&dto::responses::Success { statusCode: 200 }) } -async fn get_token(user: i32, _: UserInfo, mut db: DBConnection) -> Result { - let user = db.get_user(user) - .ok_or(AppError::Forbidden("Invalid user"))?; +pub fn get_token( + span: &Span, + _: &mut Request, + db: &mut DBConnection, + info: UserInfo, + user: i32 +) -> Result { + if info.0.role != UserRole::Admin { + return AppError::Forbidden("Forbidden").err(); + } + + let user = db.get_user(span, user).ok_or(AppError::Forbidden("Invalid user"))?; get_reply(&dto::responses::Login { statusCode: 200, - jwt: super::auth::get_token(&user, &mut db) + jwt: super::auth::get_token(span, &user, db) }) } diff --git a/backend/src/routes/auth/basic.rs b/backend/src/routes/auth/basic.rs index c04441b..a966c85 100644 --- a/backend/src/routes/auth/basic.rs +++ b/backend/src/routes/auth/basic.rs @@ -1,45 +1,20 @@ -use warp::Filter; -use crate::db::{DBConnection, DBPool, with_db}; -use crate::db::{TfaTypes, UserRole}; -use crate::dto; -use crate::dto::requests::ChangePassword; -use crate::routes::{AppError, get_reply}; -use crate::routes::filters::{authenticated, UserInfo}; +use rustracing_jaeger::Span; +use tiny_http::{Request, ResponseBox}; -pub fn build_routes(db: DBPool) -> impl Filter + Clone { - let login = warp::path!("auth" / "login") - .and(warp::post()) - .and(warp::body::json()) - .and(with_db(db.clone())) - .and_then(login); - let signup = warp::path!("auth" / "signup") - .and(warp::post()) - .and(warp::body::json()) - .and(with_db(db.clone())) - .and_then(signup); - let refresh = warp::path!("auth" / "refresh") - .and(warp::post()) - .and(authenticated(db.clone())) - .and(with_db(db.clone())) - .and_then(refresh); - let logout_all = warp::path!("auth" / "logout_all") - .and(warp::post()) - .and(authenticated(db.clone())) - .and(with_db(db.clone())) - .and_then(logout_all); - let change_password = warp::path!("auth" / "change_password") - .and(warp::post()) - .and(warp::body::json()) - .and(authenticated(db.clone())) - .and(with_db(db)) - .and_then(change_password); +use crate::{ + db::{DBConnection, TfaTypes, UserRole}, + dto, + routes::{filters::UserInfo, get_reply, AppError} +}; - login.or(signup).or(refresh).or(logout_all).or(change_password) -} - -async fn login(data: dto::requests::Login, mut db: DBConnection) - -> Result { - let user = db.find_user(&data.username, false) +pub fn login( + span: &Span, + _: &mut Request, + db: &mut DBConnection, + data: dto::requests::Login +) -> Result { + let user = db + .find_user(span, &data.username, false) .ok_or(AppError::Unauthorized("Invalid username or password"))?; if !argon2::verify_encoded(user.password.as_str(), data.password.as_bytes()).unwrap_or(false) { @@ -56,60 +31,69 @@ async fn login(data: dto::requests::Login, mut db: DBConnection) return AppError::Unauthorized("Incorrect 2fa").err(); } } else { - if user.tfa_type == TfaTypes::Email { super::tfa::send_2fa_mail(&user); } + if user.tfa_type == TfaTypes::Email { + super::tfa::send_2fa_mail(span, &user); + } - return get_reply(&dto::responses::Success { - statusCode: 200 - }); + return get_reply(&dto::responses::Success { statusCode: 200 }); } } get_reply(&dto::responses::Login { statusCode: 200, - jwt: super::get_token(&user, &mut db) + jwt: super::get_token(span, &user, db) }) } -async fn signup(data: dto::requests::SignUp, mut db: DBConnection) - -> Result { - if db.find_user(&data.username, false).is_some() { +pub fn signup( + span: &Span, + _: &mut Request, + db: &mut DBConnection, + data: dto::requests::SignUp +) -> Result { + if db.find_user(span, &data.username, false).is_some() { return AppError::BadRequest("Username is already taken").err(); } - db.create_user_password(data.username, super::hash_password(&data.password)); + db.create_user_password(span, data.username, super::hash_password(&data.password)); - get_reply(&dto::responses::Success { - statusCode: 200 - }) + get_reply(&dto::responses::Success { statusCode: 200 }) } -async fn refresh(info: UserInfo, mut db: DBConnection) -> Result { - db.delete_token(info.1.id); +pub fn refresh(span: &Span, _: &mut Request, db: &mut DBConnection, info: UserInfo) -> Result { + db.delete_token(span, info.1.id); get_reply(&dto::responses::Login { statusCode: 200, - jwt: super::get_token(&info.0, &mut db) + jwt: super::get_token(span, &info.0, db) }) } -async fn logout_all(info: UserInfo, mut db: DBConnection) -> Result { - db.delete_all_tokens(info.0.id); +pub fn logout_all( + span: &Span, + _: &mut Request, + db: &mut DBConnection, + info: UserInfo +) -> Result { + db.delete_all_tokens(span, info.0.id); - get_reply(&dto::responses::Success { - statusCode: 200 - }) + get_reply(&dto::responses::Success { statusCode: 200 }) } -async fn change_password(data: ChangePassword, mut info: UserInfo, mut db: DBConnection) -> Result { +pub fn change_password( + span: &Span, + _: &mut Request, + db: &mut DBConnection, + mut info: UserInfo, + data: dto::requests::ChangePassword +) -> Result { if !argon2::verify_encoded(info.0.password.as_str(), data.oldPassword.as_bytes()).unwrap_or(false) { return AppError::Unauthorized("Old password is wrong").err(); } info.0.password = super::hash_password(&data.newPassword); - db.save_user(&info.0); - db.delete_all_tokens(info.0.id); + db.save_user(span, &info.0); + db.delete_all_tokens(span, info.0.id); - get_reply(&dto::responses::Success { - statusCode: 200 - }) + get_reply(&dto::responses::Success { statusCode: 200 }) } diff --git a/backend/src/routes/auth/gitlab.rs b/backend/src/routes/auth/gitlab.rs index 470eca1..4707e82 100644 --- a/backend/src/routes/auth/gitlab.rs +++ b/backend/src/routes/auth/gitlab.rs @@ -1,9 +1,15 @@ -use cached::proc_macro::cached; -use lazy_static::lazy_static; -use warp::{Filter, Reply}; -use crate::config::CONFIG; -use crate::db::{DBConnection, DBPool, with_db}; -use crate::routes::AppError; +use std::time::Duration; + +use once_cell::sync::Lazy; +use rustracing_jaeger::Span; +use tiny_http::{Request, Response, ResponseBox}; + +use crate::{ + config::CONFIG, + db::DBConnection, + metrics, + routes::{header, AppError} +}; #[derive(serde::Deserialize, Clone, Debug)] pub struct GitlabTokens { @@ -17,92 +23,112 @@ pub struct GitlabUser { pub is_admin: bool } -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] -pub struct GitlabCallbackQuery { - pub code: String -} +pub static REDIRECT_URL: Lazy = Lazy::new(|| CONFIG.gitlab_redirect_url.clone() + "/api/auth/gitlab_callback"); +pub static TOKEN_URL: Lazy = Lazy::new(|| format!("{}/oauth/token", CONFIG.gitlab_api_url.clone())); +pub static USER_URL: Lazy = Lazy::new(|| format!("{}/api/v4/user", CONFIG.gitlab_api_url.clone())); +pub static AUTHORIZE_URL: Lazy = Lazy::new(|| format!("{}/oauth/authorize", CONFIG.gitlab_url.clone())); -lazy_static! { - static ref REDIRECT_URL: String = CONFIG.gitlab_redirect_url.clone() + "/api/auth/gitlab_callback"; - static ref TOKEN_URL: String = format!("{}/oauth/token", CONFIG.gitlab_api_url.clone()); - static ref USER_URL: String = format!("{}/api/v4/user", CONFIG.gitlab_api_url.clone()); - static ref AUTHORIZE_URL: String = format!("{}/oauth/authorize", CONFIG.gitlab_url.clone()); -} - -pub fn get_gitlab_token(code_or_token: String, token: bool) -> Option { +pub fn get_gitlab_token(span: &Span, code_or_token: String, token: bool) -> Option { + let _span = metrics::span("get_gitlab_token", span); let mut req = ureq::post(&TOKEN_URL) .query("redirect_uri", &REDIRECT_URL) .query("client_id", &CONFIG.gitlab_id) .query("client_secret", &CONFIG.gitlab_secret); if token { - req = req - .query("refresh_token", &code_or_token) - .query("grant_type", "refresh_token"); + req = req.query("refresh_token", &code_or_token).query("grant_type", "refresh_token"); } else { - req = req - .query("code", &code_or_token) - .query("grant_type", "authorization_code"); + req = req.query("code", &code_or_token).query("grant_type", "authorization_code"); } req.call().ok()?.into_json().ok() } -#[cached(time=300, time_refresh=false, option=true)] -pub fn get_gitlab_user(token: String) -> Option { - ureq::get(&USER_URL) - .set("Authorization", &format!("Bearer {}", token)) - .call() - .ok()? - .into_json().ok() +pub fn get_gitlab_user(span: &Span, token: String) -> Option { + static CACHE: Lazy> = + Lazy::new(|| stretto::CacheBuilder::new(1000, 100).finalize().expect("Failed to create gitlab cache")); + match CACHE.get(&token) { + None => { + let _span = metrics::span("get_gitlab_user", span); + ureq::get(&USER_URL) + .set("Authorization", &format!("Bearer {}", token)) + .call() + .ok()? + .into_json::() + .ok() + .map(|v| { + CACHE.insert_with_ttl(token, v.clone(), 1, Duration::from_secs(500)); + v + }) + } + Some(v) => Some(v.value().clone()) + } } -pub fn build_routes(db: DBPool) -> impl Filter + Clone { - let gitlab = warp::path!("auth" / "gitlab") - .and(warp::get()) - .and_then(gitlab); - let gitlab_callback = warp::path!("auth" / "gitlab_callback") - .and(warp::get()) - .and(warp::query::query::()) - .and(with_db(db)) - .and_then(gitlab_callback); - - gitlab.or(gitlab_callback) +pub fn gitlab(_: &Span, _: &mut Request, _: &mut DBConnection) -> Result { + let uri = format!( + "{}?redirect_uri={}&client_id={}&scope=read_user&response_type=code", + AUTHORIZE_URL.as_str(), + REDIRECT_URL.as_str(), + CONFIG.gitlab_id + ); + Ok(Response::empty(302).with_header(header("location", &uri)).boxed()) } -async fn gitlab() -> Result { - let uri = format!("{}?redirect_uri={}&client_id={}&scope=read_user&response_type=code", AUTHORIZE_URL.as_str(), REDIRECT_URL.as_str(), CONFIG.gitlab_id); - Ok(warp::redirect::found(uri.parse::().expect("Failed to parse gitlab auth uri"))) -} - -async fn gitlab_callback(code: GitlabCallbackQuery, mut db: DBConnection) -> Result { +pub fn gitlab_callback( + span: &Span, + _: &mut Request, + db: &mut DBConnection, + query_string: &str +) -> Result { use crate::db::UserRole; - let tokens = get_gitlab_token(code.code, false).ok_or(AppError::Unauthorized("Invalid code"))?; - let gitlab_user = get_gitlab_user(tokens.access_token.clone()).ok_or(AppError::Unauthorized("Invalid code"))?; + let code: &str = query_string + // a=b&code=c&d + .split('&') + // ['a=b', 'code=c', 'd'] + .find_map(|prop| + prop.split_once('=') + // [Some(('a', 'b')), Some(('code', 'c')), None] + .and_then(|v| + v.0.eq("code") + .then_some(v.1) + ) + ) + .ok_or(AppError::BadRequest("Query code missing"))?; - let user = db.find_user(&gitlab_user.username, true); + let tokens = get_gitlab_token(span, code.to_string(), false).ok_or(AppError::Unauthorized("Invalid code"))?; + let gitlab_user = + get_gitlab_user(span, tokens.access_token.clone()).ok_or(AppError::Unauthorized("Invalid code"))?; + + let user = db.find_user(span, &gitlab_user.username, true); let user = match user { Some(mut v) => { v.gitlab_at = Some(tokens.access_token); v.gitlab_rt = Some(tokens.refresh_token); - db.save_user(&v); + db.save_user(span, &v); v - }, - None => { - db.create_user_gitlab( - gitlab_user.username, - if gitlab_user.is_admin { UserRole::Admin } else { UserRole::Disabled }, - tokens.access_token, - tokens.refresh_token - ) } + None => db.create_user_gitlab( + span, + gitlab_user.username, + if gitlab_user.is_admin { + UserRole::Admin + } else { + UserRole::Disabled + }, + tokens.access_token, + tokens.refresh_token + ) }; if user.role == UserRole::Disabled { - Ok(warp::reply::html("

Your account is disabled, please contact an admin.
Go to login page

").into_response()) + Ok( + Response::from_data("

Your account is disabled, please contact an admin.
Go to login page

") + .with_header(header("content-type", "text/html; charset=utf-8")) + .boxed() + ) } else { - let uri = format!("/set_token?token={}", super::get_token(&user, &mut db)); - Ok(warp::redirect::found(uri.parse::().expect("Failed to parse set_token uri")).into_response()) + let uri = format!("/set_token?token={}", super::get_token(span, &user, db)); + Ok(Response::empty(302).with_header(header("location", &uri)).boxed()) } } - diff --git a/backend/src/routes/auth/mod.rs b/backend/src/routes/auth/mod.rs index aeb6446..bb242fc 100644 --- a/backend/src/routes/auth/mod.rs +++ b/backend/src/routes/auth/mod.rs @@ -1,20 +1,12 @@ -mod basic; -mod tfa; +pub mod basic; pub mod gitlab; +pub mod tfa; use std::ops::Add; -use lazy_static::lazy_static; -use ring::rand; -use ring::rand::SecureRandom; -use warp::Filter; -use crate::db::DBPool; -pub fn build_routes(db: DBPool) -> impl Filter + Clone { - SEC_RANDOM.fill(&mut [0; 1]).expect("Failed to init secure random"); - basic::build_routes(db.clone()) - .or(tfa::build_routes(db.clone())) - .or(gitlab::build_routes(db)) -} +use once_cell::sync::Lazy; +use ring::{rand, rand::SecureRandom}; +use rustracing_jaeger::Span; #[derive(Debug, serde::Deserialize, serde::Serialize)] pub struct JWTClaims { @@ -25,13 +17,12 @@ pub struct JWTClaims { } pub static JWT_ALGORITHM: jsonwebtoken::Algorithm = jsonwebtoken::Algorithm::HS512; - -lazy_static! { - pub static ref SEC_RANDOM: rand::SystemRandom = rand::SystemRandom::new(); - pub static ref JWT_SECRET: Vec = get_jwt_secret(); - pub static ref JWT_DECODE_KEY: jsonwebtoken::DecodingKey = jsonwebtoken::DecodingKey::from_secret(JWT_SECRET.as_slice()); - pub static ref JWT_ENCODE_KEY: jsonwebtoken::EncodingKey = jsonwebtoken::EncodingKey::from_secret(JWT_SECRET.as_slice()); -} +pub static SEC_RANDOM: Lazy = Lazy::new(rand::SystemRandom::new); +pub static JWT_SECRET: Lazy> = Lazy::new(get_jwt_secret); +pub static JWT_DECODE_KEY: Lazy = + Lazy::new(|| jsonwebtoken::DecodingKey::from_secret(JWT_SECRET.as_slice())); +pub static JWT_ENCODE_KEY: Lazy = + Lazy::new(|| jsonwebtoken::EncodingKey::from_secret(JWT_SECRET.as_slice())); fn get_jwt_secret() -> Vec { let secret = std::fs::read("jwt.secret"); @@ -45,12 +36,12 @@ fn get_jwt_secret() -> Vec { } } -pub fn get_token(user: &crate::db::User, db: &mut crate::db::DBConnection) -> String { +pub fn get_token(span: &Span, user: &crate::db::User, db: &mut crate::db::DBConnection) -> String { let iat = chrono::Utc::now(); let exp = iat.add(chrono::Duration::hours(24)).timestamp(); let iat = iat.timestamp(); - let token = db.create_token(user.id, exp); + let token = db.create_token(span, user.id, exp); let claims = JWTClaims { exp, @@ -59,8 +50,12 @@ pub fn get_token(user: &crate::db::User, db: &mut crate::db::DBConnection) -> St sub: user.id }; - jsonwebtoken::encode(&jsonwebtoken::Header::new(JWT_ALGORITHM), &claims, &JWT_ENCODE_KEY) - .expect("Failed to create JWT token") + jsonwebtoken::encode( + &jsonwebtoken::Header::new(JWT_ALGORITHM), + &claims, + &JWT_ENCODE_KEY + ) + .expect("Failed to create JWT token") } pub fn hash_password(password: &String) -> String { diff --git a/backend/src/routes/auth/tfa.rs b/backend/src/routes/auth/tfa.rs index 02d9f56..a23e313 100644 --- a/backend/src/routes/auth/tfa.rs +++ b/backend/src/routes/auth/tfa.rs @@ -1,44 +1,52 @@ -use lazy_static::lazy_static; use lettre::Transport; +use once_cell::sync::Lazy; use ring::rand::SecureRandom; -use warp::Filter; -use crate::config::CONFIG; -use crate::db::{DBConnection, DBPool, with_db, TfaTypes}; -use crate::dto; -use crate::routes::{AppError, get_reply}; -use crate::routes::filters::{authenticated, UserInfo}; +use rustracing_jaeger::Span; +use tiny_http::{Request, ResponseBox}; + +use crate::{ + config::CONFIG, + db, + db::{DBConnection, TfaTypes}, + dto, + metrics, + routes::{filters::UserInfo, get_reply, AppError} +}; fn build_mail_sender() -> lettre::SmtpTransport { lettre::SmtpTransport::builder_dangerous(CONFIG.smtp_server.clone()) .port(CONFIG.smtp_port) - .tls( - lettre::transport::smtp::client::Tls::Required( - lettre::transport::smtp::client::TlsParameters::new( - CONFIG.smtp_server.clone() - ).unwrap() - ) - ) - .credentials(lettre::transport::smtp::authentication::Credentials::new(CONFIG.smtp_user.clone(), CONFIG.smtp_password.clone())) + .tls(lettre::transport::smtp::client::Tls::Required( + lettre::transport::smtp::client::TlsParameters::new(CONFIG.smtp_server.clone()).unwrap() + )) + .credentials(lettre::transport::smtp::authentication::Credentials::new( + CONFIG.smtp_user.clone(), + CONFIG.smtp_password.clone() + )) .build() } -lazy_static! { - static ref MAIL_SENDER: lettre::SmtpTransport = build_mail_sender(); -} +static MAIL_SENDER: Lazy = Lazy::new(build_mail_sender); -fn get_totp(user: &crate::db::User) -> totp_rs::TOTP { +fn get_totp(user: &db::User) -> totp_rs::TOTP { totp_rs::TOTP::from_rfc6238( totp_rs::Rfc6238::new( 6, user.tfa_secret.clone().unwrap(), Some("MFileserver".to_owned()), user.name.clone() - ).unwrap() - ).unwrap() + ) + .unwrap() + ) + .unwrap() } -pub fn verify2fa(user: &crate::db::User, code: String) -> bool { - let allowed_skew = if user.tfa_type == TfaTypes::Totp {0} else {10}; +pub fn verify2fa(user: &db::User, code: String) -> bool { + let allowed_skew = if user.tfa_type == TfaTypes::Totp { + 0 + } else { + 10 + }; let totp = get_totp(user); let time = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs(); let base_step = time / totp.step - allowed_skew; @@ -51,54 +59,39 @@ pub fn verify2fa(user: &crate::db::User, code: String) -> bool { false } -pub fn send_2fa_mail(user: &crate::db::User) { +pub fn send_2fa_mail(span: &Span, user: &db::User) { + let _span = metrics::span("send_2fa_mail", span); let totp = get_totp(user); let code = totp.generate_current().unwrap(); let mail = lettre::Message::builder() .from("fileserver@mattv.de".parse().unwrap()) .to(user.name.parse().unwrap()) .subject("MFileserver - Email 2fa code") - .body(format!("Your code is: {}\r\nIt is valid for 5 minutes", code)) + .body(format!( + "Your code is: {}\r\nIt is valid for 5 minutes", + code + )) .unwrap(); MAIL_SENDER.send(&mail).expect("Failed to send mail"); } -pub fn build_routes(db: DBPool) -> impl Filter + Clone { - let tfa_setup = warp::path!("auth" / "2fa" / "setup") - .and(warp::post()) - .and(warp::body::json()) - .and(authenticated(db.clone())) - .and(with_db(db.clone())) - .and_then(tfa_setup); - let tfa_complete = warp::path!("auth" / "2fa" / "complete") - .and(warp::post()) - .and(warp::body::json()) - .and(authenticated(db.clone())) - .and(with_db(db.clone())) - .and_then(tfa_complete); - let tfa_disable = warp::path!("auth" / "2fa" / "disable") - .and(warp::post()) - .and(authenticated(db.clone())) - .and(with_db(db)) - .and_then(tfa_disable); - - tfa_setup.or(tfa_complete).or(tfa_disable) -} - -async fn tfa_setup(data: dto::requests::TfaSetup, mut info: UserInfo, mut db: DBConnection) - -> Result { +pub fn tfa_setup( + span: &Span, + _: &mut Request, + db: &mut DBConnection, + mut info: UserInfo, + data: dto::requests::TfaSetup +) -> Result { let mut secret: [u8; 32] = [0; 32]; super::SEC_RANDOM.fill(&mut secret).expect("Failed to generate secret"); let secret = Vec::from(secret); info.0.tfa_secret = Some(secret); - db.save_user(&info.0); + db.save_user(span, &info.0); if data.mail { - send_2fa_mail(&info.0); - get_reply(&dto::responses::Success { - statusCode: 200 - }) + send_2fa_mail(span, &info.0); + get_reply(&dto::responses::Success { statusCode: 200 }) } else { let totp = get_totp(&info.0); get_reply(&dto::responses::TfaSetup { @@ -109,28 +102,37 @@ async fn tfa_setup(data: dto::requests::TfaSetup, mut info: UserInfo, mut db: DB } } -async fn tfa_complete(data: dto::requests::TfaComplete, mut info: UserInfo, mut db: DBConnection) - -> Result { - info.0.tfa_type = if data.mail { TfaTypes::Email } else { TfaTypes::Totp }; +pub fn tfa_complete( + span: &Span, + _: &mut Request, + db: &mut DBConnection, + mut info: UserInfo, + data: dto::requests::TfaComplete +) -> Result { + info.0.tfa_type = if data.mail { + TfaTypes::Email + } else { + TfaTypes::Totp + }; if verify2fa(&info.0, data.code) { - db.save_user(&info.0); - db.delete_all_tokens(info.0.id); - get_reply(&dto::responses::Success { - statusCode: 200 - }) + db.save_user(span, &info.0); + db.delete_all_tokens(span, info.0.id); + get_reply(&dto::responses::Success { statusCode: 200 }) } else { AppError::BadRequest("Incorrect 2fa code").err() } } -async fn tfa_disable(mut info: UserInfo, mut db: DBConnection) - -> Result { +pub fn tfa_disable( + span: &Span, + _: &mut Request, + db: &mut DBConnection, + mut info: UserInfo +) -> Result { info.0.tfa_secret = None; info.0.tfa_type = TfaTypes::None; - db.save_user(&info.0); - db.delete_all_tokens(info.0.id); - get_reply(&dto::responses::Success { - statusCode: 200 - }) + db.save_user(span, &info.0); + db.delete_all_tokens(span, info.0.id); + get_reply(&dto::responses::Success { statusCode: 200 }) } diff --git a/backend/src/routes/filters.rs b/backend/src/routes/filters.rs index ff01a49..4a81de6 100644 --- a/backend/src/routes/filters.rs +++ b/backend/src/routes/filters.rs @@ -1,50 +1,26 @@ -use warp::Filter; -use warp::http::{HeaderMap, HeaderValue}; -use crate::db::UserRole; -use crate::db::{DBConnection, DBPool, with_db}; -use crate::routes::AppError; -use crate::routes::auth; +use rustracing_jaeger::Span; + +use crate::{ + db::{DBConnection, UserRole}, + metrics, + routes::{auth, AppError} +}; pub type UserInfo = (crate::db::User, crate::db::Token); -pub fn authenticated(db: DBPool) -> impl Filter + Clone { - warp::header::headers_cloned() - .map(move |_headers: HeaderMap| _headers) - .and(with_db(db)) - .and_then(authorize) -} - -pub fn admin(db: DBPool) -> impl Filter + Clone { - warp::header::headers_cloned() - .map(move |_headers: HeaderMap| _headers) - .and(with_db(db)) - .and_then(|_headers, db| async { - let info = authorize(_headers, db).await?; - if info.0.role == UserRole::Admin { - Ok(info) - } else { - AppError::Forbidden("Forbidden").err() - } - }) -} - -async fn authorize(_headers: HeaderMap, mut db: DBConnection) -> Result { - authorize_jwt(extract_jwt(&_headers).map_err(|e| e.reject())?, &mut db).await -} - -pub async fn authorize_jwt(jwt: String, db: &mut DBConnection) -> Result { +pub fn authorize_jwt(span: &Span, jwt: &str, db: &mut DBConnection) -> Result { + let inner_span = metrics::span("authorize_jwt", span); let decoded = jsonwebtoken::decode::( - &jwt, - &crate::routes::auth::JWT_DECODE_KEY, + jwt, + &auth::JWT_DECODE_KEY, &jsonwebtoken::Validation::new(auth::JWT_ALGORITHM) - ).map_err(|_| AppError::Forbidden("Invalid token"))?; + ) + .map_err(|_| AppError::Forbidden("Invalid token"))?; - db.cleanup_tokens(); + db.cleanup_tokens(span); - let mut user = db.get_user(decoded.claims.sub) - .ok_or(AppError::Forbidden("Invalid token"))?; - let token = db.get_token(decoded.claims.jti) - .ok_or(AppError::Forbidden("Invalid token"))?; + let mut user = db.get_user(&inner_span, decoded.claims.sub).ok_or(AppError::Forbidden("Invalid token"))?; + let token = db.get_token(&inner_span, decoded.claims.jti).ok_or(AppError::Forbidden("Invalid token"))?; if user.id != token.owner_id { return AppError::Forbidden("Invalid token").err(); @@ -53,38 +29,27 @@ pub async fn authorize_jwt(jwt: String, db: &mut DBConnection) -> Result Some(v), None => { - let tokens = auth::gitlab::get_gitlab_token(user.gitlab_rt.clone().unwrap(), true); + let tokens = auth::gitlab::get_gitlab_token(&inner_span, user.gitlab_rt.clone().unwrap(), true); if let Some(tokens) = tokens { user.gitlab_at = Some(tokens.access_token.clone()); user.gitlab_rt = Some(tokens.refresh_token); - db.save_user(&user); - auth::gitlab::get_gitlab_user(tokens.access_token) - } else { None } + db.save_user(&inner_span, &user); + auth::gitlab::get_gitlab_user(&inner_span, tokens.access_token) + } else { + None + } } }; if info.is_none() || info.unwrap().username != user.name { - db.delete_all_tokens(token.owner_id); - db.delete_all_tokens(user.id); + db.delete_all_tokens(&inner_span, token.owner_id); + db.delete_all_tokens(&inner_span, user.id); return AppError::Forbidden("Invalid gitlab user").err(); } } Ok((user, token)) } - -fn extract_jwt(_headers: &HeaderMap) -> Result { - let header = match _headers.get(warp::http::header::AUTHORIZATION) { - Some(v) => v, - None => return Err(AppError::Unauthorized("Missing token")) - }; - let header = header.to_str().map_err(|_| AppError::Unauthorized("Missing token"))?; - if !header.starts_with("Bearer ") { - Err(AppError::Unauthorized("Missing token")) - } else { - Ok(header.trim_start_matches("Bearer ").to_owned()) - } -} \ No newline at end of file diff --git a/backend/src/routes/fs/mod.rs b/backend/src/routes/fs/mod.rs index 2e622f1..a401d62 100644 --- a/backend/src/routes/fs/mod.rs +++ b/backend/src/routes/fs/mod.rs @@ -1,30 +1,19 @@ -use std::collections::VecDeque; -use std::iter::Iterator; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering}; -use lazy_static::lazy_static; -use warp::Filter; -use futures::TryFutureExt; -use futures::TryStreamExt; -use crate::db::DBPool; +pub mod routes; -mod routes; - -pub fn build_routes(db: DBPool) -> impl Filter + Clone { - { - if !std::path::Path::new("temp").is_dir() { - std::fs::create_dir("temp").expect("Failed to create temp dir"); - } - std::fs::read_dir("temp") - .expect("Failed to iter temp dir") - .for_each(|dir| { - std::fs::remove_file(dir.expect("Failed to retrieve temp dir entry").path()).expect("Failed to delete file in temp dir"); - }); - DELETE_RT.spawn(async {}); - ZIP_RT.spawn(async {}); +use std::{ + collections::VecDeque, + iter::Iterator, + sync::{ + atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering}, + Arc } - routes::build_routes(db) -} +}; + +use once_cell::sync::Lazy; +use rayon_core::ThreadPoolBuilder; +use rustracing_jaeger::Span; + +use crate::metrics; pub static WINDOWS_INVALID_CHARS: &str = "\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1A\x1B\x1C\x1D\x1E\x1F<>:\"/\\|"; @@ -43,19 +32,36 @@ pub enum CreateNodeResult { Exists(bool, i32) } -lazy_static! { - static ref DELETE_RT: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread().worker_threads(1).enable_time().build().expect("Failed to create delete runtime"); - static ref ZIP_RT: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread().worker_threads(3).enable_time().build().expect("Failed to create zip runtime"); - pub static ref ZIP_TO_PROGRESS: tokio::sync::RwLock, Arc>> = tokio::sync::RwLock::new(std::collections::HashMap::new()); -} +pub static DELETE_POOL: Lazy = Lazy::new(|| { + ThreadPoolBuilder::new() + .num_threads(1) + .thread_name(|i| format!("Delete thread {}", i)) + .build() + .unwrap() +}); +pub static ZIP_POOL: Lazy = Lazy::new(|| { + ThreadPoolBuilder::new() + .num_threads(3) + .thread_name(|i| format!("Zip thread {}", i)) + .build() + .unwrap() +}); +pub static ZIP_TO_PROGRESS: Lazy< + parking_lot::RwLock, Arc>> +> = Lazy::new(|| parking_lot::RwLock::new(std::collections::HashMap::new())); static NEXT_TEMP_ID: AtomicU64 = AtomicU64::new(0); -async fn cleanup_temp_zips() { - let mut existing = ZIP_TO_PROGRESS.write().await; +fn cleanup_temp_zips(span: &Span) { + let _span = metrics::span("zipping - cleanup", span); + let mut existing = ZIP_TO_PROGRESS.write(); existing.retain(|_, v| { - if Arc::strong_count(v) == 1 && v.done.load(Ordering::Relaxed) && v.delete_after.load(Ordering::Relaxed) <= chrono::Utc::now().timestamp() { - std::fs::remove_file(std::path::Path::new(&format!("./temp/{}", v.temp_id))).expect("Failed to delete temp file"); + if Arc::strong_count(v) == 1 + && v.done.load(Ordering::Relaxed) + && v.delete_after.load(Ordering::Relaxed) <= chrono::Utc::now().timestamp() + { + std::fs::remove_file(std::path::Path::new(&format!("./temp/{}", v.temp_id))) + .expect("Failed to delete temp file"); false } else { true @@ -63,37 +69,54 @@ async fn cleanup_temp_zips() { }); } -fn get_nodes_recursive(root: crate::db::Inode, db: &mut crate::db::DBConnection) -> VecDeque { +fn get_nodes_recursive( + span: &Span, + root: crate::db::Inode, + db: &mut crate::db::DBConnection +) -> VecDeque { + let inner_span = metrics::span("get_nodes_recursive", span); let mut nodes = VecDeque::from(vec![root.clone()]); - if root.is_file { return nodes; } + if root.is_file { + return nodes; + } let mut nodes_to_check = VecDeque::from(vec![root]); while !nodes_to_check.is_empty() { let node = nodes_to_check.pop_front().unwrap(); - db.get_children(node.id).iter().for_each(|node| { + db.get_children(&inner_span, node.id).iter().for_each(|node| { nodes.push_back(node.clone()); - if !node.is_file { nodes_to_check.push_front(node.clone()); } + if !node.is_file { + nodes_to_check.push_front(node.clone()); + } }); } nodes } -fn get_node_path(node: crate::db::Inode, db: &mut crate::db::DBConnection) -> VecDeque { +fn get_node_path(span: &Span, node: crate::db::Inode, db: &mut crate::db::DBConnection) -> VecDeque { + let inner_span = metrics::span("get_node_path", span); let mut path = VecDeque::from(vec![node.clone()]); let mut node = node; while let Some(parent) = node.parent_id { - node = db.get_node(parent).expect("Failed to get node parent"); + node = db.get_node(&inner_span, parent).expect("Failed to get node parent"); path.push_front(node.clone()); } path } -fn get_total_size(node: crate::db::Inode, db: &mut crate::db::DBConnection) -> u64 { - let nodes = get_nodes_recursive(node, db); - nodes.iter().fold(0_u64, |acc, node| acc + node.size.unwrap_or(0) as u64) +fn get_total_size(span: &Span, node: crate::db::Inode, db: &mut crate::db::DBConnection) -> u64 { + let inner_span = metrics::span("get_total_size", span); + get_nodes_recursive(&inner_span, node, db) + .iter() + .fold(0_u64, |acc, node| acc + node.size.unwrap_or(0) as u64) } -pub fn get_node_and_validate(user: &crate::db::User, node: i32, db: &mut crate::db::DBConnection) -> Option { - let node = db.get_node(node)?; +pub fn get_node_and_validate( + span: &Span, + user: &crate::db::User, + node: i32, + db: &mut crate::db::DBConnection +) -> Option { + let node = db.get_node(span, node)?; if node.owner_id != user.id { None } else { @@ -101,21 +124,33 @@ pub fn get_node_and_validate(user: &crate::db::User, node: i32, db: &mut crate:: } } -pub fn create_node(name: String, owner: &crate::db::User, file: bool, parent: Option, force: bool, db: &mut crate::db::DBConnection) - -> Result { +pub fn create_node( + span: &Span, + name: String, + owner: &crate::db::User, + file: bool, + parent: Option, + force: bool, + db: &mut crate::db::DBConnection +) -> Result { + let inner_span = metrics::span("create_node", span); if !force && (name.is_empty() || name.starts_with(' ') || name.contains(|c| { - WINDOWS_INVALID_CHARS.contains(c) - } || name.ends_with(' ') || name.ends_with('.') || name == "." || name == "..")) { - return Err(CreateNodeResult::InvalidName); - } + WINDOWS_INVALID_CHARS.contains(c) + } || name.ends_with(' ') || name.ends_with('.') || name == "." || name == "..")) { + return Err(CreateNodeResult::InvalidName); + } if let Some(parent) = parent { - let parent = match get_node_and_validate(owner, parent, db) { - None => { return Err(CreateNodeResult::InvalidParent); } + let parent = match get_node_and_validate(&inner_span, owner, parent, db) { + None => { + return Err(CreateNodeResult::InvalidParent); + } Some(v) => v }; - if parent.is_file { return Err(CreateNodeResult::InvalidParent); } - let children = db.get_children(parent.id); + if parent.is_file { + return Err(CreateNodeResult::InvalidParent); + } + let children = db.get_children(&inner_span, parent.id); for child in children { if child.name == name { return Err(CreateNodeResult::Exists(child.is_file, child.id)); @@ -123,29 +158,36 @@ pub fn create_node(name: String, owner: &crate::db::User, file: bool, parent: Op } } - Ok(db.create_node(file, name, parent, owner.id)) + Ok(db.create_node(&inner_span, file, name, parent, owner.id)) } -pub fn delete_node_root(node: &crate::db::Inode, db: &mut crate::db::DBConnection) { - get_nodes_recursive(node.clone(), db).iter().rev().for_each(|node| { - db.delete_node(node); +pub fn delete_node_root(span: &Span, node: &crate::db::Inode, db: &mut crate::db::DBConnection) { + get_nodes_recursive(span, node.clone(), db).iter().rev().for_each(|node| { + db.delete_node(span, node); }); } -pub async fn delete_node(node: &crate::db::Inode, sender: &mut warp::hyper::body::Sender, db: &mut crate::db::DBConnection) { - if node.parent_id.is_none() { return; } +pub fn delete_node( + span: &Span, + node: &crate::db::Inode, + sender: &std::sync::mpsc::Sender, + db: &mut crate::db::DBConnection +) { + if node.parent_id.is_none() { + return; + } - for node in get_nodes_recursive(node.clone(), db).iter().rev() { - sender.send_data(warp::hyper::body::Bytes::from(format!("Deleting {}...", generate_path(node, db)))).await.unwrap(); - db.delete_node(node); - sender.send_data(warp::hyper::body::Bytes::from(" Done \n")).await.unwrap(); + for node in get_nodes_recursive(span, node.clone(), db).iter().rev() { + sender.send(format!("Deleting {}...", generate_path(span, node, db))).unwrap(); + db.delete_node(span, node); + sender.send(" Done \n".to_owned()).unwrap(); } } -pub fn generate_path(node: &crate::db::Inode, db: &mut crate::db::DBConnection) -> String { +pub fn generate_path(span: &Span, node: &crate::db::Inode, db: &mut crate::db::DBConnection) -> String { let mut path = String::new(); - get_node_path(node.clone(), db).iter().for_each(|node| { + get_node_path(span, node.clone(), db).iter().for_each(|node| { if node.parent_id.is_none() { path += "/"; } else { @@ -159,12 +201,16 @@ pub fn generate_path(node: &crate::db::Inode, db: &mut crate::db::DBConnection) path } -pub fn generate_path_dto(node: &crate::db::Inode, db: &mut crate::db::DBConnection) -> crate::dto::responses::GetPath { +pub fn generate_path_dto( + span: &Span, + node: &crate::db::Inode, + db: &mut crate::db::DBConnection +) -> crate::dto::responses::GetPath { let mut get_path = crate::dto::responses::GetPath { segments: Vec::new() }; - get_node_path(node.clone(), db).iter().for_each(|node| { + get_node_path(span, node.clone(), db).iter().for_each(|node| { if node.parent_id.is_none() { get_path.segments.push(crate::dto::responses::GetPathSegment { path: "/".to_owned(), @@ -186,14 +232,3 @@ pub fn generate_path_dto(node: &crate::db::Inode, db: &mut crate::db::DBConnecti get_path } - -pub fn get_file_stream_body(path: String) -> warp::hyper::Body { - warp::hyper::Body::wrap_stream( - tokio::fs::File::open(path) - .map_ok(|file| - tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new()) - .map_ok(bytes::BytesMut::freeze) - ) - .try_flatten_stream() - ) -} diff --git a/backend/src/routes/fs/routes.rs b/backend/src/routes/fs/routes.rs index f6d8919..3298cb3 100644 --- a/backend/src/routes/fs/routes.rs +++ b/backend/src/routes/fs/routes.rs @@ -1,95 +1,40 @@ -use std::collections::{BTreeSet, HashMap}; -use std::io::{Read, Write}; -use std::sync::atomic::Ordering; -use futures::{Stream, StreamExt}; -use headers::HeaderMapExt; -use warp::{Filter, Reply}; -use crate::db::{DBConnection, DBPool, with_db}; -use crate::dto; -use crate::routes::{AppError, get_reply}; -use crate::routes::filters::{authenticated, UserInfo}; +use std::{ + collections::{BTreeSet, HashMap}, + fs::File, + io::{Read, Write}, + ops::DerefMut, + sync::atomic::Ordering +}; -pub fn build_routes(db: DBPool) -> impl Filter + Clone { - let root = warp::path!("fs" / "root") - .and(warp::get()) - .and(authenticated(db.clone())) - .and_then(root); - let node = warp::path!("fs" / "node" / i32) - .and(warp::get()) - .and(authenticated(db.clone())) - .and(with_db(db.clone())) - .and_then(node) - .with(warp::compression::brotli()); - let path = warp::path!("fs" / "path" / i32) - .and(warp::get()) - .and(authenticated(db.clone())) - .and(with_db(db.clone())) - .and_then(path); - let create_folder = warp::path!("fs" / "create_folder") - .and(warp::post()) - .and(warp::body::json()) - .and(authenticated(db.clone())) - .and(with_db(db.clone())) - .and_then(|data, info, db| create_node(data, info, db, false)); - let create_file = warp::path!("fs" / "create_file") - .and(warp::post()) - .and(warp::body::json()) - .and(authenticated(db.clone())) - .and(with_db(db.clone())) - .and_then(|data, info, db| create_node(data, info, db, true)); - let delete_node = warp::path!("fs" / "delete" / i32) - .and(warp::post()) - .and(authenticated(db.clone())) - .and(with_db(db.clone())) - .and_then(delete_node); - let upload = warp::path!("fs" / "upload" / i32) - .and(warp::post()) - .and(warp::body::stream()) - .and(authenticated(db.clone())) - .and(with_db(db.clone())) - .and_then(upload); - let create_zip = warp::path!("fs" / "create_zip") - .and(warp::post()) - .and(warp::body::json()) - .and(authenticated(db.clone())) - .and(with_db(db.clone())) - .and_then(create_zip); - let download = warp::path!("fs" / "download") - .and(warp::post()) - .and(warp::body::form()) - .and(with_db(db.clone())) - .and_then(download); - let download_multi = warp::path!("fs" / "download_multi") - .and(warp::post()) - .and(warp::body::form()) - .and(with_db(db.clone())) - .and_then(download_multi); - let download_preview = warp::path!("fs" / "download_preview" / i32) - .and(warp::get()) - .and(authenticated(db.clone())) - .and(with_db(db.clone())) - .and_then(download_preview); - let get_type = warp::path!("fs" / "get_type" / i32) - .and(warp::get()) - .and(authenticated(db.clone())) - .and(with_db(db)) - .and_then(get_type); +use rustracing_jaeger::Span; +use tiny_http::{Request, Response, ResponseBox, StatusCode}; - root.or(node).or(path).or(create_folder).or(create_file).or(delete_node).or(upload).or(create_zip).or(download).or(download_multi).or(download_preview).or(get_type) -} +use crate::{ + db, + db::DBConnection, + dto, + header, + metrics, + routes::{filters::UserInfo, get_reply, AppError, ChannelReader} +}; -async fn root(info: UserInfo) -> Result { +pub fn root(_: &Span, _: &mut Request, _: &mut DBConnection, info: UserInfo) -> Result { get_reply(&dto::responses::Root { statusCode: 200, rootId: info.0.root_id }) } -async fn node(node: i32, info: UserInfo, mut db: DBConnection) -> Result { - let guard_lock = DBConnection::get_lock(info.0.id).await; - let _guard = guard_lock.read().await; - let node = super::get_node_and_validate(&info.0, node, &mut db) - .ok_or(AppError::BadRequest("Unknown node"))?; +pub fn node( + span: &Span, + _: &mut Request, + db: &mut DBConnection, + info: UserInfo, + node: i32 +) -> Result { + let guard_lock = DBConnection::get_lock(info.0.id); + let _guard = guard_lock.read(span); + let node = super::get_node_and_validate(span, &info.0, node, db).ok_or(AppError::BadRequest("Unknown node"))?; get_reply(&dto::responses::GetNode { statusCode: 200, @@ -100,176 +45,227 @@ async fn node(node: i32, info: UserInfo, mut db: DBConnection) -> Result Result { - let guard_lock = DBConnection::get_lock(info.0.id).await; - let _guard = guard_lock.read().await; - let node = super::get_node_and_validate(&info.0, node, &mut db) - .ok_or(AppError::BadRequest("Unknown node"))?; +pub fn path( + span: &Span, + _: &mut Request, + db: &mut DBConnection, + info: UserInfo, + node: i32 +) -> Result { + let guard_lock = DBConnection::get_lock(info.0.id); + let _guard = guard_lock.read(span); + let node = super::get_node_and_validate(span, &info.0, node, db).ok_or(AppError::BadRequest("Unknown node"))?; - get_reply(&super::generate_path_dto(&node, &mut db)) + get_reply(&super::generate_path_dto(span, &node, db)) } -async fn create_node(data: dto::requests::CreateNode, info: UserInfo, mut db: DBConnection, file: bool) -> Result { - let guard_lock = DBConnection::get_lock(info.0.id).await; - let _guard = guard_lock.read().await; - let node = super::create_node(data.name, &info.0, file, Some(data.parent), false, &mut db); +pub fn create_node( + span: &Span, + _: &mut Request, + db: &mut DBConnection, + info: UserInfo, + data: dto::requests::CreateNode, + file: bool +) -> Result { + let guard_lock = DBConnection::get_lock(info.0.id); + let _guard = guard_lock.read(span); + let node = super::create_node(span, data.name, &info.0, file, Some(data.parent), false, db); match node { Ok(v) => get_reply(&dto::responses::NewNode { + statusCode: 200, + id: v.id + }), + Err(v) => match v { + super::CreateNodeResult::InvalidName => AppError::BadRequest("Invalid name").err(), + super::CreateNodeResult::InvalidParent => AppError::BadRequest("Invalid parent").err(), + super::CreateNodeResult::Exists(file, id) => get_reply(&dto::responses::NodeExists { statusCode: 200, - id: v.id - }), - Err(v) => { - match v { - super::CreateNodeResult::InvalidName => AppError::BadRequest("Invalid name").err(), - super::CreateNodeResult::InvalidParent => AppError::BadRequest("Invalid parent").err(), - super::CreateNodeResult::Exists(file, id) => get_reply(&dto::responses::NodeExists { - statusCode: 200, - id, - exists: true, - isFile: file - }) - } + id, + exists: true, + isFile: file + }) } } } -async fn delete_node(node: i32, info: UserInfo, mut db: DBConnection) -> Result { - let guard_lock = DBConnection::get_lock(info.0.id).await; +pub fn delete_node( + span: &Span, + _: &mut Request, + db: &mut DBConnection, + info: UserInfo, + node: i32, + pool: &db::DBPool +) -> Result { + let guard_lock = DBConnection::get_lock(info.0.id); let inner_guard_lock = guard_lock.clone(); - let _guard = guard_lock.read().await; - let node = super::get_node_and_validate(&info.0, node, &mut db) - .ok_or(AppError::BadRequest("Unknown node"))?; + let _guard = guard_lock.read(span); + + let node = super::get_node_and_validate(span, &info.0, node, db).ok_or(AppError::BadRequest("Unknown node"))?; if node.parent_id.is_none() { return AppError::BadRequest("Can't delete root").err(); } - let (mut sender, body) = warp::hyper::Body::channel(); + let (tx, rx) = std::sync::mpsc::channel::(); + let inner_pool = pool.clone(); - sender.send_data(warp::hyper::body::Bytes::from("Waiting in queue\n")).await.unwrap(); - super::DELETE_RT.spawn(async move { + tx.send("Waiting in queue\n".to_owned()).unwrap(); + + let inner_span = metrics::span("DELETE_POOL - queueing", span); + super::DELETE_POOL.spawn(move || { + let del_span = metrics::span("DELETE_POOL - deleting", &inner_span); + drop(inner_span); + let mut db = &mut DBConnection::from(inner_pool.get().unwrap()); let guard_lock = inner_guard_lock.clone(); - let _guard = guard_lock.write().await; - super::delete_node(&node, &mut sender, &mut db).await; + let _guard = guard_lock.write(&del_span); + super::delete_node(&del_span, &node, &tx, db.deref_mut()); }); - let mut resp = warp::reply::Response::new(body); - *resp.status_mut() = warp::http::StatusCode::OK; - resp.headers_mut().typed_insert( - headers::ContentType::text_utf8() - ); - - Ok(resp) + Ok(Response::new( + StatusCode::from(200), + vec![header("content-type", "text/plain; charset=utf-8")], + ChannelReader(rx), + None, + None + ) + .boxed()) } -async fn upload(node: i32, stream: S, info: UserInfo, mut db: DBConnection) -> Result - where - S: Stream>, - S: StreamExt, - B: warp::Buf -{ - let guard_lock = DBConnection::get_lock(info.0.id).await; - let _guard = guard_lock.read().await; - let mut node = super::get_node_and_validate(&info.0, node, &mut db) - .ok_or(AppError::BadRequest("Unknown node"))?; +pub fn upload( + span: &Span, + req: &mut Request, + db: &mut DBConnection, + info: UserInfo, + node: i32 +) -> Result { + let guard_lock = DBConnection::get_lock(info.0.id); + let _guard = guard_lock.read(span); + let mut node = super::get_node_and_validate(span, &info.0, node, db).ok_or(AppError::BadRequest("Unknown node"))?; if !node.is_file { - return AppError::BadRequest("Can't upload to a directory").err(); + return AppError::BadRequest("Can't upload to a directory node").err(); } let mut file_size = 0_i64; let file_name = format!("./files/{}", node.id); { - let mut file = std::fs::File::create(file_name.clone()).unwrap(); + let _span = metrics::span("receive_file", span); + let mut buf = vec![0_u8; 8 * 1024 * 1024]; + let mut file = File::create(file_name.clone()).unwrap(); - stream.for_each(|f| { - let mut buffer = f.unwrap(); - file_size += buffer.remaining() as i64; - while buffer.remaining() != 0 { - let chunk = buffer.chunk(); - buffer.advance(file.write(chunk).expect("Failed to write file")); + let reader = req.as_reader(); + + loop { + let r = reader.read(&mut buf).unwrap(); + if r == 0 { + break; } - futures::future::ready(()) - }).await; + file.write_all(&buf[..r]).unwrap(); + file_size += r as i64; + } } - let generate_preview = || -> Option<()> { - if file_size > 20 * 1024 * 1024 { return None; } - let mime = mime_guess::from_path(std::path::Path::new(&node.name)).first()?.to_string(); - let img = image::load( - std::io::BufReader::new(std::fs::File::open(file_name.clone()).unwrap()), - image::ImageFormat::from_mime_type(mime)? - ).ok()?; - let img = img.resize(300, 300, image::imageops::FilterType::Triangle); - img.save(std::path::Path::new(&(file_name + "_preview.jpg"))).expect("Failed to save preview image"); - Some(()) - }; + metrics::DISK_USAGE + .with_label_values(&[node.owner_id.to_string().as_str()]) + .add(file_size - node.size.unwrap_or(0)); + { + let _span = metrics::span("generate_preview", span); + node.has_preview = (|| { + if file_size > 20 * 1024 * 1024 { + return None; + } + let mime = mime_guess::from_path(std::path::Path::new(&node.name)).first()?.to_string(); + let img = image::load( + std::io::BufReader::new(File::open(file_name.clone()).unwrap()), + image::ImageFormat::from_mime_type(mime)? + ) + .ok()?; + let img = img.resize(300, 300, image::imageops::FilterType::Triangle); + img.save(std::path::Path::new(&(file_name + "_preview.jpg"))).expect("Failed to save preview image"); + Some(()) + })() + .is_some(); + } - node.has_preview = generate_preview().is_some(); node.size = Some(file_size); - db.save_node(&node); + db.save_node(span, &node); - get_reply(&dto::responses::Success { - statusCode: 200 - }) + get_reply(&dto::responses::Success { statusCode: 200 }) } -async fn create_zip(data: dto::requests::CreateZip, info: UserInfo, mut db: DBConnection) -> Result { - let guard_lock = DBConnection::get_lock(info.0.id).await; +pub fn create_zip( + span: &Span, + _: &mut Request, + db: &mut DBConnection, + info: UserInfo, + data: dto::requests::CreateZip, + pool: &db::DBPool +) -> Result { + let guard_lock = DBConnection::get_lock(info.0.id); let inner_guard_lock = guard_lock.clone(); - let _guard = guard_lock.read().await; - let mut nodes: Vec = Vec::new(); + let _guard = guard_lock.read(span); + let mut nodes: Vec = Vec::new(); for node in data.nodes.clone() { - nodes.push( - super::get_node_and_validate(&info.0, node, &mut db) - .ok_or(AppError::BadRequest("Unknown node"))? - ); + nodes.push(super::get_node_and_validate(span, &info.0, node, db).ok_or(AppError::BadRequest("Unknown node"))?); } let zip_nodes = BTreeSet::from_iter(data.nodes.iter().copied()); { - let guard = super::ZIP_TO_PROGRESS.read().await; + let guard = super::ZIP_TO_PROGRESS.read(); if let Some(entry) = guard.get(&zip_nodes) { return get_reply(&dto::responses::CreateZipDone { statusCode: 200, done: entry.done.load(Ordering::Relaxed), progress: Some(entry.progress.load(Ordering::Relaxed)), total: Some(entry.total.load(Ordering::Relaxed)) - }) + }); } } let entry = { - let mut guard = super::ZIP_TO_PROGRESS.write().await; - guard.insert(zip_nodes.clone(), std::sync::Arc::from(super::ZipProgressEntry { - temp_id: super::NEXT_TEMP_ID.fetch_add(1, Ordering::Relaxed), - done: std::sync::atomic::AtomicBool::new(false), - progress: std::sync::atomic::AtomicU64::new(0), - total: std::sync::atomic::AtomicU64::new(1), - delete_after: std::sync::atomic::AtomicI64::new(0) - })); + let mut guard = super::ZIP_TO_PROGRESS.write(); + guard.insert( + zip_nodes.clone(), + std::sync::Arc::from(super::ZipProgressEntry { + temp_id: super::NEXT_TEMP_ID.fetch_add(1, Ordering::Relaxed), + done: std::sync::atomic::AtomicBool::new(false), + progress: std::sync::atomic::AtomicU64::new(0), + total: std::sync::atomic::AtomicU64::new(1), + delete_after: std::sync::atomic::AtomicI64::new(0) + }) + ); guard.get(&zip_nodes).unwrap().clone() }; - super::ZIP_RT.spawn(async move { - type NodeMap = HashMap; - super::cleanup_temp_zips().await; + let inner_pool = pool.clone(); - let _guard = inner_guard_lock.read().await; + let inner_span = metrics::span("ZIP_POOL - queueing", span); + super::ZIP_POOL.spawn(move || { + let mut zip_span = metrics::span("ZIP_POOL - zipping", &inner_span); + drop(inner_span); + let db = &mut DBConnection::from(inner_pool.get().unwrap()); + type NodeMap = HashMap; - fn get_path(node: &crate::db::Inode, dirs: &NodeMap) -> String { + super::cleanup_temp_zips(&zip_span); + + let _guard = inner_guard_lock.read(&zip_span); + + fn get_path(node: &db::Inode, dirs: &NodeMap) -> String { let mut path = node.name.clone(); let mut _node = dirs.get(&node.parent_id.unwrap_or(-1)); while let Some(node) = _node { @@ -278,35 +274,58 @@ async fn create_zip(data: dto::requests::CreateZip, info: UserInfo, mut db: DBCo } path } - - nodes.iter().for_each(|node| { - entry.total.fetch_add(super::get_total_size(node.clone(), &mut db), Ordering::Relaxed); - }); - entry.total.fetch_sub(1, Ordering::Relaxed); { - let mut buf = vec![0_u8; 1024 * 1024 * 4]; - let file = std::fs::File::create(format!("./temp/{}", entry.temp_id)).expect("Failed to create temp file"); + let i_span = metrics::span("ZIP_POOL - calc total", &zip_span); + nodes.iter().for_each(|node| { + entry.total.fetch_add( + super::get_total_size(&i_span, node.clone(), db), + Ordering::Relaxed + ); + }); + entry.total.fetch_sub(1, Ordering::Relaxed); + } + { + let comp_span = metrics::span("ZIP_POOL - compressing total", &zip_span); + let mut buf = vec![0_u8; 16 * 1024 * 1024]; + let file = File::create(format!("./temp/{}", entry.temp_id)).expect("Failed to create temp file"); let mut zip = zip::ZipWriter::new(file); let zip_options = zip::write::FileOptions::default().large_file(true); - let (files, dirs): (NodeMap, NodeMap) = - nodes.iter() - .flat_map(|node| super::get_nodes_recursive(node.clone(), &mut db)) + let (files, dirs): (NodeMap, NodeMap) = { + let _span = metrics::span("ZIP_POOL - gathering nodes", &comp_span); + nodes + .iter() + .flat_map(|node| super::get_nodes_recursive(&comp_span, node.clone(), db)) .map(|node| (node.id, node)) - .partition(|v| v.1.is_file); + .partition(|v| v.1.is_file) + }; + zip_span.set_tags(|| { + vec![ + rustracing::tag::Tag::new("files", files.len() as i64), + rustracing::tag::Tag::new("dirs", dirs.len() as i64), + ] + }); + { + let _span = metrics::span("ZIP_POOL - dirs", &comp_span); + dirs.values().for_each(|dir| { + zip.add_directory(get_path(dir, &dirs), zip_options).expect("Failed to add dir to zip"); + }); + } + { + let _span = metrics::span("ZIP_POOL - files", &comp_span); + files.values().for_each(|node| { + zip.start_file(get_path(node, &dirs), zip_options).expect("Failed to start zip file"); + let mut file = File::open(format!("./files/{}", node.id)).expect("Failed to open file for zip"); - dirs.values().for_each(|dir| { - zip.add_directory(get_path(dir, &dirs), zip_options).expect("Failed to add dir to zip"); - }); - files.values().for_each(|node| { - zip.start_file(get_path(node, &dirs), zip_options).expect("Failed to start zip file"); - let mut file = std::fs::File::open(format!("./files/{}", node.id)).expect("Failed to open file for zip"); - loop { - let count = file.read(&mut buf).expect("Failed to read file for zip"); - if count == 0 { break; } - zip.write_all(&buf[..count]).expect("Failed to write zip"); - entry.progress.fetch_add(count as u64, Ordering::Relaxed); - } - }); + loop { + let count = file.read(&mut buf).expect("Failed to read file for zip"); + if count == 0 { + break; + } + zip.write_all(&buf[..count]).expect("Failed to write zip"); + entry.progress.fetch_add(count as u64, Ordering::Relaxed); + } + }); + } zip.finish().expect("Failed to finish zip"); } @@ -321,121 +340,115 @@ async fn create_zip(data: dto::requests::CreateZip, info: UserInfo, mut db: DBCo }) } -async fn download(data: dto::requests::Download, mut db: DBConnection) -> Result { - let info = crate::routes::filters::authorize_jwt(data.jwtToken, &mut db).await?; - let guard_lock = DBConnection::get_lock(info.0.id).await; - let _guard = guard_lock.read().await; - - let node: crate::db::Inode = super::get_node_and_validate(&info.0, data.id, &mut db) - .ok_or(AppError::BadRequest("Unknown node"))?; +pub fn download(span: &Span, req: &mut Request, db: &mut DBConnection) -> Result { + let data: dto::requests::Download = + serde_urlencoded::from_reader(req.as_reader()).map_err(|_| AppError::BadRequest("Invalid form data"))?; + let info = crate::routes::filters::authorize_jwt(span, &data.jwtToken, db)?; + let guard_lock = DBConnection::get_lock(info.0.id); + let _guard = guard_lock.read(span); + let node: db::Inode = + super::get_node_and_validate(span, &info.0, data.id, db).ok_or(AppError::BadRequest("Unknown node"))?; if node.is_file { - let mut resp = warp::reply::Response::new(super::get_file_stream_body( - format!("./files/{}", node.id) - )); - *resp.status_mut() = warp::http::StatusCode::OK; - resp.headers_mut().typed_insert( - headers::ContentLength(node.size.unwrap() as u64) - ); - resp.headers_mut().typed_insert( - headers::ContentType::from( - mime_guess::from_path(std::path::Path::new(&node.name)).first_or_octet_stream() - ) - ); - resp.headers_mut().insert( - "Content-Disposition", - ("attachment; filename=".to_owned() + &node.name).parse().unwrap() - ); + let file_name = format!("./files/{}", node.id); + let resp = Response::from_file(File::open(std::path::Path::new(&file_name)).unwrap()) + .with_header(header("content-type", "application/octet-stream")) + .with_header(header( + "content-disposition", + &("attachment; filename=".to_owned() + &node.name) + )) + .boxed(); Ok(resp) } else { let nodes_key = BTreeSet::from([node.id]); - let guard = super::ZIP_TO_PROGRESS.read().await; - let entry = guard.get(&nodes_key) - .ok_or(AppError::BadRequest("Unknown node"))?; + let guard = super::ZIP_TO_PROGRESS.read(); + let entry = guard.get(&nodes_key).ok_or(AppError::BadRequest("Unknown node"))?; if !entry.done.load(Ordering::Relaxed) { AppError::BadRequest("Unknown node").err() } else { - let file = format!("./temp/{}", entry.temp_id); - let mut resp = warp::reply::Response::new(super::get_file_stream_body(file.clone())); - *resp.status_mut() = warp::http::StatusCode::OK; - resp.headers_mut().typed_insert( - headers::ContentLength(std::fs::metadata(std::path::Path::new(&file)).unwrap().len()) - ); - resp.headers_mut().typed_insert( - headers::ContentType::from( - mime_guess::from_ext("zip").first().unwrap() - ) - ); - resp.headers_mut().insert( - "Content-Disposition", - ("attachment; filename=".to_owned() + &node.name + ".zip").parse().unwrap() - ); + let file_name = format!("./temp/{}", entry.temp_id); + let resp = Response::from_file(File::open(std::path::Path::new(&file_name)).unwrap()) + .with_header(header("content-type", "application/zip")) + .with_header(header( + "content-disposition", + &("attachment; filename=".to_owned() + &node.name + ".zip") + )) + .boxed(); Ok(resp) } } } -async fn download_multi(data: dto::requests::DownloadMulti, mut db: DBConnection) -> Result { - let info = crate::routes::filters::authorize_jwt(data.jwtToken, &mut db).await?; - let guard_lock = DBConnection::get_lock(info.0.id).await; - let _guard = guard_lock.read().await; - - let mut nodes: Vec = Vec::new(); - for node in data.id.split(',').map(|v| v.parse::() - .map_err(|_| AppError::BadRequest("Failed to parse").reject()) - ) { - nodes.push( - super::get_node_and_validate(&info.0, node?, &mut db) - .ok_or(AppError::BadRequest("Unknown node"))? - ); - } +pub fn download_multi(span: &Span, req: &mut Request, db: &mut DBConnection) -> Result { + let data: dto::requests::DownloadMulti = + serde_urlencoded::from_reader(req.as_reader()).map_err(|_| AppError::BadRequest("Invalid form data"))?; + let info = crate::routes::filters::authorize_jwt(span, &data.jwtToken, db)?; + let guard_lock = DBConnection::get_lock(info.0.id); + let _guard = guard_lock.read(span); + let nodes: Vec = { + let _span = metrics::span("parsing_nodes", span); + data.id + .split(',') + .map(|v| { + v.parse::().map_err(|_| AppError::BadRequest("Failed to parse")).map(|n| { + super::get_node_and_validate(span, &info.0, n, db).ok_or(AppError::BadRequest("Unknown node")) + }) + }) + .into_iter() + .collect::, AppError>, AppError>>()?? + }; let nodes_key = BTreeSet::from_iter(nodes.iter().map(|node| node.id)); - let guard = super::ZIP_TO_PROGRESS.read().await; - let entry = guard.get(&nodes_key) - .ok_or(AppError::BadRequest("Unknown zip"))?; + let guard = super::ZIP_TO_PROGRESS.read(); + let entry = guard.get(&nodes_key).ok_or(AppError::BadRequest("Unknown zip"))?; if !entry.done.load(Ordering::Relaxed) { AppError::BadRequest("Unfinished zip").err() } else { - let file = format!("./temp/{}", entry.temp_id); - let mut resp = warp::reply::Response::new(super::get_file_stream_body(file.clone())); - *resp.status_mut() = warp::http::StatusCode::OK; - resp.headers_mut().typed_insert( - headers::ContentLength(std::fs::metadata(std::path::Path::new(&file)).unwrap().len()) - ); - resp.headers_mut().typed_insert( - headers::ContentType::from( - mime_guess::from_ext("zip").first().unwrap() - ) - ); - resp.headers_mut().insert( - "Content-Disposition", - "attachment; filename=files.zip".parse().unwrap() - ); + let file_name = format!("./temp/{}", entry.temp_id); + let resp = Response::from_file(File::open(std::path::Path::new(&file_name)).unwrap()) + .with_header(header("content-type", "application/zip")) + .with_header(header( + "content-disposition", + "attachment; filename=files.zip" + )) + .boxed(); Ok(resp) } } -async fn download_preview(node: i32, info: UserInfo, mut db: DBConnection) -> Result { - let guard_lock = DBConnection::get_lock(info.0.id).await; - let _guard = guard_lock.read().await; - let node: crate::db::Inode = super::get_node_and_validate(&info.0, node, &mut db) - .ok_or(AppError::BadRequest("Unknown node"))?; +pub fn download_preview( + span: &Span, + _: &mut Request, + db: &mut DBConnection, + info: UserInfo, + node: i32 +) -> Result { + let guard_lock = DBConnection::get_lock(info.0.id); + let _guard = guard_lock.read(span); + let node: db::Inode = + super::get_node_and_validate(span, &info.0, node, db).ok_or(AppError::BadRequest("Unknown node"))?; if node.has_preview { let file = format!("./files/{}_preview.jpg", node.id); get_reply(&dto::responses::DownloadBase64 { statusCode: 200, - data: "data:image/png;base64,".to_owned() + &base64::encode(std::fs::read(std::path::Path::new(&file)).unwrap()) + data: "data:image/png;base64,".to_owned() + + &base64::encode(std::fs::read(std::path::Path::new(&file)).unwrap()) }) } else { AppError::BadRequest("No preview").err() } } -async fn get_type(node: i32, info: UserInfo, mut db: DBConnection) -> Result { - let node: crate::db::Inode = super::get_node_and_validate(&info.0, node, &mut db) - .ok_or(AppError::BadRequest("Unknown node"))?; +pub fn get_type( + span: &Span, + _: &mut Request, + db: &mut DBConnection, + info: UserInfo, + node: i32 +) -> Result { + let node: db::Inode = + super::get_node_and_validate(span, &info.0, node, db).ok_or(AppError::BadRequest("Unknown node"))?; get_reply(&dto::responses::Type { statusCode: 200, diff --git a/backend/src/routes/mod.rs b/backend/src/routes/mod.rs index a9da831..7cea577 100644 --- a/backend/src/routes/mod.rs +++ b/backend/src/routes/mod.rs @@ -1,28 +1,28 @@ -mod filters; -mod auth; -mod admin; -mod user; +pub mod admin; +pub mod auth; +pub mod filters; pub mod fs; +pub mod user; -use warp::{Filter, Reply}; -use crate::db::DBPool; -use crate::dto; +use std::io::Write; -pub fn build_routes(db: DBPool) -> impl Filter + Clone { - warp::path::path("api") - .and( - auth::build_routes(db.clone()) - .or(admin::build_routes(db.clone())) - .or(user::build_routes(db.clone())) - .or(fs::build_routes(db)) - .recover(error_handler) - ) - .or(warp::fs::dir("./static/")) - .or(warp::fs::file("./static/index.html")) +use tiny_http::{Response, ResponseBox}; + +struct ChannelReader(std::sync::mpsc::Receiver); + +impl std::io::Read for ChannelReader { + fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result { + self.0.recv().map(|s| buf.write(s.as_bytes())).unwrap_or(Ok(0)) + } } -pub fn get_reply(data: &T) -> Result where T: serde::Serialize { - Ok(warp::reply::with_status(warp::reply::json(data), warp::http::StatusCode::OK).into_response()) +pub fn header(name: &str, data: &str) -> tiny_http::Header { tiny_http::Header::from_bytes(name, data).unwrap() } + +pub fn get_reply(data: &T) -> Result +where T: serde::Serialize { + Ok(Response::from_data(serde_json::to_vec(data).unwrap()) + .with_header(header("content-type", "application/json; charset=utf-8")) + .boxed()) } #[derive(thiserror::Error, Debug, Clone)] @@ -33,82 +33,13 @@ pub enum AppError { Forbidden(&'static str), #[error("bad request")] BadRequest(&'static str), + #[error("not found")] + NotFound, + #[allow(dead_code)] #[error("internal error")] InternalError(&'static str) } -impl warp::reject::Reject for AppError {} impl AppError { - pub fn reject(&self) -> warp::reject::Rejection { - warp::reject::custom(self.clone()) - } - - pub fn err(&self) -> Result { - Err(self.reject()) - } + pub fn err(&self) -> Result { Err(self.clone()) } } - -pub async fn error_handler(err: warp::reject::Rejection) -> Result { - if err.is_not_found() { - return Ok(warp::reply::with_status( - warp::reply::json(&dto::responses::Error { - statusCode: 404, - message: "bruh".to_owned() - }), - warp::http::StatusCode::NOT_FOUND - )); - } - if let Some(e) = err.find::() { - return Ok(warp::reply::with_status( - warp::reply::json(&dto::responses::Error { - statusCode: match e { - AppError::BadRequest(_) => 400, - AppError::Unauthorized(_) => 401, - AppError::Forbidden(_) => 403, - AppError::InternalError(_) => 500 - }, - message: match e { - AppError::BadRequest(v) => v.to_string(), - AppError::Unauthorized(v) => v.to_string(), - AppError::Forbidden(v) => v.to_string(), - AppError::InternalError(v) => v.to_string() - }, - }), - match e { - AppError::BadRequest(_) => warp::http::StatusCode::BAD_REQUEST, - AppError::Unauthorized(_) => warp::http::StatusCode::UNAUTHORIZED, - AppError::Forbidden(_) => warp::http::StatusCode::FORBIDDEN, - AppError::InternalError(_) => warp::http::StatusCode::INTERNAL_SERVER_ERROR - } - )); - } - if let Some(e) = err.find::() { - return Ok(warp::reply::with_status( - warp::reply::json(&dto::responses::Error { - statusCode: 400, - message: e.to_string(), - }), - warp::http::StatusCode::BAD_REQUEST - )) - } - if let Some(e) = err.find::() { - return Ok(warp::reply::with_status( - warp::reply::json(&dto::responses::Error { - statusCode: 400, - message: e.to_string(), - }), - warp::http::StatusCode::BAD_REQUEST - )) - } - if let Some(e) = err.find::() { - return Ok(warp::reply::with_status( - warp::reply::json(&dto::responses::Error { - statusCode: 405, - message: e.to_string(), - }), - warp::http::StatusCode::METHOD_NOT_ALLOWED - )) - } - - Err(err).expect("Can't handle error") -} \ No newline at end of file diff --git a/backend/src/routes/user.rs b/backend/src/routes/user.rs index 9cb2fdf..fdc8769 100644 --- a/backend/src/routes/user.rs +++ b/backend/src/routes/user.rs @@ -1,24 +1,14 @@ -use warp::{Filter, Reply}; -use crate::db::{DBConnection, DBPool, with_db}; -use crate::dto; -use crate::routes::get_reply; -use crate::routes::filters::{authenticated, UserInfo}; +use rustracing_jaeger::Span; +use tiny_http::{Request, ResponseBox}; -pub fn build_routes(db: DBPool) -> impl Filter + Clone { - let info = warp::path!("user" / "info") - .and(warp::get()) - .and(authenticated(db.clone())) - .and_then(info); - let delete_user = warp::path!("user" / "delete") - .and(warp::post()) - .and(authenticated(db.clone())) - .and(with_db(db)) - .and_then(delete_user); +use crate::{ + db::DBConnection, + dto, + routes::{filters::UserInfo, get_reply}, + AppError +}; - info.or(delete_user) -} - -async fn info(info: UserInfo) -> Result { +pub fn info(_: &Span, _: &mut Request, _: &mut DBConnection, info: UserInfo) -> Result { get_reply(&dto::responses::UserInfo { statusCode: info.0.id, name: info.0.name, @@ -27,16 +17,20 @@ async fn info(info: UserInfo) -> Result { }) } -async fn delete_user(info: UserInfo, mut db: DBConnection) -> Result { - db.delete_all_tokens(info.0.id); +pub fn delete_user( + span: &Span, + _: &mut Request, + db: &mut DBConnection, + info: UserInfo +) -> Result { + db.delete_all_tokens(span, info.0.id); - let root_node = super::fs::get_node_and_validate(&info.0, info.0.root_id, &mut db).expect("Failed to get root node for deleting"); + let root_node = super::fs::get_node_and_validate(span, &info.0, info.0.root_id, db) + .expect("Failed to get root node for deleting"); - super::fs::delete_node_root(&root_node, &mut db); + super::fs::delete_node_root(span, &root_node, db); db.delete_user(&info.0); - get_reply(&dto::responses::Success { - statusCode: 200 - }) -} \ No newline at end of file + get_reply(&dto::responses::Success { statusCode: 200 }) +} diff --git a/backend/src/schema.rs b/backend/src/schema.rs index 563fb80..b090e81 100644 --- a/backend/src/schema.rs +++ b/backend/src/schema.rs @@ -35,8 +35,4 @@ diesel::table! { } } -diesel::allow_tables_to_appear_in_same_query!( - inode, - tokens, - user, -); +diesel::allow_tables_to_appear_in_same_query!(inode, tokens, user,); diff --git a/tokio-top.cmd b/tokio-top.cmd deleted file mode 100644 index 0cfea6a..0000000 --- a/tokio-top.cmd +++ /dev/null @@ -1 +0,0 @@ -tokio-console http://127.0.0.1:9999/ --colorterm 24bit --retain-for "2s" \ No newline at end of file