@ -32,11 +32,6 @@
# include <vector>
using namespace NLMISC ;
using namespace NLNET ;
using namespace std ;
// Disable this define to reduce the traffic between mirror services of property changes
# define DISABLE_PROPERTY_CHANGE_TRAFFIC_LIMITING
@ -89,7 +84,7 @@ struct TMessageCarrierWithDestId : public TMessageCarrier
{
TMessageCarrierWithDestId ( bool inputStream ) : TMessageCarrier ( inputStream ) { }
TServiceId DestId ;
NLNET: : TServiceId DestId ;
} ;
@ -125,7 +120,7 @@ public:
CDataSetMS * dataSet ( ) const { return _DataSet ; }
/// Return the buffer
CMemStream& getDeltaBuffer ( ) { return _DeltaBuffer ; }
NLMISC: : CMemStream& getDeltaBuffer ( ) { return _DeltaBuffer ; }
/**
* Clear the buffer
@ -429,7 +424,7 @@ public:
} */
/// Store (and push directly) a message
void pushMessage ( TServiceId destId , TServiceId senderId , NLMISC : : CMemStream & msg )
void pushMessage ( NLNET: : TServiceId destId , NLNET : : TServiceId senderId , NLMISC : : CMemStream & msg )
{
if ( _NbMessagesStored = = 0 )
{
@ -470,7 +465,7 @@ private:
CDataSetMS * _DataSet ;
///
CMemStream _DeltaBuffer ;
NLMISC: : CMemStream _DeltaBuffer ;
/// Number of changes since the last beginRowManagement() or beginPropChanges()
sint _NbChangesPushed ;
@ -514,8 +509,8 @@ inline void CDeltaToMS::pushRowManagement( TEntityTrackerIndex eti, const TDat
if ( eti = = ADDING )
{
//nldebug( "Pushing E%d addition", entityIndex );
_DeltaBuffer . fastWrite ( const_cast < CEntityId& > ( _DataSet - > getEntityId ( datasetRow ) ) ) ;
TServiceId8 spawnerId = _DataSet - > getSpawnerServiceId ( datasetRow . getIndex ( ) ) ;
_DeltaBuffer . fastWrite ( const_cast < NLMISC: : CEntityId& > ( _DataSet - > getEntityId ( datasetRow ) ) ) ;
NLNET: : TServiceId8 spawnerId = _DataSet - > getSpawnerServiceId ( datasetRow . getIndex ( ) ) ;
_DeltaBuffer . fastWrite ( spawnerId ) ;
}
@ -530,8 +525,8 @@ inline void CDeltaToMS::pushRowManagementSync( const TDataSetRow& datasetRow )
{
_DeltaBuffer . fastWrite ( datasetRow ) ;
//nldebug( "Pushing E%d addition", entityIndex );
_DeltaBuffer . fastWrite ( const_cast < CEntityId& > ( _DataSet - > getEntityId ( datasetRow ) ) ) ;
TServiceId8 spawnerId = _DataSet - > getSpawnerServiceId ( datasetRow . getIndex ( ) ) ;
_DeltaBuffer . fastWrite ( const_cast < NLMISC: : CEntityId& > ( _DataSet - > getEntityId ( datasetRow ) ) ) ;
NLNET: : TServiceId8 spawnerId = _DataSet - > getSpawnerServiceId ( datasetRow . getIndex ( ) ) ;
_DeltaBuffer . fastWrite ( spawnerId ) ;
+ + _NbChangesPushed ;
}
@ -540,7 +535,7 @@ inline void CDeltaToMS::pushRowManagementSync( const TDataSetRow& datasetRow )
// The vector stores pointers to avoid big reallocations and to allow linking with pointers
typedef std : : vector < CDeltaToMS * > TDeltaToMSList ;
typedef std : : list < TServiceId8 > TServiceIdList ;
typedef std : : list < NLNET: : TServiceId8 > TServiceIdList ;
/**
@ -560,7 +555,7 @@ public:
}
/// Add a remote mirror service
void addRemoteMS ( TSDataSetsMS & datasets , TServiceId serviceId )
void addRemoteMS ( TSDataSetsMS & datasets , NLNET: : TServiceId serviceId )
{
_ServiceIdList . push_back ( serviceId ) ;
@ -568,12 +563,12 @@ public:
for ( ids = datasets . begin ( ) ; ids ! = datasets . end ( ) ; + + ids )
{
_DeltaToMSArray [ serviceId . get ( ) ] . push_back ( new CDeltaToMS ( & ( GET_SDATASET ( ids ) ) ) ) ;
_DeltaToMSArray [ serviceId . get ( ) ] . back ( ) - > setCapacity ( min ( ( int ) GET_SDATASET ( ids ) . maxOutBandwidth ( ) + 16 , ( int ) 1024 * 1024 ) ) ; // after push_back to avoid a big copy
_DeltaToMSArray [ serviceId . get ( ) ] . back ( ) - > setCapacity ( std : : min ( ( int ) GET_SDATASET ( ids ) . maxOutBandwidth ( ) + 16 , ( int ) 1024 * 1024 ) ) ; // after push_back to avoid a big copy
}
}
/// Remove a remote mirror service
void removeRemoteMS ( TServiceId serviceId )
void removeRemoteMS ( NLNET: : TServiceId serviceId )
{
TServiceIdList : : iterator isl = find ( _ServiceIdList . begin ( ) , _ServiceIdList . end ( ) , serviceId ) ;
if ( isl ! = _ServiceIdList . end ( ) )
@ -588,38 +583,38 @@ public:
{
if ( _CorrespondingMS [ i ] = = serviceId )
{
removeRemoteClientService ( TServiceId ( i ) ) ;
removeRemoteClientService ( NLNET : : TServiceId ( i ) ) ;
}
}
}
/// Map service -> MS (supports multiple adding if the same)
void addRemoteClientService ( TServiceId8 clientServiceId , TServiceId msId )
void addRemoteClientService ( NLNET: : TServiceId8 clientServiceId , NLNET : : TServiceId msId )
{
nlassert ( ( _CorrespondingMS [ clientServiceId . get ( ) ] . get ( ) = = 0 ) | | ( _CorrespondingMS [ clientServiceId . get ( ) ] = = msId ) ) ;
_CorrespondingMS [ clientServiceId . get ( ) ] = msId ;
}
/// Unmap service -> MS
void removeRemoteClientService ( TServiceId8 clientServiceId )
void removeRemoteClientService ( NLNET: : TServiceId8 clientServiceId )
{
_CorrespondingMS [ clientServiceId . get ( ) ] . set ( 0 ) ;
}
/// Get a mapping service -> MS. Return 0 if not found
TServiceId8 getCorrespondingMS ( NLNET : : TServiceId8 remoteClientService ) const
NLNET: : TServiceId8 getCorrespondingMS ( NLNET : : TServiceId8 remoteClientService ) const
{
return _CorrespondingMS [ remoteClientService . get ( ) ] ;
}
// Random access to a serviceId, get the list of datasets
TDeltaToMSList & getDeltaToMSList ( TServiceId serviceId )
TDeltaToMSList & getDeltaToMSList ( NLNET: : TServiceId serviceId )
{
return _DeltaToMSArray [ serviceId . get ( ) ] ;
}
// Random access to a serviceId, then find a dataset
CDeltaToMS * getDeltaToMS ( TServiceId serviceId , CDataSetMS * dataset )
CDeltaToMS * getDeltaToMS ( NLNET: : TServiceId serviceId , CDataSetMS * dataset )
{
TDeltaToMSList & deltaList = _DeltaToMSArray [ serviceId . get ( ) ] ;
@ -651,12 +646,12 @@ public:
}
/// Store the message for later sending within delta packet to the corresponding mirror service
void pushMessageToRemoteQueue ( TServiceId destId , TServiceId senderId , NLMISC : : CMemStream & msgin ) ;
void pushMessageToRemoteQueue ( NLNET: : TServiceId destId , NLNET : : TServiceId senderId , NLMISC : : CMemStream & msgin ) ;
private :
/// Helper for pushMessageToRemoteQueue
void doPushMessageToDelta ( TServiceId msId , TServiceId destId , TServiceId senderId , NLMISC : : CMemStream & msg ) ;
void doPushMessageToDelta ( NLNET: : TServiceId msId , NLNET: : TServiceId destId , NLNET : : TServiceId senderId , NLMISC : : CMemStream & msg ) ;
/// List of remote mirror services
TServiceIdList _ServiceIdList ;
@ -665,7 +660,7 @@ private:
TDeltaToMSList _DeltaToMSArray [ 256 ] ;
/// Vector of MS serviceids indexed by their client services ids
TServiceId8 _CorrespondingMS [ 256 ] ;
NLNET: : TServiceId8 _CorrespondingMS [ 256 ] ;
} ;
@ -688,7 +683,7 @@ private:
* \ author Nevrax France
* \ date 2002
*/
class CMirrorService : public IService
class CMirrorService : public NLNET: : IService
{
public :
@ -713,22 +708,22 @@ public:
}
/// Declare the datasets used by a client service
void declareDataSets ( CMessage& msgin , NLNET : : TServiceId serviceId ) ;
void declareDataSets ( NLNET: : CMessage& msgin , NLNET : : TServiceId serviceId ) ;
/// Allocate a property segment with shared memory and return the smid to the sender
void allocateProperty ( CMessage& msgin , NLNET : : TServiceId serviceId ) ;
void allocateProperty ( NLNET: : CMessage& msgin , NLNET : : TServiceId serviceId ) ;
/// Give the list of properties not subscribed but allocated on this machine for owned datasets
void giveOtherProperties ( CMessage& msgin , NLNET : : TServiceId serviceId ) ;
void giveOtherProperties ( NLNET: : CMessage& msgin , NLNET : : TServiceId serviceId ) ;
/// Unallocate (destroy) all the allocated segments
void destroyPropertySegments ( ) ;
/// Declare/undeclare an entity type
void declareEntityTypeOwner ( CMessage& msgin , NLNET : : TServiceId serviceId ) ;
void declareEntityTypeOwner ( NLNET: : CMessage& msgin , NLNET : : TServiceId serviceId ) ;
/// Give a range (message from the range manager)
void giveRange ( CMessage& msgin , NLNET : : TServiceId serviceId ) ;
void giveRange ( NLNET: : CMessage& msgin , NLNET : : TServiceId serviceId ) ;
/// Process a subscription of dataset entities management, received from another MS
void processDataSetEntitiesSubscription ( const std : : string & dataSetName , NLNET : : TServiceId msId , NLNET : : TServiceId clientServiceId , const CMTRTag & newTagOfRemoteMS ) ;
@ -740,10 +735,10 @@ public:
void processPropSubscription ( CDataSetMS & dataset , TPropertyIndex propIndex , const std : : string & propName , NLNET : : TServiceId serviceId , bool local , bool readOnly , bool allocate , const std : : string & notifyGroupByPropName , bool writeOnly = false ) ;
/// Process an unsubscription of property from another MS (corresponding to one client service leaving)
void processPropUnsubscription ( CMessage& msgin , NLNET : : TServiceId serviceId ) ;
void processPropUnsubscription ( NLNET: : CMessage& msgin , NLNET : : TServiceId serviceId ) ;
/// Process a delta
void processReceivedDelta ( CMessage& msgin , NLNET : : TServiceId serviceId ) ;
void processReceivedDelta ( NLNET: : CMessage& msgin , NLNET : : TServiceId serviceId ) ;
/// Browse all datasets and build deltas to remote MS, then send them
void buildAndSendAllDeltas ( ) ;
@ -764,7 +759,7 @@ public:
void removeRemoteMS ( NLNET : : TServiceId serviceId ) ;
/// Add or remove a remote client service
void addRemoveRemoteClientService ( TServiceId clientServiceId , TServiceId msId , bool addOrRemove , const CMTRTag & tagOfNewClientService ) ;
void addRemoveRemoteClientService ( NLNET: : TServiceId clientServiceId , NLNET : : TServiceId msId , bool addOrRemove , const CMTRTag & tagOfNewClientService ) ;
/// Remove all trackers of the specified local client service, and tell all services that have them to do the same
void removeTrackers ( NLNET : : TServiceId serviceId ) ;
@ -794,28 +789,28 @@ public:
void setRangeManagerReady ( bool b ) ;
/// If all the expected MS and the range manager service are ready, send a message to the specified service if it is a local client
void testAndSendMirrorsOnline ( TServiceId servId ) ;
void testAndSendMirrorsOnline ( NLNET: : TServiceId servId ) ;
/// Wait before doing rescan for service
void deferRescanOfEntitiesForNewService ( TServiceId serviceId ) ;
void deferRescanOfEntitiesForNewService ( NLNET: : TServiceId serviceId ) ;
/// Wait one tick before applying the row removals from the quitting service, before releasing the trackers and unsubscribing
void deferTrackerRemovalForQuittingService ( TServiceId servId ) ;
void deferTrackerRemovalForQuittingService ( NLNET: : TServiceId servId ) ;
/// Do the rescans if it's time to do so
void applyPendingRescans ( ) ;
/// Do the rescan
void rescanEntitiesForNewService ( TServiceId clientServiceId ) ;
void rescanEntitiesForNewService ( NLNET: : TServiceId clientServiceId ) ;
/// Receive SYNC_MS
void receiveSyncFromRemoteMS ( CMessage& msgin , TServiceId srcRemoteMSId ) ;
void receiveSyncFromRemoteMS ( NLNET: : CMessage& msgin , NLNET : : TServiceId srcRemoteMSId ) ;
/// Remove all entities in the specified ranges (received from the range manager when a MS is down)
void releaseEntitiesInRanges ( CDataSetMS & dataset , const std : : vector < TRowRange > & erasedRanges ) ;
/// Receive SYNCMI
void receiveSyncMirrorInformation ( CMessage& msgin , NLNET : : TServiceId serviceId ) ;
void receiveSyncMirrorInformation ( NLNET: : CMessage& msgin , NLNET : : TServiceId serviceId ) ;
/// Scan for existing entities in the received ranges
void receiveAcknowledgeAddEntityTrackerMS ( NLNET : : CMessage & msgin , NLNET : : TServiceId serviceIdFrom , bool isTrackerAdded ) ;
@ -828,17 +823,17 @@ public:
void tickClientServices ( ) ;
void receiveMessageToForwardFromClient ( CMessage& msgin , NLNET : : TServiceId senderId ) ;
void receiveMessageToForwardMultipleFromClient ( CMessage& msgin , NLNET : : TServiceId senderId ) ;
void receiveMessageToForwardFromClient ( NLNET: : CMessage& msgin , NLNET : : TServiceId senderId ) ;
void receiveMessageToForwardMultipleFromClient ( NLNET: : CMessage& msgin , NLNET : : TServiceId senderId ) ;
void pushMessageToLocalQueue ( std : : vector < TMessageCarrier > & msgQueue , NLNET : : TServiceId senderId , NLMISC : : CMemStream & msg ) ;
void pushMessageToLocalQueueFromMessage ( std : : vector < TMessageCarrier > & msgQueue , NLNET : : CMessage & msgin ) ;
void receiveDeltaFromRemoteMS ( CMessage& msgin , NLNET : : TServiceId senderMSId ) ;
void receiveDeltaFromRemoteMS ( NLNET: : CMessage& msgin , NLNET : : TServiceId senderMSId ) ;
void serialToMessageFromLocalQueue ( CMessage& msgout , const TMessageCarrier & srcMsgInQueue ) ;
void serialToMessageFromLocalQueue ( NLNET: : CMessage& msgout , const TMessageCarrier & srcMsgInQueue ) ;
void testIfNotInQuittingServices ( TServiceId servId ) ;
void testIfNotInQuittingServices ( NLNET: : TServiceId servId ) ;
/// Return true if the delta of the current tick has been sent
bool deltaUpdateSent ( ) const { return _DeltaSent ; }
@ -874,7 +869,7 @@ public:
void examineDeltaUpdatesReceivedBeforeSync ( ) ;
///
void setNewTagOfRemoteMS ( const CMTRTag & tag , TServiceId msId ) ;
void setNewTagOfRemoteMS ( const CMTRTag & tag , NLNET: : TServiceId msId ) ;
///
void decNbExpectedATAcknowledges ( NLNET : : TServiceId clientServiceId , uint whichEorP ) ;
@ -888,13 +883,13 @@ public:
//void receiveDeclaredRange( NLNET::CMessage& msgin );
/// Return the tag of a service
const CMTRTag & getTagOfService ( TServiceId serviceId )
const CMTRTag & getTagOfService ( NLNET: : TServiceId serviceId )
{
return _AllServiceTags [ serviceId . get ( ) ] [ 0 ] ;
}
///
const CMTRTag & getPreviousTagOfService ( TServiceId serviceId )
const CMTRTag & getPreviousTagOfService ( NLNET: : TServiceId serviceId )
{
return _AllServiceTags [ serviceId . get ( ) ] [ 1 ] ;
}
@ -906,7 +901,7 @@ public:
// const CMTRTag& getTagOfService( uint16 serviceId ) { return getTagOfService( (TServiceId)serviceId ); }
/// Return true if the tag changed during the last dataset entity subscription from the specified MS
bool hasTagOfServiceChanged ( TServiceId serviceId )
bool hasTagOfServiceChanged ( NLNET: : TServiceId serviceId )
{
return ( _AllServiceTags [ serviceId . get ( ) ] [ 0 ] . rawTag ( ) ! = _AllServiceTags [ serviceId . get ( ) ] [ 1 ] . rawTag ( ) ) ;
}
@ -927,7 +922,7 @@ protected:
void broadcastSubscribeDataSetEntities ( const std : : string & dataSetName , NLNET : : TServiceId clientServiceId ) ;
/// Process a property subscription request from a client service, to forward it to all other mirror services
void broadcastSubscribeProperty ( CMessage& msgin , const std : : string & propName , NLNET : : TServiceId serviceId ) ;
void broadcastSubscribeProperty ( NLNET: : CMessage& msgin , const std : : string & propName , NLNET : : TServiceId serviceId ) ;
/// Allocate entity tracker
void allocateEntityTrackers ( CDataSetMS & dataset , NLNET : : TServiceId serviceId , NLNET : : TServiceId clientServiceId , bool local , const CMTRTag & tagOfNewPeer ) ;
@ -936,7 +931,7 @@ protected:
void deleteTracker ( CChangeTrackerMS & tracker , std : : vector < CChangeTrackerMS > & vect ) ;
/// Send RT to local services
void sendTrackersToRemoveToLocalServices ( map< NLNET : : TServiceId , vector < sint32 > > & servicesForTrackerRemoval ) ;
void sendTrackersToRemoveToLocalServices ( std: : map< NLNET : : TServiceId , std : : vector < sint32 > > & servicesForTrackerRemoval ) ;
/// Push the property changes on the local machine for a remote mirror service to its delta buffer
template < class T >
@ -944,7 +939,7 @@ protected:
/// Apply the property changes coming from a remote mirror service
template < class T >
void applyPropertyChanges ( CMessage& msgin , CDataSetMS & dataset , TPropertyIndex propIndex , NLNET : : TServiceId serviceId , T * )
void applyPropertyChanges ( NLNET: : CMessage& msgin , CDataSetMS & dataset , TPropertyIndex propIndex , NLNET : : TServiceId serviceId , T * )
{
//sint NbChangesRead;
//NbChangesRead = 0;
@ -1022,7 +1017,7 @@ protected:
void testAndSendMirrorsOnline ( ) ;
/// Send a synchronize message to a new remote MS
void synchronizeSubscriptionsToNewMS ( TServiceId newRemoteMSId ) ;
void synchronizeSubscriptionsToNewMS ( NLNET: : TServiceId newRemoteMSId ) ;
/// Compute output rate stats if needed (delta + messages)
void computeDeltaStats ( ) ;
@ -1040,7 +1035,7 @@ protected:
}
/// Set the new tag
void setNewTag ( TServiceId serviceId , const CMTRTag & newTag )
void setNewTag ( NLNET: : TServiceId serviceId , const CMTRTag & newTag )
{
_AllServiceTags [ serviceId . get ( ) ] [ 1 ] = _AllServiceTags [ serviceId . get ( ) ] [ 0 ] ;
_AllServiceTags [ serviceId . get ( ) ] [ 0 ] = newTag ;
@ -1054,7 +1049,7 @@ protected:
private :
typedef std : : pair < NLMISC : : TGameCycle , TServiceId> TServiceAndTimeLeft ;
typedef std : : pair < NLMISC : : TGameCycle , NLNET: : TServiceId> TServiceAndTimeLeft ;
/// Properties in the local mirror
TPropertiesInMirrorMS _PropertiesInMirror ;