Implement NLNET sleepUntilDataAvailable for Win32 build

develop
kaetemi 3 years ago
parent 8a7e96f7a6
commit 0c5f4cdc1a
No known key found for this signature in database
GPG Key ID: 9873C4D40BB479BC

@ -56,6 +56,10 @@ extern uint32 NbNetworkTask;
enum TPipeWay { PipeRead, PipeWrite }; enum TPipeWay { PipeRead, PipeWrite };
#endif #endif
#ifdef NL_OS_WINDOWS
typedef void *HANDLE;
#endif
/** /**
* Layer 1 * Layer 1
@ -78,7 +82,7 @@ public:
/// Destructor /// Destructor
virtual ~CBufNetBase(); virtual ~CBufNetBase();
#ifdef NL_OS_UNIX #if defined(NL_OS_UNIX)
/** Init the pipe for data available with an external pipe. /** Init the pipe for data available with an external pipe.
* Call it only if you set initPipeForDataAvailable to false in the constructor. * Call it only if you set initPipeForDataAvailable to false in the constructor.
* Then don't call sleepUntilDataAvailable() but use select() on the pipe. * Then don't call sleepUntilDataAvailable() but use select() on the pipe.
@ -89,6 +93,11 @@ public:
_DataAvailablePipeHandle[PipeRead] = twoPipeHandles[PipeRead]; _DataAvailablePipeHandle[PipeRead] = twoPipeHandles[PipeRead];
_DataAvailablePipeHandle[PipeWrite] = twoPipeHandles[PipeWrite]; _DataAvailablePipeHandle[PipeWrite] = twoPipeHandles[PipeWrite];
} }
#elif defined(NL_OS_WINDOWS)
void setExternalPipeForDataAvailable(HANDLE eventHandle)
{
_DataAvailableHandle = eventHandle;
}
#endif #endif
/// Sets callback for detecting a disconnection (or NULL to disable callback) /// Sets callback for detecting a disconnection (or NULL to disable callback)
@ -201,9 +210,11 @@ protected:
/// Return _DataAvailable /// Return _DataAvailable
bool dataAvailableFlag() const { return _DataAvailable; } bool dataAvailableFlag() const { return _DataAvailable; }
#ifdef NL_OS_UNIX #if defined(NL_OS_UNIX)
/// Pipe to select() on data available /// Pipe to select() on data available
int _DataAvailablePipeHandle [2]; int _DataAvailablePipeHandle [2];
#elif defined(NL_OS_WINDOWS)
HANDLE _DataAvailableHandle;
#endif #endif
private: private:

@ -638,7 +638,7 @@ protected:
/// Auto-reconnect /// Auto-reconnect
void autoReconnect( CUnifiedConnection &uc, uint connectionIndex ); void autoReconnect( CUnifiedConnection &uc, uint connectionIndex );
#ifdef NL_OS_UNIX #if defined(NL_OS_UNIX) || defined(NL_OS_WINDOWS)
/// Sleep (implemented by select()) /// Sleep (implemented by select())
void sleepUntilDataAvailable( NLMISC::TTime msecMax ); void sleepUntilDataAvailable( NLMISC::TTime msecMax );
#endif #endif
@ -700,9 +700,11 @@ private:
/// for each services, which network to take /// for each services, which network to take
std::vector<std::string> _DefaultNetwork; std::vector<std::string> _DefaultNetwork;
#ifdef NL_OS_UNIX #if defined(NL_OS_UNIX)
/// Pipe to select() on data available (shared among all connections) /// Pipe to select() on data available (shared among all connections)
int _MainDataAvailablePipe [2]; int _MainDataAvailablePipe [2];
#elif defined(NL_OS_WINDOWS)
HANDLE _MainDataAvailableHandle;
#endif #endif
/// Service id of the running service /// Service id of the running service

@ -57,13 +57,15 @@ CBufNetBase::CBufNetBase() :
#ifdef MUTEX_DEBUG #ifdef MUTEX_DEBUG
initAcquireTimeMap(); initAcquireTimeMap();
#endif #endif
#ifdef NL_OS_UNIX #if defined(NL_OS_UNIX)
_IsDataAvailablePipeSelfManaged = isDataAvailablePipeSelfManaged; _IsDataAvailablePipeSelfManaged = isDataAvailablePipeSelfManaged;
if ( _IsDataAvailablePipeSelfManaged ) if ( _IsDataAvailablePipeSelfManaged )
{ {
if ( ::pipe( _DataAvailablePipeHandle ) != 0 ) if ( ::pipe( _DataAvailablePipeHandle ) != 0 )
nlwarning( "Unable to create D.A. pipe" ); nlwarning( "Unable to create D.A. pipe" );
} }
#elif defined(NL_OS_WINDOWS)
_DataAvailableHandle = NULL;
#endif #endif
} }
@ -99,7 +101,7 @@ void CBufNetBase::pushMessageIntoReceiveQueue( const std::vector<uint8>& buffer
//mbsize = recvfifo.value().size() / 1048576; //mbsize = recvfifo.value().size() / 1048576;
setDataAvailableFlag( true ); setDataAvailableFlag( true );
} }
#ifdef NL_OS_UNIX #if defined(NL_OS_UNIX)
// Wake-up main thread (outside the critical section of CFifoAccessor, to allow main thread to be // Wake-up main thread (outside the critical section of CFifoAccessor, to allow main thread to be
// read the fifo; if the main thread sees the Data Available flag is true but the pipe not written // read the fifo; if the main thread sees the Data Available flag is true but the pipe not written
// yet, it will block on read()). // yet, it will block on read()).
@ -109,6 +111,9 @@ void CBufNetBase::pushMessageIntoReceiveQueue( const std::vector<uint8>& buffer
nlwarning( "LNETL1: Write pipe failed in pushMessageIntoReceiveQueue" ); nlwarning( "LNETL1: Write pipe failed in pushMessageIntoReceiveQueue" );
} }
//nldebug( "Pipe: 1 byte written (%p)", this ); //nldebug( "Pipe: 1 byte written (%p)", this );
#elif defined(NL_OS_WINDOWS)
if (_DataAvailableHandle)
SetEvent(_DataAvailableHandle);
#endif #endif
//nldebug( "BNB: Released." ); //nldebug( "BNB: Released." );
//if ( mbsize > 1 ) //if ( mbsize > 1 )
@ -131,7 +136,7 @@ void CBufNetBase::pushMessageIntoReceiveQueue( const uint8 *buffer, uint32 size
//nldebug( "BNB: Pushed, releasing the receive queue..." ); //nldebug( "BNB: Pushed, releasing the receive queue..." );
//mbsize = recvfifo.value().size() / 1048576; //mbsize = recvfifo.value().size() / 1048576;
setDataAvailableFlag( true ); setDataAvailableFlag( true );
#ifdef NL_OS_UNIX #if defined(NL_OS_UNIX)
// Wake-up main thread // Wake-up main thread
uint8 b=0; uint8 b=0;
if ( write( _DataAvailablePipeHandle[PipeWrite], &b, 1 ) == -1 ) if ( write( _DataAvailablePipeHandle[PipeWrite], &b, 1 ) == -1 )
@ -139,6 +144,9 @@ void CBufNetBase::pushMessageIntoReceiveQueue( const uint8 *buffer, uint32 size
nlwarning( "LNETL1: Write pipe failed in pushMessageIntoReceiveQueue" ); nlwarning( "LNETL1: Write pipe failed in pushMessageIntoReceiveQueue" );
} }
nldebug( "Pipe: 1 byte written" ); nldebug( "Pipe: 1 byte written" );
#elif defined(NL_OS_WINDOWS)
if (_DataAvailableHandle)
SetEvent(_DataAvailableHandle);
#endif #endif
} }
//nldebug( "BNB: Released." ); //nldebug( "BNB: Released." );

@ -45,8 +45,8 @@ uint32 TotalCallbackCalled = 0;
uint32 TimeInCallback =0; uint32 TimeInCallback =0;
#ifdef NL_OS_UNIX #if defined(NL_OS_UNIX) || defined(NL_OS_WINDOWS)
/// Yield method (Unix only) /// Yield method
CVariable<uint32> UseYieldMethod("nel", "UseYieldMethod", "0=select 1=usleep 2=nanosleep 3=sched_yield 4=none", 0, 0, true ); CVariable<uint32> UseYieldMethod("nel", "UseYieldMethod", "0=select 1=usleep 2=nanosleep 3=sched_yield 4=none", 0, 0, true );
#endif #endif
@ -564,11 +564,15 @@ bool CUnifiedNetwork::init(const CInetAddress *addr, CCallbackNetBase::TRecordin
port = CNamingClient::queryServicePort (); port = CNamingClient::queryServicePort ();
} }
#ifdef NL_OS_UNIX #if defined(NL_OS_UNIX)
/// Init the main pipe to select() on data available /// Init the main pipe to select() on data available
if ( ::pipe( _MainDataAvailablePipe ) != 0 ) if ( ::pipe( _MainDataAvailablePipe ) != 0 )
nlwarning( "Unable to create main D.A. pipe" ); nlwarning( "Unable to create main D.A. pipe" );
//nldebug( "Pipe: created" ); //nldebug( "Pipe: created" );
#elif defined(NL_OS_WINDOWS)
_MainDataAvailableHandle = CreateEventW(NULL, FALSE, FALSE, NULL);
if (!_MainDataAvailableHandle)
nlwarning("Unable to create main D.A. event");
#endif #endif
// setup the server callback only if server port != 0, otherwise there's no server callback // setup the server callback only if server port != 0, otherwise there's no server callback
@ -578,9 +582,11 @@ bool CUnifiedNetwork::init(const CInetAddress *addr, CCallbackNetBase::TRecordin
{ {
nlassert (_CbServer == 0); nlassert (_CbServer == 0);
_CbServer = new CCallbackServer( CCallbackNetBase::Off, "", true, false ); // don't init one pipe per connection _CbServer = new CCallbackServer( CCallbackNetBase::Off, "", true, false ); // don't init one pipe per connection
#ifdef NL_OS_UNIX #if defined(NL_OS_UNIX)
_CbServer->setExternalPipeForDataAvailable( _MainDataAvailablePipe ); // the main pipe is shared for all connections _CbServer->setExternalPipeForDataAvailable( _MainDataAvailablePipe ); // the main pipe is shared for all connections
//nldebug( "Pipe: set (server %p)", _CbServer ); //nldebug( "Pipe: set (server %p)", _CbServer );
#elif defined(NL_OS_WINDOWS)
_CbServer->setExternalPipeForDataAvailable(_MainDataAvailableHandle);
#endif #endif
bool retry = false; bool retry = false;
do do
@ -754,9 +760,15 @@ void CUnifiedNetwork::release(bool mustFlushSendQueues, const std::vector<std::s
if (CNamingClient::connected ()) if (CNamingClient::connected ())
CNamingClient::disconnect (); CNamingClient::disconnect ();
#ifdef NL_OS_UNIX #if defined(NL_OS_UNIX)
::close( _MainDataAvailablePipe[PipeRead] ); ::close( _MainDataAvailablePipe[PipeRead] );
::close( _MainDataAvailablePipe[PipeWrite] ); ::close( _MainDataAvailablePipe[PipeWrite] );
#elif defined(NL_OS_WINDOWS)
if (_MainDataAvailableHandle)
{
CloseHandle(_MainDataAvailableHandle);
_MainDataAvailableHandle = NULL;
}
#endif #endif
} }
@ -856,9 +868,11 @@ void CUnifiedNetwork::addService(const string &name, const vector<CInetAddress>
// Create a new connection with the service, setup callback and connect // Create a new connection with the service, setup callback and connect
CCallbackClient *cbc = new CCallbackClient( CCallbackNetBase::Off, "", true, false ); // don't init one pipe per connection CCallbackClient *cbc = new CCallbackClient( CCallbackNetBase::Off, "", true, false ); // don't init one pipe per connection
#ifdef NL_OS_UNIX #if defined(NL_OS_UNIX)
cbc->setExternalPipeForDataAvailable( _MainDataAvailablePipe ); // the main pipe is shared for all connections cbc->setExternalPipeForDataAvailable( _MainDataAvailablePipe ); // the main pipe is shared for all connections
//nldebug( "Pipe: set (client %p)", cbc ); //nldebug( "Pipe: set (client %p)", cbc );
#elif defined(NL_OS_WINDOWS)
cbc->setExternalPipeForDataAvailable(_MainDataAvailableHandle);
#endif #endif
cbc->setDisconnectionCallback(uncbDisconnection, NULL); cbc->setDisconnectionCallback(uncbDisconnection, NULL);
cbc->setDefaultCallback(uncbMsgProcessing); cbc->setDefaultCallback(uncbMsgProcessing);
@ -1147,7 +1161,7 @@ void CUnifiedNetwork::update(TTime timeout)
t0 = currentTime - (timeout - remainingTime); t0 = currentTime - (timeout - remainingTime);
} }
#ifdef NL_OS_UNIX #if defined(NL_OS_UNIX)
// Sleep until the time expires or we receive a message // Sleep until the time expires or we receive a message
H_BEFORE(L5UpdateSleep); H_BEFORE(L5UpdateSleep);
switch ( UseYieldMethod.get() ) switch ( UseYieldMethod.get() )
@ -1159,6 +1173,18 @@ void CUnifiedNetwork::update(TTime timeout)
default: break; // don't sleep at all, makes all slow! default: break; // don't sleep at all, makes all slow!
} }
H_AFTER(L5UpdateSleep); H_AFTER(L5UpdateSleep);
#elif defined(NL_OS_WINDOWS)
// Sleep until the time expires or we receive a message
H_BEFORE(L5UpdateSleep);
switch (UseYieldMethod.get())
{
case 0: sleepUntilDataAvailable(remainingTime); break; // accurate sleep
case 1: nlSleep(1); break;
case 2: nlSleep(1); break;
case 3: SwitchToThread(); break;
default: break; // don't sleep at all, makes all slow!
}
H_AFTER(L5UpdateSleep);
#else #else
// Enable windows multithreading before rescanning all connections // Enable windows multithreading before rescanning all connections
H_TIME(L5UpdateSleep, nlSleep(1);); // 0 (yield) would be too harmful to other applications H_TIME(L5UpdateSleep, nlSleep(1);); // 0 (yield) would be too harmful to other applications
@ -1222,10 +1248,7 @@ void CUnifiedNetwork::autoReconnect( CUnifiedConnection &uc, uint connectionInde
} }
} }
#ifdef NL_OS_UNIX #if defined(NL_OS_UNIX)
/*
*
*/
void CUnifiedNetwork::sleepUntilDataAvailable( TTime msecMax ) void CUnifiedNetwork::sleepUntilDataAvailable( TTime msecMax )
{ {
// Prevent looping infinitely if an erroneous time was provided // Prevent looping infinitely if an erroneous time was provided
@ -1249,6 +1272,15 @@ void CUnifiedNetwork::sleepUntilDataAvailable( TTime msecMax )
nlwarning( "HNETL5: Select failed in sleepUntilDataAvailable"); nlwarning( "HNETL5: Select failed in sleepUntilDataAvailable");
//nldebug( "Slept %u ms", (uint)(CTime::getLocalTime()-before) ); //nldebug( "Slept %u ms", (uint)(CTime::getLocalTime()-before) );
} }
#elif defined(NL_OS_WINDOWS)
void CUnifiedNetwork::sleepUntilDataAvailable(TTime msecMax)
{
if (msecMax > 999)
msecMax = 999;
nlassert(_MainDataAvailableHandle);
WaitForSingleObject(_MainDataAvailableHandle, msecMax);
}
#endif #endif

Loading…
Cancel
Save