mrpc/templates/cpp_server.rs.cpp

84 lines
3.2 KiB
C++

@use crate::data::RPC;
@use crate::generators::cpp_s::*;
@use super::cpp_server_json_cpp;
@(header_name: &str, rpc: &RPC)
#include "@header_name"
@:cpp_server_json_cpp(rpc)
template<class T>
void send_msg(crow::websocket::connection &c, std::uint64_t id, const T &v) @{
rapidjson::StringBuffer s;
mrpc::MRPCJWriter writer@{s@};
writer.StartObject();
writer.Key("id");
writer.Uint64(id);
writer.Key("data");
if constexpr (std::is_same_v<T, std::nullptr_t>)
writer.Null();
else
v >> writer;
writer.EndObject();
c.send_text(s.GetString());
@}
void mrpc::MRPCStreamImpl::close() noexcept @{
if (conn != nullptr) @{
send_msg(*conn, id, nullptr);
conn = nullptr;
@}
@}
void mrpc::MRPCStreamImpl::abort() noexcept @{ conn = nullptr; @}
bool mrpc::MRPCStreamImpl::is_open() noexcept @{ return conn != nullptr; @}
void mrpc::MRPCServer::install(crow::SimpleApp &app, std::string &&route) @{
app.route_dynamic(std::move(route))
.websocket()
.onclose([&](crow::websocket::connection &c, const std::string&)@{
std::lock_guard guard@{__streams_mutex@};
auto range = __streams.equal_range(&c);
for (auto it = range.first; it != range.second; ++it)
it->second->abort();
__streams.erase(&c);
@})
.onmessage([this](auto &&a, auto &&b, auto &&c) @{
try @{ msg_handler(a, b, c); @}
catch (const std::exception &_) @{@}
@});
@}
void mrpc::MRPCServer::msg_handler(crow::websocket::connection &__c, std::string __msg, bool) @{
rapidjson::Document __j;
__j.ParseInsitu(__msg.data());
if (__j.HasParseError())
throw std::exception@{@};
std::uint64_t __id; std::string __service, __method;
json_get(__j, "id", __id); json_get(__j, "service", __service); json_get(__j, "method", __method);
try @{
auto __data_member = __j.FindMember("data");
if (__data_member == __j.MemberEnd() || !__data_member->value.IsObject())
throw std::exception@{@};
auto &__data = __data_member->value;
@for (si, s) in rpc.services.iter().enumerate() {
@if si > 0 {else }if (__service == "@s.name") @{
@for (mi, m) in s.methods.iter().enumerate() {
@if mi > 0 {else }if (__method == "@m.name") @{
@if m.ret_stream {
auto __stream = std::make_shared<MRPCStream<@ty_to_str(m.ret.as_ref().unwrap())>>(&__c, __id);
@{ std::lock_guard guard@{__streams_mutex@}; __streams.emplace(&__c, __stream); @}
}
@for (name, ty) in m.args.iter().map(|a| (&a.name, ty_to_str(&a.ty))) { @ty @name; json_get<@ty>(__data, "@name", @name);
}
@if m.ret_stream || m.ret.is_none() {@(s.name)_@(m.name)(@call_args(m));}
else {send_msg(__c, __id, @(s.name)_@(m.name)(@call_args(m)));}
@}
}
else @{ throw std::exception@{@}; @}
@}
}
else @{ throw std::exception@{@}; @}
@} catch (const std::exception &_) @{
std::cerr << "Got invalid request " << __id << " for " << __service << "::" << __method << std::endl;
@}
@}
@}