445 lines
17 KiB
Rust
Raw Normal View History

2022-10-10 23:07:40 +02:00
use std::collections::{BTreeSet, HashMap};
use std::io::{Read, Write};
use std::sync::atomic::Ordering;
use futures::{Stream, StreamExt};
use headers::HeaderMapExt;
use warp::{Filter, Reply};
use crate::db::{DBConnection, DBPool, with_db};
use crate::dto;
use crate::routes::{AppError, get_reply};
use crate::routes::filters::{authenticated, UserInfo};
pub fn build_routes(db: DBPool) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
let root = warp::path!("fs" / "root")
.and(warp::get())
.and(authenticated(db.clone()))
.and_then(root);
let node = warp::path!("fs" / "node" / i32)
.and(warp::get())
.and(authenticated(db.clone()))
.and(with_db(db.clone()))
.and_then(node)
.with(warp::compression::brotli());
let path = warp::path!("fs" / "path" / i32)
.and(warp::get())
.and(authenticated(db.clone()))
.and(with_db(db.clone()))
.and_then(path);
let create_folder = warp::path!("fs" / "create_folder")
.and(warp::post())
.and(warp::body::json())
.and(authenticated(db.clone()))
.and(with_db(db.clone()))
.and_then(|data, info, db| create_node(data, info, db, false));
let create_file = warp::path!("fs" / "create_file")
.and(warp::post())
.and(warp::body::json())
.and(authenticated(db.clone()))
.and(with_db(db.clone()))
.and_then(|data, info, db| create_node(data, info, db, true));
let delete_node = warp::path!("fs" / "delete" / i32)
.and(warp::post())
.and(authenticated(db.clone()))
.and(with_db(db.clone()))
.and_then(delete_node);
let upload = warp::path!("fs" / "upload" / i32)
.and(warp::post())
.and(warp::body::stream())
.and(authenticated(db.clone()))
.and(with_db(db.clone()))
.and_then(upload);
let create_zip = warp::path!("fs" / "create_zip")
.and(warp::post())
.and(warp::body::json())
.and(authenticated(db.clone()))
.and(with_db(db.clone()))
.and_then(create_zip);
let download = warp::path!("fs" / "download")
.and(warp::post())
.and(warp::body::form())
.and(with_db(db.clone()))
.and_then(download);
let download_multi = warp::path!("fs" / "download_multi")
.and(warp::post())
.and(warp::body::form())
.and(with_db(db.clone()))
.and_then(download_multi);
let download_preview = warp::path!("fs" / "download_preview" / i32)
.and(warp::get())
.and(authenticated(db.clone()))
.and(with_db(db.clone()))
.and_then(download_preview);
let get_type = warp::path!("fs" / "get_type" / i32)
.and(warp::get())
.and(authenticated(db.clone()))
.and(with_db(db))
.and_then(get_type);
root.or(node).or(path).or(create_folder).or(create_file).or(delete_node).or(upload).or(create_zip).or(download).or(download_multi).or(download_preview).or(get_type)
}
async fn root(info: UserInfo) -> Result<impl Reply, warp::Rejection> {
get_reply(&dto::responses::Root {
statusCode: 200,
rootId: info.0.root_id
})
}
async fn node(node: i32, info: UserInfo, mut db: DBConnection) -> Result<impl Reply, warp::Rejection> {
let guard_lock = DBConnection::get_lock(info.0.id).await;
let _guard = guard_lock.read().await;
let node = super::get_node_and_validate(&info.0, node, &mut db)
.ok_or(AppError::BadRequest("Unknown node"))?;
get_reply(&dto::responses::GetNode {
statusCode: 200,
id: node.id,
name: node.name,
isFile: node.is_file,
preview: node.has_preview,
parent: node.parent_id,
size: node.size,
children: (!node.is_file).then(|| {
db.get_children(node.id).iter().map(|child| dto::responses::GetNodeEntry {
id: child.id,
name: child.name.clone(),
isFile: child.is_file,
preview: child.has_preview,
parent: child.parent_id,
size: child.size
}).collect()
})
})
}
async fn path(node: i32, info: UserInfo, mut db: DBConnection) -> Result<impl Reply, warp::Rejection> {
let guard_lock = DBConnection::get_lock(info.0.id).await;
let _guard = guard_lock.read().await;
let node = super::get_node_and_validate(&info.0, node, &mut db)
.ok_or(AppError::BadRequest("Unknown node"))?;
get_reply(&super::generate_path_dto(&node, &mut db))
}
async fn create_node(data: dto::requests::CreateNode, info: UserInfo, mut db: DBConnection, file: bool) -> Result<impl Reply, warp::Rejection> {
let guard_lock = DBConnection::get_lock(info.0.id).await;
let _guard = guard_lock.read().await;
let node = super::create_node(data.name, &info.0, file, Some(data.parent), false, &mut db);
match node {
Ok(v) => get_reply(&dto::responses::NewNode {
statusCode: 200,
id: v.id
}),
Err(v) => {
match v {
super::CreateNodeResult::InvalidName => AppError::BadRequest("Invalid name").err(),
super::CreateNodeResult::InvalidParent => AppError::BadRequest("Invalid parent").err(),
super::CreateNodeResult::Exists(file, id) => get_reply(&dto::responses::NodeExists {
statusCode: 200,
id,
exists: true,
isFile: file
})
}
}
}
}
async fn delete_node(node: i32, info: UserInfo, mut db: DBConnection) -> Result<impl Reply, warp::Rejection> {
let guard_lock = DBConnection::get_lock(info.0.id).await;
let inner_guard_lock = guard_lock.clone();
let _guard = guard_lock.read().await;
let node = super::get_node_and_validate(&info.0, node, &mut db)
.ok_or(AppError::BadRequest("Unknown node"))?;
if node.parent_id.is_none() {
return AppError::BadRequest("Can't delete root").err();
}
let (mut sender, body) = warp::hyper::Body::channel();
sender.send_data(warp::hyper::body::Bytes::from("Waiting in queue\n")).await.unwrap();
super::DELETE_RT.spawn(async move {
let guard_lock = inner_guard_lock.clone();
let _guard = guard_lock.write().await;
super::delete_node(&node, &mut sender, &mut db).await;
});
let mut resp = warp::reply::Response::new(body);
*resp.status_mut() = warp::http::StatusCode::OK;
resp.headers_mut().typed_insert(
headers::ContentType::text_utf8()
);
Ok(resp)
}
async fn upload<S, B>(node: i32, stream: S, info: UserInfo, mut db: DBConnection) -> Result<impl Reply, warp::Rejection>
where
S: Stream<Item = Result<B, warp::Error>>,
S: StreamExt,
B: warp::Buf
{
let guard_lock = DBConnection::get_lock(info.0.id).await;
let _guard = guard_lock.read().await;
let mut node = super::get_node_and_validate(&info.0, node, &mut db)
.ok_or(AppError::BadRequest("Unknown node"))?;
if !node.is_file {
return AppError::BadRequest("Can't upload to a directory").err();
}
let mut file_size = 0_i64;
let file_name = format!("./files/{}", node.id);
{
let mut file = std::fs::File::create(file_name.clone()).unwrap();
stream.for_each(|f| {
let mut buffer = f.unwrap();
file_size += buffer.remaining() as i64;
while buffer.remaining() != 0 {
let chunk = buffer.chunk();
buffer.advance(file.write(chunk).expect("Failed to write file"));
}
futures::future::ready(())
}).await;
}
let generate_preview = || -> Option<()> {
if file_size > 20 * 1024 * 1024 { return None; }
let mime = mime_guess::from_path(std::path::Path::new(&node.name)).first()?.to_string();
let img = image::load(
std::io::BufReader::new(std::fs::File::open(file_name.clone()).unwrap()),
image::ImageFormat::from_mime_type(mime)?
).ok()?;
let img = img.resize(300, 300, image::imageops::FilterType::Triangle);
img.save(std::path::Path::new(&(file_name + "_preview.jpg"))).expect("Failed to save preview image");
Some(())
};
node.has_preview = generate_preview().is_some();
node.size = Some(file_size);
db.save_node(&node);
get_reply(&dto::responses::Success {
statusCode: 200
})
}
async fn create_zip(data: dto::requests::CreateZip, info: UserInfo, mut db: DBConnection) -> Result<impl Reply, warp::Rejection> {
let guard_lock = DBConnection::get_lock(info.0.id).await;
let inner_guard_lock = guard_lock.clone();
let _guard = guard_lock.read().await;
let mut nodes: Vec<crate::db::Inode> = Vec::new();
for node in data.nodes.clone() {
nodes.push(
super::get_node_and_validate(&info.0, node, &mut db)
.ok_or(AppError::BadRequest("Unknown node"))?
);
}
let zip_nodes = BTreeSet::from_iter(data.nodes.iter().copied());
{
let guard = super::ZIP_TO_PROGRESS.read().await;
if let Some(entry) = guard.get(&zip_nodes) {
return get_reply(&dto::responses::CreateZipDone {
statusCode: 200,
done: entry.done.load(Ordering::Relaxed),
progress: Some(entry.progress.load(Ordering::Relaxed)),
total: Some(entry.total.load(Ordering::Relaxed))
})
}
}
let entry = {
let mut guard = super::ZIP_TO_PROGRESS.write().await;
guard.insert(zip_nodes.clone(), std::sync::Arc::from(super::ZipProgressEntry {
temp_id: super::NEXT_TEMP_ID.fetch_add(1, Ordering::Relaxed),
done: std::sync::atomic::AtomicBool::new(false),
progress: std::sync::atomic::AtomicU64::new(0),
total: std::sync::atomic::AtomicU64::new(1),
delete_after: std::sync::atomic::AtomicI64::new(0)
}));
guard.get(&zip_nodes).unwrap().clone()
};
super::ZIP_RT.spawn(async move {
type NodeMap = HashMap<i32, crate::db::Inode>;
super::cleanup_temp_zips().await;
let _guard = inner_guard_lock.read().await;
fn get_path(node: &crate::db::Inode, dirs: &NodeMap) -> String {
let mut path = node.name.clone();
let mut _node = dirs.get(&node.parent_id.unwrap_or(-1));
while let Some(node) = _node {
path.insert_str(0, &(node.name.clone() + "/"));
_node = dirs.get(&node.parent_id.unwrap_or(-1));
}
path
}
nodes.iter().for_each(|node| {
entry.total.fetch_add(super::get_total_size(node.clone(), &mut db), Ordering::Relaxed);
});
entry.total.fetch_sub(1, Ordering::Relaxed);
{
let mut buf = vec![0_u8; 1024 * 1024 * 4];
let file = std::fs::File::create(format!("./temp/{}", entry.temp_id)).expect("Failed to create temp file");
let mut zip = zip::ZipWriter::new(file);
let zip_options = zip::write::FileOptions::default().large_file(true);
let (files, dirs): (NodeMap, NodeMap) =
nodes.iter()
.flat_map(|node| super::get_nodes_recursive(node.clone(), &mut db))
.map(|node| (node.id, node))
.partition(|v| v.1.is_file);
dirs.values().for_each(|dir| {
zip.add_directory(get_path(dir, &dirs), zip_options).expect("Failed to add dir to zip");
});
files.values().for_each(|node| {
zip.start_file(get_path(node, &dirs), zip_options).expect("Failed to start zip file");
let mut file = std::fs::File::open(format!("./files/{}", node.id)).expect("Failed to open file for zip");
loop {
let count = file.read(&mut buf).expect("Failed to read file for zip");
if count == 0 { break; }
zip.write_all(&buf[..count]).expect("Failed to write zip");
entry.progress.fetch_add(count as u64, Ordering::Relaxed);
}
});
zip.finish().expect("Failed to finish zip");
}
entry.done.store(true, Ordering::Relaxed);
entry.delete_after.store(chrono::Utc::now().timestamp() + 10 * 60, Ordering::Relaxed);
});
get_reply(&dto::responses::CreateZipDone {
statusCode: 200,
done: false,
progress: Some(0),
total: Some(1)
})
}
async fn download(data: dto::requests::Download, mut db: DBConnection) -> Result<impl Reply, warp::Rejection> {
let info = crate::routes::filters::authorize_jwt(data.jwtToken, &mut db).await?;
let guard_lock = DBConnection::get_lock(info.0.id).await;
let _guard = guard_lock.read().await;
let node: crate::db::Inode = super::get_node_and_validate(&info.0, data.id, &mut db)
.ok_or(AppError::BadRequest("Unknown node"))?;
if node.is_file {
let mut resp = warp::reply::Response::new(super::get_file_stream_body(
format!("./files/{}", node.id)
));
*resp.status_mut() = warp::http::StatusCode::OK;
resp.headers_mut().typed_insert(
headers::ContentLength(node.size.unwrap() as u64)
);
resp.headers_mut().typed_insert(
headers::ContentType::from(
mime_guess::from_path(std::path::Path::new(&node.name)).first_or_octet_stream()
)
);
resp.headers_mut().insert(
"Content-Disposition",
("attachment; filename=".to_owned() + &node.name).parse().unwrap()
);
Ok(resp)
} else {
let nodes_key = BTreeSet::from([node.id]);
let guard = super::ZIP_TO_PROGRESS.read().await;
let entry = guard.get(&nodes_key)
.ok_or(AppError::BadRequest("Unknown node"))?;
if !entry.done.load(Ordering::Relaxed) {
AppError::BadRequest("Unknown node").err()
} else {
let file = format!("./temp/{}", entry.temp_id);
let mut resp = warp::reply::Response::new(super::get_file_stream_body(file.clone()));
*resp.status_mut() = warp::http::StatusCode::OK;
resp.headers_mut().typed_insert(
headers::ContentLength(std::fs::metadata(std::path::Path::new(&file)).unwrap().len())
);
resp.headers_mut().typed_insert(
headers::ContentType::from(
mime_guess::from_ext("zip").first().unwrap()
)
);
resp.headers_mut().insert(
"Content-Disposition",
("attachment; filename=".to_owned() + &node.name + ".zip").parse().unwrap()
);
Ok(resp)
}
}
}
async fn download_multi(data: dto::requests::DownloadMulti, mut db: DBConnection) -> Result<impl Reply, warp::Rejection> {
let info = crate::routes::filters::authorize_jwt(data.jwtToken, &mut db).await?;
let guard_lock = DBConnection::get_lock(info.0.id).await;
let _guard = guard_lock.read().await;
let mut nodes: Vec<crate::db::Inode> = Vec::new();
for node in data.id.split(',').map(|v| v.parse::<i32>()
.map_err(|_| AppError::BadRequest("Failed to parse").reject())
) {
nodes.push(
super::get_node_and_validate(&info.0, node?, &mut db)
.ok_or(AppError::BadRequest("Unknown node"))?
);
}
let nodes_key = BTreeSet::from_iter(nodes.iter().map(|node| node.id));
let guard = super::ZIP_TO_PROGRESS.read().await;
let entry = guard.get(&nodes_key)
.ok_or(AppError::BadRequest("Unknown zip"))?;
if !entry.done.load(Ordering::Relaxed) {
AppError::BadRequest("Unfinished zip").err()
} else {
let file = format!("./temp/{}", entry.temp_id);
let mut resp = warp::reply::Response::new(super::get_file_stream_body(file.clone()));
*resp.status_mut() = warp::http::StatusCode::OK;
resp.headers_mut().typed_insert(
headers::ContentLength(std::fs::metadata(std::path::Path::new(&file)).unwrap().len())
);
resp.headers_mut().typed_insert(
headers::ContentType::from(
mime_guess::from_ext("zip").first().unwrap()
)
);
resp.headers_mut().insert(
"Content-Disposition",
"attachment; filename=files.zip".parse().unwrap()
);
Ok(resp)
}
}
async fn download_preview(node: i32, info: UserInfo, mut db: DBConnection) -> Result<impl Reply, warp::Rejection> {
let guard_lock = DBConnection::get_lock(info.0.id).await;
let _guard = guard_lock.read().await;
let node: crate::db::Inode = super::get_node_and_validate(&info.0, node, &mut db)
.ok_or(AppError::BadRequest("Unknown node"))?;
if node.has_preview {
let file = format!("./files/{}_preview.jpg", node.id);
get_reply(&dto::responses::DownloadBase64 {
statusCode: 200,
data: "data:image/png;base64,".to_owned() + &base64::encode(std::fs::read(std::path::Path::new(&file)).unwrap())
})
} else {
AppError::BadRequest("No preview").err()
}
}
async fn get_type(node: i32, info: UserInfo, mut db: DBConnection) -> Result<impl Reply, warp::Rejection> {
let node: crate::db::Inode = super::get_node_and_validate(&info.0, node, &mut db)
.ok_or(AppError::BadRequest("Unknown node"))?;
get_reply(&dto::responses::Type {
statusCode: 200,
_type: mime_guess::from_path(std::path::Path::new(&node.name)).first_or_octet_stream().to_string()
})
}