You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ryzom-core/code/nel/src/net/buf_client.cpp

472 lines
12 KiB
C++

// NeL - MMORPG Framework <http://dev.ryzom.com/projects/nel/>
// Copyright (C) 2010 Winch Gate Property Limited
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "stdnet.h"
#include "nel/misc/hierarchical_timer.h"
#include "nel/net/buf_client.h"
#include "nel/misc/thread.h"
#include "nel/net/dummy_tcp_sock.h"
#include "nel/net/net_log.h"
#ifdef NL_OS_WINDOWS
# define NOMINMAX
# include <windows.h>
#elif defined NL_OS_UNIX
# include <netinet/in.h>
#endif
using namespace NLMISC;
using namespace std;
namespace NLNET {
uint32 NbClientReceiveTask = 0;
/***************************************************************************************************
* User main thread (initialization)
**************************************************************************************************/
/*
* Constructor
*/
#ifdef NL_OS_UNIX
CBufClient::CBufClient( bool nodelay, bool replaymode, bool initPipeForDataAvailable ) :
CBufNetBase( initPipeForDataAvailable ),
#else
CBufClient::CBufClient( bool nodelay, bool replaymode, bool ) :
CBufNetBase(),
#endif
_NoDelay( nodelay ),
_PrevBytesDownloaded( 0 ),
_PrevBytesUploaded( 0 ),
_RecvTask( NULL ),
_RecvThread( NULL )
/*_PrevBytesReceived( 0 ),
_PrevBytesSent( 0 )*/
{
nlnettrace( "CBufClient::CBufClient" ); // don't define a global object
if ( replaymode )
{
_BufSock = new CNonBlockingBufSock( new CDummyTcpSock(), CBufNetBase::DefaultMaxExpectedBlockSize );
}
else
{
_BufSock = new CNonBlockingBufSock( NULL, CBufNetBase::DefaultMaxExpectedBlockSize );
_RecvTask = new CClientReceiveTask( this, _BufSock );
}
}
/*
* Connects to the specified host
* Precond: not connected
*/
void CBufClient::connect( const CInetAddress& addr )
{
nlnettrace( "CBufClient::connect" );
nlassert( ! _BufSock->Sock->connected() );
_BufSock->setMaxExpectedBlockSize( maxExpectedBlockSize() );
_BufSock->connect( addr, _NoDelay, true );
_BufSock->setNonBlocking(); // ADDED: non-blocking client connection
_PrevBytesDownloaded = 0;
_PrevBytesUploaded = 0;
/*_PrevBytesReceived = 0;
_PrevBytesSent = 0;*/
// Allow reconnection
if ( _RecvThread != NULL )
{
delete _RecvThread;
}
_RecvThread = IThread::create( _RecvTask, 1024*4*4 );
_RecvThread->start();
}
/***************************************************************************************************
* User main thread (running)
**************************************************************************************************/
void CBufClient::displayThreadStat (NLMISC::CLog *log)
{
log->displayNL ("client thread %p nbloop %d", _RecvTask, _RecvTask->NbLoop);
}
/*
* Sends a message to the remote host
*/
void CBufClient::send( const NLMISC::CMemStream& buffer )
{
nlnettrace( "CBufClient::send" );
nlassert( buffer.length() > 0 );
nlassert( buffer.length() <= maxSentBlockSize() );
// slow down the layer H_AUTO (CBufServer_send);
if ( ! _BufSock->pushBuffer( buffer ) )
{
// Disconnection event if disconnected
_BufSock->advertiseDisconnection( this, NULL );
}
}
/*
* Checks if there are some data to receive
*/
bool CBufClient::dataAvailable()
{
// slow down the layer H_AUTO (CBufClient_dataAvailable);
{
/* If no data available, enter the 'while' loop and return false (1 volatile test)
* If there are user data available, enter the 'while' and return true immediately (1 volatile test + 1 short locking)
* If there is a disconnection event (rare), call the callback and loop
*/
while ( dataAvailableFlag() )
{
// Because _DataAvailable is true, the receive queue is not empty at this point
uint8 val;
{
CFifoAccessor recvfifo( &receiveQueue() );
val = recvfifo.value().frontLast ();
}
#ifdef NL_OS_UNIX
uint8 b;
if ( read( _DataAvailablePipeHandle[PipeRead], &b, 1 ) == -1 )
nlwarning( "LNETL1: Read pipe failed in dataAvailable" );
//nldebug( "Pipe: 1 byte read (client %p)", this );
#endif
// Test if it the next block is a system event
switch ( val )
{
// Normal message available
case CBufNetBase::User:
return true; // return immediatly, do not extract the message
// Process disconnection event
case CBufNetBase::Disconnection:
LNETL1_DEBUG( "LNETL1: Disconnection event" );
_BufSock->setConnectedState( false );
// Call callback if needed
if ( disconnectionCallback() != NULL )
{
disconnectionCallback()( id(), argOfDisconnectionCallback() );
}
// Unlike the server version, we do not delete the CBufSock object here,
// it will be done in the destructor of CBufClient
break;
default: // should not occur
{
CFifoAccessor recvfifo( &receiveQueue() );
vector<uint8> buffer;
recvfifo.value().front (buffer);
LNETL1_INFO( "LNETL1: Invalid block type: %hu (should be = %hu)", (uint16)(buffer[buffer.size()-1]), (uint16)val );
LNETL1_INFO( "LNETL1: Buffer (%d B): [%s]", buffer.size(), stringFromVector(buffer).c_str() );
LNETL1_INFO( "LNETL1: Receive queue:" );
recvfifo.value().display();
nlerror( "LNETL1: Invalid system event type in client receive queue" );
}
}
// Extract system event
{
CFifoAccessor recvfifo( &receiveQueue() );
recvfifo.value().pop();
setDataAvailableFlag( ! recvfifo.value().empty() );
}
}
// _DataAvailable is false here
return false;
}
}
#ifdef NL_OS_UNIX
/* Wait until the receive queue contains something to read (implemented with a select()).
* This is where the connection/disconnection callbacks can be called.
* \param usecMax Max time to wait in microsecond (up to 1 sec)
*/
void CBufClient::sleepUntilDataAvailable( uint usecMax )
{
// Prevent looping infinitely if the system time was changed
if ( usecMax > 999999 ) // limit not told in Linux man but here: http://docs.hp.com/en/B9106-90009/select.2.html
usecMax = 999999;
fd_set readers;
timeval tv;
do
{
FD_ZERO( &readers );
FD_SET( _DataAvailablePipeHandle[PipeRead], &readers );
tv.tv_sec = 0;
tv.tv_usec = usecMax;
int res = ::select( _DataAvailablePipeHandle[PipeRead]+1, &readers, NULL, NULL, &tv );
if ( res == -1 )
nlerror( "LNETL1: Select failed in sleepUntilDataAvailable (code %u)", CSock::getLastError() );
}
while ( ! dataAvailable() ); // will loop if only a connection/disconnection event was read
}
#endif
/*
* Receives next block of data in the specified buffer (resizes the vector)
* Precond: dataAvailable() has returned true
*/
void CBufClient::receive( NLMISC::CMemStream& buffer )
{
nlnettrace( "CBufClient::receive" );
//nlassert( dataAvailable() );
// Extract buffer from the receive queue
{
CFifoAccessor recvfifo( &receiveQueue() );
nlassert( ! recvfifo.value().empty() );
recvfifo.value().front( buffer );
recvfifo.value().pop();
setDataAvailableFlag( ! recvfifo.value().empty() );
}
// Extract event type
nlassert( buffer.buffer()[buffer.size()-1] == CBufNetBase::User );
//commented for optimisation LNETL1_DEBUG( "LNETL1: Client read buffer (%d+%d B)", buffer.size(), sizeof(TSockId)+1 );
buffer.resize( buffer.size()-1 );
}
/*
* Update the network (call this method evenly)
*/
void CBufClient::update()
{
//nlnettrace( "CBufClient::update" );
// Update sending
bool sendingok = _BufSock->update();
// Disconnection event if disconnected
if ( ! ( _BufSock->Sock->connected() && sendingok ) )
{
if ( _BufSock->Sock->connected() )
{
_BufSock->Sock->disconnect();
}
_BufSock->advertiseDisconnection( this, NULL );
}
}
/*
* Disconnect the remote host
*/
void CBufClient::disconnect( bool quick )
{
nlnettrace( "CBufClient::disconnect" );
// Do not allow to disconnect a socket that is not connected
nlassert( _BufSock->connectedState() );
// When the NS tells us to remove this connection AND the connection has physically
// disconnected but not yet logically (i.e. disconnection event not processed yet),
// skip flushing and physical active disconnection
if ( _BufSock->Sock->connected() )
{
// Flush sending is asked for
if ( ! quick )
{
_BufSock->flush();
}
// Disconnect and prevent from advertising the disconnection
_BufSock->disconnect( false );
}
// Empty the receive queue
{
CFifoAccessor recvfifo( &receiveQueue() );
recvfifo.value().clear();
setDataAvailableFlag( false );
}
}
// Utility function for newBytes...()
inline uint64 updateStatCounter( uint64& counter, uint64 newvalue )
{
uint64 result = newvalue - counter;
counter = newvalue;
return result;
}
/*
* Returns the number of bytes downloaded since the previous call to this method
*/
uint64 CBufClient::newBytesDownloaded()
{
return updateStatCounter( _PrevBytesDownloaded, bytesDownloaded() );
}
/*
* Returns the number of bytes uploaded since the previous call to this method
*/
uint64 CBufClient::newBytesUploaded()
{
return updateStatCounter( _PrevBytesUploaded, bytesUploaded() );
}
/*
* Returns the number of bytes popped by receive() since the previous call to this method
*/
/*uint64 CBufClient::newBytesReceived()
{
return updateStatCounter( _PrevBytesReceived, bytesReceived() );
}*/
/*
* Returns the number of bytes pushed by send() since the previous call to this method
*/
/*uint64 CBufClient::newBytesSent()
{
return updateStatCounter( _PrevBytesSent, bytesSent() );
}*/
/*
* Destructor
*/
CBufClient::~CBufClient()
{
nlnettrace( "CBufClient::~CBufClient" );
// Disconnect if not done
if ( _BufSock->Sock->connected() )
{
nlassert( _BufSock->connectedState() );
disconnect( true );
}
// Clean thread termination
if ( _RecvThread != NULL )
{
LNETL1_DEBUG( "LNETL1: Waiting for the end of the receive thread..." );
_RecvThread->wait();
}
if ( _RecvTask != NULL )
delete _RecvTask;
if ( _RecvThread != NULL )
delete _RecvThread;
if ( _BufSock != NULL )
delete _BufSock;
nlnettrace( "Exiting CBufClient::~CBufClient" );
}
/***************************************************************************************************
* Receive thread
**************************************************************************************************/
/*
* Code of receiving thread for clients
*/
void CClientReceiveTask::run()
{
NbClientReceiveTask++;
NbNetworkTask++;
nlnettrace( "CClientReceiveTask::run" );
// 18/08/2005 : sonix : Changed time out from 60s to 1s, in some case, it
// can generate a 60 s wait on destruction of the CBufSock
// By the way, checking every 1s is not a time consuming
_NBBufSock->Sock->setTimeOutValue( 1, 0 );
bool connected = true;
while ( connected && _NBBufSock->Sock->connected())
{
try
{
// ADDED: non-blocking client connection
// Wait until some data are received (sleepin' select inside)
while ( ! _NBBufSock->Sock->dataAvailable() )
{
if ( ! _NBBufSock->Sock->connected() )
{
LNETL1_DEBUG( "LNETL1: Client connection %s closed", sockId()->asString().c_str() );
// The socket went to _Connected=false when throwing the exception
connected = false;
break;
}
}
// Process the data received
if ( _NBBufSock->receivePart( 1 ) ) // 1 for the event type
{
//commented out for optimisation: LNETL1_DEBUG( "LNETL1: Client %s received buffer (%u bytes)", _SockId->asString().c_str(), buffer.size()/*, stringFromVector(buffer).c_str()*/ );
// Add event type
_NBBufSock->fillEventTypeOnly();
// Push message into receive queue
_Client->pushMessageIntoReceiveQueue( _NBBufSock->receivedBuffer() );
}
NbLoop++;
}
catch (const ESocket&)
{
LNETL1_DEBUG( "LNETL1: Client connection %s broken", sockId()->asString().c_str() );
sockId()->Sock->disconnect();
connected = false;
}
}
nlnettrace( "Exiting CClientReceiveTask::run()" );
NbClientReceiveTask--;
NbNetworkTask--;
}
NLMISC_CATEGORISED_VARIABLE(nel, uint32, NbClientReceiveTask, "Number of client receive thread");
} // NLNET