From 3dc92bde5fba6c61bbf76ba128e1283ebb3207e0 Mon Sep 17 00:00:00 2001 From: Christina Sander Date: Thu, 22 Sep 2022 18:06:51 +0200 Subject: [PATCH] MOD: Replaces the queues connectionQueue_ and disableSchedulingQueue with a command queue where different commands can be queued for later processing. --- include/EventManager/Manager.hpp | 55 +++++++--- src/EventManager/Manager.cpp | 175 +++++++++++++++++-------------- 2 files changed, 142 insertions(+), 88 deletions(-) diff --git a/include/EventManager/Manager.hpp b/include/EventManager/Manager.hpp index b059344..06b9d58 100644 --- a/include/EventManager/Manager.hpp +++ b/include/EventManager/Manager.hpp @@ -25,6 +25,21 @@ namespace EventManager { + /** + * @brief command types + */ + enum class commandType :uint16_t + { + // command to connect a participant to the manager + CONNECT, + // command to disconnect a participant from the manager + DISCONNECT, + // command to schedule start scheduling a participant + ENABLE_SCHEDULING, + // command to disable scheduling for a participant (not schedule it anymore) + DISABLE_SCHEDULING + }; // commandType + // forward declaration of EventManager::Participant class Participant; @@ -95,17 +110,11 @@ /// the id for the next participant connecting std::uint32_t nextParticipantID_; - /// queue for connection request so connecting is done in manager context - std::queue> connectionQueue_; + /// queue for different command requests like e.g. connecting a participant to the manager + std::queue>> commandQueue_; - /// mutex to protect connectionQueue_ - std::mutex mutexConnectionQueue_; - - /// queue for disable scheduling request so the participants are not scheduled by the manager any longer - std::queue> disableScheduleQueue_; - - /// mutex to protect disableScheduleQueue_ - std::mutex mutexDisableScheduleQueue_; + /// mutex to protect the commandQueue_ + std::mutex mutexCommandQueue_; /* * all private methods @@ -126,6 +135,11 @@ */ void processEvent(const std::shared_ptr event); + /** + * @brief processes the commands in the commandQueue depending on their type + */ + void processCommands_(); + /** * @brief adds the queued participants to the list of connected participants * @@ -134,7 +148,24 @@ * This class function adds the queued participants to the list participants_ * and removes them from the queue. */ - void processConnections_(); + void processConnect_( std::shared_ptr participant ); + + /** + * @brief removes the given participant form the list of connected participants + * + * The participants_ list contains the participants that are connected to + * the manager. This class function removes the queued participants from the list + * participants_. + */ + void processDisconnect_( std::shared_ptr participant ); + + /** + * @brief processes the command to enable scheduling for the given participant + * + * This class function adds the given participant to the schedulingParticipants_ list + * where the participants are scheduled by calling their schedule class function. + */ + void processEnableScheduling_( std::shared_ptr participant ); /** * @brief disables the scheduling of the requested participants @@ -142,7 +173,7 @@ * Removes the participants from the schedulingParticipants_ list that * should not be scheduled anymore. */ - void processDisableScheduling_(); + void processDisableScheduling_( std::shared_ptr participant ); /** * @brief start the main thread diff --git a/src/EventManager/Manager.cpp b/src/EventManager/Manager.cpp index 1499951..b85d051 100644 --- a/src/EventManager/Manager.cpp +++ b/src/EventManager/Manager.cpp @@ -190,18 +190,17 @@ while(!stopSchedulingThread_) { mutexSchedulingParticipants_.lock(); - if (!schedulingParticipants_.empty()) + if( !schedulingParticipants_.empty() ) { - for (auto it = schedulingParticipants_.begin(); it != schedulingParticipants_.end(); ++it) + for( auto it = schedulingParticipants_.begin(); it != schedulingParticipants_.end(); ++it ) { (*it)->schedule(); } } mutexSchedulingParticipants_.unlock(); - processConnections_(); - processDisableScheduling_(); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + processCommands_(); + std::this_thread::sleep_for( std::chrono::milliseconds(100) ); } isSchedulingThreadRunning_=false; @@ -320,89 +319,113 @@ } - void EventManager::Manager::schedule(std::shared_ptr participant) - { - std::lock_guard guard(mutexSchedulingParticipants_); - - auto it = std::find(schedulingParticipants_.begin(), schedulingParticipants_.end(), participant); - - if (it == schedulingParticipants_.end()) - { - schedulingParticipants_.push_back(participant); - } - } +void EventManager::Manager::schedule(std::shared_ptr participant) +{ + std::lock_guard guard( mutexCommandQueue_ ); + commandQueue_.push( std::make_pair( EventManager::commandType::ENABLE_SCHEDULING, participant) ); +} - void EventManager::Manager::unschedule(std::shared_ptr participant ) - { - std::lock_guard guard( mutexDisableScheduleQueue_ ); - disableScheduleQueue_.push( participant ); - } +void EventManager::Manager::unschedule(std::shared_ptr participant ) +{ + std::lock_guard guard( mutexCommandQueue_ ); + commandQueue_.push( std::make_pair( EventManager::commandType::DISABLE_SCHEDULING, participant) ); +} - void EventManager::Manager::processConnections_() - { - std::lock_guard guard(mutexParticipants_); - std::lock_guard lockGuard(mutexConnectionQueue_); - - while(!connectionQueue_.empty()) - { - std::shared_ptr participant = connectionQueue_.front(); - connectionQueue_.pop(); - participants_.push_back(participant); - - participant->init(); - } - } +void EventManager::Manager::processCommands_() +{ + std::unique_lock lk( mutexCommandQueue_ ); + while( commandQueue_.empty() == false ) + { + auto pair = commandQueue_.front(); + commandQueue_.pop(); + lk.unlock(); + switch( pair.first ) + { + case EventManager::commandType::CONNECT: + processConnect_( pair.second ); + break; + case EventManager::commandType::DISCONNECT: + processDisconnect_( pair.second ); + break; + case EventManager::commandType::ENABLE_SCHEDULING: + processEnableScheduling_( pair.second ); + break; + case EventManager::commandType::DISABLE_SCHEDULING: + processDisableScheduling_( pair.second ); + break; + } + lk.lock(); + } + lk.unlock(); +} - void EventManager::Manager::processDisableScheduling_() - { - std::lock_guard guard( mutexDisableScheduleQueue_ ); - while( disableScheduleQueue_.empty() != true ) - { - std::shared_ptr participant = disableScheduleQueue_.front(); - disableScheduleQueue_.pop(); - - std::unique_lock lk(mutexSchedulingParticipants_); - auto it = std::find(schedulingParticipants_.begin(), schedulingParticipants_.end(), participant); - - if (it != schedulingParticipants_.end()) - { - schedulingParticipants_.erase(it); - } - lk.unlock(); - } - } +void EventManager::Manager::processConnect_( std::shared_ptr participant ) +{ + std::lock_guard guard(mutexParticipants_); + auto it = std::find( participants_.begin(), participants_.end(), participant ); + if( it == participants_.end() ) + { + participant->setManager(shared_from_this()); + participant->setID(nextParticipantID_); + // we can set and increment here because this critical section is secured by a mutex + nextParticipantID_++; + participants_.push_back(participant); + participant->init(); + } +} - void EventManager::Manager::connect(std::shared_ptr participant) - { - std::lock_guard guard(mutexParticipants_); - participant->setManager(shared_from_this()); +void EventManager::Manager::processDisconnect_( std::shared_ptr participant ) +{ + // before the participant gets disconnected it has to be unscheduled + processDisableScheduling_( participant ); - // we can set and increment here because only one participant is in this - // critical section in any moment - participant->setID(nextParticipantID_); - nextParticipantID_++; + std::lock_guard guard(mutexParticipants_); + auto it = std::find( participants_.begin(), participants_.end(), participant ); + if( it != participants_.end() ) + { + participant->setManager(nullptr); + participants_.erase( it ); + } +} - connectionQueue_.push(participant); - } +void EventManager::Manager::processEnableScheduling_( std::shared_ptr participant ) +{ + std::lock_guard guard(mutexSchedulingParticipants_); + auto it = std::find( schedulingParticipants_.begin(), schedulingParticipants_.end(), participant ); - void EventManager::Manager::disconnect(std::shared_ptr participant) - { - disconnect(participant); - - std::lock_guard guard(mutexParticipants_); - std::list>::iterator it; + if( it == schedulingParticipants_.end() ) + { + schedulingParticipants_.push_back( participant ); + } +} - it = std::find(participants_.begin(), participants_.end(),participant); - if (it != participants_.end()) - { - participants_.erase(it); - } +void EventManager::Manager::processDisableScheduling_( std::shared_ptr participant ) +{ + std::lock_guard guard( mutexSchedulingParticipants_ ); + auto it = std::find( schedulingParticipants_.begin(), schedulingParticipants_.end(), participant ); - participant->setManager(nullptr); - } + if( it != schedulingParticipants_.end() ) + { + schedulingParticipants_.erase(it); + } +} + + +void EventManager::Manager::connect( std::shared_ptr participant ) +{ + std::lock_guard guard( mutexCommandQueue_ ); + commandQueue_.push( std::make_pair( EventManager::commandType::CONNECT, participant) ); +} + + +void EventManager::Manager::disconnect( std::shared_ptr participant ) +{ + std::lock_guard guard( mutexCommandQueue_ ); + commandQueue_.push( std::make_pair( EventManager::commandType::DISCONNECT, participant) ); +}