Compare commits

..

28 Commits

Author SHA1 Message Date
91ef7ad770
FIX: fixed empty bug
All checks were successful
continuous-integration/drone Build is passing
2024-01-16 12:41:45 +01:00
0198266526
FIX: removed unnecessary atomic 2024-01-16 12:27:12 +01:00
22ff983817
FIX: test if thread is joinable before join 2024-01-16 12:25:53 +01:00
caa13b0159
ADD: add CI/CD flow
Some checks reported errors
continuous-integration/drone Build encountered an error
2023-09-04 21:50:56 +02:00
7a1f5b4ae1
FIX: unsubscribe participant from all events on disconnect 2023-07-03 12:35:53 +02:00
38c69d8da1
FIX: subscribing when manager is unset
One feature added to EventManager has been disconnecting
a participant at runtime. Unfortunatly, then the
manager of this participant is set to nullptr using
the setManager method. But this method also subscribes to the
shutdown event afterwords. This commit checks if the manager
is nullptr and ignore subscribing in that case.
2023-07-03 12:23:04 +02:00
4f7f58a60c
ADD: Adds class function to participant to disconnect itself from the event manager. 2022-11-01 20:06:38 +01:00
3dc92bde5f
MOD: Replaces the queues connectionQueue_ and disableSchedulingQueue with a command queue where different commands can be queued for later processing. 2022-09-22 18:06:51 +02:00
01b2d3cd0d
ADD: Adds class comments for Event, Manager and Participant. 2022-09-22 13:39:33 +02:00
d268273e3c
FIX: fixes deadlock bug of class function to disable scheduling
When calling _diableScheduling of the participant to enable the
own participant for scheduling by the manager the process runned in a
deadlock.

Summary:
1. manager locks mutex mutexSchedulingParticipants_
2. calls schedule class function of participants in the
   schedulingParticipants_ list
3. there the _disableScheduling function is called by the participant
4. which calls the unschedule function of the manager
5. manager tries to lock mutexSchedulingParticipants_ which is already
   locked

Solution:
- this commit adds the following elements to the Manager class:
	1. disableScheduleQueue_
	2. mutexDisableScheduleQueue_
	3. processDisableScheduling_() ( like processConnections() )
- this commit changes the following functions of Manager class:
	1. unschedule( ... ) -> now adds given participant to
	   disableScheduleQueue
	2. scheduleProcess_() -> now also calls the class function
	   processDisableSchedulung_() after processConections_()
2022-08-30 15:12:51 +02:00
5c2fc37748
ADD: Adds comment to processConnections_ class function of the manager 2022-08-30 15:12:31 +02:00
1ed68d302d
FIX: fixes spelling error in variablename participants_ (list of connected participants in the manager). 2022-08-30 12:43:42 +02:00
92975e8349
FIX: fixed deadlock when participant connects another one 2022-02-19 16:06:49 +01:00
c2a609c913
ADD: support disable scheduling by manager 2022-02-19 12:32:25 +01:00
2141eb0c61
FIX: fixed missing doxygen comment 2022-02-19 12:28:19 +01:00
b79a67b596
ADD: assign a unique id to each connecting participant 2022-02-19 12:26:19 +01:00
89f13445ba
ADD: protect eventMap with mutex 2022-02-18 22:49:09 +01:00
e7d90d0c87
ADD: added method to not schedule a participant anymore 2022-02-18 22:02:31 +01:00
03d9f5537d
FIX: fixed missing algorithm include 2021-11-30 14:04:44 +01:00
9074fe3631
FIX: use unique object lib name 2021-10-25 23:29:48 +02:00
9808646bac
FIX: only add tests if tests are enabled 2021-10-25 23:28:33 +02:00
a0d174df08
ADD: add participant to a list on connect 2021-08-16 20:20:01 +02:00
e7b1eeeaa1
ADD: check for timeout when waiting for event 2021-08-16 20:12:05 +02:00
789ce929a4
ADD: method to wait for new event with timeout 2021-08-16 20:07:07 +02:00
8f78070770
ADD: support connecting additonal participants from participant 2021-08-15 20:53:45 +02:00
437feb597d
FIX: added missing method disconnect 2021-08-15 20:53:23 +02:00
9a0efd0b62
ADD: improved calling interfacce 2021-08-04 12:04:30 +02:00
e06a5b9be5
FIX: fixed license in README 2021-08-04 09:56:32 +02:00
9 changed files with 469 additions and 53 deletions

48
.drone.yml Normal file
View File

@ -0,0 +1,48 @@
kind: pipeline
type: kubernetes
name: build-amd64
platform:
arch: amd64
node_selector:
kubernetes.io/arch: amd64
steps:
- name: submodules
image: alpine/git
commands:
- git submodule update --init --recursive
- name: build-amd64
image: debian:bookworm-slim
commands:
- apt-get update
- apt-get -qy install gcc-12 cmake make build-essential
- mkdir build
- cd build; cmake ..
- make -j 4
- make test
---
kind: pipeline
type: kubernetes
name: build-arm64
platform:
arch: arm64
node_selector:
kubernetes.io/arch: arm64
steps:
- name: submodules
image: alpine/git
commands:
- git submodule update --init --recursive
- name: build-arm64
image: debian:bookworm-slim
commands:
- apt-get update
- apt-get -qy install gcc-12 cmake make build-essential
- mkdir build
- cd build; cmake ..
- make -j 4
- make test

View File

@ -42,17 +42,17 @@ SET(EVENTMANAGER_SOURCES
src/EventManager/Manager.cpp src/EventManager/Manager.cpp
) )
add_library(objlib OBJECT ${EVENTMANAGER_SOURCES}) add_library(em-objlib OBJECT ${EVENTMANAGER_SOURCES})
set_property(TARGET objlib PROPERTY POSITION_INDEPENDENT_CODE 1) set_property(TARGET em-objlib PROPERTY POSITION_INDEPENDENT_CODE 1)
target_include_directories(objlib target_include_directories(em-objlib
PUBLIC PUBLIC
include include
PRIVATE PRIVATE
src src
) )
target_link_libraries(objlib PUBLIC Threads::Threads) target_link_libraries(em-objlib PUBLIC Threads::Threads)
add_library(eventmanager SHARED $<TARGET_OBJECTS:objlib>) add_library(eventmanager SHARED $<TARGET_OBJECTS:em-objlib>)
target_include_directories(eventmanager target_include_directories(eventmanager
PUBLIC PUBLIC
include include
@ -61,7 +61,7 @@ target_include_directories(eventmanager
) )
target_link_libraries(eventmanager PUBLIC Threads::Threads) target_link_libraries(eventmanager PUBLIC Threads::Threads)
add_library(eventmanager-static STATIC $<TARGET_OBJECTS:objlib>) add_library(eventmanager-static STATIC $<TARGET_OBJECTS:em-objlib>)
target_include_directories(eventmanager-static target_include_directories(eventmanager-static
PUBLIC PUBLIC
include include
@ -70,6 +70,7 @@ target_include_directories(eventmanager-static
) )
target_link_libraries(eventmanager-static PUBLIC Threads::Threads) target_link_libraries(eventmanager-static PUBLIC Threads::Threads)
IF(${EM_TESTS})
# #
# add tests as executable # add tests as executable
# #
@ -80,3 +81,5 @@ catch_discover_tests(test_event)
add_executable(test_basic tests/test_basic.cpp) add_executable(test_basic tests/test_basic.cpp)
target_link_libraries(test_basic Catch2::Catch2 eventmanager-static) target_link_libraries(test_basic Catch2::Catch2 eventmanager-static)
catch_discover_tests(test_basic) catch_discover_tests(test_basic)
ENDIF()

View File

@ -10,7 +10,7 @@ A small Event System written as C++20 library.
## License ## License
GPLv3 MPLv2
## Rest ## Rest
T.b.d. T.b.d.

View File

@ -22,7 +22,13 @@
/// Eventtype to notify all participants that a shutdown is immanent /// Eventtype to notify all participants that a shutdown is immanent
const static std::uint32_t EVENT_TYPE_SHUTDOWN = 0; const static std::uint32_t EVENT_TYPE_SHUTDOWN = 0;
/**
* @class Event
* @brief An Event is the element in the system that triggers actions from participants
*
* Derive own events from this class to sned e.g. also a payload to subscribing
* participants.
*/
class Event class Event
{ {
private: private:
@ -36,7 +42,7 @@
std::uint64_t responseId_; std::uint64_t responseId_;
/// identifies if this event is a response to another event /// identifies if this event is a response to another event
std::atomic<bool> isResponse_; bool isResponse_;
/// emitter of the event /// emitter of the event
std::shared_ptr<EventManager::Participant> emitter_; std::shared_ptr<EventManager::Participant> emitter_;
@ -45,17 +51,17 @@
/** /**
* @brief constructor for creating a simple event * @brief constructor for creating a simple event
* *
* @param type - what kinf of event is this * @param type - what kind of event is this
*/ */
Event(std::uint32_t type); Event(std::uint32_t type);
/** /**
* @brief Constructor to create a response Event * @brief Constructor to create a response event
*/ */
Event(std::uint32_t type, const EventManager::Event &event); Event(std::uint32_t type, const EventManager::Event &event);
/** /**
* @brief Constructor to create a response Event * @brief Constructor to create a response event
*/ */
Event(std::uint32_t type, const std::shared_ptr<EventManager::Event> event); Event(std::uint32_t type, const std::shared_ptr<EventManager::Event> event);

View File

@ -25,10 +25,42 @@
namespace EventManager namespace EventManager
{ {
/**
* @brief command types
*/
enum class commandType :uint16_t
{
// command to connect a participant to the manager
CONNECT,
// command to disconnect a participant from the manager
DISCONNECT,
// command to schedule start scheduling a participant
ENABLE_SCHEDULING,
// command to disable scheduling for a participant (not schedule it anymore)
DISABLE_SCHEDULING
}; // commandType
// forward declaration of EventManager::Participant // forward declaration of EventManager::Participant
class Participant; class Participant;
class Manager /**
* @class Manager
*
* If you use the manager it has to be a shared pointer. Otherwise you will
* get a mem error.
*
* To add participants to the manager call class function connect.
* Calling start method will start the manager.
*
* Depending on your concept you can first connect all participants to
* the manager and then start it. Or you can start in first and then
* connect the participants. In the first example all participant will
* be started at the same time (when calling start from the manager.)
* In the second example they will be started when they are connected.
* That means if you have one starting event that all participants need
* to receive you would choose example 1.
*/
class Manager : public std::enable_shared_from_this<Manager>
{ {
/// the thread the event manager is transmitting events in /// the thread the event manager is transmitting events in
std::unique_ptr<std::thread> mainThread_; std::unique_ptr<std::thread> mainThread_;
@ -51,6 +83,9 @@
/// map holding all the event type and plugin combinations /// map holding all the event type and plugin combinations
std::map<std::uint32_t, std::list<std::shared_ptr<EventManager::Participant>>> eventMap_; std::map<std::uint32_t, std::list<std::shared_ptr<EventManager::Participant>>> eventMap_;
/// mutex to protect the eventMap
std::mutex mutexEventMap_;
/// queue for incomng events /// queue for incomng events
std::queue<std::shared_ptr<EventManager::Event>> eventQueue_; std::queue<std::shared_ptr<EventManager::Event>> eventQueue_;
@ -63,17 +98,32 @@
/// list of all plugins requiring scheduling /// list of all plugins requiring scheduling
std::list<std::shared_ptr<EventManager::Participant>> schedulingParticipants_; std::list<std::shared_ptr<EventManager::Participant>> schedulingParticipants_;
/// mutex to protect schedulingPlugins_ /// mutex to protect list schedulingParticipants_
std::mutex mutexSchedulingParticipants_; std::mutex mutexSchedulingParticipants_;
/* /// list of all participants connected
* all private methods std::list<std::shared_ptr<EventManager::Participant>> participants_;
*/
/** /// mutex to protect participants_
* @brief the method running in the main thread std::mutex mutexParticipants_;
*/
void mainProcess_(); /// the id for the next participant connecting
std::uint32_t nextParticipantID_;
/// queue for different command requests like e.g. connecting a participant to the manager
std::queue<std::pair<EventManager::commandType,std::shared_ptr<EventManager::Participant>>> commandQueue_;
/// mutex to protect the commandQueue_
std::mutex mutexCommandQueue_;
/*
* all private methods
*/
/**
* @brief the method running in the main thread
*/
void mainProcess_();
/** /**
* @brief the method running in the scheduling thread * @brief the method running in the scheduling thread
@ -85,6 +135,46 @@
*/ */
void processEvent(const std::shared_ptr<EventManager::Event> event); void processEvent(const std::shared_ptr<EventManager::Event> event);
/**
* @brief processes the commands in the commandQueue depending on their type
*/
void processCommands_();
/**
* @brief adds the queued participants to the list of connected participants
*
* The connectionQueue_ contains the participants that should be connected to
* the manager. All connected participants are stored in the list participants_.
* This class function adds the queued participants to the list participants_
* and removes them from the queue.
*/
void processConnect_( std::shared_ptr<EventManager::Participant> participant );
/**
* @brief removes the given participant form the list of connected participants
*
* The participants_ list contains the participants that are connected to
* the manager. This class function removes the queued participants from the list
* participants_.
*/
void processDisconnect_( std::shared_ptr<EventManager::Participant> participant );
/**
* @brief processes the command to enable scheduling for the given participant
*
* This class function adds the given participant to the schedulingParticipants_ list
* where the participants are scheduled by calling their schedule class function.
*/
void processEnableScheduling_( std::shared_ptr<EventManager::Participant> participant );
/**
* @brief disables the scheduling of the requested participants
*
* Removes the participants from the schedulingParticipants_ list that
* should not be scheduled anymore.
*/
void processDisableScheduling_( std::shared_ptr<EventManager::Participant> participant );
/** /**
* @brief start the main thread * @brief start the main thread
*/ */
@ -110,14 +200,16 @@
/** /**
* @brief The constructor for the event manager * @brief The constructor for the event manager
* *
* Just initializes all attributes to its starting value * Just initializes all attributes to their starting values
*/ */
Manager() : mainThread_(nullptr), isMainThreadRunning_(false), Manager() : mainThread_(nullptr), isMainThreadRunning_(false),
stopMainThread_(false), schedulingThread_(nullptr), stopMainThread_(false), schedulingThread_(nullptr),
isSchedulingThreadRunning_(false), stopSchedulingThread_(false){} isSchedulingThreadRunning_(false), stopSchedulingThread_(false),
nextParticipantID_(1){}
~Manager(); ~Manager();
/** /**
* @brief start the event manager * @brief start the event manager
*/ */
@ -185,6 +277,23 @@
*/ */
void schedule(std::shared_ptr<EventManager::Participant> plugin); void schedule(std::shared_ptr<EventManager::Participant> plugin);
/**
* @brief remove a participant from scheduling
*
* @param participant - the participant to remove
*/
void unschedule(std::shared_ptr<EventManager::Participant> participant);
/**
* @brief method to connect a particpant to the manager
*/
void connect(std::shared_ptr<EventManager::Participant> participant);
/**
* @brief method to disconnect a particpant from the manager
*/
void disconnect(std::shared_ptr<EventManager::Participant> participant);
}; // class Manager }; // class Manager
}; //namespace EventManager }; //namespace EventManager

View File

@ -27,10 +27,15 @@
/** /**
* @brief The entity participating in the event system. * @brief The entity participating in the event system.
*
* If you want the participant to be scheduled from the manager call
* _enableScheduling class function.
*/ */
class Participant : public std::enable_shared_from_this<Participant> class Participant : public std::enable_shared_from_this<Participant>
{ {
private: private:
/// a unique id for this participant, helpful for debugging
std::uint32_t id_;
/// pointer to the event manager /// pointer to the event manager
std::shared_ptr<EventManager::Manager> manager_; std::shared_ptr<EventManager::Manager> manager_;
@ -64,6 +69,14 @@
*/ */
virtual void schedule_() { throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + " not implemented");} virtual void schedule_() { throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + " not implemented");}
/**
* @brief method called by the EventManager::Manager on connecting this particpant
*
* This method need to be implemented by each child class of EventManager::Participant
* Its the best location for subscribing to events and enable scheduling if required.
*/
virtual void init_() {throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + " not implemented");}
protected: protected:
@ -92,6 +105,13 @@
*/ */
void _waitForEvent(); void _waitForEvent();
/**
* @brief wait for a new event with timeout
*
* @return true - new event available and queue locked
* @return false - no new event, queue not locked, timeout reached
*/
bool _waitForEvent(std::uint32_t timeoutMS);
/** /**
* @brief This method subscribes the participant to an event type * @brief This method subscribes the participant to an event type
@ -122,18 +142,69 @@
*/ */
void _enableScheduling(); void _enableScheduling();
/**
* @brief disable scheduling of this particpant through the EventManager::Manager
*/
void _disableScheduling();
/** /**
* @brief check if the participant is scheduled by event manager * @brief check if the participant is scheduled by event manager
*/ */
bool isScheduledByManager() const {return isScheduledByManager_;} bool isScheduledByManager() const {return isScheduledByManager_;}
/**
* @brief connect a new participant through another participant
*/
void connect(std::shared_ptr<EventManager::Participant> participant);
/**
* @brief disconnect a participant through another participant
*/
void disconnect(std::shared_ptr<EventManager::Participant> participant);
/**
* @brief disconnect this participant from the event manager
*/
void disconnect();
public: public:
/** /**
* @brief Constructor setting the participant up for use * @brief Constructor setting the participant up for use
*/ */
Participant(); Participant();
void setManager(std::shared_ptr<EventManager::Manager> manager) { manager_=manager;_subscribe(EVENT_TYPE_SHUTDOWN);} /**
* @brief Method to set the Manager object
*
* This method is in general only used by the EventManager::Manager!
* Only use this method if you really know what you are doing!
*
* @param manager - the manager to set
*/
void setManager(std::shared_ptr<EventManager::Manager> manager)
{ manager_=manager;
if (manager_!=nullptr)
{
_subscribe(EVENT_TYPE_SHUTDOWN);
}
}
/**
* @brief Method to set the unique id of the participant
*
* This method is in general only used by the EventManager::Manager!
* Only use this method if you really know what you are doing!
*
* @param id - the id to set
*/
void setID(const std::uint32_t id) { id_=id;}
/**
* @brief Method to return the unique id of the participant
*
* @return std::uint32_t
*/
std::uint32_t getID() const { return id_;}
/** /**
* @brief Method called by the EventManager::Manager to schedule the particpant * @brief Method called by the EventManager::Manager to schedule the particpant
@ -141,6 +212,12 @@
*/ */
void schedule() {schedule_();}; void schedule() {schedule_();};
/**
* @brief method called by the EventManager::Manager when connecting the particpant
*
* method calls virtual _init method which has to be implemented by each particpants child class
*/
void init() {init_();}
/** /**
* @brief emit an event to this participant * @brief emit an event to this participant

View File

@ -11,6 +11,8 @@
#include <EventManager/Manager.hpp> #include <EventManager/Manager.hpp>
#include <EventManager/Participant.hpp> #include <EventManager/Participant.hpp>
#include <iostream> #include <iostream>
#include <algorithm>
#include <mutex>
void EventManager::Manager::startMain_() void EventManager::Manager::startMain_()
{ {
@ -86,7 +88,10 @@
throw std::runtime_error("can not stop main thread"); throw std::runtime_error("can not stop main thread");
} }
mainThread_->join(); if (mainThread_->joinable())
{
mainThread_->join();
}
} }
void EventManager::Manager::stopScheduling_() void EventManager::Manager::stopScheduling_()
@ -105,8 +110,11 @@
throw std::runtime_error("can not stop scheduling thread"); throw std::runtime_error("can not stop scheduling thread");
} }
schedulingThread_->join(); if (schedulingThread_->joinable())
} {
schedulingThread_->join();
}
}
void EventManager::Manager::start() void EventManager::Manager::start()
{ {
@ -188,15 +196,17 @@
while(!stopSchedulingThread_) while(!stopSchedulingThread_)
{ {
mutexSchedulingParticipants_.lock(); mutexSchedulingParticipants_.lock();
if (!schedulingParticipants_.empty()) if( !schedulingParticipants_.empty() )
{ {
for (auto it = schedulingParticipants_.begin(); it != schedulingParticipants_.end(); ++it) for( auto it = schedulingParticipants_.begin(); it != schedulingParticipants_.end(); ++it )
{ {
(*it)->schedule(); (*it)->schedule();
} }
} }
mutexSchedulingParticipants_.unlock(); mutexSchedulingParticipants_.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
processCommands_();
std::this_thread::sleep_for( std::chrono::milliseconds(100) );
} }
isSchedulingThreadRunning_=false; isSchedulingThreadRunning_=false;
@ -206,6 +216,7 @@
void EventManager::Manager::subscribe(std::uint32_t type, std::shared_ptr<EventManager::Participant> participant) void EventManager::Manager::subscribe(std::uint32_t type, std::shared_ptr<EventManager::Participant> participant)
{ {
std::lock_guard<std::mutex> lockGuard(mutexEventMap_);
// check if participant is already registered // check if participant is already registered
auto it = eventMap_.find(type); auto it = eventMap_.find(type);
@ -230,6 +241,7 @@
void EventManager::Manager::unsubscribe(std::uint32_t type, std::shared_ptr<EventManager::Participant> participant) void EventManager::Manager::unsubscribe(std::uint32_t type, std::shared_ptr<EventManager::Participant> participant)
{ {
std::lock_guard<std::mutex> lockGuard(mutexEventMap_);
auto it = eventMap_.find(type); auto it = eventMap_.find(type);
@ -283,6 +295,11 @@
{ {
bool isEmpty=true; bool isEmpty=true;
while(!commandQueue_.empty())
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
for (auto it = eventMap_.begin(); it != eventMap_.end(); ++it) for (auto it = eventMap_.begin(); it != eventMap_.end(); ++it)
{ {
if ( !(*it).second.empty()) if ( !(*it).second.empty())
@ -313,14 +330,116 @@
} }
void EventManager::Manager::schedule(std::shared_ptr<EventManager::Participant> participant) void EventManager::Manager::schedule(std::shared_ptr<EventManager::Participant> participant)
{ {
std::lock_guard<std::mutex> guard(mutexSchedulingParticipants_); std::lock_guard<std::mutex> guard( mutexCommandQueue_ );
commandQueue_.push( std::make_pair( EventManager::commandType::ENABLE_SCHEDULING, participant) );
}
auto it = std::find(schedulingParticipants_.begin(), schedulingParticipants_.end(), participant);
if (it == schedulingParticipants_.end()) void EventManager::Manager::unschedule(std::shared_ptr<EventManager::Participant> participant )
{ {
schedulingParticipants_.push_back(participant); std::lock_guard<std::mutex> guard( mutexCommandQueue_ );
} commandQueue_.push( std::make_pair( EventManager::commandType::DISABLE_SCHEDULING, participant) );
} }
void EventManager::Manager::processCommands_()
{
std::unique_lock<std::mutex> lk( mutexCommandQueue_ );
while( commandQueue_.empty() == false )
{
auto pair = commandQueue_.front();
commandQueue_.pop();
lk.unlock();
switch( pair.first )
{
case EventManager::commandType::CONNECT:
processConnect_( pair.second );
break;
case EventManager::commandType::DISCONNECT:
processDisconnect_( pair.second );
break;
case EventManager::commandType::ENABLE_SCHEDULING:
processEnableScheduling_( pair.second );
break;
case EventManager::commandType::DISABLE_SCHEDULING:
processDisableScheduling_( pair.second );
break;
}
lk.lock();
}
lk.unlock();
}
void EventManager::Manager::processConnect_( std::shared_ptr<EventManager::Participant> participant )
{
std::lock_guard<std::mutex> guard(mutexParticipants_);
auto it = std::find( participants_.begin(), participants_.end(), participant );
if( it == participants_.end() )
{
participant->setManager(shared_from_this());
participant->setID(nextParticipantID_);
// we can set and increment here because this critical section is secured by a mutex
nextParticipantID_++;
participants_.push_back(participant);
participant->init();
}
}
void EventManager::Manager::processDisconnect_( std::shared_ptr<EventManager::Participant> participant )
{
// before the participant gets disconnected it has to be unscheduled
processDisableScheduling_( participant );
// unsubscribe plugin from all events
unsubscribe(participant);
std::lock_guard<std::mutex> guard(mutexParticipants_);
auto it = std::find( participants_.begin(), participants_.end(), participant );
if( it != participants_.end() )
{
participant->setManager(nullptr);
participants_.erase( it );
}
}
void EventManager::Manager::processEnableScheduling_( std::shared_ptr<EventManager::Participant> participant )
{
std::lock_guard<std::mutex> guard(mutexSchedulingParticipants_);
auto it = std::find( schedulingParticipants_.begin(), schedulingParticipants_.end(), participant );
if( it == schedulingParticipants_.end() )
{
schedulingParticipants_.push_back( participant );
}
}
void EventManager::Manager::processDisableScheduling_( std::shared_ptr<EventManager::Participant> participant )
{
std::lock_guard<std::mutex> guard( mutexSchedulingParticipants_ );
auto it = std::find( schedulingParticipants_.begin(), schedulingParticipants_.end(), participant );
if( it != schedulingParticipants_.end() )
{
schedulingParticipants_.erase(it);
}
}
void EventManager::Manager::connect( std::shared_ptr<EventManager::Participant> participant )
{
std::lock_guard<std::mutex> guard( mutexCommandQueue_ );
commandQueue_.push( std::make_pair( EventManager::commandType::CONNECT, participant) );
}
void EventManager::Manager::disconnect( std::shared_ptr<EventManager::Participant> participant )
{
std::lock_guard<std::mutex> guard( mutexCommandQueue_ );
commandQueue_.push( std::make_pair( EventManager::commandType::DISCONNECT, participant) );
}

View File

@ -11,6 +11,9 @@
#include <EventManager/Participant.hpp> #include <EventManager/Participant.hpp>
#include <EventManager/Manager.hpp> #include <EventManager/Manager.hpp>
#include <iostream> #include <iostream>
#include <chrono>
using namespace std::chrono_literals;
EventManager::Participant::Participant() : manager_(nullptr), EventManager::Participant::Participant() : manager_(nullptr),
isScheduledByManager_(false), isQueueLocked_(false) isScheduledByManager_(false), isQueueLocked_(false)
@ -18,6 +21,35 @@ EventManager::Participant::Participant() : manager_(nullptr),
} }
void EventManager::Participant::connect(std::shared_ptr<EventManager::Participant> participant)
{
if (manager_ == nullptr)
{
throw std::runtime_error("no event manager set yet");
}
manager_->connect(participant);
}
void EventManager::Participant::disconnect(std::shared_ptr<EventManager::Participant> participant)
{
if (manager_ == nullptr)
{
throw std::runtime_error("no event manager set yet");
}
manager_->disconnect(participant);
}
void EventManager::Participant::disconnect()
{
if (manager_ == nullptr)
{
throw std::runtime_error("no event manager set yet");
}
manager_->disconnect(shared_from_this());
}
void EventManager::Participant::emit(std::shared_ptr<EventManager::Event> event) void EventManager::Participant::emit(std::shared_ptr<EventManager::Event> event)
{ {
{ {
@ -72,6 +104,20 @@ void EventManager::Participant::_waitForEvent()
isQueueLocked_=true; isQueueLocked_=true;
} }
bool EventManager::Participant::_waitForEvent(std::uint32_t timeoutMS)
{
std::unique_lock<std::mutex> lock(mutexEventQueue_);
if (newEventInQueue_.wait_for(lock,timeoutMS*1ms)==std::cv_status::no_timeout)
{
isQueueLocked_=true;
return true;
}
return false;
}
void EventManager::Participant::_enableScheduling() void EventManager::Participant::_enableScheduling()
{ {
if (manager_ == nullptr) if (manager_ == nullptr)
@ -82,6 +128,14 @@ void EventManager::Participant::_enableScheduling()
isScheduledByManager_=true; isScheduledByManager_=true;
} }
void EventManager::Participant::_disableScheduling() {
if (manager_ == nullptr) {
throw std::runtime_error("no event manager set yet");
}
manager_->unschedule(shared_from_this());
isScheduledByManager_ = false;
}
void EventManager::Participant::_subscribe(std::uint32_t type) void EventManager::Participant::_subscribe(std::uint32_t type)
{ {
if (manager_ == nullptr) if (manager_ == nullptr)

View File

@ -33,6 +33,11 @@ class myParticipant : public EventManager::Participant
_unsubscribe(); _unsubscribe();
} }
void init_() {
_subscribe(eventType_);
_enableScheduling();
}
void schedule_() { void schedule_() {
std::shared_ptr<EventManager::Event> event = nullptr; std::shared_ptr<EventManager::Event> event = nullptr;
@ -67,10 +72,7 @@ class myParticipant : public EventManager::Participant
bool eventReceived() const { return receivedEvent_;} bool eventReceived() const { return receivedEvent_;}
void init() {
_subscribe(eventType_);
_enableScheduling();
}
}; };
@ -85,14 +87,18 @@ SCENARIO("Basic Usage of EventManager", "[Manager]")
manager = std::make_shared<EventManager::Manager>(); manager = std::make_shared<EventManager::Manager>();
}()); }());
REQUIRE_NOTHROW([&]()
{
manager->start();
}());
REQUIRE(manager->empty() == true); REQUIRE(manager->empty() == true);
std::shared_ptr<myParticipant> participant0; std::shared_ptr<myParticipant> participant0;
REQUIRE_NOTHROW([&]() REQUIRE_NOTHROW([&]()
{ {
participant0 = std::make_shared<myParticipant>(0,TEST_EVENT0); participant0 = std::make_shared<myParticipant>(0,TEST_EVENT0);
participant0->setManager(manager); manager->connect(participant0);
participant0->init();
}()); }());
REQUIRE(manager->empty() == false); REQUIRE(manager->empty() == false);
@ -101,17 +107,11 @@ SCENARIO("Basic Usage of EventManager", "[Manager]")
REQUIRE_NOTHROW([&]() REQUIRE_NOTHROW([&]()
{ {
participant1 = std::make_shared<myParticipant>(1,TEST_EVENT1); participant1 = std::make_shared<myParticipant>(1,TEST_EVENT1);
participant1->setManager(manager); manager->connect(participant1);
participant1->init();
}()); }());
REQUIRE(manager->empty() == false); REQUIRE(manager->empty() == false);
REQUIRE_NOTHROW([&]()
{
manager->start();
}());
REQUIRE(manager->isRunning() == true); REQUIRE(manager->isRunning() == true);
WHEN("emitting shutdown event") WHEN("emitting shutdown event")