Adaption to command queue instead of single use queues. #3
@ -22,7 +22,13 @@
|
||||
|
||||
/// Eventtype to notify all participants that a shutdown is immanent
|
||||
const static std::uint32_t EVENT_TYPE_SHUTDOWN = 0;
|
||||
|
||||
/**
|
||||
* @class Event
|
||||
* @brief An Event is the element in the system that triggers actions from participants
|
||||
*
|
||||
* Derive own events from this class to sned e.g. also a payload to subscribing
|
||||
* participants.
|
||||
*/
|
||||
class Event
|
||||
{
|
||||
private:
|
||||
@ -45,17 +51,17 @@
|
||||
/**
|
||||
* @brief constructor for creating a simple event
|
||||
*
|
||||
* @param type - what kinf of event is this
|
||||
* @param type - what kind of event is this
|
||||
*/
|
||||
Event(std::uint32_t type);
|
||||
|
||||
/**
|
||||
* @brief Constructor to create a response Event
|
||||
* @brief Constructor to create a response event
|
||||
*/
|
||||
Event(std::uint32_t type, const EventManager::Event &event);
|
||||
|
||||
/**
|
||||
* @brief Constructor to create a response Event
|
||||
* @brief Constructor to create a response event
|
||||
*/
|
||||
Event(std::uint32_t type, const std::shared_ptr<EventManager::Event> event);
|
||||
|
||||
|
@ -25,9 +25,41 @@
|
||||
namespace EventManager
|
||||
{
|
||||
|
||||
/**
|
||||
* @brief command types
|
||||
*/
|
||||
enum class commandType :uint16_t
|
||||
{
|
||||
// command to connect a participant to the manager
|
||||
CONNECT,
|
||||
// command to disconnect a participant from the manager
|
||||
DISCONNECT,
|
||||
// command to schedule start scheduling a participant
|
||||
ENABLE_SCHEDULING,
|
||||
// command to disable scheduling for a participant (not schedule it anymore)
|
||||
DISABLE_SCHEDULING
|
||||
}; // commandType
|
||||
|
||||
// forward declaration of EventManager::Participant
|
||||
class Participant;
|
||||
|
||||
/**
|
||||
* @class Manager
|
||||
*
|
||||
* If you use the manager it has to be a shared pointer. Otherwise you will
|
||||
* get a mem error.
|
||||
*
|
||||
* To add participants to the manager call class function connect.
|
||||
* Calling start method will start the manager.
|
||||
*
|
||||
* Depending on your concept you can first connect all participants to
|
||||
* the manager and then start it. Or you can start in first and then
|
||||
* connect the participants. In the first example all participant will
|
||||
* be started at the same time (when calling start from the manager.)
|
||||
* In the second example they will be started when they are connected.
|
||||
* That means if you have one starting event that all participants need
|
||||
* to receive you would choose example 1.
|
||||
*/
|
||||
class Manager : public std::enable_shared_from_this<Manager>
|
||||
{
|
||||
/// the thread the event manager is transmitting events in
|
||||
@ -66,7 +98,7 @@
|
||||
/// list of all plugins requiring scheduling
|
||||
std::list<std::shared_ptr<EventManager::Participant>> schedulingParticipants_;
|
||||
|
||||
/// mutex to protect schedulingPlugins_
|
||||
/// mutex to protect list schedulingParticipants_
|
||||
std::mutex mutexSchedulingParticipants_;
|
||||
|
||||
/// list of all participants connected
|
||||
@ -78,17 +110,11 @@
|
||||
/// the id for the next participant connecting
|
||||
std::uint32_t nextParticipantID_;
|
||||
|
||||
/// queue for connection request so connecting is done in manager context
|
||||
std::queue<std::shared_ptr<EventManager::Participant>> connectionQueue_;
|
||||
/// queue for different command requests like e.g. connecting a participant to the manager
|
||||
std::queue<std::pair<EventManager::commandType,std::shared_ptr<EventManager::Participant>>> commandQueue_;
|
||||
|
||||
/// mutex to protect connectionQueue_
|
||||
std::mutex mutexConnectionQueue_;
|
||||
|
||||
/// queue for disable scheduling request so the participants are not scheduled by the manager any longer
|
||||
std::queue<std::shared_ptr<EventManager::Participant>> disableScheduleQueue_;
|
||||
|
||||
/// mutex to protect disableScheduleQueue_
|
||||
std::mutex mutexDisableScheduleQueue_;
|
||||
/// mutex to protect the commandQueue_
|
||||
std::mutex mutexCommandQueue_;
|
||||
|
||||
/*
|
||||
* all private methods
|
||||
@ -109,6 +135,11 @@
|
||||
*/
|
||||
void processEvent(const std::shared_ptr<EventManager::Event> event);
|
||||
|
||||
/**
|
||||
* @brief processes the commands in the commandQueue depending on their type
|
||||
*/
|
||||
void processCommands_();
|
||||
|
||||
/**
|
||||
* @brief adds the queued participants to the list of connected participants
|
||||
*
|
||||
@ -117,7 +148,24 @@
|
||||
* This class function adds the queued participants to the list participants_
|
||||
* and removes them from the queue.
|
||||
*/
|
||||
void processConnections_();
|
||||
void processConnect_( std::shared_ptr<EventManager::Participant> participant );
|
||||
|
||||
/**
|
||||
* @brief removes the given participant form the list of connected participants
|
||||
*
|
||||
* The participants_ list contains the participants that are connected to
|
||||
* the manager. This class function removes the queued participants from the list
|
||||
* participants_.
|
||||
*/
|
||||
void processDisconnect_( std::shared_ptr<EventManager::Participant> participant );
|
||||
|
||||
/**
|
||||
* @brief processes the command to enable scheduling for the given participant
|
||||
*
|
||||
* This class function adds the given participant to the schedulingParticipants_ list
|
||||
* where the participants are scheduled by calling their schedule class function.
|
||||
*/
|
||||
void processEnableScheduling_( std::shared_ptr<EventManager::Participant> participant );
|
||||
|
||||
/**
|
||||
* @brief disables the scheduling of the requested participants
|
||||
@ -125,7 +173,7 @@
|
||||
* Removes the participants from the schedulingParticipants_ list that
|
||||
* should not be scheduled anymore.
|
||||
*/
|
||||
void processDisableScheduling_();
|
||||
void processDisableScheduling_( std::shared_ptr<EventManager::Participant> participant );
|
||||
|
||||
/**
|
||||
* @brief start the main thread
|
||||
@ -152,7 +200,7 @@
|
||||
/**
|
||||
* @brief The constructor for the event manager
|
||||
*
|
||||
* Just initializes all attributes to its starting value
|
||||
* Just initializes all attributes to their starting values
|
||||
*/
|
||||
Manager() : mainThread_(nullptr), isMainThreadRunning_(false),
|
||||
stopMainThread_(false), schedulingThread_(nullptr),
|
||||
@ -161,6 +209,7 @@
|
||||
|
||||
|
||||
~Manager();
|
||||
|
||||
/**
|
||||
* @brief start the event manager
|
||||
*/
|
||||
|
@ -27,6 +27,9 @@
|
||||
|
||||
/**
|
||||
* @brief The entity participating in the event system.
|
||||
*
|
||||
* If you want the participant to be scheduled from the manager call
|
||||
* _enableScheduling class function.
|
||||
*/
|
||||
class Participant : public std::enable_shared_from_this<Participant>
|
||||
{
|
||||
|
@ -190,18 +190,17 @@
|
||||
while(!stopSchedulingThread_)
|
||||
{
|
||||
mutexSchedulingParticipants_.lock();
|
||||
if (!schedulingParticipants_.empty())
|
||||
if( !schedulingParticipants_.empty() )
|
||||
{
|
||||
for (auto it = schedulingParticipants_.begin(); it != schedulingParticipants_.end(); ++it)
|
||||
for( auto it = schedulingParticipants_.begin(); it != schedulingParticipants_.end(); ++it )
|
||||
{
|
||||
(*it)->schedule();
|
||||
}
|
||||
}
|
||||
mutexSchedulingParticipants_.unlock();
|
||||
|
||||
processConnections_();
|
||||
processDisableScheduling_();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
processCommands_();
|
||||
std::this_thread::sleep_for( std::chrono::milliseconds(100) );
|
||||
}
|
||||
|
||||
isSchedulingThreadRunning_=false;
|
||||
@ -320,89 +319,113 @@
|
||||
|
||||
}
|
||||
|
||||
void EventManager::Manager::schedule(std::shared_ptr<EventManager::Participant> participant)
|
||||
{
|
||||
std::lock_guard<std::mutex> guard(mutexSchedulingParticipants_);
|
||||
|
||||
auto it = std::find(schedulingParticipants_.begin(), schedulingParticipants_.end(), participant);
|
||||
|
||||
if (it == schedulingParticipants_.end())
|
||||
{
|
||||
schedulingParticipants_.push_back(participant);
|
||||
}
|
||||
}
|
||||
void EventManager::Manager::schedule(std::shared_ptr<EventManager::Participant> participant)
|
||||
{
|
||||
std::lock_guard<std::mutex> guard( mutexCommandQueue_ );
|
||||
commandQueue_.push( std::make_pair( EventManager::commandType::ENABLE_SCHEDULING, participant) );
|
||||
}
|
||||
|
||||
|
||||
void EventManager::Manager::unschedule(std::shared_ptr<EventManager::Participant> participant )
|
||||
{
|
||||
std::lock_guard<std::mutex> guard( mutexDisableScheduleQueue_ );
|
||||
disableScheduleQueue_.push( participant );
|
||||
}
|
||||
void EventManager::Manager::unschedule(std::shared_ptr<EventManager::Participant> participant )
|
||||
{
|
||||
std::lock_guard<std::mutex> guard( mutexCommandQueue_ );
|
||||
commandQueue_.push( std::make_pair( EventManager::commandType::DISABLE_SCHEDULING, participant) );
|
||||
}
|
||||
|
||||
|
||||
void EventManager::Manager::processConnections_()
|
||||
{
|
||||
std::lock_guard<std::mutex> guard(mutexParticipants_);
|
||||
std::lock_guard<std::mutex> lockGuard(mutexConnectionQueue_);
|
||||
|
||||
while(!connectionQueue_.empty())
|
||||
{
|
||||
std::shared_ptr<EventManager::Participant> participant = connectionQueue_.front();
|
||||
connectionQueue_.pop();
|
||||
participants_.push_back(participant);
|
||||
|
||||
participant->init();
|
||||
}
|
||||
}
|
||||
void EventManager::Manager::processCommands_()
|
||||
{
|
||||
std::unique_lock<std::mutex> lk( mutexCommandQueue_ );
|
||||
while( commandQueue_.empty() == false )
|
||||
{
|
||||
auto pair = commandQueue_.front();
|
||||
commandQueue_.pop();
|
||||
lk.unlock();
|
||||
switch( pair.first )
|
||||
{
|
||||
case EventManager::commandType::CONNECT:
|
||||
processConnect_( pair.second );
|
||||
break;
|
||||
case EventManager::commandType::DISCONNECT:
|
||||
processDisconnect_( pair.second );
|
||||
break;
|
||||
case EventManager::commandType::ENABLE_SCHEDULING:
|
||||
processEnableScheduling_( pair.second );
|
||||
break;
|
||||
case EventManager::commandType::DISABLE_SCHEDULING:
|
||||
processDisableScheduling_( pair.second );
|
||||
break;
|
||||
}
|
||||
lk.lock();
|
||||
}
|
||||
lk.unlock();
|
||||
}
|
||||
|
||||
|
||||
void EventManager::Manager::processDisableScheduling_()
|
||||
{
|
||||
std::lock_guard<std::mutex> guard( mutexDisableScheduleQueue_ );
|
||||
while( disableScheduleQueue_.empty() != true )
|
||||
{
|
||||
std::shared_ptr<EventManager::Participant> participant = disableScheduleQueue_.front();
|
||||
disableScheduleQueue_.pop();
|
||||
|
||||
std::unique_lock<std::mutex> lk(mutexSchedulingParticipants_);
|
||||
auto it = std::find(schedulingParticipants_.begin(), schedulingParticipants_.end(), participant);
|
||||
|
||||
if (it != schedulingParticipants_.end())
|
||||
{
|
||||
schedulingParticipants_.erase(it);
|
||||
}
|
||||
lk.unlock();
|
||||
}
|
||||
}
|
||||
void EventManager::Manager::processConnect_( std::shared_ptr<EventManager::Participant> participant )
|
||||
{
|
||||
std::lock_guard<std::mutex> guard(mutexParticipants_);
|
||||
auto it = std::find( participants_.begin(), participants_.end(), participant );
|
||||
if( it == participants_.end() )
|
||||
{
|
||||
participant->setManager(shared_from_this());
|
||||
participant->setID(nextParticipantID_);
|
||||
// we can set and increment here because this critical section is secured by a mutex
|
||||
nextParticipantID_++;
|
||||
participants_.push_back(participant);
|
||||
participant->init();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void EventManager::Manager::connect(std::shared_ptr<EventManager::Participant> participant)
|
||||
{
|
||||
std::lock_guard<std::mutex> guard(mutexParticipants_);
|
||||
participant->setManager(shared_from_this());
|
||||
void EventManager::Manager::processDisconnect_( std::shared_ptr<EventManager::Participant> participant )
|
||||
{
|
||||
// before the participant gets disconnected it has to be unscheduled
|
||||
processDisableScheduling_( participant );
|
||||
|
||||
// we can set and increment here because only one participant is in this
|
||||
// critical section in any moment
|
||||
participant->setID(nextParticipantID_);
|
||||
nextParticipantID_++;
|
||||
std::lock_guard<std::mutex> guard(mutexParticipants_);
|
||||
auto it = std::find( participants_.begin(), participants_.end(), participant );
|
||||
if( it != participants_.end() )
|
||||
{
|
||||
participant->setManager(nullptr);
|
||||
participants_.erase( it );
|
||||
}
|
||||
}
|
||||
|
||||
connectionQueue_.push(participant);
|
||||
|
||||
}
|
||||
void EventManager::Manager::processEnableScheduling_( std::shared_ptr<EventManager::Participant> participant )
|
||||
{
|
||||
std::lock_guard<std::mutex> guard(mutexSchedulingParticipants_);
|
||||
auto it = std::find( schedulingParticipants_.begin(), schedulingParticipants_.end(), participant );
|
||||
|
||||
void EventManager::Manager::disconnect(std::shared_ptr<EventManager::Participant> participant)
|
||||
{
|
||||
disconnect(participant);
|
||||
|
||||
std::lock_guard<std::mutex> guard(mutexParticipants_);
|
||||
std::list<std::shared_ptr<EventManager::Participant>>::iterator it;
|
||||
if( it == schedulingParticipants_.end() )
|
||||
{
|
||||
schedulingParticipants_.push_back( participant );
|
||||
}
|
||||
}
|
||||
|
||||
it = std::find(participants_.begin(), participants_.end(),participant);
|
||||
|
||||
if (it != participants_.end())
|
||||
{
|
||||
participants_.erase(it);
|
||||
}
|
||||
void EventManager::Manager::processDisableScheduling_( std::shared_ptr<EventManager::Participant> participant )
|
||||
{
|
||||
std::lock_guard<std::mutex> guard( mutexSchedulingParticipants_ );
|
||||
auto it = std::find( schedulingParticipants_.begin(), schedulingParticipants_.end(), participant );
|
||||
|
||||
participant->setManager(nullptr);
|
||||
}
|
||||
if( it != schedulingParticipants_.end() )
|
||||
{
|
||||
schedulingParticipants_.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void EventManager::Manager::connect( std::shared_ptr<EventManager::Participant> participant )
|
||||
{
|
||||
std::lock_guard<std::mutex> guard( mutexCommandQueue_ );
|
||||
commandQueue_.push( std::make_pair( EventManager::commandType::CONNECT, participant) );
|
||||
}
|
||||
|
||||
|
||||
void EventManager::Manager::disconnect( std::shared_ptr<EventManager::Participant> participant )
|
||||
{
|
||||
std::lock_guard<std::mutex> guard( mutexCommandQueue_ );
|
||||
commandQueue_.push( std::make_pair( EventManager::commandType::DISCONNECT, participant) );
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user