Adaption to command queue instead of single use queues. #3

Merged
byterazor merged 2 commits from csander/EventManagementSystem:main into main 2022-09-22 20:33:57 +02:00
2 changed files with 142 additions and 88 deletions
Showing only changes of commit 3dc92bde5f - Show all commits

View File

@ -25,6 +25,21 @@
namespace EventManager
{
/**
* @brief command types
*/
enum class commandType :uint16_t
{
// command to connect a participant to the manager
CONNECT,
// command to disconnect a participant from the manager
DISCONNECT,
// command to schedule start scheduling a participant
ENABLE_SCHEDULING,
// command to disable scheduling for a participant (not schedule it anymore)
DISABLE_SCHEDULING
}; // commandType
// forward declaration of EventManager::Participant
class Participant;
@ -95,17 +110,11 @@
/// the id for the next participant connecting
std::uint32_t nextParticipantID_;
/// queue for connection request so connecting is done in manager context
std::queue<std::shared_ptr<EventManager::Participant>> connectionQueue_;
/// queue for different command requests like e.g. connecting a participant to the manager
std::queue<std::pair<EventManager::commandType,std::shared_ptr<EventManager::Participant>>> commandQueue_;
/// mutex to protect connectionQueue_
std::mutex mutexConnectionQueue_;
/// queue for disable scheduling request so the participants are not scheduled by the manager any longer
std::queue<std::shared_ptr<EventManager::Participant>> disableScheduleQueue_;
/// mutex to protect disableScheduleQueue_
std::mutex mutexDisableScheduleQueue_;
/// mutex to protect the commandQueue_
std::mutex mutexCommandQueue_;
/*
* all private methods
@ -126,6 +135,11 @@
*/
void processEvent(const std::shared_ptr<EventManager::Event> event);
/**
* @brief processes the commands in the commandQueue depending on their type
*/
void processCommands_();
/**
* @brief adds the queued participants to the list of connected participants
*
@ -134,7 +148,24 @@
* This class function adds the queued participants to the list participants_
* and removes them from the queue.
*/
void processConnections_();
void processConnect_( std::shared_ptr<EventManager::Participant> participant );
/**
* @brief removes the given participant form the list of connected participants
*
* The participants_ list contains the participants that are connected to
* the manager. This class function removes the queued participants from the list
* participants_.
*/
void processDisconnect_( std::shared_ptr<EventManager::Participant> participant );
/**
* @brief processes the command to enable scheduling for the given participant
*
* This class function adds the given participant to the schedulingParticipants_ list
* where the participants are scheduled by calling their schedule class function.
*/
void processEnableScheduling_( std::shared_ptr<EventManager::Participant> participant );
/**
* @brief disables the scheduling of the requested participants
@ -142,7 +173,7 @@
* Removes the participants from the schedulingParticipants_ list that
* should not be scheduled anymore.
*/
void processDisableScheduling_();
void processDisableScheduling_( std::shared_ptr<EventManager::Participant> participant );
/**
* @brief start the main thread

View File

@ -190,18 +190,17 @@
while(!stopSchedulingThread_)
{
mutexSchedulingParticipants_.lock();
if (!schedulingParticipants_.empty())
if( !schedulingParticipants_.empty() )
{
for (auto it = schedulingParticipants_.begin(); it != schedulingParticipants_.end(); ++it)
for( auto it = schedulingParticipants_.begin(); it != schedulingParticipants_.end(); ++it )
{
(*it)->schedule();
}
}
mutexSchedulingParticipants_.unlock();
processConnections_();
processDisableScheduling_();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
processCommands_();
std::this_thread::sleep_for( std::chrono::milliseconds(100) );
}
isSchedulingThreadRunning_=false;
@ -320,89 +319,113 @@
}
void EventManager::Manager::schedule(std::shared_ptr<EventManager::Participant> participant)
{
std::lock_guard<std::mutex> guard(mutexSchedulingParticipants_);
auto it = std::find(schedulingParticipants_.begin(), schedulingParticipants_.end(), participant);
if (it == schedulingParticipants_.end())
{
schedulingParticipants_.push_back(participant);
}
}
void EventManager::Manager::schedule(std::shared_ptr<EventManager::Participant> participant)
{
std::lock_guard<std::mutex> guard( mutexCommandQueue_ );
commandQueue_.push( std::make_pair( EventManager::commandType::ENABLE_SCHEDULING, participant) );
}
void EventManager::Manager::unschedule(std::shared_ptr<EventManager::Participant> participant )
{
std::lock_guard<std::mutex> guard( mutexDisableScheduleQueue_ );
disableScheduleQueue_.push( participant );
}
void EventManager::Manager::unschedule(std::shared_ptr<EventManager::Participant> participant )
{
std::lock_guard<std::mutex> guard( mutexCommandQueue_ );
commandQueue_.push( std::make_pair( EventManager::commandType::DISABLE_SCHEDULING, participant) );
}
void EventManager::Manager::processConnections_()
{
std::lock_guard<std::mutex> guard(mutexParticipants_);
std::lock_guard<std::mutex> lockGuard(mutexConnectionQueue_);
while(!connectionQueue_.empty())
{
std::shared_ptr<EventManager::Participant> participant = connectionQueue_.front();
connectionQueue_.pop();
participants_.push_back(participant);
participant->init();
}
}
void EventManager::Manager::processCommands_()
{
std::unique_lock<std::mutex> lk( mutexCommandQueue_ );
while( commandQueue_.empty() == false )
{
auto pair = commandQueue_.front();
commandQueue_.pop();
lk.unlock();
switch( pair.first )
{
case EventManager::commandType::CONNECT:
processConnect_( pair.second );
break;
case EventManager::commandType::DISCONNECT:
processDisconnect_( pair.second );
break;
case EventManager::commandType::ENABLE_SCHEDULING:
processEnableScheduling_( pair.second );
break;
case EventManager::commandType::DISABLE_SCHEDULING:
processDisableScheduling_( pair.second );
break;
}
lk.lock();
}
lk.unlock();
}
void EventManager::Manager::processDisableScheduling_()
{
std::lock_guard<std::mutex> guard( mutexDisableScheduleQueue_ );
while( disableScheduleQueue_.empty() != true )
{
std::shared_ptr<EventManager::Participant> participant = disableScheduleQueue_.front();
disableScheduleQueue_.pop();
std::unique_lock<std::mutex> lk(mutexSchedulingParticipants_);
auto it = std::find(schedulingParticipants_.begin(), schedulingParticipants_.end(), participant);
if (it != schedulingParticipants_.end())
{
schedulingParticipants_.erase(it);
}
lk.unlock();
}
}
void EventManager::Manager::processConnect_( std::shared_ptr<EventManager::Participant> participant )
{
std::lock_guard<std::mutex> guard(mutexParticipants_);
auto it = std::find( participants_.begin(), participants_.end(), participant );
if( it == participants_.end() )
{
participant->setManager(shared_from_this());
participant->setID(nextParticipantID_);
// we can set and increment here because this critical section is secured by a mutex
nextParticipantID_++;
participants_.push_back(participant);
participant->init();
}
}
void EventManager::Manager::connect(std::shared_ptr<EventManager::Participant> participant)
{
std::lock_guard<std::mutex> guard(mutexParticipants_);
participant->setManager(shared_from_this());
void EventManager::Manager::processDisconnect_( std::shared_ptr<EventManager::Participant> participant )
{
// before the participant gets disconnected it has to be unscheduled
processDisableScheduling_( participant );
// we can set and increment here because only one participant is in this
// critical section in any moment
participant->setID(nextParticipantID_);
nextParticipantID_++;
std::lock_guard<std::mutex> guard(mutexParticipants_);
auto it = std::find( participants_.begin(), participants_.end(), participant );
if( it != participants_.end() )
{
participant->setManager(nullptr);
participants_.erase( it );
}
}
connectionQueue_.push(participant);
}
void EventManager::Manager::processEnableScheduling_( std::shared_ptr<EventManager::Participant> participant )
{
std::lock_guard<std::mutex> guard(mutexSchedulingParticipants_);
auto it = std::find( schedulingParticipants_.begin(), schedulingParticipants_.end(), participant );
void EventManager::Manager::disconnect(std::shared_ptr<EventManager::Participant> participant)
{
disconnect(participant);
if( it == schedulingParticipants_.end() )
{
schedulingParticipants_.push_back( participant );
}
}
std::lock_guard<std::mutex> guard(mutexParticipants_);
std::list<std::shared_ptr<EventManager::Participant>>::iterator it;
it = std::find(participants_.begin(), participants_.end(),participant);
void EventManager::Manager::processDisableScheduling_( std::shared_ptr<EventManager::Participant> participant )
{
std::lock_guard<std::mutex> guard( mutexSchedulingParticipants_ );
auto it = std::find( schedulingParticipants_.begin(), schedulingParticipants_.end(), participant );
if (it != participants_.end())
{
participants_.erase(it);
}
if( it != schedulingParticipants_.end() )
{
schedulingParticipants_.erase(it);
}
}
participant->setManager(nullptr);
}
void EventManager::Manager::connect( std::shared_ptr<EventManager::Participant> participant )
{
std::lock_guard<std::mutex> guard( mutexCommandQueue_ );
commandQueue_.push( std::make_pair( EventManager::commandType::CONNECT, participant) );
}
void EventManager::Manager::disconnect( std::shared_ptr<EventManager::Participant> participant )
{
std::lock_guard<std::mutex> guard( mutexCommandQueue_ );
commandQueue_.push( std::make_pair( EventManager::commandType::DISCONNECT, participant) );
}