Changed: Run tasks for slave on master on seperate thread to avoid blocking

--HG--
branch : build_pipeline_v3
hg/feature/build_pipeline_v3
kaetemi 13 years ago
parent 9bc5ee2399
commit 2af323cf46

@ -35,6 +35,7 @@
// NeL includes
#include <nel/misc/debug.h>
#include <nel/net/service.h>
#include <nel/misc/task_manager.h>
// Project includes
#include "info_flags.h"
@ -99,12 +100,14 @@ class CModulePipelineMaster :
}
}
/*
// TODO: THIS WILL CRASH IF CALLED WHEN THE SLAVE ALREADY DISCONNECTED!!! USE A SELF DELETING CLASS INSTEAD (and check g_IsExiting there too)!!!
void cbUpdateDatabaseStatus()
{
Proxy.masterUpdatedDatabaseStatus(Master);
CInfoFlags::getInstance()->removeFlag(PIPELINE_INFO_MASTER_UPDATE_DATABASE_FOR_SLAVE);
}
*/
bool canAcceptTask()
{
@ -119,14 +122,48 @@ protected:
CBuildTaskQueue m_BuildTaskQueue;
bool m_BuildWorking;
NLMISC::CTaskManager *m_TaskManager; // Manages tasks requested by a slave.
NLMISC::CSynchronized<std::list<IRunnable *>> m_WaitingCallbacks;
// A runnable that's executed at next update state used as a callback.
// Override runnable as normal as the callback function.
class CDelayedCallback : public IRunnable
{
public:
CDelayedCallback(CModulePipelineMaster *master) : Master(master)
{
NLMISC::CSynchronized<std::list<IRunnable *>>::CAccessor waitingCallbacks(&Master->m_WaitingCallbacks);
waitingCallbacks.value().push_back(this);
}
inline CCallback<void> getCallback() { return CCallback<void>(this, &CDelayedCallback::callback); }
CModulePipelineMaster *Master;
private:
void callback()
{
{
Master->addUpdateTask(this);
}
{
NLMISC::CSynchronized<std::list<IRunnable *>>::CAccessor waitingCallbacks(&Master->m_WaitingCallbacks);
waitingCallbacks.value().remove(this);
}
}
};
// Runnable tasks that are ran on the next update cycle.
// The IRunnable instance is deleted by the update cycle.
NLMISC::CSynchronized<std::deque<IRunnable *>> m_UpdateTasks;
// build command
bool m_BypassErrors;
bool m_VerifyOnly;
public:
CModulePipelineMaster() : m_BuildWorking(false)
CModulePipelineMaster() : m_BuildWorking(false), m_TaskManager(NULL), m_WaitingCallbacks("WaitingCallbacks"), m_UpdateTasks("UpdateTasks")
{
g_IsMaster = true;
m_TaskManager = new NLMISC::CTaskManager();
}
virtual ~CModulePipelineMaster()
@ -136,6 +173,25 @@ public:
g_IsMaster = false;
delete m_TaskManager;
m_TaskManager = NULL;
// Wait for waiting callbacks to be called
for (; ; )
{
{
NLMISC::CSynchronized<std::list<IRunnable *>>::CAccessor waitingCallbacks(&m_WaitingCallbacks);
if (waitingCallbacks.value().size() == 0)
break;
}
nlwarning("Waiting for callbacks on the master to be called...");
nlSleep(1000);
}
// Ensure the remaining update tasks are handled
handleUpdateTasks();
m_SlavesMutex.lock();
for (TSlaveMap::iterator it = m_Slaves.begin(), end = m_Slaves.end(); it != end; ++it)
@ -145,6 +201,10 @@ public:
m_SlavesMutex.unlock();
}
///////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////
virtual bool initModule(const TParsedCommandLine &initInfo)
{
CModuleBase::initModule(initInfo);
@ -219,8 +279,43 @@ public:
}
}
///////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////
// Add a task to be run at the next update
// The IRunnable will be deleted
void addUpdateTask(IRunnable *runnable)
{
NLMISC::CSynchronized<std::deque<IRunnable *>>::CAccessor updateTasks(&m_UpdateTasks);
updateTasks.value().push_back(runnable);
}
///////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////
void handleUpdateTasks()
{
for (; ; )
{
IRunnable *currentRunnable;
{
NLMISC::CSynchronized<std::deque<IRunnable *>>::CAccessor updateTasks(&m_UpdateTasks);
if (updateTasks.value().size() == 0)
break;
currentRunnable = updateTasks.value().front();
updateTasks.value().pop_front();
}
currentRunnable->run();
}
}
virtual void onModuleUpdate()
{
// handle update tasks (like network responses from callbacks)
handleUpdateTasks();
// if state build, iterate trough all slaves to see if any is free, and check if there's any waiting tasks
if (m_BuildWorking)
{
@ -286,6 +381,10 @@ public:
}
}
///////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////
virtual void slaveFinishedBuildTask(NLNET::IModuleProxy *sender, uint8 errorLevel)
{
// TODO
@ -351,6 +450,50 @@ public:
//m_SlavesMutex.unlock();
slave->Vector.push_back(str);
}
///////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////
class CUpdateDatabaseStatusSlaveCallback : public CDelayedCallback
{
public:
CUpdateDatabaseStatusSlaveCallback(CModulePipelineMaster *master, NLNET::IModuleProxy *slaveProxy) : CDelayedCallback(master), m_SlaveProxy(slaveProxy) { }
virtual void run() // this is sanely run from the update thread
{
CSlave *slave = Master->m_Slaves[m_SlaveProxy];
if (slave == NULL) { nlwarning("Slave disconnected before callback could be delivered"); return; }
slave->Proxy.masterUpdatedDatabaseStatus(Master);
CInfoFlags::getInstance()->removeFlag(PIPELINE_INFO_MASTER_UPDATE_DATABASE_FOR_SLAVE);
delete this;
}
private:
NLNET::IModuleProxy *m_SlaveProxy;
};
class CUpdateDatabaseStatusByVectorTask : public IRunnable
{
public:
CUpdateDatabaseStatusByVectorTask(CModulePipelineMaster *master, NLNET::IModuleProxy *sender, std::vector<std::string> &slaveVector) : m_Master(master), m_Sender(sender)
{
std::swap(slaveVector, m_Vector);
}
virtual void run() // run from the master process task manager
{
CUpdateDatabaseStatusSlaveCallback *cb = new CUpdateDatabaseStatusSlaveCallback(m_Master, m_Sender); // deleted by update
g_DatabaseStatus->updateDatabaseStatus(cb->getCallback(), m_Vector, false, false);
delete this;
}
private:
CModulePipelineMaster *m_Master;
NLNET::IModuleProxy *m_Sender;
std::vector<std::string> m_Vector;
};
virtual void updateDatabaseStatusByVector(NLNET::IModuleProxy *sender)
{
@ -379,11 +522,17 @@ public:
}
}
if (ok) g_DatabaseStatus->updateDatabaseStatus(CCallback<void>(slave, &CSlave::cbUpdateDatabaseStatus), slave->Vector, false, false);
slave->Vector.clear();
if (ok)
{
CUpdateDatabaseStatusByVectorTask *slaveTask = new CUpdateDatabaseStatusByVectorTask(this, sender, slave->Vector); // slave->Vector is wiped due to swap
m_TaskManager->addTask(slaveTask);
}
}
///////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////
void setAvailablePlugins(NLNET::IModuleProxy *sender, const std::vector<uint32> &pluginsAvailable)
{
//m_SlavesMutex.lock();

Loading…
Cancel
Save