/* * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. * * Copyright 2021 Dominik Meyer * This file is part of the EventManager distribution hosted at https://gitea.federationhq.de/byterazor/EventManager.git */ /** @file */ #include #include #include #include #include void EventManager::Manager::startMain_() { if (isMainThreadRunning_) { throw std::runtime_error("Main thread already running"); } stopMainThread_=false; mainThread_ = std::make_unique(&EventManager::Manager::mainProcess_,this); std::int32_t timeout = 6000; while(!isMainThreadRunning_ && timeout > 0) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); timeout-=100; } if (timeout <= 0) { stopMainThread_=true; throw std::runtime_error("EventManager: can not start main thread"); } } void EventManager::Manager::startScheduling_() { if (isSchedulingThreadRunning_) { throw std::runtime_error("Scheduling thread already running"); } stopSchedulingThread_=false; schedulingThread_ = std::make_unique(&EventManager::Manager::schedulingProcess_,this); std::int32_t timeout = 6000; while(!isSchedulingThreadRunning_ && timeout > 0) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); timeout-=100; } if (timeout <= 0) { stopSchedulingThread_=true; throw std::runtime_error("EventManager: can not start scheduling thread"); } } void EventManager::Manager::stopMain_() { std::int32_t timeout = 6000; stopMainThread_=true; newEventInQueue_.notify_one(); while(isMainThreadRunning_ && timeout > 0) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); timeout-=100; } if (timeout <= 0) { throw std::runtime_error("can not stop main thread"); } mainThread_->join(); } void EventManager::Manager::stopScheduling_() { std::int32_t timeout = 6000; stopSchedulingThread_=true; while(isSchedulingThreadRunning_ && timeout > 0) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); timeout-=100; } if (timeout <= 0) { throw std::runtime_error("can not stop scheduling thread"); } schedulingThread_->join(); } void EventManager::Manager::start() { startMain_(); try { startScheduling_(); } catch (std::exception &e) { stopMain_(); throw e; } } void EventManager::Manager::stop() { stopMain_(); stopScheduling_(); } EventManager::Manager::~Manager() { if (isMainThreadRunning_) { stopMain_(); } if (isSchedulingThreadRunning_) { stopScheduling_(); } } void EventManager::Manager::processEvent(const std::shared_ptr event) { auto it = eventMap_.find(event->type()); if (it != eventMap_.end()) { for (auto it2 = it->second.begin(); it2 != it->second.end(); ++it2) { if (event->emitter() != *it2) { (*it2)->emit(event); } } } } void EventManager::Manager::mainProcess_() { isMainThreadRunning_=true; while(!stopMainThread_) { std::unique_lock lock(mutexEventQueue_); newEventInQueue_.wait(lock); while(!eventQueue_.empty()) { std::shared_ptr event = eventQueue_.front(); eventQueue_.pop(); processEvent(event); } lock.unlock(); } isMainThreadRunning_=false; } void EventManager::Manager::schedulingProcess_() { isSchedulingThreadRunning_=true; while(!stopSchedulingThread_) { mutexSchedulingParticipants_.lock(); if (!schedulingParticipants_.empty()) { for (auto it = schedulingParticipants_.begin(); it != schedulingParticipants_.end(); ++it) { (*it)->schedule(); } } mutexSchedulingParticipants_.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } isSchedulingThreadRunning_=false; } void EventManager::Manager::subscribe(std::uint32_t type, std::shared_ptr participant) { std::lock_guard lockGuard(mutexEventMap_); // check if participant is already registered auto it = eventMap_.find(type); if (it != eventMap_.end()) { auto it2 = std::find(it->second.begin(), it->second.end(),participant); if (it2 == it->second.end()) { it->second.push_back(participant); } } else { eventMap_[type].push_back(participant); } } void EventManager::Manager::unsubscribe(std::uint32_t type, std::shared_ptr participant) { std::lock_guard lockGuard(mutexEventMap_); auto it = eventMap_.find(type); if (it == eventMap_.end()) { return; } auto it2 = std::find(it->second.begin(), it->second.end(),participant); if (it2 != it->second.end()) { it->second.erase(it2); } } void EventManager::Manager::unsubscribe(std::shared_ptr participant) { for (auto it = eventMap_.begin(); it != eventMap_.end(); ++it) { unsubscribe(it->first,participant); } } void EventManager::Manager::emit(const std::shared_ptr event) { { std::lock_guard lock(mutexEventQueue_); eventQueue_.push(event); } newEventInQueue_.notify_one(); } bool EventManager::Manager::isRunning() { if (isMainThreadRunning_ && isSchedulingThreadRunning_) { return true; } return false; } bool EventManager::Manager::empty() const { bool isEmpty=true; for (auto it = eventMap_.begin(); it != eventMap_.end(); ++it) { if ( !(*it).second.empty()) { isEmpty=false; } } return isEmpty; } bool EventManager::Manager::waitEmpty(std::uint32_t timeoutMS) const { std::uint32_t timeout=timeoutMS; while(!empty() && timeout > 0) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); timeout-=100; } if (timeout == 0) { return false; } return true; } void EventManager::Manager::schedule(std::shared_ptr participant) { std::lock_guard guard(mutexSchedulingParticipants_); auto it = std::find(schedulingParticipants_.begin(), schedulingParticipants_.end(), participant); if (it == schedulingParticipants_.end()) { schedulingParticipants_.push_back(participant); } } void EventManager::Manager::unschedule(std::shared_ptr participant ) { std::lock_guard guard(mutexSchedulingParticipants_); auto it = std::find(schedulingParticipants_.begin(), schedulingParticipants_.end(), participant); if (it != schedulingParticipants_.end()) { schedulingParticipants_.erase(it); } } void EventManager::Manager::connect(std::shared_ptr participant) { std::lock_guard guard(mutexParticipants_); particpants_.push_back(participant); participant->setManager(shared_from_this()); participant->init(); } void EventManager::Manager::disconnect(std::shared_ptr participant) { disconnect(participant); std::lock_guard guard(mutexParticipants_); std::list>::iterator it; it = std::find(particpants_.begin(), particpants_.end(),participant); if (it != particpants_.end()) { particpants_.erase(it); } participant->setManager(nullptr); }