Adaption to command queue instead of single use queues. #3
@ -22,7 +22,13 @@
|
|||||||
|
|
||||||
/// Eventtype to notify all participants that a shutdown is immanent
|
/// Eventtype to notify all participants that a shutdown is immanent
|
||||||
const static std::uint32_t EVENT_TYPE_SHUTDOWN = 0;
|
const static std::uint32_t EVENT_TYPE_SHUTDOWN = 0;
|
||||||
|
/**
|
||||||
|
* @class Event
|
||||||
|
* @brief An Event is the element in the system that triggers actions from participants
|
||||||
|
*
|
||||||
|
* Derive own events from this class to sned e.g. also a payload to subscribing
|
||||||
|
* participants.
|
||||||
|
*/
|
||||||
class Event
|
class Event
|
||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
@ -45,17 +51,17 @@
|
|||||||
/**
|
/**
|
||||||
* @brief constructor for creating a simple event
|
* @brief constructor for creating a simple event
|
||||||
*
|
*
|
||||||
* @param type - what kinf of event is this
|
* @param type - what kind of event is this
|
||||||
*/
|
*/
|
||||||
Event(std::uint32_t type);
|
Event(std::uint32_t type);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Constructor to create a response Event
|
* @brief Constructor to create a response event
|
||||||
*/
|
*/
|
||||||
Event(std::uint32_t type, const EventManager::Event &event);
|
Event(std::uint32_t type, const EventManager::Event &event);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Constructor to create a response Event
|
* @brief Constructor to create a response event
|
||||||
*/
|
*/
|
||||||
Event(std::uint32_t type, const std::shared_ptr<EventManager::Event> event);
|
Event(std::uint32_t type, const std::shared_ptr<EventManager::Event> event);
|
||||||
|
|
||||||
|
@ -25,9 +25,41 @@
|
|||||||
namespace EventManager
|
namespace EventManager
|
||||||
{
|
{
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief command types
|
||||||
|
*/
|
||||||
|
enum class commandType :uint16_t
|
||||||
|
{
|
||||||
|
// command to connect a participant to the manager
|
||||||
|
CONNECT,
|
||||||
|
// command to disconnect a participant from the manager
|
||||||
|
DISCONNECT,
|
||||||
|
// command to schedule start scheduling a participant
|
||||||
|
ENABLE_SCHEDULING,
|
||||||
|
// command to disable scheduling for a participant (not schedule it anymore)
|
||||||
|
DISABLE_SCHEDULING
|
||||||
|
}; // commandType
|
||||||
|
|
||||||
// forward declaration of EventManager::Participant
|
// forward declaration of EventManager::Participant
|
||||||
class Participant;
|
class Participant;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @class Manager
|
||||||
|
*
|
||||||
|
* If you use the manager it has to be a shared pointer. Otherwise you will
|
||||||
|
* get a mem error.
|
||||||
|
*
|
||||||
|
* To add participants to the manager call class function connect.
|
||||||
|
* Calling start method will start the manager.
|
||||||
|
*
|
||||||
|
* Depending on your concept you can first connect all participants to
|
||||||
|
* the manager and then start it. Or you can start in first and then
|
||||||
|
* connect the participants. In the first example all participant will
|
||||||
|
* be started at the same time (when calling start from the manager.)
|
||||||
|
* In the second example they will be started when they are connected.
|
||||||
|
* That means if you have one starting event that all participants need
|
||||||
|
* to receive you would choose example 1.
|
||||||
|
*/
|
||||||
class Manager : public std::enable_shared_from_this<Manager>
|
class Manager : public std::enable_shared_from_this<Manager>
|
||||||
{
|
{
|
||||||
/// the thread the event manager is transmitting events in
|
/// the thread the event manager is transmitting events in
|
||||||
@ -66,7 +98,7 @@
|
|||||||
/// list of all plugins requiring scheduling
|
/// list of all plugins requiring scheduling
|
||||||
std::list<std::shared_ptr<EventManager::Participant>> schedulingParticipants_;
|
std::list<std::shared_ptr<EventManager::Participant>> schedulingParticipants_;
|
||||||
|
|
||||||
/// mutex to protect schedulingPlugins_
|
/// mutex to protect list schedulingParticipants_
|
||||||
std::mutex mutexSchedulingParticipants_;
|
std::mutex mutexSchedulingParticipants_;
|
||||||
|
|
||||||
/// list of all participants connected
|
/// list of all participants connected
|
||||||
@ -78,17 +110,11 @@
|
|||||||
/// the id for the next participant connecting
|
/// the id for the next participant connecting
|
||||||
std::uint32_t nextParticipantID_;
|
std::uint32_t nextParticipantID_;
|
||||||
|
|
||||||
/// queue for connection request so connecting is done in manager context
|
/// queue for different command requests like e.g. connecting a participant to the manager
|
||||||
std::queue<std::shared_ptr<EventManager::Participant>> connectionQueue_;
|
std::queue<std::pair<EventManager::commandType,std::shared_ptr<EventManager::Participant>>> commandQueue_;
|
||||||
|
|
||||||
/// mutex to protect connectionQueue_
|
/// mutex to protect the commandQueue_
|
||||||
std::mutex mutexConnectionQueue_;
|
std::mutex mutexCommandQueue_;
|
||||||
|
|
||||||
/// queue for disable scheduling request so the participants are not scheduled by the manager any longer
|
|
||||||
std::queue<std::shared_ptr<EventManager::Participant>> disableScheduleQueue_;
|
|
||||||
|
|
||||||
/// mutex to protect disableScheduleQueue_
|
|
||||||
std::mutex mutexDisableScheduleQueue_;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* all private methods
|
* all private methods
|
||||||
@ -109,6 +135,11 @@
|
|||||||
*/
|
*/
|
||||||
void processEvent(const std::shared_ptr<EventManager::Event> event);
|
void processEvent(const std::shared_ptr<EventManager::Event> event);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief processes the commands in the commandQueue depending on their type
|
||||||
|
*/
|
||||||
|
void processCommands_();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief adds the queued participants to the list of connected participants
|
* @brief adds the queued participants to the list of connected participants
|
||||||
*
|
*
|
||||||
@ -117,7 +148,24 @@
|
|||||||
* This class function adds the queued participants to the list participants_
|
* This class function adds the queued participants to the list participants_
|
||||||
* and removes them from the queue.
|
* and removes them from the queue.
|
||||||
*/
|
*/
|
||||||
void processConnections_();
|
void processConnect_( std::shared_ptr<EventManager::Participant> participant );
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief removes the given participant form the list of connected participants
|
||||||
|
*
|
||||||
|
* The participants_ list contains the participants that are connected to
|
||||||
|
* the manager. This class function removes the queued participants from the list
|
||||||
|
* participants_.
|
||||||
|
*/
|
||||||
|
void processDisconnect_( std::shared_ptr<EventManager::Participant> participant );
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief processes the command to enable scheduling for the given participant
|
||||||
|
*
|
||||||
|
* This class function adds the given participant to the schedulingParticipants_ list
|
||||||
|
* where the participants are scheduled by calling their schedule class function.
|
||||||
|
*/
|
||||||
|
void processEnableScheduling_( std::shared_ptr<EventManager::Participant> participant );
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief disables the scheduling of the requested participants
|
* @brief disables the scheduling of the requested participants
|
||||||
@ -125,7 +173,7 @@
|
|||||||
* Removes the participants from the schedulingParticipants_ list that
|
* Removes the participants from the schedulingParticipants_ list that
|
||||||
* should not be scheduled anymore.
|
* should not be scheduled anymore.
|
||||||
*/
|
*/
|
||||||
void processDisableScheduling_();
|
void processDisableScheduling_( std::shared_ptr<EventManager::Participant> participant );
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief start the main thread
|
* @brief start the main thread
|
||||||
@ -152,7 +200,7 @@
|
|||||||
/**
|
/**
|
||||||
* @brief The constructor for the event manager
|
* @brief The constructor for the event manager
|
||||||
*
|
*
|
||||||
* Just initializes all attributes to its starting value
|
* Just initializes all attributes to their starting values
|
||||||
*/
|
*/
|
||||||
Manager() : mainThread_(nullptr), isMainThreadRunning_(false),
|
Manager() : mainThread_(nullptr), isMainThreadRunning_(false),
|
||||||
stopMainThread_(false), schedulingThread_(nullptr),
|
stopMainThread_(false), schedulingThread_(nullptr),
|
||||||
@ -161,6 +209,7 @@
|
|||||||
|
|
||||||
|
|
||||||
~Manager();
|
~Manager();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief start the event manager
|
* @brief start the event manager
|
||||||
*/
|
*/
|
||||||
|
@ -27,6 +27,9 @@
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief The entity participating in the event system.
|
* @brief The entity participating in the event system.
|
||||||
|
*
|
||||||
|
* If you want the participant to be scheduled from the manager call
|
||||||
|
* _enableScheduling class function.
|
||||||
*/
|
*/
|
||||||
class Participant : public std::enable_shared_from_this<Participant>
|
class Participant : public std::enable_shared_from_this<Participant>
|
||||||
{
|
{
|
||||||
|
@ -190,18 +190,17 @@
|
|||||||
while(!stopSchedulingThread_)
|
while(!stopSchedulingThread_)
|
||||||
{
|
{
|
||||||
mutexSchedulingParticipants_.lock();
|
mutexSchedulingParticipants_.lock();
|
||||||
if (!schedulingParticipants_.empty())
|
if( !schedulingParticipants_.empty() )
|
||||||
{
|
{
|
||||||
for (auto it = schedulingParticipants_.begin(); it != schedulingParticipants_.end(); ++it)
|
for( auto it = schedulingParticipants_.begin(); it != schedulingParticipants_.end(); ++it )
|
||||||
{
|
{
|
||||||
(*it)->schedule();
|
(*it)->schedule();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
mutexSchedulingParticipants_.unlock();
|
mutexSchedulingParticipants_.unlock();
|
||||||
|
|
||||||
processConnections_();
|
processCommands_();
|
||||||
processDisableScheduling_();
|
std::this_thread::sleep_for( std::chrono::milliseconds(100) );
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
isSchedulingThreadRunning_=false;
|
isSchedulingThreadRunning_=false;
|
||||||
@ -320,89 +319,113 @@
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void EventManager::Manager::schedule(std::shared_ptr<EventManager::Participant> participant)
|
void EventManager::Manager::schedule(std::shared_ptr<EventManager::Participant> participant)
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> guard(mutexSchedulingParticipants_);
|
std::lock_guard<std::mutex> guard( mutexCommandQueue_ );
|
||||||
|
commandQueue_.push( std::make_pair( EventManager::commandType::ENABLE_SCHEDULING, participant) );
|
||||||
auto it = std::find(schedulingParticipants_.begin(), schedulingParticipants_.end(), participant);
|
}
|
||||||
|
|
||||||
if (it == schedulingParticipants_.end())
|
|
||||||
{
|
|
||||||
schedulingParticipants_.push_back(participant);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void EventManager::Manager::unschedule(std::shared_ptr<EventManager::Participant> participant )
|
void EventManager::Manager::unschedule(std::shared_ptr<EventManager::Participant> participant )
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> guard( mutexDisableScheduleQueue_ );
|
std::lock_guard<std::mutex> guard( mutexCommandQueue_ );
|
||||||
disableScheduleQueue_.push( participant );
|
commandQueue_.push( std::make_pair( EventManager::commandType::DISABLE_SCHEDULING, participant) );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void EventManager::Manager::processConnections_()
|
void EventManager::Manager::processCommands_()
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> guard(mutexParticipants_);
|
std::unique_lock<std::mutex> lk( mutexCommandQueue_ );
|
||||||
std::lock_guard<std::mutex> lockGuard(mutexConnectionQueue_);
|
while( commandQueue_.empty() == false )
|
||||||
|
{
|
||||||
while(!connectionQueue_.empty())
|
auto pair = commandQueue_.front();
|
||||||
{
|
commandQueue_.pop();
|
||||||
std::shared_ptr<EventManager::Participant> participant = connectionQueue_.front();
|
lk.unlock();
|
||||||
connectionQueue_.pop();
|
switch( pair.first )
|
||||||
participants_.push_back(participant);
|
{
|
||||||
|
case EventManager::commandType::CONNECT:
|
||||||
participant->init();
|
processConnect_( pair.second );
|
||||||
}
|
break;
|
||||||
}
|
case EventManager::commandType::DISCONNECT:
|
||||||
|
processDisconnect_( pair.second );
|
||||||
|
break;
|
||||||
|
case EventManager::commandType::ENABLE_SCHEDULING:
|
||||||
|
processEnableScheduling_( pair.second );
|
||||||
|
break;
|
||||||
|
case EventManager::commandType::DISABLE_SCHEDULING:
|
||||||
|
processDisableScheduling_( pair.second );
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
lk.lock();
|
||||||
|
}
|
||||||
|
lk.unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void EventManager::Manager::processDisableScheduling_()
|
void EventManager::Manager::processConnect_( std::shared_ptr<EventManager::Participant> participant )
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> guard( mutexDisableScheduleQueue_ );
|
std::lock_guard<std::mutex> guard(mutexParticipants_);
|
||||||
while( disableScheduleQueue_.empty() != true )
|
auto it = std::find( participants_.begin(), participants_.end(), participant );
|
||||||
{
|
if( it == participants_.end() )
|
||||||
std::shared_ptr<EventManager::Participant> participant = disableScheduleQueue_.front();
|
{
|
||||||
disableScheduleQueue_.pop();
|
participant->setManager(shared_from_this());
|
||||||
|
participant->setID(nextParticipantID_);
|
||||||
std::unique_lock<std::mutex> lk(mutexSchedulingParticipants_);
|
// we can set and increment here because this critical section is secured by a mutex
|
||||||
auto it = std::find(schedulingParticipants_.begin(), schedulingParticipants_.end(), participant);
|
nextParticipantID_++;
|
||||||
|
participants_.push_back(participant);
|
||||||
if (it != schedulingParticipants_.end())
|
participant->init();
|
||||||
{
|
}
|
||||||
schedulingParticipants_.erase(it);
|
}
|
||||||
}
|
|
||||||
lk.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void EventManager::Manager::connect(std::shared_ptr<EventManager::Participant> participant)
|
void EventManager::Manager::processDisconnect_( std::shared_ptr<EventManager::Participant> participant )
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> guard(mutexParticipants_);
|
// before the participant gets disconnected it has to be unscheduled
|
||||||
participant->setManager(shared_from_this());
|
processDisableScheduling_( participant );
|
||||||
|
|
||||||
// we can set and increment here because only one participant is in this
|
std::lock_guard<std::mutex> guard(mutexParticipants_);
|
||||||
// critical section in any moment
|
auto it = std::find( participants_.begin(), participants_.end(), participant );
|
||||||
participant->setID(nextParticipantID_);
|
if( it != participants_.end() )
|
||||||
nextParticipantID_++;
|
{
|
||||||
|
participant->setManager(nullptr);
|
||||||
|
participants_.erase( it );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
connectionQueue_.push(participant);
|
|
||||||
|
|
||||||
}
|
void EventManager::Manager::processEnableScheduling_( std::shared_ptr<EventManager::Participant> participant )
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> guard(mutexSchedulingParticipants_);
|
||||||
|
auto it = std::find( schedulingParticipants_.begin(), schedulingParticipants_.end(), participant );
|
||||||
|
|
||||||
void EventManager::Manager::disconnect(std::shared_ptr<EventManager::Participant> participant)
|
if( it == schedulingParticipants_.end() )
|
||||||
{
|
{
|
||||||
disconnect(participant);
|
schedulingParticipants_.push_back( participant );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
std::lock_guard<std::mutex> guard(mutexParticipants_);
|
|
||||||
std::list<std::shared_ptr<EventManager::Participant>>::iterator it;
|
|
||||||
|
|
||||||
it = std::find(participants_.begin(), participants_.end(),participant);
|
void EventManager::Manager::processDisableScheduling_( std::shared_ptr<EventManager::Participant> participant )
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> guard( mutexSchedulingParticipants_ );
|
||||||
|
auto it = std::find( schedulingParticipants_.begin(), schedulingParticipants_.end(), participant );
|
||||||
|
|
||||||
if (it != participants_.end())
|
if( it != schedulingParticipants_.end() )
|
||||||
{
|
{
|
||||||
participants_.erase(it);
|
schedulingParticipants_.erase(it);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
participant->setManager(nullptr);
|
|
||||||
}
|
void EventManager::Manager::connect( std::shared_ptr<EventManager::Participant> participant )
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> guard( mutexCommandQueue_ );
|
||||||
|
commandQueue_.push( std::make_pair( EventManager::commandType::CONNECT, participant) );
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void EventManager::Manager::disconnect( std::shared_ptr<EventManager::Participant> participant )
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> guard( mutexCommandQueue_ );
|
||||||
|
commandQueue_.push( std::make_pair( EventManager::commandType::DISCONNECT, participant) );
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user