Adaption to command queue instead of single use queues. #3

Merged
byterazor merged 2 commits from csander/EventManagementSystem:main into main 2022-09-22 20:33:57 +02:00
4 changed files with 175 additions and 94 deletions

View File

@ -22,7 +22,13 @@
/// Eventtype to notify all participants that a shutdown is immanent /// Eventtype to notify all participants that a shutdown is immanent
const static std::uint32_t EVENT_TYPE_SHUTDOWN = 0; const static std::uint32_t EVENT_TYPE_SHUTDOWN = 0;
/**
* @class Event
* @brief An Event is the element in the system that triggers actions from participants
*
* Derive own events from this class to sned e.g. also a payload to subscribing
* participants.
*/
class Event class Event
{ {
private: private:
@ -45,17 +51,17 @@
/** /**
* @brief constructor for creating a simple event * @brief constructor for creating a simple event
* *
* @param type - what kinf of event is this * @param type - what kind of event is this
*/ */
Event(std::uint32_t type); Event(std::uint32_t type);
/** /**
* @brief Constructor to create a response Event * @brief Constructor to create a response event
*/ */
Event(std::uint32_t type, const EventManager::Event &event); Event(std::uint32_t type, const EventManager::Event &event);
/** /**
* @brief Constructor to create a response Event * @brief Constructor to create a response event
*/ */
Event(std::uint32_t type, const std::shared_ptr<EventManager::Event> event); Event(std::uint32_t type, const std::shared_ptr<EventManager::Event> event);

View File

@ -25,9 +25,41 @@
namespace EventManager namespace EventManager
{ {
/**
* @brief command types
*/
enum class commandType :uint16_t
{
// command to connect a participant to the manager
CONNECT,
// command to disconnect a participant from the manager
DISCONNECT,
// command to schedule start scheduling a participant
ENABLE_SCHEDULING,
// command to disable scheduling for a participant (not schedule it anymore)
DISABLE_SCHEDULING
}; // commandType
// forward declaration of EventManager::Participant // forward declaration of EventManager::Participant
class Participant; class Participant;
/**
* @class Manager
*
* If you use the manager it has to be a shared pointer. Otherwise you will
* get a mem error.
*
* To add participants to the manager call class function connect.
* Calling start method will start the manager.
*
* Depending on your concept you can first connect all participants to
* the manager and then start it. Or you can start in first and then
* connect the participants. In the first example all participant will
* be started at the same time (when calling start from the manager.)
* In the second example they will be started when they are connected.
* That means if you have one starting event that all participants need
* to receive you would choose example 1.
*/
class Manager : public std::enable_shared_from_this<Manager> class Manager : public std::enable_shared_from_this<Manager>
{ {
/// the thread the event manager is transmitting events in /// the thread the event manager is transmitting events in
@ -66,7 +98,7 @@
/// list of all plugins requiring scheduling /// list of all plugins requiring scheduling
std::list<std::shared_ptr<EventManager::Participant>> schedulingParticipants_; std::list<std::shared_ptr<EventManager::Participant>> schedulingParticipants_;
/// mutex to protect schedulingPlugins_ /// mutex to protect list schedulingParticipants_
std::mutex mutexSchedulingParticipants_; std::mutex mutexSchedulingParticipants_;
/// list of all participants connected /// list of all participants connected
@ -78,17 +110,11 @@
/// the id for the next participant connecting /// the id for the next participant connecting
std::uint32_t nextParticipantID_; std::uint32_t nextParticipantID_;
/// queue for connection request so connecting is done in manager context /// queue for different command requests like e.g. connecting a participant to the manager
std::queue<std::shared_ptr<EventManager::Participant>> connectionQueue_; std::queue<std::pair<EventManager::commandType,std::shared_ptr<EventManager::Participant>>> commandQueue_;
/// mutex to protect connectionQueue_ /// mutex to protect the commandQueue_
std::mutex mutexConnectionQueue_; std::mutex mutexCommandQueue_;
/// queue for disable scheduling request so the participants are not scheduled by the manager any longer
std::queue<std::shared_ptr<EventManager::Participant>> disableScheduleQueue_;
/// mutex to protect disableScheduleQueue_
std::mutex mutexDisableScheduleQueue_;
/* /*
* all private methods * all private methods
@ -109,6 +135,11 @@
*/ */
void processEvent(const std::shared_ptr<EventManager::Event> event); void processEvent(const std::shared_ptr<EventManager::Event> event);
/**
* @brief processes the commands in the commandQueue depending on their type
*/
void processCommands_();
/** /**
* @brief adds the queued participants to the list of connected participants * @brief adds the queued participants to the list of connected participants
* *
@ -117,7 +148,24 @@
* This class function adds the queued participants to the list participants_ * This class function adds the queued participants to the list participants_
* and removes them from the queue. * and removes them from the queue.
*/ */
void processConnections_(); void processConnect_( std::shared_ptr<EventManager::Participant> participant );
/**
* @brief removes the given participant form the list of connected participants
*
* The participants_ list contains the participants that are connected to
* the manager. This class function removes the queued participants from the list
* participants_.
*/
void processDisconnect_( std::shared_ptr<EventManager::Participant> participant );
/**
* @brief processes the command to enable scheduling for the given participant
*
* This class function adds the given participant to the schedulingParticipants_ list
* where the participants are scheduled by calling their schedule class function.
*/
void processEnableScheduling_( std::shared_ptr<EventManager::Participant> participant );
/** /**
* @brief disables the scheduling of the requested participants * @brief disables the scheduling of the requested participants
@ -125,7 +173,7 @@
* Removes the participants from the schedulingParticipants_ list that * Removes the participants from the schedulingParticipants_ list that
* should not be scheduled anymore. * should not be scheduled anymore.
*/ */
void processDisableScheduling_(); void processDisableScheduling_( std::shared_ptr<EventManager::Participant> participant );
/** /**
* @brief start the main thread * @brief start the main thread
@ -152,7 +200,7 @@
/** /**
* @brief The constructor for the event manager * @brief The constructor for the event manager
* *
* Just initializes all attributes to its starting value * Just initializes all attributes to their starting values
*/ */
Manager() : mainThread_(nullptr), isMainThreadRunning_(false), Manager() : mainThread_(nullptr), isMainThreadRunning_(false),
stopMainThread_(false), schedulingThread_(nullptr), stopMainThread_(false), schedulingThread_(nullptr),
@ -161,6 +209,7 @@
~Manager(); ~Manager();
/** /**
* @brief start the event manager * @brief start the event manager
*/ */

View File

@ -27,6 +27,9 @@
/** /**
* @brief The entity participating in the event system. * @brief The entity participating in the event system.
*
* If you want the participant to be scheduled from the manager call
* _enableScheduling class function.
*/ */
class Participant : public std::enable_shared_from_this<Participant> class Participant : public std::enable_shared_from_this<Participant>
{ {

View File

@ -190,18 +190,17 @@
while(!stopSchedulingThread_) while(!stopSchedulingThread_)
{ {
mutexSchedulingParticipants_.lock(); mutexSchedulingParticipants_.lock();
if (!schedulingParticipants_.empty()) if( !schedulingParticipants_.empty() )
{ {
for (auto it = schedulingParticipants_.begin(); it != schedulingParticipants_.end(); ++it) for( auto it = schedulingParticipants_.begin(); it != schedulingParticipants_.end(); ++it )
{ {
(*it)->schedule(); (*it)->schedule();
} }
} }
mutexSchedulingParticipants_.unlock(); mutexSchedulingParticipants_.unlock();
processConnections_(); processCommands_();
processDisableScheduling_(); std::this_thread::sleep_for( std::chrono::milliseconds(100) );
std::this_thread::sleep_for(std::chrono::milliseconds(100));
} }
isSchedulingThreadRunning_=false; isSchedulingThreadRunning_=false;
@ -320,89 +319,113 @@
} }
void EventManager::Manager::schedule(std::shared_ptr<EventManager::Participant> participant) void EventManager::Manager::schedule(std::shared_ptr<EventManager::Participant> participant)
{ {
std::lock_guard<std::mutex> guard(mutexSchedulingParticipants_); std::lock_guard<std::mutex> guard( mutexCommandQueue_ );
commandQueue_.push( std::make_pair( EventManager::commandType::ENABLE_SCHEDULING, participant) );
auto it = std::find(schedulingParticipants_.begin(), schedulingParticipants_.end(), participant); }
if (it == schedulingParticipants_.end())
{
schedulingParticipants_.push_back(participant);
}
}
void EventManager::Manager::unschedule(std::shared_ptr<EventManager::Participant> participant ) void EventManager::Manager::unschedule(std::shared_ptr<EventManager::Participant> participant )
{ {
std::lock_guard<std::mutex> guard( mutexDisableScheduleQueue_ ); std::lock_guard<std::mutex> guard( mutexCommandQueue_ );
disableScheduleQueue_.push( participant ); commandQueue_.push( std::make_pair( EventManager::commandType::DISABLE_SCHEDULING, participant) );
} }
void EventManager::Manager::processConnections_() void EventManager::Manager::processCommands_()
{ {
std::lock_guard<std::mutex> guard(mutexParticipants_); std::unique_lock<std::mutex> lk( mutexCommandQueue_ );
std::lock_guard<std::mutex> lockGuard(mutexConnectionQueue_); while( commandQueue_.empty() == false )
{
while(!connectionQueue_.empty()) auto pair = commandQueue_.front();
{ commandQueue_.pop();
std::shared_ptr<EventManager::Participant> participant = connectionQueue_.front(); lk.unlock();
connectionQueue_.pop(); switch( pair.first )
participants_.push_back(participant); {
case EventManager::commandType::CONNECT:
participant->init(); processConnect_( pair.second );
} break;
} case EventManager::commandType::DISCONNECT:
processDisconnect_( pair.second );
break;
case EventManager::commandType::ENABLE_SCHEDULING:
processEnableScheduling_( pair.second );
break;
case EventManager::commandType::DISABLE_SCHEDULING:
processDisableScheduling_( pair.second );
break;
}
lk.lock();
}
lk.unlock();
}
void EventManager::Manager::processDisableScheduling_() void EventManager::Manager::processConnect_( std::shared_ptr<EventManager::Participant> participant )
{ {
std::lock_guard<std::mutex> guard( mutexDisableScheduleQueue_ ); std::lock_guard<std::mutex> guard(mutexParticipants_);
while( disableScheduleQueue_.empty() != true ) auto it = std::find( participants_.begin(), participants_.end(), participant );
{ if( it == participants_.end() )
std::shared_ptr<EventManager::Participant> participant = disableScheduleQueue_.front(); {
disableScheduleQueue_.pop(); participant->setManager(shared_from_this());
participant->setID(nextParticipantID_);
std::unique_lock<std::mutex> lk(mutexSchedulingParticipants_); // we can set and increment here because this critical section is secured by a mutex
auto it = std::find(schedulingParticipants_.begin(), schedulingParticipants_.end(), participant); nextParticipantID_++;
participants_.push_back(participant);
if (it != schedulingParticipants_.end()) participant->init();
{ }
schedulingParticipants_.erase(it); }
}
lk.unlock();
}
}
void EventManager::Manager::connect(std::shared_ptr<EventManager::Participant> participant) void EventManager::Manager::processDisconnect_( std::shared_ptr<EventManager::Participant> participant )
{ {
std::lock_guard<std::mutex> guard(mutexParticipants_); // before the participant gets disconnected it has to be unscheduled
participant->setManager(shared_from_this()); processDisableScheduling_( participant );
// we can set and increment here because only one participant is in this std::lock_guard<std::mutex> guard(mutexParticipants_);
// critical section in any moment auto it = std::find( participants_.begin(), participants_.end(), participant );
participant->setID(nextParticipantID_); if( it != participants_.end() )
nextParticipantID_++; {
participant->setManager(nullptr);
participants_.erase( it );
}
}
connectionQueue_.push(participant);
} void EventManager::Manager::processEnableScheduling_( std::shared_ptr<EventManager::Participant> participant )
{
std::lock_guard<std::mutex> guard(mutexSchedulingParticipants_);
auto it = std::find( schedulingParticipants_.begin(), schedulingParticipants_.end(), participant );
void EventManager::Manager::disconnect(std::shared_ptr<EventManager::Participant> participant) if( it == schedulingParticipants_.end() )
{ {
disconnect(participant); schedulingParticipants_.push_back( participant );
}
std::lock_guard<std::mutex> guard(mutexParticipants_); }
std::list<std::shared_ptr<EventManager::Participant>>::iterator it;
it = std::find(participants_.begin(), participants_.end(),participant);
if (it != participants_.end()) void EventManager::Manager::processDisableScheduling_( std::shared_ptr<EventManager::Participant> participant )
{ {
participants_.erase(it); std::lock_guard<std::mutex> guard( mutexSchedulingParticipants_ );
} auto it = std::find( schedulingParticipants_.begin(), schedulingParticipants_.end(), participant );
participant->setManager(nullptr); if( it != schedulingParticipants_.end() )
} {
schedulingParticipants_.erase(it);
}
}
void EventManager::Manager::connect( std::shared_ptr<EventManager::Participant> participant )
{
std::lock_guard<std::mutex> guard( mutexCommandQueue_ );
commandQueue_.push( std::make_pair( EventManager::commandType::CONNECT, participant) );
}
void EventManager::Manager::disconnect( std::shared_ptr<EventManager::Participant> participant )
{
std::lock_guard<std::mutex> guard( mutexCommandQueue_ );
commandQueue_.push( std::make_pair( EventManager::commandType::DISCONNECT, participant) );
}