WhisperCom/src/WhisperCom/Router.cpp

330 lines
9.2 KiB
C++

/*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#include "RouterMessage.pb.h"
#include "google/protobuf/any.pb.h"
#include "zmq.hpp"
#include <WhisperCom/Router.hpp>
#include <atomic>
#include <chrono>
#include <cstdint>
#include <exception>
#include <memory>
#include <mutex>
#include <stdexcept>
#include <thread>
#define LOGURU_WITH_STREAMS 1
#include <loguru.hpp>
WhisperCom::Router::~Router()
{
if (isLocalProcessingRunning_)
{
stop();
}
DLOG_S(INFO) << "router terminated";
}
WhisperCom::Router::Router(const std::string localUrl, const std::string radioUrl, const std::string dishUrl)
:
isReady_(false),
localUrl_(localUrl),
radiolUrl_(radioUrl),
dishUrl_(dishUrl),
zmqContext_(zmq::context_t{}),
localSocket_(zmqContext_, zmq::socket_type::server),
listening_(zmqContext_,zmq::socket_type::dish),
transmitting_(zmqContext_, zmq::socket_type::radio),
dummySocket_(zmqContext_, zmq::socket_type::server),
localProcesing_(nullptr),
isLocalProcessingRunning_(false),
stopLocalProcessing_(false)
{
isReady_=true;
try
{
dummySocket_.bind("tcp://127.0.0.1:23543");
}
catch (std::exception &e)
{
DLOG_S(WARNING) << "dummy socket port already in use";
zmqContext_.shutdown();
isReady_=false;
}
if (isReady_)
{
try
{
localSocket_.bind(localUrl_);
}
catch (std::exception &e)
{
DLOG_S(WARNING) << "local port already in use";
zmqContext_.shutdown();
isReady_=false;
}
if (localSocket_.handle() == nullptr)
{
LOG_S(ERROR) << "socket bind failed";
isReady_=false;
}
}
if (isReady_)
{
transmitting_.connect(radiolUrl_);
if (transmitting_.handle() == nullptr)
{
throw std::runtime_error("Failed connecting to radio url");
}
listening_.bind(dishUrl_);
if (listening_.handle() == nullptr)
{
throw std::runtime_error("Failed binding to dish url");
}
}
}
void WhisperCom::Router::start()
{
if (!isReady_)
{
LOG_S(ERROR) << "router not ready, can not start";
throw std::runtime_error("Can not start, router is not ready");
}
if (isLocalProcessingRunning_)
{
LOG_S(ERROR) << "router already running, can not start twice";
throw std::runtime_error("router already running, can not start twice");
}
stopLocalProcessing_=false;
localProcesing_ = std::make_unique<std::thread>(&WhisperCom::Router::processLocalMessages_, this);
std::uint16_t timeout = 1000;
while(timeout > 0 && !isLocalProcessingRunning_)
{
using namespace std::chrono_literals;
std::this_thread::sleep_for(100ms);
timeout = timeout - 100;
}
if (timeout == 0)
{
LOG_S(ERROR) << "could not start processing thread, router could not be started";
throw std::runtime_error("could not start router");
}
}
void WhisperCom::Router::stop()
{
using namespace std::chrono_literals;
std::uint16_t timeout = 1000;
if (!isLocalProcessingRunning_)
{
return;
}
if (stopLocalProcessing_)
{
return;
}
stopLocalProcessing_=true;
while(timeout > 0 && isLocalProcessingRunning_)
{
std::this_thread::sleep_for(100ms);
timeout = timeout - 100;
}
if (timeout == 0)
{
throw std::runtime_error("could not stop local processing thread");
}
localProcesing_->join();
}
void WhisperCom::Router::processLocalMessages_()
{
using namespace std::chrono_literals;
isLocalProcessingRunning_=true;
while(!stopLocalProcessing_)
{
std::this_thread::sleep_for(300ms);
bool finished = false;
// process messages on internal socket
while(!finished)
{
zmq::message_t msg{};
zmq::recv_result_t result = localSocket_.recv(msg, zmq::recv_flags::dontwait);
if (msg.empty())
{
finished=true;
}
else
{
processLocalMessage_(msg);
}
}
finished = false;
// process messages on external socket
while(!finished)
{
zmq::message_t msg{};
zmq::recv_result_t result = listening_.recv(msg, zmq::recv_flags::dontwait);
if (msg.empty())
{
finished=true;
}
else
{
processExternalMessage_(msg);
}
}
}
isLocalProcessingRunning_=false;
}
void WhisperCom::Router::processExternalMessage_(const zmq::message_t &msg)
{
WhisperCom::Protobuf::Message *eMsg = new WhisperCom::Protobuf::Message();
std::uint8_t data[msg.size()];
memcpy(data, msg.data(), msg.size());
eMsg->ParseFromArray(data, msg.size());
WhisperCom::Protobuf::RouterMessage rmsg;
rmsg.set_type(WhisperCom::Protobuf::RouterMessageType::DATA);
rmsg.set_allocated_msg(eMsg);
std::string data2 = rmsg.SerializeAsString();
zmq::message_t msg2{data2.data(), data2.size()};
for (auto it = localClients_.begin(); it != localClients_.end(); ++it) {
DLOG_S(INFO) << "send message to " << it->first;
msg2.set_routing_id(it->first);
localSocket_.send(msg2, zmq::send_flags::dontwait);
}
}
void WhisperCom::Router::processLocalMessage_(zmq::message_t &msg)
{
WhisperCom::Protobuf::RouterMessage rmsg;
rmsg.ParseFromArray(msg.data(), msg.size());
mutexLocalClients_.lock();
localClients_.emplace(msg.routing_id(),std::chrono::steady_clock::now().time_since_epoch().count());
mutexLocalClients_.unlock();
std::uint32_t sender = msg.routing_id();
std::string typeName;
if (rmsg.type() == WhisperCom::Protobuf::RouterMessageType::JOIN)
{
typeName="JOIN";
}
else if (rmsg.type() == WhisperCom::Protobuf::RouterMessageType::LEAVE)
{
typeName="LEAVE";
mutexLocalClients_.lock();
localClients_.erase(msg.routing_id());
mutexLocalClients_.unlock();
}
else if (rmsg.type() == WhisperCom::Protobuf::RouterMessageType::KEEPALIVE)
{
typeName="KEEPALIVE";
WhisperCom::Protobuf::RouterMessage rmsg;
rmsg.set_type(WhisperCom::Protobuf::RouterMessageType::LEAVE);
std::string data = rmsg.SerializeAsString();
zmq::message_t msg2{data.data(), data.size()};
msg2.set_routing_id(msg.routing_id());
localSocket_.send(msg2, zmq::send_flags::dontwait);
}
else if (rmsg.type() == WhisperCom::Protobuf::RouterMessageType::DATA)
{
WhisperCom::Protobuf::Message externalMsg = rmsg.msg();
google::protobuf::Any raw;
std::string rawPayload=externalMsg.payload().SerializePartialAsString();
zmq::message_t emsg{rawPayload.data(), rawPayload.size()};
emsg.set_group(rmsg.topic().c_str());
transmitting_.send(emsg, zmq::send_flags::dontwait);
std::lock_guard<std::mutex> guard{mutexLocalClients_};
typeName="DATA";
for (auto it = localClients_.begin(); it != localClients_.end(); ++it)
{
if ((*it).first != sender)
{
msg.set_routing_id(it->first);
localSocket_.send(msg, zmq::send_flags::dontwait);
}
}
}
else if (rmsg.type() == WhisperCom::Protobuf::RouterMessageType::SUBSCRIBE)
{
DLOG_S(INFO) << "subscription request from " << sender << " topic: " << rmsg.topic();
std::lock_guard<std::mutex> guard{mutexSubscription_};
typeName="Subscribe";
auto it = subscription_.find(rmsg.topic());
if (it == subscription_.end() || (*it).second.empty())
{
listening_.join(rmsg.topic().c_str());
}
subscription_[rmsg.topic()].push_back(sender);
}
else if (rmsg.type() == WhisperCom::Protobuf::RouterMessageType::UNSUBSCRIBE)
{
std::lock_guard<std::mutex> guard{mutexSubscription_};
typeName="UnSubscribe";
auto it = std::find(subscription_[rmsg.topic()].begin(),subscription_[rmsg.topic()].end(),sender);
if (it != subscription_[rmsg.topic()].end())
{
subscription_[rmsg.topic()].erase(it);
}
listening_.leave(rmsg.topic().c_str());
}
else if (rmsg.type() == WhisperCom::Protobuf::RouterMessageType::UNSUBSCRIBE_ALL)
{
std::lock_guard<std::mutex> guard{mutexSubscription_};
typeName="UnSubscribeAll";
for (auto topic = subscription_.begin(); topic != subscription_.end(); ++topic)
{
auto it = std::find(subscription_[topic->first].begin(),subscription_[topic->first].end(),sender);
if (it != subscription_[topic->first].end()) {
subscription_[topic->first].erase(it);
}
listening_.leave(topic->first.c_str());
}
}
DLOG_S(INFO) << "received " << typeName << " message from " << sender;
}