Changed: Safely handle exit during status update.

--HG--
branch : build_pipeline_v3
hg/feature/build_pipeline_v3
kaetemi 13 years ago
parent 9bd3b473fa
commit 5d02bcff33

@ -36,6 +36,7 @@
#include <nel/misc/async_file_manager.h> #include <nel/misc/async_file_manager.h>
#include <nel/misc/path.h> #include <nel/misc/path.h>
#include <nel/misc/file.h> #include <nel/misc/file.h>
#include <nel/net/service.h>
// Project includes // Project includes
#include "pipeline_service.h" #include "pipeline_service.h"
@ -97,7 +98,14 @@ bool CDatabaseStatus::getFileStatus(CFileStatus &fileStatus, const std::string &
} }
namespace { namespace {
/*
class CUpdateFileStatusCancel : public CAsyncFileManager::ICancelCallback
{
// cancel callback
virtual bool callback(const IRunnable *prunnable) const;
};
CUpdateFileStatusCancel s_UpdateFileStatusCancel;
*/
class CUpdateFileStatus : public IRunnable class CUpdateFileStatus : public IRunnable
{ {
public: public:
@ -110,62 +118,81 @@ public:
virtual void run() virtual void run()
{ {
nldebug("Run this");
bool firstSeen = false;
uint32 time = CTime::getSecondsSince1970();
std::string statusPath = g_PipelineDirectory + PIPELINE_DATABASE_STATUS_SUBDIR + dropDatabaseDirectory(FilePath) + ".status";
CFileStatus fs; CFileStatus fs;
StatusMutex->enter(); if (!g_IsExiting)
if (CFile::fileExists(statusPath))
{
CIFile ifs(statusPath, false);
fs.serial(ifs);
ifs.close();
}
else
{
firstSeen = true;
}
StatusMutex->leave();
if (firstSeen)
{ {
fs.FirstSeen = time; bool firstSeen = false;
fs.CRC32 = 0; uint32 time = CTime::getSecondsSince1970();
nldebug("First seen file: '%s'", FilePath.c_str()); std::string statusPath = g_PipelineDirectory + PIPELINE_DATABASE_STATUS_SUBDIR + dropDatabaseDirectory(FilePath) + ".status";
StatusMutex->enter();
if (CFile::fileExists(statusPath))
{
CIFile ifs(statusPath, false);
fs.serial(ifs);
ifs.close();
}
else
{
firstSeen = true;
}
StatusMutex->leave();
if (firstSeen)
{
fs.FirstSeen = time;
fs.CRC32 = 0;
nldebug("First seen file: '%s'", FilePath.c_str());
// create dir
CFile::createDirectoryTree(g_PipelineDirectory + PIPELINE_DATABASE_STATUS_SUBDIR + dropDatabaseDirectory(CFile::getPath(FilePath)));
}
fs.LastChanged = CFile::getFileModificationDate(FilePath);
fs.LastUpdate = time;
// create dir nldebug("Calculate crc32 of file: '%s'", FilePath.c_str());
CFile::createDirectoryTree(g_PipelineDirectory + PIPELINE_DATABASE_STATUS_SUBDIR + dropDatabaseDirectory(CFile::getPath(FilePath))); nlSleep(1000);
// calculate crc32 etcetera etcetera
StatusMutex->enter();
{
COFile ofs(statusPath, false, false, true);
fs.serial(ofs);
ofs.flush();
ofs.close();
}
StatusMutex->leave();
Callback(FilePath, fs, true);
} }
fs.LastChanged = CFile::getFileModificationDate(FilePath); else
fs.LastUpdate = time;
nldebug("Calculate crc32 of file: '%s'", FilePath.c_str());
nlSleep(1000);
// calculate crc32 etcetera etcetera
StatusMutex->enter();
{ {
COFile ofs(statusPath, false, false, true); Callback(FilePath, fs, false);
fs.serial(ofs); }
ofs.flush();
ofs.close();
}
StatusMutex->leave();
nldebug("Callback");
Callback(FilePath, fs);
nldebug("Delete this");
delete this; delete this;
} }
}; };
/*
bool CUpdateFileStatusCancel::callback(const IRunnable *prunnable) const
{
std::string name;
prunnable->getName(name);
if (name.find("CUpdateFileStatus") != std::string::npos)
{
CUpdateFileStatus *ufs = const_cast<CUpdateFileStatus *>(static_cast<const CUpdateFileStatus *>(prunnable));
CFileStatus fs;
ufs->Callback(ufs->FilePath, fs, false);
// delete ufs; CALLER DELETES THIS!
return true;
}
return false;
}*/
} /* anonymous namespace */ } /* anonymous namespace */
void CDatabaseStatus::updateFileStatus(const TFileStatusCallback &callback, const std::string &filePath) IRunnable *CDatabaseStatus::updateFileStatus(const TFileStatusCallback &callback, const std::string &filePath)
{ {
if (!g_IsMaster) if (!g_IsMaster)
{ {
nlerror("Not master, not allowed."); nlerror("Not master, not allowed.");
return; return NULL;
} }
CUpdateFileStatus *ufs = new CUpdateFileStatus(); CUpdateFileStatus *ufs = new CUpdateFileStatus();
@ -173,6 +200,7 @@ void CDatabaseStatus::updateFileStatus(const TFileStatusCallback &callback, cons
ufs->FilePath = filePath; ufs->FilePath = filePath;
ufs->Callback = callback; ufs->Callback = callback;
CAsyncFileManager::getInstance().addLoadTask(ufs); CAsyncFileManager::getInstance().addLoadTask(ufs);
return ufs;
} }
// ****************************************************************** // ******************************************************************
@ -190,9 +218,11 @@ public:
bool Ready; bool Ready;
bool CallbackCalled; bool CallbackCalled;
void fileUpdated(const std::string &filePath, const CFileStatus &fileStatus) std::vector<IRunnable *> RequestTasks;
void fileUpdated(const std::string &filePath, const CFileStatus &fileStatus, bool success)
{ {
nldebug("File updated callback"); // warning: may be g_IsExiting during this callback!
bool done = false; bool done = false;
Mutex.enter(); Mutex.enter();
@ -204,7 +234,45 @@ public:
} }
Mutex.leave(); Mutex.leave();
if (done) Callback(); if (done) doneRemove();
if (g_IsExiting)
{
abortExit();
}
}
void abortExit()
{
nlinfo("Abort database status update.");
Mutex.enter();
for (std::vector<IRunnable *>::iterator it = RequestTasks.begin(), end = RequestTasks.end(); it != end; ++it)
{
if (CAsyncFileManager::getInstance().deleteTask(*it))
{
++FilesUpdated;
delete *it;
}
}
bool done = false;
if (!CallbackCalled)
{
done = true;
CallbackCalled = true;
}
Mutex.leave();
if (done) doneRemove();
}
void doneRemove()
{
nlinfo("Database status update done.");
Callback();
delete this;
} }
}; };
@ -225,16 +293,23 @@ void updateDirectoryStatus(CDatabaseStatus* ds, CDatabaseStatusUpdater &updater,
} }
else else
{ {
updater.Mutex.enter();
++updater.FilesRequested;
updater.Mutex.leave();
CFileStatus fileStatus; CFileStatus fileStatus;
if (!ds->getFileStatus(fileStatus, subPath)) if (!ds->getFileStatus(fileStatus, subPath))
{ {
ds->updateFileStatus(TFileStatusCallback(&updater, &CDatabaseStatusUpdater::fileUpdated), subPath); updater.Mutex.enter();
if (!updater.CallbackCalled) // on abort.
{
++updater.FilesRequested;
IRunnable *runnable = ds->updateFileStatus(TFileStatusCallback(&updater, &CDatabaseStatusUpdater::fileUpdated), subPath);
updater.RequestTasks.push_back(runnable);
}
updater.Mutex.leave();
} }
} }
if (g_IsExiting)
return;
} }
} }
@ -242,31 +317,31 @@ void updateDirectoryStatus(CDatabaseStatus* ds, CDatabaseStatusUpdater &updater,
void CDatabaseStatus::updateDatabaseStatus(const CCallback<void> &callback) void CDatabaseStatus::updateDatabaseStatus(const CCallback<void> &callback)
{ {
CDatabaseStatusUpdater updater; CDatabaseStatusUpdater *updater = new CDatabaseStatusUpdater();
updater.Callback = callback; updater->Callback = callback;
updater.FilesRequested = 0; updater->FilesRequested = 0;
updater.FilesUpdated = 0; updater->FilesUpdated = 0;
updater.Ready = false; updater->Ready = false;
updater.CallbackCalled = false; updater->CallbackCalled = false;
nlinfo("Starting iteration through database, queueing file status updates."); nlinfo("Starting iteration through database, queueing file status updates.");
// recursive loop // recursive loop
updateDirectoryStatus(this, updater, g_DatabaseDirectory); updateDirectoryStatus(this, *updater, g_DatabaseDirectory);
nlinfo("Iteration through database, queueing file status updates complete."); nlinfo("Iteration through database, queueing file status updates complete.");
bool done = false; bool done = false;
updater.Mutex.enter(); updater->Mutex.enter();
updater.Ready = true; updater->Ready = true;
if (updater.FilesRequested == updater.FilesUpdated && !updater.CallbackCalled) if (updater->FilesRequested == updater->FilesUpdated && !updater->CallbackCalled)
{ {
done = true; done = true;
updater.CallbackCalled = true; updater->CallbackCalled = true;
} }
updater.Mutex.leave(); updater->Mutex.leave();
if (done) updater.Callback(); if (done) updater->doneRemove();
} }
// ****************************************************************** // ******************************************************************

@ -39,6 +39,10 @@
// Project includes // Project includes
#include "callback.h" #include "callback.h"
namespace NLMISC {
class IRunnable;
}
namespace PIPELINE { namespace PIPELINE {
#define PIPELINE_DATABASE_STATUS_SUBDIR "database.status/" #define PIPELINE_DATABASE_STATUS_SUBDIR "database.status/"
@ -69,7 +73,7 @@ public:
void serial(NLMISC::IStream &stream); void serial(NLMISC::IStream &stream);
}; };
typedef CCallback<void, const std::string &/*filePath*/, const CFileStatus &/*fileStatus*/> TFileStatusCallback; typedef CCallback<void, const std::string &/*filePath*/, const CFileStatus &/*fileStatus*/, bool /*success*/> TFileStatusCallback;
/** /**
* \brief CDatabaseStatus * \brief CDatabaseStatus
@ -89,9 +93,9 @@ public:
/// Tries to read the last file status. Return false if the status is invalid. Call updateFileStatus if the result is false to update asynchronously. /// Tries to read the last file status. Return false if the status is invalid. Call updateFileStatus if the result is false to update asynchronously.
bool getFileStatus(CFileStatus &fileStatus, const std::string &filePath) const; bool getFileStatus(CFileStatus &fileStatus, const std::string &filePath) const;
/// Updates the file status asynchronously. The new file status is broadcast to clients and slaves afterwards. /// Updates the file status asynchronously. The new file status is broadcast to clients and slaves afterwards. Warning: If g_IsExiting during callback then update likely did not happen.
void updateFileStatus(const TFileStatusCallback &callback, const std::string &filePath); NLMISC::IRunnable *updateFileStatus(const TFileStatusCallback &callback, const std::string &filePath);
/// Forces an update of the complete database status. /// Forces an update of the complete database status. Warning: If g_IsExiting during callback then update is incomplete.
void updateDatabaseStatus(const CCallback<void> &callback); void updateDatabaseStatus(const CCallback<void> &callback);
void getFileErrors(CFileErrors &fileErrors, const std::string &filePath, uint32 newerThan = 0) const; void getFileErrors(CFileErrors &fileErrors, const std::string &filePath, uint32 newerThan = 0) const;

@ -43,6 +43,7 @@
#include <nel/georges/u_form_loader.h> #include <nel/georges/u_form_loader.h>
#include <nel/misc/mutex.h> #include <nel/misc/mutex.h>
#include <nel/misc/task_manager.h> #include <nel/misc/task_manager.h>
#include <nel/misc/async_file_manager.h>
// Project includes // Project includes
#include "pipeline_workspace.h" #include "pipeline_workspace.h"
@ -58,6 +59,7 @@ namespace PIPELINE {
bool g_IsMaster = false; bool g_IsMaster = false;
std::string g_DatabaseDirectory; std::string g_DatabaseDirectory;
std::string g_PipelineDirectory; std::string g_PipelineDirectory;
bool g_IsExiting = false;
namespace { namespace {
@ -251,6 +253,14 @@ public:
/// Finalization. Release the service. For example, this function frees all allocations made in the init() function. /// Finalization. Release the service. For example, this function frees all allocations made in the init() function.
virtual void release() virtual void release()
{ {
g_IsExiting = true;
while (NLMISC::CAsyncFileManager::getInstance().getNumWaitingTasks() > 0)
{
nlSleep(10);
}
NLMISC::CAsyncFileManager::terminate();
delete s_DatabaseStatus; delete s_DatabaseStatus;
s_DatabaseStatus = NULL; s_DatabaseStatus = NULL;

@ -42,6 +42,8 @@ extern bool g_IsMaster;
extern std::string g_DatabaseDirectory; extern std::string g_DatabaseDirectory;
extern std::string g_PipelineDirectory; extern std::string g_PipelineDirectory;
extern bool g_IsExiting;
} /* namespace PIPELINE */ } /* namespace PIPELINE */
#endif /* #ifndef PIPELINE_PIPELINE_SERVICE_H */ #endif /* #ifndef PIPELINE_PIPELINE_SERVICE_H */

Loading…
Cancel
Save