From 2af323cf46dcd1096aa277d45eaef54a1611c28c Mon Sep 17 00:00:00 2001 From: kaetemi Date: Sun, 29 Jul 2012 18:30:35 +0200 Subject: [PATCH] Changed: Run tasks for slave on master on seperate thread to avoid blocking --HG-- branch : build_pipeline_v3 --- .../service/module_pipeline_master.cpp | 157 +++++++++++++++++- 1 file changed, 153 insertions(+), 4 deletions(-) diff --git a/code/nel/tools/pipeline/service/module_pipeline_master.cpp b/code/nel/tools/pipeline/service/module_pipeline_master.cpp index 7d8850da0..427ad7b05 100644 --- a/code/nel/tools/pipeline/service/module_pipeline_master.cpp +++ b/code/nel/tools/pipeline/service/module_pipeline_master.cpp @@ -35,6 +35,7 @@ // NeL includes #include #include +#include // Project includes #include "info_flags.h" @@ -99,12 +100,14 @@ class CModulePipelineMaster : } } + /* // TODO: THIS WILL CRASH IF CALLED WHEN THE SLAVE ALREADY DISCONNECTED!!! USE A SELF DELETING CLASS INSTEAD (and check g_IsExiting there too)!!! void cbUpdateDatabaseStatus() { Proxy.masterUpdatedDatabaseStatus(Master); CInfoFlags::getInstance()->removeFlag(PIPELINE_INFO_MASTER_UPDATE_DATABASE_FOR_SLAVE); } + */ bool canAcceptTask() { @@ -119,14 +122,48 @@ protected: CBuildTaskQueue m_BuildTaskQueue; bool m_BuildWorking; + NLMISC::CTaskManager *m_TaskManager; // Manages tasks requested by a slave. + NLMISC::CSynchronized> m_WaitingCallbacks; + + // A runnable that's executed at next update state used as a callback. + // Override runnable as normal as the callback function. + class CDelayedCallback : public IRunnable + { + public: + CDelayedCallback(CModulePipelineMaster *master) : Master(master) + { + NLMISC::CSynchronized>::CAccessor waitingCallbacks(&Master->m_WaitingCallbacks); + waitingCallbacks.value().push_back(this); + } + inline CCallback getCallback() { return CCallback(this, &CDelayedCallback::callback); } + CModulePipelineMaster *Master; + + private: + void callback() + { + { + Master->addUpdateTask(this); + } + { + NLMISC::CSynchronized>::CAccessor waitingCallbacks(&Master->m_WaitingCallbacks); + waitingCallbacks.value().remove(this); + } + } + }; + + // Runnable tasks that are ran on the next update cycle. + // The IRunnable instance is deleted by the update cycle. + NLMISC::CSynchronized> m_UpdateTasks; + // build command bool m_BypassErrors; bool m_VerifyOnly; public: - CModulePipelineMaster() : m_BuildWorking(false) + CModulePipelineMaster() : m_BuildWorking(false), m_TaskManager(NULL), m_WaitingCallbacks("WaitingCallbacks"), m_UpdateTasks("UpdateTasks") { g_IsMaster = true; + m_TaskManager = new NLMISC::CTaskManager(); } virtual ~CModulePipelineMaster() @@ -136,6 +173,25 @@ public: g_IsMaster = false; + delete m_TaskManager; + m_TaskManager = NULL; + + // Wait for waiting callbacks to be called + for (; ; ) + { + { + NLMISC::CSynchronized>::CAccessor waitingCallbacks(&m_WaitingCallbacks); + if (waitingCallbacks.value().size() == 0) + break; + } + + nlwarning("Waiting for callbacks on the master to be called..."); + nlSleep(1000); + } + + // Ensure the remaining update tasks are handled + handleUpdateTasks(); + m_SlavesMutex.lock(); for (TSlaveMap::iterator it = m_Slaves.begin(), end = m_Slaves.end(); it != end; ++it) @@ -145,6 +201,10 @@ public: m_SlavesMutex.unlock(); } + /////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////// + virtual bool initModule(const TParsedCommandLine &initInfo) { CModuleBase::initModule(initInfo); @@ -219,8 +279,43 @@ public: } } + /////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////// + + // Add a task to be run at the next update + // The IRunnable will be deleted + void addUpdateTask(IRunnable *runnable) + { + NLMISC::CSynchronized>::CAccessor updateTasks(&m_UpdateTasks); + updateTasks.value().push_back(runnable); + } + + /////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////// + + void handleUpdateTasks() + { + for (; ; ) + { + IRunnable *currentRunnable; + { + NLMISC::CSynchronized>::CAccessor updateTasks(&m_UpdateTasks); + if (updateTasks.value().size() == 0) + break; + currentRunnable = updateTasks.value().front(); + updateTasks.value().pop_front(); + } + currentRunnable->run(); + } + } + virtual void onModuleUpdate() { + // handle update tasks (like network responses from callbacks) + handleUpdateTasks(); + // if state build, iterate trough all slaves to see if any is free, and check if there's any waiting tasks if (m_BuildWorking) { @@ -286,6 +381,10 @@ public: } } + /////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////// + virtual void slaveFinishedBuildTask(NLNET::IModuleProxy *sender, uint8 errorLevel) { // TODO @@ -351,6 +450,50 @@ public: //m_SlavesMutex.unlock(); slave->Vector.push_back(str); } + + /////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////// + + class CUpdateDatabaseStatusSlaveCallback : public CDelayedCallback + { + public: + CUpdateDatabaseStatusSlaveCallback(CModulePipelineMaster *master, NLNET::IModuleProxy *slaveProxy) : CDelayedCallback(master), m_SlaveProxy(slaveProxy) { } + + virtual void run() // this is sanely run from the update thread + { + CSlave *slave = Master->m_Slaves[m_SlaveProxy]; + if (slave == NULL) { nlwarning("Slave disconnected before callback could be delivered"); return; } + + slave->Proxy.masterUpdatedDatabaseStatus(Master); + CInfoFlags::getInstance()->removeFlag(PIPELINE_INFO_MASTER_UPDATE_DATABASE_FOR_SLAVE); + + delete this; + } + + private: + NLNET::IModuleProxy *m_SlaveProxy; + }; + + class CUpdateDatabaseStatusByVectorTask : public IRunnable + { + public: + CUpdateDatabaseStatusByVectorTask(CModulePipelineMaster *master, NLNET::IModuleProxy *sender, std::vector &slaveVector) : m_Master(master), m_Sender(sender) + { + std::swap(slaveVector, m_Vector); + } + virtual void run() // run from the master process task manager + { + CUpdateDatabaseStatusSlaveCallback *cb = new CUpdateDatabaseStatusSlaveCallback(m_Master, m_Sender); // deleted by update + g_DatabaseStatus->updateDatabaseStatus(cb->getCallback(), m_Vector, false, false); + + delete this; + } + private: + CModulePipelineMaster *m_Master; + NLNET::IModuleProxy *m_Sender; + std::vector m_Vector; + }; virtual void updateDatabaseStatusByVector(NLNET::IModuleProxy *sender) { @@ -379,11 +522,17 @@ public: } } - if (ok) g_DatabaseStatus->updateDatabaseStatus(CCallback(slave, &CSlave::cbUpdateDatabaseStatus), slave->Vector, false, false); - - slave->Vector.clear(); + if (ok) + { + CUpdateDatabaseStatusByVectorTask *slaveTask = new CUpdateDatabaseStatusByVectorTask(this, sender, slave->Vector); // slave->Vector is wiped due to swap + m_TaskManager->addTask(slaveTask); + } } + /////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////// + void setAvailablePlugins(NLNET::IModuleProxy *sender, const std::vector &pluginsAvailable) { //m_SlavesMutex.lock();