Changed: #1440 Safely handle slave connections during abort

--HG--
branch : build_pipeline_v3
hg/feature/build_pipeline_v3
kaetemi 12 years ago
parent 9f701b7957
commit 6e8a115eb3

@ -54,6 +54,7 @@ namespace PIPELINE {
// temporary flags // temporary flags
#define PIPELINE_INFO_MASTER_RELOAD_SHEETS "M_RELOAD_SHEETS" #define PIPELINE_INFO_MASTER_RELOAD_SHEETS "M_RELOAD_SHEETS"
#define PIPELINE_INFO_MASTER_UPDATE_DATABASE_FOR_SLAVE "M_UPD_DB_FOR_S" #define PIPELINE_INFO_MASTER_UPDATE_DATABASE_FOR_SLAVE "M_UPD_DB_FOR_S"
#define PIPELINE_INFO_ABORTING "M_ABORTING"
// permanent flags // permanent flags
#define PIPELINE_INFO_CODE_ERROR_UNMACRO "#CODE_ERROR_UNMACRO" #define PIPELINE_INFO_CODE_ERROR_UNMACRO "#CODE_ERROR_UNMACRO"
@ -101,15 +102,6 @@ class CModulePipelineMaster :
CInfoFlags::getInstance()->removeFlag(PIPELINE_INFO_MASTER_RELOAD_SHEETS); CInfoFlags::getInstance()->removeFlag(PIPELINE_INFO_MASTER_RELOAD_SHEETS);
} }
} }
/*
// TODO: THIS WILL CRASH IF CALLED WHEN THE SLAVE ALREADY DISCONNECTED!!! USE A SELF DELETING CLASS INSTEAD (and check g_IsExiting there too)!!!
void cbUpdateDatabaseStatus()
{
Proxy.masterUpdatedDatabaseStatus(Master);
CInfoFlags::getInstance()->removeFlag(PIPELINE_INFO_MASTER_UPDATE_DATABASE_FOR_SLAVE);
}
*/
bool canAcceptTask() bool canAcceptTask()
{ {
@ -124,6 +116,8 @@ protected:
CBuildTaskQueue m_BuildTaskQueue; CBuildTaskQueue m_BuildTaskQueue;
bool m_BuildWorking; bool m_BuildWorking;
bool m_AbortRequested;
NLMISC::CTaskManager *m_TaskManager; // Manages tasks requested by a slave. NLMISC::CTaskManager *m_TaskManager; // Manages tasks requested by a slave.
NLMISC::CSynchronized<std::list<IRunnable *>> m_WaitingCallbacks; NLMISC::CSynchronized<std::list<IRunnable *>> m_WaitingCallbacks;
@ -162,7 +156,7 @@ protected:
bool m_VerifyOnly; bool m_VerifyOnly;
public: public:
CModulePipelineMaster() : m_BuildWorking(false), m_TaskManager(NULL), m_WaitingCallbacks("WaitingCallbacks"), m_UpdateTasks("UpdateTasks") CModulePipelineMaster() : m_BuildWorking(false), m_AbortRequested(false), m_TaskManager(NULL), m_WaitingCallbacks("WaitingCallbacks"), m_UpdateTasks("UpdateTasks")
{ {
g_IsMaster = true; g_IsMaster = true;
m_TaskManager = new NLMISC::CTaskManager(); m_TaskManager = new NLMISC::CTaskManager();
@ -237,6 +231,8 @@ public:
PIPELINE::endedDirectTask(); PIPELINE::endedDirectTask();
} }
std::vector<IModuleProxy *> m_ModuleUpDelay;
virtual void onModuleUp(IModuleProxy *moduleProxy) virtual void onModuleUp(IModuleProxy *moduleProxy)
{ {
if (moduleProxy->getModuleClassName() == "ModulePipelineSlave") if (moduleProxy->getModuleClassName() == "ModulePipelineSlave")
@ -245,14 +241,22 @@ public:
nlassert(m_Slaves.find(moduleProxy) == m_Slaves.end()); nlassert(m_Slaves.find(moduleProxy) == m_Slaves.end());
m_SlavesMutex.lock(); if (m_AbortRequested)
{
nlinfo("Add to slave delay list");
m_ModuleUpDelay.push_back(moduleProxy);
}
else
{
m_SlavesMutex.lock();
CSlave *slave = new CSlave(this, moduleProxy); CSlave *slave = new CSlave(this, moduleProxy);
m_Slaves[moduleProxy] = slave; m_Slaves[moduleProxy] = slave;
m_SlavesMutex.unlock(); m_SlavesMutex.unlock();
slave->Proxy.submitToMaster(this); slave->Proxy.submitToMaster(this);
}
} }
} }
@ -261,27 +265,36 @@ public:
if (moduleProxy->getModuleClassName() == "ModulePipelineSlave") if (moduleProxy->getModuleClassName() == "ModulePipelineSlave")
{ {
nlinfo("Slave DOWN (%s)", moduleProxy->getModuleName().c_str()); nlinfo("Slave DOWN (%s)", moduleProxy->getModuleName().c_str());
nlassert(m_Slaves.find(moduleProxy) != m_Slaves.end());
m_SlavesMutex.lock();
TSlaveMap::iterator slaveIt = m_Slaves.find(moduleProxy);
CSlave *slave = slaveIt->second;
if (slave->ActiveTaskId) std::vector<IModuleProxy *>::iterator findDelay = std::find(m_ModuleUpDelay.begin(), m_ModuleUpDelay.end(), moduleProxy);
if (findDelay != m_ModuleUpDelay.end())
{ {
// if it goes down while busy on a task it crashed (or was poorly stopped by user...) nlinfo("Remove from slave delay list");
CInfoFlags::getInstance()->addFlag(PIPELINE_INFO_SLAVE_CRASHED); m_ModuleUpDelay.erase(findDelay);
// ... TODO ... }
slaveAbortedBuildTask(moduleProxy); // see if this works else
{
nlassert(m_Slaves.find(moduleProxy) != m_Slaves.end());
m_SlavesMutex.lock();
TSlaveMap::iterator slaveIt = m_Slaves.find(moduleProxy);
CSlave *slave = slaveIt->second;
if (slave->ActiveTaskId)
{
// if it goes down while busy on a task it crashed (or was poorly stopped by user...)
CInfoFlags::getInstance()->addFlag(PIPELINE_INFO_SLAVE_CRASHED);
// ... TODO ...
slaveAbortedBuildTask(moduleProxy); // see if this works
}
m_Slaves.erase(slaveIt);
delete slave;
// nldebug("Now %i slaves remaining", m_Slaves.size());
m_SlavesMutex.unlock();
} }
m_Slaves.erase(slaveIt);
delete slave;
// nldebug("Now %i slaves remaining", m_Slaves.size());
m_SlavesMutex.unlock();
} }
} }
@ -388,6 +401,16 @@ public:
nlassert(it->second->ActiveTaskId == 0); nlassert(it->second->ActiveTaskId == 0);
} }
m_SlavesMutex.unlock(); m_SlavesMutex.unlock();
if (m_AbortRequested)
{
m_AbortRequested = false;
CInfoFlags::getInstance()->removeFlag(PIPELINE_INFO_ABORTING);
// Go through delayed slave list
nldebug("Handle delayed slave list");
for (std::vector<IModuleProxy *>::iterator it = m_ModuleUpDelay.begin(), end = m_ModuleUpDelay.end(); it != end; ++it)
onModuleUp(*it);
}
PIPELINE::endedBuildReadyMaster(); PIPELINE::endedBuildReadyMaster();
} }
@ -603,6 +626,9 @@ public:
{ {
if (m_BuildWorking) if (m_BuildWorking)
{ {
m_AbortRequested = true;
CInfoFlags::getInstance()->addFlag(PIPELINE_INFO_ABORTING);
m_BuildTaskQueue.abortQueue(); m_BuildTaskQueue.abortQueue();
m_SlavesMutex.lock(); m_SlavesMutex.lock();

Loading…
Cancel
Save