WhisperCom/src/WhisperCom/Service.cpp

311 lines
8.3 KiB
C++

/*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#include "RouterMessage.pb.h"
#include "WhisperCom/Router.hpp"
#include "google/protobuf/any.pb.h"
#include "zmq.hpp"
#include <WhisperCom/Service.hpp>
#include <chrono>
#include <cstdint>
#include <cstring>
#include <exception>
#include <future>
#include <memory>
#include <mutex>
#include <stdexcept>
#include <thread>
#define LOGURU_WITH_STREAMS 1
#include <loguru.hpp>
WhisperCom::Service::~Service()
{
if (isKeepAliveRunning_ || isReceiveRunning_)
{
stop();
}
DLOG_S(INFO) << "service terminated";
}
WhisperCom::Service::Service(const std::string localUrl, const std::string radioUrl, const std::string dishUrl, std::uint32_t routerDeadTimeout, std::uint32_t keepaliveTime)
:
routerDeadTimeOut_(routerDeadTimeout),
keepaliveTime_(keepaliveTime),
localUrl_(localUrl),
radioUrl_(radioUrl),
dishUrl_(dishUrl),
router_(std::make_unique<WhisperCom::Router>(localUrl,radioUrl,dishUrl)),
zmqContext_(),
socket_(zmqContext_,zmq::socket_type::client),
lastKeepAlive_(std::chrono::steady_clock::now()),
keepaliveThread_(nullptr),
stopKeepAlive_(false),
isKeepAliveRunning_(false),
receiveThread_(nullptr),
stopReceive_(false),
isReceiveRunning_(false)
{
if (router_->isReady())
{
router_->start();
}
else
{
router_=nullptr;
}
socket_.connect(localUrl);
WhisperCom::Protobuf::RouterMessage rmsg{};
rmsg.set_type(WhisperCom::Protobuf::RouterMessageType::JOIN);
std::string data = rmsg.SerializeAsString();
DLOG_S(INFO) << "message size: " << data.size();
zmq::message_t msg{data.data(), data.size()};
socket_.send(msg,zmq::send_flags::none);
keepaliveThread_ = std::make_unique<std::thread>(&WhisperCom::Service::keepalive_,this);
receiveThread_ = std::make_unique<std::thread>(&WhisperCom::Service::receive_,this);
}
void WhisperCom::Service::verifyRouter_()
{
std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
std::chrono::steady_clock::time_point update = lastKeepAlive_;
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - update);
if (elapsed.count() > routerDeadTimeOut_ && router_ == nullptr)
{
router_ = std::make_unique<WhisperCom::Router>(localUrl_,radioUrl_,dishUrl_);
if (router_->isReady())
{
router_->start();
}
else
{
router_=nullptr;
}
}
}
void WhisperCom::Service::keepalive_()
{
using namespace std::chrono_literals;
isKeepAliveRunning_=true;
WhisperCom::Protobuf::RouterMessage rmsg{};
rmsg.set_type(WhisperCom::Protobuf::RouterMessageType::KEEPALIVE);
while(!stopKeepAlive_)
{
std::string data = rmsg.SerializeAsString();
zmq::message_t msg{data.data(), data.size()};
socket_.send(msg, zmq::send_flags::dontwait);
std::this_thread::sleep_for(std::chrono::seconds(keepaliveTime_));
}
isKeepAliveRunning_=false;
}
void WhisperCom::Service::receive_()
{
using namespace std::chrono_literals;
isReceiveRunning_ = true;
while (!stopReceive_) {
std::this_thread::sleep_for(300ms);
bool finished = false;
while(!finished)
{
zmq::message_t msg{};
try
{
zmq::recv_result_t result = socket_.recv(msg, zmq::recv_flags::dontwait);
}
catch (std::exception &e)
{
finished=true;
stopReceive_=true;
}
if (!msg.empty()) {
DLOG_S(INFO) << "processing message";
processMessage_(msg);
}
else
{
finished=true;
}
};
if (router_ == nullptr)
{
verifyRouter_();
}
}
isReceiveRunning_=false;
}
void WhisperCom::Service::processMessage_(const zmq::message_t &msg)
{
WhisperCom::Protobuf::RouterMessage rmsg;
rmsg.ParseFromArray(msg.data(), msg.size());
lastKeepAlive_=std::chrono::steady_clock::now();
if (rmsg.type() == WhisperCom::Protobuf::RouterMessageType::DATA)
{
WhisperCom::Protobuf::Message wMsg{};
wMsg = rmsg.msg();
std::unique_lock lk(mutexMessageQueue_);
mutexMessageQueue_.lock();
messageQueue_.push(std::make_shared<WhisperCom::Protobuf::Message>(wMsg));
mutexMessageQueue_.unlock();
condWaitMessageQueue_.notify_one();
DLOG_S(INFO) << "received data message";
}
}
bool WhisperCom::Service::sendMessage(const std::string &topic,WhisperCom::Protobuf::Message &data)
{
WhisperCom::Protobuf::RouterMessage rmsg{};
std::unique_ptr<WhisperCom::Protobuf::Message> payload;
payload = std::make_unique < WhisperCom::Protobuf::Message>(data);
rmsg.set_type(WhisperCom::Protobuf::RouterMessageType::DATA);
rmsg.set_allocated_msg(payload.release());
rmsg.set_topic(topic);
std::string msgData = rmsg.SerializeAsString();
zmq::message_t msg{msgData.data(), msgData.size()};
socket_.send(msg, zmq::send_flags::dontwait);
return true;
}
void WhisperCom::Service::stop()
{
using namespace std::chrono_literals;
WhisperCom::Protobuf::RouterMessage rmsg{};
rmsg.set_type(WhisperCom::Protobuf::RouterMessageType::LEAVE);
std::string data = rmsg.SerializeAsString();
zmq::message_t msg{data.data(), data.size()};
socket_.send(msg, zmq::send_flags::dontwait);
std::uint16_t timeout = 3000;
stopKeepAlive_=true;
stopReceive_=true;
socket_.set(zmq::sockopt::linger,0);
while(timeout > 0 && (isKeepAliveRunning_ || isReceiveRunning_ ))
{
std::this_thread::sleep_for(100ms);
timeout = timeout - 100;
DLOG_S(INFO) << "wait termination";
}
if (timeout == 0)
{
if (isKeepAliveRunning_)
{
LOG_S(ERROR) << "could not stop keepalive thread";
throw std::runtime_error("could not stop keepalive thread");
}
else
{
LOG_S(ERROR) << "could not stop receiver thread";
throw std::runtime_error("could not stop receiver thread");
}
}
keepaliveThread_->join();
DLOG_S(INFO) << "joined keepalive";
receiveThread_->join();
DLOG_S(INFO) << "joined receiver";
zmqContext_.shutdown();
DLOG_S(INFO) << "stopped";
}
void WhisperCom::Service::subscribe(const std::string &topic) {
WhisperCom::Protobuf::RouterMessage rmsg{};
rmsg.set_type(WhisperCom::Protobuf::RouterMessageType::SUBSCRIBE);
rmsg.set_topic(topic);
std::string msgData = rmsg.SerializeAsString();
zmq::message_t msg{msgData.data(), msgData.size()};
socket_.send(msg, zmq::send_flags::dontwait);
}
void WhisperCom::Service::unsubscribe(const std::string &topic) {
WhisperCom::Protobuf::RouterMessage rmsg{};
rmsg.set_type(WhisperCom::Protobuf::RouterMessageType::UNSUBSCRIBE);
rmsg.set_topic(topic);
std::string msgData = rmsg.SerializeAsString();
zmq::message_t msg{msgData.data(), msgData.size()};
socket_.send(msg, zmq::send_flags::dontwait);
}
void WhisperCom::Service::unsubscribeAll() {
WhisperCom::Protobuf::RouterMessage rmsg{};
rmsg.set_type(WhisperCom::Protobuf::RouterMessageType::UNSUBSCRIBE_ALL);
std::string msgData = rmsg.SerializeAsString();
zmq::message_t msg{msgData.data(), msgData.size()};
socket_.send(msg, zmq::send_flags::dontwait);
}
bool WhisperCom::Service::waitForMessage(std::chrono::milliseconds timeout)
{
std::unique_lock lk(mutexMessageQueue_);
if (messageQueue_.empty())
{
condWaitMessageQueue_.wait_for(lk,timeout);
}
if (messageQueue_.empty())
{
return false;
}
return true;
}
bool WhisperCom::Service::waitForMessage(std::chrono::milliseconds timeout, std::shared_ptr<WhisperCom::Protobuf::Message> message) {
std::unique_lock lk(mutexMessageQueue_);
if (messageQueue_.empty()) {
condWaitMessageQueue_.wait_for(lk, timeout);
}
if (messageQueue_.empty()) {
return false;
}
message = messageQueue_.front();
messageQueue_.pop();
return true;
}