Initial commit

This commit is contained in:
2024-01-27 14:19:37 +01:00
commit ad731f1793
11 changed files with 1647 additions and 0 deletions

68
src/config.rs Normal file
View File

@@ -0,0 +1,68 @@
use graph_cycles::Cycles;
use serde::Deserialize;
#[derive(Debug, Deserialize)]
pub struct Config {
pub service: Vec<CService>,
#[serde(default)]
pub autostart: Vec<String>
}
#[derive(Debug, Deserialize)]
pub struct CService {
pub name: String,
pub command: String,
pub directory: Option<String>,
#[serde(default)]
pub depends_on: Vec<String>
}
impl Config {
pub fn load() -> Result<Config, String> {
let data = std::fs::read_to_string("minitd.toml")
.map_err(|err| format!("Failed to read config file: {err}"))?;
let config = toml::from_str::<Config>(&data)
.map_err(|err| format!("Failed to parse config: {err}"))?;
let mut graph = petgraph::Graph::new();
let mut node_map = std::collections::HashMap::new();
for service in &config.service {
let node = graph.add_node(&service.name);
node_map.insert(service.name.clone(), node);
}
for service in &config.service {
let my_id = node_map.get(&service.name).unwrap();
for dep in &service.depends_on {
let other_id = match node_map.get(dep) {
Some(v) => v,
None => return Err(format!("Service '{}' depends on unknown service '{}'", service.name, dep))
};
graph.add_edge(*my_id, *other_id, ());
}
}
let cycles = graph.cycles();
if !cycles.is_empty() {
let mut msg = String::new();
for cycle in cycles {
msg += "Dependency cycle: ";
for node in &cycle {
msg += graph.node_weight(*node).unwrap();
msg += " -> ";
}
msg += graph.node_weight(cycle[0]).unwrap();
msg += "\n";
}
return Err(msg);
}
for auto in &config.autostart {
if config.service.iter().find(|item| item.name == *auto).is_none() {
return Err(format!("autostart: unkown service '{auto}'"));
}
}
Ok(config)
}
}

130
src/html.rs Normal file
View File

@@ -0,0 +1,130 @@
use std::io::Write;
use std::net::TcpStream;
use std::os::fd::AsRawFd;
use base64::Engine;
use hhmmss::Hhmmss;
use crate::{Data, service};
fn index_write_service(stream: &mut impl Write, service: &crate::service::Service) -> Option<()> {
let (state, desc, can_start, can_stop) = match &service.status {
crate::service::ServiceStatus::STOPPED => ("stopped", "".to_string(), true, false),
crate::service::ServiceStatus::STARTED(started_at) =>
("started", format!("Pid: {}, uptime: {}", service.pid.as_ref().map_or(-1, |v| v.0.as_raw()), started_at.elapsed().hhmmss()), false, true),
crate::service::ServiceStatus::STOPPING => ("stopping", "".to_string(), false, false),
crate::service::ServiceStatus::FAILED(err) => ("failed", err.clone(), true, false)
};
write!(stream, "<tr><td data-status=\"{}\"></td><td>{}</td><td>{}</td><td>", state, &service.config.name, desc).ok()?;
if can_start { write!(stream, "<a href=\"/start/{}\">Start</a>", &service.config.name).ok()?; }
stream.write_all(b"</td><td>").ok()?;
if can_stop { write!(stream, "<a href=\"/stop/{}\">Stop</a>", &service.config.name).ok()?; }
write!(stream, "</td><td><a href=\"/tail#{}\">Tail log</a></td></tr>", &service.config.name).ok()
}
pub fn send_404(mut stream: TcpStream) -> Option<()> {
stream.write_all(b"HTTP/1.0 404 Not found\r\nContent-Type: text/plain;charset=utf-8\r\n\r\n404 Not found").ok()
}
pub fn send_301(mut stream: TcpStream) -> Option<()> {
stream.write_all(b"HTTP/1.0 302 Found\r\nLocation: /\r\n\r\n").ok()
}
pub fn send_index(stream: TcpStream) -> Option<()> {
let mut stream = std::io::BufWriter::new(stream);
stream.write_all(b"HTTP/1.0 200 OK\r\nContent-Type: text/html;charset=utf-8\r\n\r\n").ok()?;
stream.write_all(b"<!DOCTYPE html><html lang=\"en\"><head><title>Minitd</title><style>
.center-child { display: flex; flex-direction: column; align-items: center; row-gap: 2em; background-color: #aaffaa44; }
.space-child > * { margin: 0 0.75em; }
a, a:link, a:visited { color: #444; }
a:hover { color: #000; }
table { border: 1px solid #999; }
thead { color: #fff; background-color: #999; }
td, th { padding: 0.25em 0.5em; border-bottom: 1px solid #fff; }
tbody > tr:nth-child(even) { background-color: #eee; }
tbody > tr:nth-child(odd) { background-color: #ddd; }
[data-status] { padding: 0.125em 0.25em; }
[data-status]::after { padding: 0.125em 0.25em; }
[data-status='stopped']::after { content: 'Stopped'; background-color: #ef8; }
[data-status='started']::after { content: 'Started'; background-color: #8f8; }
[data-status='stopping']::after { content: 'Stopping'; background-color: #fc8; }
[data-status='failed']::after { content: 'Failed'; background-color: #faa; }
</style></head><body class=\"center-child\">").ok()?;
stream.write_all(b"<div class=\"space-child\"><a href=\".\">Refresh</a><a href=\"/tail#minitd\">Tail log</a><a href=\"/reload\">Reload</a><a href=\"/stop_all\">Stop all</a><a href=\"/shutdown\">Shutdown</a></div>").ok()?;
stream.write_all(b"<table cellspacing=\"0\"><thead><th>State</th><th>Name</th><th>Description</th><th colspan=\"3\">Action</th></thead><tbody>").ok()?;
for (_, service) in &crate::Data::get().services {
index_write_service(&mut stream, service)?;
}
stream.write_all(b"</tbody></table></body></html>").ok()
}
pub fn send_tail_html(stream: TcpStream) -> Option<()> {
let mut stream = std::io::BufWriter::new(stream);
stream.write_all(b"HTTP/1.0 200 OK\r\nContent-Type: text/html;charset=utf-8\r\n\r\n").ok()?;
stream.write_all(b"<!DOCTYPE html><html lang=\"en\"><head><title>Minitd</title></head><body><pre id=\"c\"></pre><script>").ok()?;
stream.write_all(b"const c = document.getElementById('c'); const e = new EventSource('/_tail/' + location.hash.slice(1)); e.onerror = _ => e.close(); e.addEventListener('line', msg => c.innerText += atob(msg.data));").ok()?;
stream.write_all(b"</script></body></html>").ok()
}
pub fn send_tail(mut stream: TcpStream, file: String) -> Option<()> {
let mut b64_buf = [0_u8; 2048];
stream.write_all(b"HTTP/1.0 200 OK\r\ncontent-type: text/event-stream\r\n\r\n").ok()?;
let file = match std::fs::File::open(file) {
Ok(v) => v,
Err(err) => {
let msg = format!("Failed to open file: {err}");
let size = base64::prelude::BASE64_STANDARD.encode_slice(&msg, &mut b64_buf).ok()?;
stream.write_all(b"event:line\ndata:").ok()?;
stream.write_all(&b64_buf[..size]).ok()?;
return stream.write_all(b"\n\n").ok();
}
};
let mut buf = [0_u8; 1024];
let mut fds = [nix::poll::PollFd::new(&file, nix::poll::PollFlags::POLLIN | nix::poll::PollFlags::POLLHUP | nix::poll::PollFlags::POLLERR)];
loop {
let ready = nix::poll::poll(&mut fds, 15000).ok()?;
if ready == 0 { // timeout
stream.write_all(b"event:ka\ndata:\n\n").ok()?;
} else if fds[0].revents().unwrap().contains(nix::poll::PollFlags::POLLIN) {
let size = nix::unistd::read(file.as_raw_fd(), &mut buf).ok()?;
if size > 0 {
let size = base64::prelude::BASE64_STANDARD.encode_slice(&buf[..size], &mut b64_buf).ok()?;
stream.write_all(b"event:line\ndata:").ok()?;
stream.write_all(&b64_buf[..size]).ok()?;
stream.write_all(b"\n\n").ok()?;
}
} else {
break
}
}
None
}
pub fn reload_config() -> Result<(), String> {
let data = Data::get();
let new_config = crate::config::Config::load()?;
let mut removed_services = data.services.keys().cloned().collect::<std::collections::HashSet<_>>();
for service in new_config.service {
match data.services.iter_mut().find(|s| s.1.config.name == service.name) {
Some((id, s)) => { // Replace old service
s.config = service;
removed_services.remove(id);
}
None => { // New service
let s = service::Service::new(service);
data.services.insert(s.id, s);
}
}
}
let queue = data.work_queue.as_ref().unwrap();
for work in removed_services.iter().map(|id| crate::WorkItem::StopService(*id, false)) { queue.send(work).unwrap(); }
for id in removed_services {
let s = data.services.get(&id).unwrap();
while !s.stopped() { std::thread::sleep(std::time::Duration::from_millis(250)); }
data.services.remove(&id);
}
Ok(())
}

132
src/main.rs Normal file
View File

@@ -0,0 +1,132 @@
use std::os::unix::prelude::CommandExt;
use std::process::Command;
use crate::web::setup_webserver;
mod config;
mod service;
mod watch;
mod web;
mod html;
#[derive(Debug)]
enum WorkItem {
StartService(u64),
StopService(u64, bool)
}
#[derive(Debug)]
struct Data {
pub epoll: nix::sys::epoll::Epoll,
pub sigfd: nix::sys::signalfd::SignalFd,
pub work_queue: Option<std::sync::mpsc::Sender<WorkItem>>,
pub services: std::collections::HashMap<u64, service::Service>
}
impl Data {
fn new() -> Self {
let epoll = nix::sys::epoll::Epoll::new(nix::sys::epoll::EpollCreateFlags::EPOLL_CLOEXEC).expect("Failed to create epoll");
let mut sigset = nix::sys::signal::SigSet::empty();
sigset.add(nix::sys::signal::SIGTERM);
sigset.add(nix::sys::signal::SIGINT);
sigset.add(nix::sys::signal::SIGCHLD);
nix::sys::signal::sigprocmask(nix::sys::signal::SigmaskHow::SIG_BLOCK, Some(&sigset), None).expect("Failed to set signal mask");
let sigfd = nix::sys::signalfd::SignalFd::with_flags(&sigset, nix::sys::signalfd::SfdFlags::SFD_CLOEXEC).expect("Failed to create signalfd");
epoll.add(&sigfd, nix::sys::epoll::EpollEvent::new(nix::sys::epoll::EpollFlags::EPOLLIN, watch::EPOLL_SIGFD_ID)).expect("Failed to add signalfd to epoll");
Self {
epoll,
sigfd,
work_queue: None,
services: std::collections::HashMap::new()
}
}
pub fn get() -> &'static mut Self {
static mut INSTANCE: Option<Data> = None;
unsafe {
if INSTANCE.is_none() { INSTANCE.replace(Data::new()); }
INSTANCE.as_mut().unwrap()
}
}
pub fn any_running(&self) -> bool {
return self.services.iter().any(|s| !s.1.stopped());
}
pub fn service_update_dep(&mut self) {
let mut id_map = std::collections::HashMap::new();
for (_, service) in &self.services {
id_map.insert(service.config.name.clone(), service.id);
}
let id_map = id_map;
let mut dep_map: std::collections::HashMap<u64, Vec<u64>> = std::collections::HashMap::new();
for (_, service) in &mut self.services {
service.dependencies.clear();
for dep in &service.config.depends_on {
let did = *id_map.get(dep).unwrap();
service.dependencies.push(did);
dep_map.entry(did).or_default().push(service.id);
}
}
for (_, service) in &mut self.services {
service.dependents = dep_map.entry(service.id).or_default().clone();
}
}
}
fn main() {
let (start_as_daemon, config) = {
let mut start_as_daemon = true;
let mut arg_iter = std::env::args().skip(1);
while let Some(arg) = arg_iter.next() {
if arg == "-n" {
start_as_daemon = false;
} else if arg == "-s" {
let new_args = std::env::args().filter(|s| s != "-s");
Command::new("strace").arg("-f").arg("-e").arg("trace=%process,%ipc,/epoll.*").args(new_args).exec();
} else if arg.starts_with('-') {
eprintln!("Invalid switch '{arg}'");
std::process::exit(1);
} else {
eprintln!("Invalid arg '{arg}'");
std::process::exit(1);
}
}
let config = match config::Config::load() {
Ok(v) => v,
Err(e) => { eprintln!("{e}"); std::process::exit(1); }
};
(start_as_daemon, config)
};
if start_as_daemon {
println!("Launching as daemon!");
daemonize::Daemonize::new()
.working_directory(std::env::current_dir().unwrap())
.start()
.expect("Failed to daemonize");
let log_file = nix::fcntl::open(
"minitd.log",
nix::fcntl::OFlag::O_WRONLY | nix::fcntl::OFlag::O_CLOEXEC | nix::fcntl::OFlag::O_CREAT | nix::fcntl::OFlag::O_APPEND,
nix::sys::stat::Mode::S_IWUSR | nix::sys::stat::Mode::S_IRUSR
).expect("Failed to open log file");
nix::unistd::dup2(log_file, 1).expect("Failed to redirect stdout");
nix::unistd::dup2(log_file, 2).expect("Failed to redirect stderr");
nix::unistd::close(log_file).expect("Failed to close log file");
}
{
let data = Data::get();
for service in config.service {
let service = service::Service::new(service);
data.services.insert(service.id, service);
}
data.service_update_dep();
}
setup_webserver();
watch::watch_services(config.autostart);
}

177
src/service.rs Normal file
View File

@@ -0,0 +1,177 @@
use std::ffi::CString;
use std::os::fd::{AsRawFd, FromRawFd};
use crate::Data;
#[derive(Debug, PartialEq)]
pub enum ServiceStatus {
STOPPED,
STARTED(std::time::Instant), // Started at
STOPPING,
FAILED(String)
}
#[derive(Debug)]
pub struct Service {
pub id: u64, // > 255
pub pid: Option<(nix::unistd::Pid, std::os::fd::OwnedFd)>,
pub status: ServiceStatus,
pub reaped: std::sync::atomic::AtomicBool,
pub dependencies: Vec<u64>,
pub dependents: Vec<u64>,
pub restart_count: u8,
pub config: crate::config::CService
}
impl Service {
fn get_args(&self) -> Result<Vec<CString>, String> {
let args = match shlex::split(&self.config.command) {
Some(v) => v,
None => return Err("Invalid command".into())
};
if args.is_empty() {
return Err("Command is empty".into());
}
Ok(args.into_iter().map(|s| CString::new(s).unwrap()).collect::<Vec<_>>())
}
pub fn started(&self) -> bool {
match &self.status {
ServiceStatus::STARTED(_) => !self.reaped.load(std::sync::atomic::Ordering::Relaxed),
_ => false
}
}
pub fn stopped(&self) -> bool {
match &self.status {
ServiceStatus::STOPPED | ServiceStatus::FAILED(_) => true,
_ => false
}
}
pub fn can_autostart(&self) -> bool {
return self.restart_count < 3;
}
pub fn new(config: crate::config::CService) -> Self {
static NEXT_SERVICE_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(256);
let id = NEXT_SERVICE_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Self {
id,
pid: None,
status: ServiceStatus::STOPPED,
reaped: std::sync::atomic::AtomicBool::new(false),
dependents: Vec::new(),
dependencies: Vec::new(),
restart_count: 0,
config
}
}
pub fn start(&mut self) {
if self.started() { return; }
println!("[U...] {}", &self.config.name);
self.restart_count += 1;
self.reaped.store(false, std::sync::atomic::Ordering::Relaxed);
let args = match self.get_args() {
Ok(v) => v,
Err(err) => {
self.status = ServiceStatus::FAILED(err);
println!("[FAIL] {}", &self.config.name);
return;
}
};
let (r, s) = nix::unistd::pipe().unwrap();
let (r, s) = unsafe { (std::os::fd::OwnedFd::from_raw_fd(r), std::os::fd::OwnedFd::from_raw_fd(s)) };
let mut pipe_buf = [0_u8];
match unsafe {nix::unistd::fork()} {
Ok(nix::unistd::ForkResult::Child) => {
drop(r);
let mut sigset = nix::sys::signal::SigSet::empty();
sigset.add(nix::sys::signal::SIGTERM);
sigset.add(nix::sys::signal::SIGINT);
sigset.add(nix::sys::signal::SIGCHLD);
nix::sys::signal::sigprocmask(nix::sys::signal::SigmaskHow::SIG_UNBLOCK, Some(&sigset), None).expect("Failed to set signal mask");
let log_name = self.config.name.clone() + ".log";
let log_file = nix::fcntl::open(
log_name.as_str(),
nix::fcntl::OFlag::O_WRONLY | nix::fcntl::OFlag::O_CLOEXEC | nix::fcntl::OFlag::O_CREAT | nix::fcntl::OFlag::O_APPEND,
nix::sys::stat::Mode::S_IWUSR | nix::sys::stat::Mode::S_IRUSR
).expect("Failed to open log file");
nix::unistd::dup2(log_file, 1).expect("Failed to redirect stdout");
nix::unistd::dup2(log_file, 2).expect("Failed to redirect stderr");
nix::unistd::close(log_file).expect("Failed to close log file");
nix::unistd::close(0).expect("Failed to close stdin");
nix::unistd::setpgid(nix::unistd::Pid::from_raw(0), nix::unistd::Pid::from_raw(0)).expect("Failed to set pgid");
if let Some(dir) = &self.config.directory { nix::unistd::chdir(dir.as_str()).expect(&format!("Failed to change dir to {dir}")); }
nix::unistd::write(s.as_raw_fd(), &pipe_buf).unwrap();
drop(s);
nix::unistd::execvp(&args[0], args.as_slice()).unwrap();
std::process::exit(0);
}
Ok(nix::unistd::ForkResult::Parent { child }) => {
drop(s);
let pidfd = nix::errno::Errno::result(unsafe { libc::syscall(libc::SYS_pidfd_open, child.as_raw(), 0 as libc::c_int) as libc::c_int }).expect("Failed to open pidfd");
let pidfd = unsafe { std::os::fd::OwnedFd::from_raw_fd(pidfd) };
self.pid.replace((child, pidfd));
let mut fds = [
nix::poll::PollFd::new(&r, nix::poll::PollFlags::POLLIN | nix::poll::PollFlags::POLLHUP | nix::poll::PollFlags::POLLPRI),
nix::poll::PollFd::new(&r, nix::poll::PollFlags::POLLIN | nix::poll::PollFlags::POLLPRI)
];
nix::poll::poll(&mut fds, -1).expect("Failed to poll fds");
if fds[0].any().unwrap_or(false) {
let read = nix::unistd::read(r.as_raw_fd(), &mut pipe_buf).unwrap();
if read == 0 {
self.status = ServiceStatus::FAILED("Check log".into());
println!("[FAIL] {}", &self.config.name);
return;
}
} else {
self.status = ServiceStatus::FAILED("Check log".into());
println!("[FAIL] {}", &self.config.name);
return;
}
self.status = ServiceStatus::STARTED(std::time::Instant::now());
Data::get().epoll.add(&self.pid.as_ref().unwrap().1, nix::sys::epoll::EpollEvent::new(nix::sys::epoll::EpollFlags::EPOLLIN, self.id)).expect("Failed to add epoll");
println!("[ OK ] {}", &self.config.name);
}
Err(err) => panic!("Failed to fork for service: {}", err)
}
}
pub fn stop(&mut self, manual: bool) {
const TERM_WAIT_MS: u128 = 10000;
if self.stopped() { return; }
if let Some((pid, _)) = self.pid.as_ref() {
let pid = *pid;
println!("[D...] {}", &self.config.name);
self.status = ServiceStatus::STOPPING;
let stop_started = std::time::Instant::now();
nix::sys::signal::kill(pid, nix::sys::signal::SIGTERM).expect("Failed to send SIGTERM");
while !self.reaped.load(std::sync::atomic::Ordering::Relaxed) && stop_started.elapsed().as_millis() < TERM_WAIT_MS {
std::thread::sleep(std::time::Duration::from_millis(250));
}
if !self.reaped.load(std::sync::atomic::Ordering::Relaxed) {
nix::sys::signal::kill(pid, nix::sys::signal::SIGKILL).expect("Failed to send SIGKILL");
}
}
println!("[STOP] {}", &self.config.name);
self.status = ServiceStatus::STOPPED;
if manual { self.restart_count = 0; }
}
}

152
src/watch.rs Normal file
View File

@@ -0,0 +1,152 @@
use std::collections::VecDeque;
use crate::{Data, WorkItem};
use crate::service::ServiceStatus;
pub const EPOLL_SIGFD_ID: u64 = 0;
pub const EPOLL_WEBSERVER_ID: u64 = 1;
const EPOLL_MAX_EVENTS: usize = 16;
static SHUTDOWN_STARTED: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
pub fn watch_services(autostart: Vec<String>) {
let starter = std::thread::spawn(move || starter_thread(autostart));
let data = Data::get();
let mut events = [nix::sys::epoll::EpollEvent::empty(); EPOLL_MAX_EVENTS];
loop {
let ready = match data.epoll.wait(&mut events, 15000) {
Ok(v) => v,
Err(nix::errno::Errno::EINTR) => 0,
Err(e) => panic!("Epoll wait failed: {:?}", e)
};
for i in 0..ready {
let event = &events[i];
match event.data() {
EPOLL_SIGFD_ID => {
let sig = data.sigfd.read_signal().unwrap().unwrap();
if sig.ssi_signo == libc::SIGCHLD as u32 {
//println!("Got SIGCHLD, ignoring");
} else {
let s = nix::sys::signal::Signal::try_from(sig.ssi_signo as i32).unwrap();
println!("sigfd event, treated as exit command. Signal: {s:#?}");
SHUTDOWN_STARTED.store(true, std::sync::atomic::Ordering::Relaxed);
for (id, _) in &data.services {
data.work_queue.as_ref().unwrap().send(WorkItem::StopService(*id, false)).unwrap();
}
}
}
EPOLL_WEBSERVER_ID => {
crate::web::webserver_accept();
}
id => {
let service = match data.services.get_mut(&id) {
None => { panic!("Got epoll event for id '{}' but no service found\n{:#?}", id, data.services); }
Some(v) => v,
};
let name = &service.config.name;
let pid = service.pid.take().unwrap();
data.epoll.delete(pid.1).expect("Failed to delete from epoll");
let status = nix::sys::wait::waitpid(pid.0, None).expect("waitpid failed");
match status {
nix::sys::wait::WaitStatus::Exited(_, code) => println!("{name} exited with status {code}"),
nix::sys::wait::WaitStatus::Signaled(_, sig, _) => println!("{name} exited with signal {sig}"),
_ => eprintln!("Unknown status {status:#?}")
}
service.reaped.store(true, std::sync::atomic::Ordering::Relaxed);
data.work_queue.as_ref().unwrap().send(WorkItem::StopService(id, false)).unwrap();
}
}
}
if SHUTDOWN_STARTED.load(std::sync::atomic::Ordering::Relaxed) && !data.any_running() {
break;
}
}
Data::get().work_queue.as_ref().unwrap().send(WorkItem::StopService(0, true)).unwrap();
starter.join().unwrap();
}
fn starter_thread(autostart: Vec<String>) {
let data = Data::get();
let (recv, mut todo) = {
let (s,r) = std::sync::mpsc::channel();
data.work_queue.replace(s);
let mut id_map = std::collections::HashMap::new();
for (_, service) in &data.services {
id_map.insert(service.config.name.clone(), service.id);
}
let todo = autostart.into_iter()
.map(|s| WorkItem::StartService(*id_map.get(&s).unwrap()))
.collect::<VecDeque<_>>();
(r, todo)
};
'main_loop: loop {
while let Some(work) = todo.pop_front() {
match work {
WorkItem::StartService(id) => {
if SHUTDOWN_STARTED.load(std::sync::atomic::Ordering::Relaxed) { continue; }
let (missing_deps, dep_fail) = {
let mut missing_deps = Vec::new();
let mut dep_fail = false;
let service = data.services.get(&id).unwrap();
for dep in &service.dependencies {
let dep = data.services.get(dep).unwrap();
if !dep.started() {
if dep.can_autostart() {
missing_deps.push(WorkItem::StartService(dep.id));
} else {
dep_fail = true;
break;
}
}
}
(missing_deps, dep_fail)
};
let service = data.services.get_mut(&id).unwrap();
if dep_fail {
service.status = ServiceStatus::FAILED("Failed to start dependency".into());
service.restart_count = 3;
continue;
} else if !missing_deps.is_empty() {
for dep in missing_deps { todo.push_front(dep); }
todo.push_back(WorkItem::StartService(id));
continue;
}
service.start();
}
WorkItem::StopService(id, manual) => {
let can_stop = {
let mut can_stop = true;
let service = match data.services.get(&id) {
Some(v) => v,
None => continue
};
for dep in &service.dependents {
let dep = data.services.get(dep).unwrap();
if !dep.stopped() {
todo.push_front(WorkItem::StopService(dep.id, manual));
can_stop = false;
}
}
can_stop
};
let service = data.services.get_mut(&id).unwrap();
if !can_stop {
todo.push_back(WorkItem::StopService(id, manual));
continue;
}
service.stop(manual);
}
}
}
let work = recv.recv().unwrap();
match work { WorkItem::StopService(0, _) => break 'main_loop, _ => {} }
todo.push_back(work);
}
}

89
src/web.rs Normal file
View File

@@ -0,0 +1,89 @@
use std::io::{Write, BufRead};
use std::net::TcpStream;
use crate::{Data, html, watch, WorkItem};
static mut WEBSERVER_FD: Option<std::net::TcpListener> = None;
pub fn setup_webserver() {
let data = Data::get();
let listener = std::net::TcpListener::bind("0.0.0.0:9001").expect("Failed to bind listener");
listener.set_nonblocking(true).expect("Failed to set non-blocking");
data.epoll.add(&listener, nix::sys::epoll::EpollEvent::new(nix::sys::epoll::EpollFlags::EPOLLIN, watch::EPOLL_WEBSERVER_ID)).expect("Failed to add socket to epoll");
unsafe { WEBSERVER_FD = Some(listener) };
}
pub fn webserver_accept() {
let listener = unsafe { WEBSERVER_FD.as_ref().unwrap() };
if let Ok((stream, _)) = listener.accept() {
std::thread::spawn(move || handle_stream(stream));
}
}
fn handle_stream(mut stream: TcpStream) -> Option<()> {
stream.set_read_timeout(Some(std::time::Duration::new(5, 0))).ok()?;
stream.set_write_timeout(Some(std::time::Duration::new(5, 0))).ok()?;
let reader = std::io::BufReader::new(&mut stream);
let mut lines = reader.lines();
let (method, path) = {
let first = lines.next()?.ok()?;
let (method, first) = first.split_once(' ')?;
let path = first.split_once(' ')?.0;
(method.to_string(), path.to_string())
};
for line in lines {
if line.ok()?.is_empty() { break; }
}
if method != "GET" { return html::send_404(stream); }
if path == "/" {
html::send_index(stream)
} else if path == "/tail" {
html::send_tail_html(stream)
} else if path == "/reload" {
match html::reload_config() {
Ok(()) => html::send_301(stream),
Err(e) => write!(stream, "HTTP/1.0 400 Bad request\r\nContent-Type: text/plain;charset=utf-8\r\n\r\nFailed to reload config:\n{e}").ok()
}
} else if path == "/stop_all" {
let data = Data::get();
for (id, _) in &data.services {
data.work_queue.as_ref().unwrap().send(WorkItem::StopService(*id, false)).unwrap();
}
html::send_301(stream)
} else if path == "/shutdown" {
nix::sys::signal::kill(nix::unistd::getpid(), nix::sys::signal::SIGTERM).ok()?;
html::send_301(stream)
} else if path.starts_with("/_tail/") {
let mut path = path;
drop(path.drain(..7));
if path.contains('.') || path.contains('/') {
html::send_404(stream)
} else {
let path = path + ".log";
html::send_tail(stream, path)
}
} else if path.starts_with("/start/") {
let service = &path[7..];
let data = Data::get();
match data.services.iter().find(|(_, s)| s.config.name == service) {
Some((id, _)) => {
data.work_queue.as_ref().unwrap().send(WorkItem::StartService(*id)).unwrap();
html::send_301(stream)
},
None => html::send_404(stream)
}
} else if path.starts_with("/stop/") {
let service = &path[6..];
let data = Data::get();
match data.services.iter().find(|(_, s)| s.config.name == service) {
Some((id, _)) => {
data.work_queue.as_ref().unwrap().send(WorkItem::StopService(*id, true)).unwrap();
html::send_301(stream)
},
None => html::send_404(stream)
}
} else {
html::send_404(stream)
}
}