Switched from Websockets to POST requests and SSE for streams

This commit is contained in:
Mutzi 2023-10-02 15:41:00 +02:00
parent b52b0fcc82
commit 9caeb447c3
Signed by: root
GPG Key ID: 2437494E09F13876
6 changed files with 148 additions and 212 deletions

View File

@ -1,6 +1,21 @@
use itertools::Itertools; use itertools::Itertools;
use crate::data::RPC; use crate::data::RPC;
pub const JSON_INNER_IMPLS: &[(&str, &str)] = &[
("std::string", "String"),
("std::int8_t", "Int"),
("std::int16_t", "Int"),
("std::int32_t", "Int"),
("std::int64_t", "Int64"),
("std::uint8_t", "Uint"),
("std::uint16_t", "Uint"),
("std::uint32_t", "Uint"),
("std::uint64_t", "Uint64"),
("bool", "Bool"),
("std::float_t", "Double"),
("std::double_t", "Double")
];
pub fn ty_to_str(ty: &crate::data::Types) -> String { pub fn ty_to_str(ty: &crate::data::Types) -> String {
use crate::data::Types; use crate::data::Types;
match &ty { match &ty {
@ -22,11 +37,10 @@ pub fn ty_to_str(ty: &crate::data::Types) -> String {
} }
} }
pub fn method_args(method: &crate::data::MethodTy) -> String { pub fn method_args(method: &crate::data::MethodTy) -> String {
method.args.iter() method.args.iter()
.map(|arg| format!("{} &&{}", ty_to_str(&arg.ty), arg.name)) .map(|arg| format!("{} &&{}", ty_to_str(&arg.ty), arg.name))
.chain(method.ret_stream.then(|| format!("std::shared_ptr<MRPCStream<{}>>&&", ty_to_str(method.ret.as_ref().unwrap())))) .chain(method.ret_stream.then(|| format!("MRPCStream<{}>&&", ty_to_str(method.ret.as_ref().unwrap()))))
.join(", ") .join(", ")
} }
@ -76,6 +90,18 @@ pub fn json_write(ty: &crate::data::FieldTy) -> String {
} }
} }
pub fn streams_required(rpc: &RPC) -> Vec<String> {
let mut streams = std::collections::HashSet::new();
for s in &rpc.services {
for m in &s.methods {
if m.ret_stream {
streams.insert(ty_to_str(m.ret.as_ref().unwrap()));
}
}
}
streams.into_iter().collect()
}
pub fn gen(file_base_name: &std::path::PathBuf, rpc: &RPC) { pub fn gen(file_base_name: &std::path::PathBuf, rpc: &RPC) {
let header_name = file_base_name.with_extension("h"); let header_name = file_base_name.with_extension("h");
let header_name = header_name.file_name().unwrap().to_str().unwrap(); let header_name = header_name.file_name().unwrap().to_str().unwrap();

View File

@ -19,7 +19,7 @@ pub fn ty_to_str(ty: &crate::data::Types) -> String {
pub fn method_args(method: &crate::data::MethodTy) -> String { pub fn method_args(method: &crate::data::MethodTy) -> String {
method.args.iter() method.args.iter()
.map(|arg| format!("{}: {}", arg.name, ty_to_str(&arg.ty))) .map(|arg| format!("{}: {}", arg.name, ty_to_str(&arg.ty)))
.chain(method.ret_stream.then(|| format!("__cbk: (v: {}) => void", ty_to_str(method.ret.as_ref().unwrap())))) .chain(method.ret_stream.then(|| format!("__cbk: (v: {}|null) => void", ty_to_str(method.ret.as_ref().unwrap()))))
.join(", ") .join(", ")
} }

View File

@ -4,80 +4,93 @@
@(header_name: &str, rpc: &RPC) @(header_name: &str, rpc: &RPC)
#include "@header_name" #include "@header_name"
#include <corvusoft/restbed/session.hpp>
#include <corvusoft/restbed/resource.hpp>
#include <corvusoft/restbed/request.hpp>
@:cpp_server_json_cpp(rpc) @:cpp_server_json_cpp(rpc)
template<class T> template<typename T>
void send_msg(crow::websocket::connection &c, std::uint64_t id, const T &v) @{ void send_msg(const std::shared_ptr<restbed::Session> &c, const T &v) @{
if (c->is_closed())
return;
rapidjson::StringBuffer s; rapidjson::StringBuffer s;
mrpc::MRPCJWriter writer@{s@}; mrpc::MRPCJWriter writer@{s@};
writer.StartObject(); v >> writer;
writer.Key("id"); const auto body_ptr = s.GetString();
writer.Uint64(id); const auto body = restbed::Bytes@{body_ptr, body_ptr+s.GetLength()@};
writer.Key("data"); c->yield(
if constexpr (std::is_same_v<T, std::nullptr_t>) 200,
writer.Null(); body,
else std::multimap<std::string, std::string>@{
v >> writer; @{"Content-Type", "application/json"@},
writer.EndObject(); @{"Content-Length", std::to_string(body.size())@}
c.send_text(s.GetString()); @}
);
@} @}
void mrpc::MRPCStreamImpl::close() noexcept @{ template<typename T>
if (conn != nullptr) @{ void send_sse_msg(const std::shared_ptr<restbed::Session> &c, const T &v) @{
send_msg(*conn, id, nullptr); if (c->is_closed())
conn = nullptr; return;
@} rapidjson::StringBuffer s;
std::memcpy(s.Push(5), "data:", 5);
mrpc::MRPCJWriter writer@{s@};
v >> writer;
std::memcpy(s.Push(2), "\n\n", 2);
const auto body_ptr = s.GetString();
const auto body = restbed::Bytes@{body_ptr, body_ptr+s.GetLength()@};
c->yield(body);
@} @}
void mrpc::MRPCStreamImpl::abort() noexcept @{ conn = nullptr; @}
bool mrpc::MRPCStreamImpl::is_open() noexcept @{ return conn != nullptr; @}
void mrpc::MRPCServer::install(crow::SimpleApp &app, std::string &&route) @{ mrpc::MRPCStreamImpl::MRPCStreamImpl(const std::shared_ptr<restbed::Session> &conn) : conn(conn) @{
app.route_dynamic(std::move(route)) conn->yield(
.websocket() 200,
.onclose([&](crow::websocket::connection &c, const std::string&)@{ std::multimap<std::string, std::string>@{
std::lock_guard guard@{__streams_mutex@}; @{"Cache-Control", "no-cache"@},
auto range = __streams.equal_range(&c); @{"Content-Type", "text/event-stream"@}
for (auto it = range.first; it != range.second; ++it) @}
it->second->abort(); );
__streams.erase(&c); @}
@})
.onmessage([this](auto &&a, auto &&b, auto &&c) @{ void mrpc::MRPCStreamImpl::close() const noexcept @{ conn->close("data:null\n\n"); @}
try @{ msg_handler(a, b, c); @} bool mrpc::MRPCStreamImpl::is_open() const noexcept @{ return conn->is_open(); @}
catch (const std::exception &_) @{@} @for s in streams_required(rpc) {template<> void MRPCStream<mrpc::@s>::send(const mrpc::@s &v) const noexcept @{ send_sse_msg(conn, v); @}
}
mrpc::MRPCServer::MRPCServer(std::shared_ptr<restbed::Resource> &r) @{
r->set_method_handler("POST", [this](const std::shared_ptr<restbed::Session>& s) @{
const auto req = s->get_request();
const auto body_len = req->get_header("Content-Length", 0);
s->fetch(body_len, [this](const std::shared_ptr<restbed::Session> &s, auto &&body) @{
try @{ msg_handler(s, body); @}
catch (const std::exception &_) @{ s->close(400); @}
@}); @});
@});
@} @}
void mrpc::MRPCServer::msg_handler(crow::websocket::connection &__c, std::string __msg, bool) @{
void mrpc::MRPCServer::msg_handler(const std::shared_ptr<restbed::Session> __c, const restbed::Bytes &__msg) @{
rapidjson::Document __j; rapidjson::Document __j;
__j.ParseInsitu(__msg.data()); __j.Parse((const char*)__msg.data(), __msg.size());
if (__j.HasParseError()) if (__j.HasParseError())
throw std::exception@{@}; throw std::exception@{@};
std::uint64_t __id; std::string __service, __method; std::string __service, __method;
json_get(__j, "id", __id); json_get(__j, "service", __service); json_get(__j, "method", __method); json_get(__j, "service", __service); json_get(__j, "method", __method);
try @{ auto __data_member = __j.FindMember("data");
auto __data_member = __j.FindMember("data"); if (__data_member == __j.MemberEnd() || !__data_member->value.IsObject())
if (__data_member == __j.MemberEnd() || !__data_member->value.IsObject()) throw std::exception@{@};
throw std::exception@{@}; auto &__data = __data_member->value;
auto &__data = __data_member->value; @for (si, s) in rpc.services.iter().enumerate() {@if si > 0 { else }else{ }if (__service == "@s.name") @{
@for (si, s) in rpc.services.iter().enumerate() { @for (mi, m) in s.methods.iter().enumerate() {@if mi > 0 { else }else{ }if (__method == "@m.name") @{
@if si > 0 {else }if (__service == "@s.name") @{ @if m.ret_stream {auto __stream = MRPCStream<@ty_to_str(m.ret.as_ref().unwrap())>@{__c@};
@for (mi, m) in s.methods.iter().enumerate() { }
@if mi > 0 {else }if (__method == "@m.name") @{ @for (name, ty) in m.args.iter().map(|a| (&a.name, ty_to_str(&a.ty))) { @ty @name; json_get<@ty>(__data, "@name", @name);
@if m.ret_stream { }
auto __stream = std::make_shared<MRPCStream<@ty_to_str(m.ret.as_ref().unwrap())>>(&__c, __id); @if m.ret_stream || m.ret.is_none() {@(s.name)_@(m.name)(@call_args(m));}
@{ std::lock_guard guard@{__streams_mutex@}; __streams.emplace(&__c, __stream); @} else {send_msg(__c, @(s.name)_@(m.name)(@call_args(m)));}
} @}}
@for (name, ty) in m.args.iter().map(|a| (&a.name, ty_to_str(&a.ty))) { @ty @name; json_get<@ty>(__data, "@name", @name);
}
@if m.ret_stream || m.ret.is_none() {@(s.name)_@(m.name)(@call_args(m));}
else {send_msg(__c, __id, @(s.name)_@(m.name)(@call_args(m)));}
@}
}
else @{ throw std::exception@{@}; @}
@}
}
else @{ throw std::exception@{@}; @} else @{ throw std::exception@{@}; @}
@} catch (const std::exception &_) @{ @}}
std::cerr << "Got invalid request " << __id << " for " << __service << "::" << __method << std::endl; else @{ throw std::exception@{@}; @}
@}
@} @}
@} @}

View File

@ -7,27 +7,31 @@
#ifndef MRPC_GEN_H #ifndef MRPC_GEN_H
#define MRPC_GEN_H #define MRPC_GEN_H
#include <unordered_map>
#include <memory> #include <memory>
#include <mutex>
#include <iosfwd>
#include <string> #include <string>
#include <vector>
#include <optional>
#include <cstdint> #include <cstdint>
#include <crow.h> #include <cmath>
#include <corvusoft/restbed/byte.hpp>
#define RAPIDJSON_HAS_STDSTRING 1 #define RAPIDJSON_HAS_STDSTRING 1
#include <rapidjson/stringbuffer.h> #include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h> #include <rapidjson/writer.h>
#include <rapidjson/document.h> #include <rapidjson/document.h>
namespace restbed @{
class Resource;
class Session;
@}
namespace mrpc @{ namespace mrpc @{
using MRPCJWriter = rapidjson::Writer<rapidjson::StringBuffer>; using MRPCJWriter = rapidjson::Writer<rapidjson::StringBuffer>;
@for e in &rpc.enums { @for e in &rpc.enums {
enum struct @e.name : std::uint64_t @{ enum struct @e.name : std::uint64_t @{
@e.values.iter().map(|(k,v)| format!("{k} = {v}")).join(",\n ") @e.values.iter().map(|(k,v)| format!("{k} = {v}")).join(",\n ")
@}; @};
} }
@for s in &rpc.structs { @for s in &rpc.structs {struct @s.name;
struct @s.name;
} }
@for s in &rpc.structs { @for s in &rpc.structs {
struct @s.name @{ struct @s.name @{
@ -35,51 +39,30 @@ struct @s.name @{
} }
MRPCJWriter& operator >>(MRPCJWriter&) const; MRPCJWriter& operator >>(MRPCJWriter&) const;
@(s.name)& operator <<(const rapidjson::Value&); @(s.name)& operator <<(const rapidjson::Value&);
@}; @};}
} @if streams_required(rpc).len() > 0 {
struct MRPCStreamImpl @{ struct MRPCStreamImpl @{
virtual void close() noexcept final; void close() const noexcept;
virtual void abort() noexcept final; bool is_open() const noexcept;
virtual bool is_open() noexcept final;
protected: protected:
MRPCStreamImpl(crow::websocket::connection *conn, uint64_t id) : conn(conn), id(id) @{@} explicit MRPCStreamImpl(const std::shared_ptr<restbed::Session> &conn);
crow::websocket::connection* conn; std::shared_ptr<restbed::Session> conn;
std::uint64_t id;
@}; @};
template<class T> template<typename T>
struct MRPCStream final : MRPCStreamImpl @{ struct MRPCStream final : MRPCStreamImpl @{
MRPCStream(crow::websocket::connection *conn, uint64_t id) : MRPCStreamImpl(conn, id) @{@} explicit MRPCStream(const std::shared_ptr<restbed::Session> &conn) : MRPCStreamImpl(conn) @{@}
bool send(const T &v) noexcept @{ void send(const T &v) const noexcept;
if (!conn) return false;
try @{
rapidjson::StringBuffer s;
mrpc::MRPCJWriter writer@{s@};
writer.StartObject();
writer.Key("id");
writer.Uint64(id);
writer.Key("data");
v >> writer;
writer.EndObject();
conn->send_text(s.GetString());
@} catch (const std::exception &_) @{
abort();
return false;
@}
return true;
@}
@}; @};
@for s in streams_required(rpc) {template struct MRPCStream<@(s)>;
}}
struct MRPCServer @{ struct MRPCServer @{
virtual void install(crow::SimpleApp &app, std::string &&route) final; explicit MRPCServer(std::shared_ptr<restbed::Resource>&);
private: private:
@for s in &rpc.services {@for m in &s.methods { virtual @method_ret(m) @(s.name)_@(m.name)(@method_args(m)) = 0; @for s in &rpc.services {@for m in &s.methods { virtual @method_ret(m) @(s.name)_@(m.name)(@method_args(m)) = 0;
}} }}
virtual void msg_handler(crow::websocket::connection&, std::string, bool) final; virtual void msg_handler(std::shared_ptr<restbed::Session>, const restbed::Bytes&) final;
std::mutex __streams_mutex;
std::unordered_multimap<crow::websocket::connection*, std::shared_ptr<MRPCStreamImpl>> __streams;
@}; @};
@} @}

View File

@ -6,57 +6,12 @@ template<typename T>
void json_get(const rapidjson::Value &j, const char *key, T &v); void json_get(const rapidjson::Value &j, const char *key, T &v);
template<typename T> template<typename T>
void json_get_inner(const rapidjson::Value&, T &v) = delete; void json_get_inner(const rapidjson::Value&, T &v) = delete;
@for (ty, jty) in JSON_INNER_IMPLS {
template<> template<> inline void json_get_inner(const rapidjson::Value &member, @ty &v) @{
inline void json_get_inner(const rapidjson::Value &member, std::string &v) @{ if (!member.Is@(jty)())
if (!member.IsString())
throw std::exception@{@}; throw std::exception@{@};
v = member.GetString(); v = member.Get@(jty)();
@} @}}
@for i in [8, 16, 32] {
template<>
inline void json_get_inner(const rapidjson::Value &member, std::int@(i)_t &v) @{
if (!member.IsInt())
throw std::exception@{@};
v = member.GetInt();
@}
template<>
inline void json_get_inner(const rapidjson::Value &member, std::uint@(i)_t& v) @{
if (!member.IsUint())
throw std::exception@{@};
v = member.GetUint();
@}
}
template<>
inline void json_get_inner(const rapidjson::Value &member, std::int64_t &v) @{
if (!member.IsInt64())
throw std::exception@{@};
v = member.GetInt64();
@}
template<>
inline void json_get_inner(const rapidjson::Value &member, std::uint64_t& v) @{
if (!member.IsUint64())
throw std::exception@{@};
v = member.GetUint64();
@}
template<>
inline void json_get_inner(const rapidjson::Value &member, bool &v) @{
if (!member.IsBool())
throw std::exception@{@};
v = member.GetBool();
@}
template<>
inline void json_get_inner(const rapidjson::Value &member, double &v) @{
if (!member.IsDouble())
throw std::exception@{@};
v = member.GetDouble();
@}
template<typename T> template<typename T>
inline void json_get_inner(const rapidjson::Value &member, std::optional<T> &v) @{ inline void json_get_inner(const rapidjson::Value &member, std::optional<T> &v) @{
@ -80,18 +35,15 @@ inline void json_get_inner(const rapidjson::Value &member, std::vector<T> &v) @{
@} @}
@} @}
@for s in &rpc.structs { @for s in &rpc.structs {
template<> template<> inline void json_get_inner(const rapidjson::Value &__j, mrpc::@s.name &v) @{
inline void json_get_inner(const rapidjson::Value &__j, mrpc::@s.name &v) @{
using namespace mrpc; using namespace mrpc;
@for f in &s.fields { json_get<@ty_to_str(&f.ty)>(__j, "@f.name", v.@f.name); @for f in &s.fields { json_get<@ty_to_str(&f.ty)>(__j, "@f.name", v.@f.name);
}@} }@}
} }
@for e in &rpc.enums { @for e in &rpc.enums {
template<> template<> inline void json_get_inner(const rapidjson::Value &j, mrpc::@e.name &v) @{
inline void json_get_inner(const rapidjson::Value &j, mrpc::@e.name &v) @{
json_get_inner<std::uint64_t>(j, (std::uint64_t&)v); json_get_inner<std::uint64_t>(j, (std::uint64_t&)v);
@} @}
mrpc::MRPCJWriter& operator >>(const mrpc::@e.name &v, mrpc::MRPCJWriter &w) @{ mrpc::MRPCJWriter& operator >>(const mrpc::@e.name &v, mrpc::MRPCJWriter &w) @{
@ -112,10 +64,10 @@ namespace mrpc @{
@for s in &rpc.structs { @for s in &rpc.structs {
MRPCJWriter& @(s.name)::operator >>(MRPCJWriter &__w) const @{ MRPCJWriter& @(s.name)::operator >>(MRPCJWriter &__w) const @{
__w.StartObject(); __w.StartObject();
@for f in &s.fields { __w.Key("@f.name"); @for f in &s.fields { __w.Key("@f.name", @f.name.len());
@json_write(&f) @json_write(&f)
} __w.EndObject(); } __w.EndObject();
return __w; return __w;
@} @}
@(s.name)& @(s.name)::operator <<(const rapidjson::Value &__j) @{ json_get_inner<@(s.name)>(__j, *this); return *this; @} @(s.name)& @(s.name)::operator <<(const rapidjson::Value &__j) @{ json_get_inner<@(s.name)>(__j, *this); return *this; @}
} }

View File

@ -3,79 +3,41 @@
@use crate::generators::ts_c::*; @use crate::generators::ts_c::*;
@(rpc: &RPC) @(rpc: &RPC)
import @{ fetchEventSource @} from '@@microsoft/fetch-event-source';
@for e in &rpc.enums { @for e in &rpc.enums {
export enum @e.name @{ export enum @e.name @{
@for (k,v) in &e.values { @k = @v, @for (k,v) in &e.values { @k = @v,
} }@}
@}
} }
@for s in &rpc.structs { @for s in &rpc.structs {
export interface @s.name @{ export interface @s.name @{
@for f in &s.fields { @f.name: @ty_to_str(&f.ty); @for f in &s.fields { @f.name: @ty_to_str(&f.ty);
}@}
} }
@}
}
interface _WSResponse @{
id: number;
data: any;
@}
interface _WSWaitingEntry @{
ok: (v: any) => void;
err: (reason?: any) => void;
@}
export class MRPCConnector @{ export class MRPCConnector @{
url: string; url: string;
socket: WebSocket;
nmi: number;
waiting: @{ [id: number]: _WSWaitingEntry @};
streams: @{ [id: number]: (v: any) => void @};
private open() @{
this.socket = new WebSocket(this.url);
this.socket.onmessage = ev => @{
const data = JSON.parse(ev.data) as _WSResponse;
if (data.id in this.streams) @{
this.streams[data.id](data.data);
if (data.data == null)
delete this.streams[data.id];
@} else if (data.id in this.waiting) @{
this.waiting[data.id].ok(data.data);
delete this.waiting[data.id];
@} else @{
console.log(`Got unexpected message: $@{data@}`);
@}
@}
this.socket.onerror = () => setTimeout(() => @{this.open();@}, 2500);
this.socket.onclose = () => setTimeout(() => @{this.open();@}, 2500);
@}
private get_prom<T>(id: number): Promise<T> @{
return new Promise<T>((ok, err) => @{ this.waiting[id] = @{ok, err@}; @});
@}
public constructor(url: string) @{ public constructor(url: string) @{
this.url = url; this.url = url;
this.nmi = 0;
this.waiting = @{@};
this.streams = @{@};
this.open();
@} @}
@for s in &rpc.services { @for m in &s.methods { @for s in &rpc.services { @for m in &s.methods {
public @(s.name)_@(m.name)(@method_args(m))@method_ret(m) @{ public @(s.name)_@(m.name)(@method_args(m))@method_ret(m) @{
const __msg = @{ const __msg = @{
id: this.nmi++,
service: '@s.name', service: '@s.name',
method: '@m.name', method: '@m.name',
data: @{@m.args.iter().map(|a| &a.name).join(",")@} data: @{@m.args.iter().map(|a| &a.name).join(",")@}
@}; @};
@if m.ret.is_some() && !m.ret_stream {const __p = this.get_prom<@ty_to_str(m.ret.as_ref().unwrap())>(__msg.id);} @if m.ret.is_some() && !m.ret_stream {return fetch(this.url, @{
else if m.ret_stream {this.streams[__msg.id] = __cbk;} method: 'POST',
this.socket.send(JSON.stringify(__msg)); body: JSON.stringify(__msg)
@if m.ret.is_some() && !m.ret_stream {return __p;} @}).then((__r) => __r.json());}
else if m.ret_stream {fetchEventSource(this.url, @{
method: 'POST',
body: JSON.stringify(__msg),
onmessage: __e => __cbk(JSON.parse(__e.data))
@});} else {fetch(this.url, @{method: 'POST', body: JSON.stringify(__msg)@});}
@} @}
}} }}
@} @}