WhisperCom/include/WhisperCom/Service.hpp

90 lines
3.1 KiB
C++

#pragma once
/*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#include "RouterMessage.pb.h"
#include "WhisperCom/Router.hpp"
#include "zmq.hpp"
#include <atomic>
#include <chrono>
#include <memory>
#include <mutex>
#include <queue>
/**
* @brief main namespace of WhisperCom
*
*/
namespace WhisperCom
{
class Service
{
private:
/// the local url to connect to
std::string localUrl_;
/// the radio url to use within the router
std::string radioUrl_;
/// the dish url to use within the router
std::string dishUrl_;
/// a WhisperComm router for relaying messages off the computer
std::unique_ptr<WhisperCom::Router> router_;
/// context for zeromq
zmq::context_t zmqContext_;
/// socket for communication with the router
zmq::socket_t socket_;
/// time of last keepalive
std::atomic<std::chrono::steady_clock::time_point> lastKeepAlive_;
/// number of seconds without message to assume router dead
std::uint32_t routerDeadTimeOut_;
/// number of seconds to transmit and request q keepalive
std::uint32_t keepaliveTime_;
void keepalive_();
std::unique_ptr<std::thread> keepaliveThread_;
std::atomic<bool> stopKeepAlive_;
std::atomic<bool> isKeepAliveRunning_;
void receive_();
std::unique_ptr<std::thread> receiveThread_;
std::atomic<bool> stopReceive_;
std::atomic<bool> isReceiveRunning_;
std::queue<std::shared_ptr<WhisperCom::Protobuf::Message>> messageQueue_;
std::mutex mutexMessageQueue_;
std::condition_variable condWaitMessageQueue_;
void processMessage_(const zmq::message_t &msg);
void verifyRouter_();
public:
Service(const std::string localUrl, const std::string radioUrl,const std::string dishUrl, std::uint32_t routerDeadTimeout=10,std::uint32_t keepaliveTime=2);
Service()
: Service("ipc:///tmp/whispercom", "udp://239.0.0.1:40000", "udp://239.0.0.1:40000") {}
~Service();
void subscribe(const std::string &topic);
void unsubscribe(const std::string &topic);
void unsubscribeAll();
bool hasMessages() {std::lock_guard<std::mutex> guard{mutexMessageQueue_}; return messageQueue_.size();}
bool waitForMessage(std::chrono::milliseconds timeout);
bool waitForMessage(std::chrono::milliseconds timeout, std::shared_ptr<WhisperCom::Protobuf::Message> message);
std::shared_ptr<WhisperCom::Protobuf::Message> getMessage();
bool sendMessage(const std::string &topic, WhisperCom::Protobuf::Message &data);
void stop();
}; // class Service
}; // namespace WhisperCom