Use shard ID as part of shared memory key in MS to allow multiple shards on one server

ryzomclassic-develop
kaetemi 5 years ago
parent 7a71caca6e
commit 55c3939789

@ -91,8 +91,8 @@ void CDataSetMS::readdAllocatedTrackers( NLNET::CMessage& msgin, TServiceId s
TSelfPropTrackers::const_iterator ipt; TSelfPropTrackers::const_iterator ipt;
for ( ipt=selfPropTrackers.begin(); ipt!=selfPropTrackers.end(); ++ipt ) for ( ipt=selfPropTrackers.begin(); ipt!=selfPropTrackers.end(); ++ipt )
{ {
smidpool.reacquireId( (*ipt).smid() ); smidpool.reacquireId( (*ipt).smid() & 0xFFF );
mutidpool.reacquireId( (*ipt).mutid() ); mutidpool.reacquireId( (*ipt).mutid() & 0xFFF );
_SubscribersByProperty[(*ipt).propIndex()].push_back( _SubscribersByProperty[(*ipt).propIndex()].push_back(
CChangeTrackerMS( serviceId, true, CChangeTrackerMS( serviceId, true,
@ -107,15 +107,15 @@ void CDataSetMS::readdAllocatedTrackers( NLNET::CMessage& msgin, TServiceId s
// Entity trackers // Entity trackers
CChangeTrackerClient selfAddingTracker; CChangeTrackerClient selfAddingTracker;
msgin.serial( selfAddingTracker ); msgin.serial( selfAddingTracker );
smidpool.reacquireId( selfAddingTracker.smid() ); smidpool.reacquireId( selfAddingTracker.smid() & 0xFFF );
mutidpool.reacquireId( selfAddingTracker.mutid() ); mutidpool.reacquireId( selfAddingTracker.mutid() & 0xFFF );
_EntityTrackers[ADDING].push_back( CChangeTrackerMS( serviceId, true ) ); _EntityTrackers[ADDING].push_back( CChangeTrackerMS( serviceId, true ) );
_EntityTrackers[ADDING].back().reaccess( selfAddingTracker.smid() ); _EntityTrackers[ADDING].back().reaccess( selfAddingTracker.smid() );
_EntityTrackers[ADDING].back().createMutex( selfAddingTracker.mutid(), false ); _EntityTrackers[ADDING].back().createMutex( selfAddingTracker.mutid(), false );
CChangeTrackerClient selfRemovingTracker; CChangeTrackerClient selfRemovingTracker;
msgin.serial( selfRemovingTracker ); msgin.serial( selfRemovingTracker );
smidpool.reacquireId( selfRemovingTracker.smid() ); smidpool.reacquireId( selfRemovingTracker.smid() & 0xFFF );
mutidpool.reacquireId( selfRemovingTracker.mutid() ); mutidpool.reacquireId( selfRemovingTracker.mutid() & 0xFFF );
_EntityTrackers[REMOVING].push_back( CChangeTrackerMS( serviceId, true ) ); _EntityTrackers[REMOVING].push_back( CChangeTrackerMS( serviceId, true ) );
_EntityTrackers[REMOVING].back().reaccess( selfRemovingTracker.smid() ); _EntityTrackers[REMOVING].back().reaccess( selfRemovingTracker.smid() );
_EntityTrackers[REMOVING].back().createMutex( selfRemovingTracker.mutid(), false ); _EntityTrackers[REMOVING].back().createMutex( selfRemovingTracker.mutid(), false );

@ -166,6 +166,15 @@ void CMirrorService::init()
_IsPureReceiver = true; _IsPureReceiver = true;
nlinfo( "\tThis MS is in pure receiver mode" ); nlinfo( "\tThis MS is in pure receiver mode" );
} }
CConfigFile::CVar *varS = ConfigFile.getVarPtr("ShardId");
if (varS)
{
m_ShardId = varS->asInt();
if ((m_ShardId & 0xFFF) != m_ShardId)
nlwarning("\tConfigured ShardId %u is too large", (unsigned int)m_ShardId);
m_ShardId &= 0xFFF;
nlinfo("\tShardId for shared memory namespace is %u", (unsigned int)m_ShardId);
}
// Register to ServiceUp and ServiceDown // Register to ServiceUp and ServiceDown
CUnifiedNetwork::getInstance()->setServiceUpCallback( "*", cbServiceUp, 0 ); CUnifiedNetwork::getInstance()->setServiceUpCallback( "*", cbServiceUp, 0 );
@ -540,8 +549,8 @@ void CMirrorService::deleteTracker( CChangeTrackerMS& tracker, std::vector<CChan
if ( tracker.destroy() ) if ( tracker.destroy() )
{ {
_SMIdPool.releaseId( tracker.smid() ); _SMIdPool.releaseId( tracker.smid() & 0xFFF );
_MutIdPool.releaseId( tracker.mutid() ); _MutIdPool.releaseId( tracker.mutid() & 0xFFF );
} }
// Delete the tracker object // Delete the tracker object
@ -1454,6 +1463,8 @@ void CMirrorService::allocateProperty( CMessage& msgin, NLNET::TServiceId servic
} }
// Allocate shared memory // Allocate shared memory
nlassert((propinfo.SMId & 0xFFF) == propinfo.SMId);
propinfo.SMId ^= (m_ShardId << 12);
propinfo.Segment = CSharedMemory::createSharedMemory( toSharedMemId(propinfo.SMId), segmentSize ); propinfo.Segment = CSharedMemory::createSharedMemory( toSharedMemId(propinfo.SMId), segmentSize );
if ( (propinfo.Segment == NULL) && DestroyGhostSharedMemSegments ) if ( (propinfo.Segment == NULL) && DestroyGhostSharedMemSegments )
{ {
@ -1600,7 +1611,7 @@ void CMirrorService::destroyPropertySegments()
CSharedMemory::closeSharedMemory( GET_PROPERTY_INFO(ip).Segment ); CSharedMemory::closeSharedMemory( GET_PROPERTY_INFO(ip).Segment );
GET_PROPERTY_INFO(ip).Segment = NULL; GET_PROPERTY_INFO(ip).Segment = NULL;
CSharedMemory::destroySharedMemory( toSharedMemId(GET_PROPERTY_INFO(ip).SMId) ); CSharedMemory::destroySharedMemory( toSharedMemId(GET_PROPERTY_INFO(ip).SMId) );
_SMIdPool.releaseId( GET_PROPERTY_INFO(ip).SMId ); _SMIdPool.releaseId( GET_PROPERTY_INFO(ip).SMId & 0xFFF );
} }
} }
@ -1619,8 +1630,8 @@ void CMirrorService::destroyPropertySegments()
CChangeTrackerMS& tracker = (*isl); CChangeTrackerMS& tracker = (*isl);
if ( tracker.destroy() ) if ( tracker.destroy() )
{ {
_SMIdPool.releaseId( tracker.smid() ); _SMIdPool.releaseId( tracker.smid() & 0xFFF );
_MutIdPool.releaseId( tracker.mutid() ); _MutIdPool.releaseId( tracker.mutid() & 0xFFF );
} }
} }
} }
@ -1635,8 +1646,8 @@ void CMirrorService::destroyPropertySegments()
CChangeTrackerMS& tracker = (*isl); CChangeTrackerMS& tracker = (*isl);
if ( tracker.destroy() ) if ( tracker.destroy() )
{ {
_SMIdPool.releaseId( tracker.smid() ); _SMIdPool.releaseId( tracker.smid() & 0xFFF );
_MutIdPool.releaseId( tracker.mutid() ); _MutIdPool.releaseId( tracker.mutid() & 0xFFF );
} }
} }
} }
@ -2177,7 +2188,8 @@ void CMirrorService::allocateEntityTrackers( CDataSetMS& dataset, NLNET::TServic
smidAdd = _SMIdPool.getNewId(); smidAdd = _SMIdPool.getNewId();
if ( ! pEntityTrackerRemoving ) if ( ! pEntityTrackerRemoving )
smidRemove = _SMIdPool.getNewId(); smidRemove = _SMIdPool.getNewId();
if ( (smidAdd == InvalidSMId) || (smidRemove == InvalidSMId) ) if ((!pEntityTrackerAdding && (smidAdd == InvalidSMId))
|| (!pEntityTrackerRemoving && (smidRemove == InvalidSMId)))
{ {
nlwarning( "ROWMGT: No more free shared memory ids (entity tracker)" ); nlwarning( "ROWMGT: No more free shared memory ids (entity tracker)" );
beep( 660, 150 ); // TODO: error handling beep( 660, 150 ); // TODO: error handling
@ -2186,11 +2198,15 @@ void CMirrorService::allocateEntityTrackers( CDataSetMS& dataset, NLNET::TServic
if ( ! pEntityTrackerAdding ) if ( ! pEntityTrackerAdding )
{ {
nlassert((smidAdd & 0xFFF) == smidAdd);
smidAdd ^= (m_ShardId << 12);
CChangeTrackerMS& entityTrackerAdding = dataset.addEntityTracker( ADDING, serviceId, local, smidAdd ); CChangeTrackerMS& entityTrackerAdding = dataset.addEntityTracker( ADDING, serviceId, local, smidAdd );
pEntityTrackerAdding = &entityTrackerAdding; pEntityTrackerAdding = &entityTrackerAdding;
} }
if ( ! pEntityTrackerRemoving ) if ( ! pEntityTrackerRemoving )
{ {
nlassert((smidRemove & 0xFFF) == smidRemove);
smidRemove ^= (m_ShardId << 12);
CChangeTrackerMS& entityTrackerRemoving = dataset.addEntityTracker( REMOVING, serviceId, local, smidRemove ); CChangeTrackerMS& entityTrackerRemoving = dataset.addEntityTracker( REMOVING, serviceId, local, smidRemove );
pEntityTrackerRemoving = &entityTrackerRemoving; pEntityTrackerRemoving = &entityTrackerRemoving;
} }
@ -2563,6 +2579,8 @@ void CMirrorService::processPropSubscription( CDataSetMS& dataset, TPropertyInde
beep( 660, 150 ); // TODO: error handling beep( 660, 150 ); // TODO: error handling
return; return;
} }
nlassert((smid & 0xFFF) == smid);
smid ^= (m_ShardId << 12);
if ( ! newtracker->allocate( smid, ds->maxNbRows() ) ) if ( ! newtracker->allocate( smid, ds->maxNbRows() ) )
smid = InvalidSMId; smid = InvalidSMId;
} }
@ -3283,7 +3301,7 @@ void CMirrorService::receiveSyncMirrorInformation( CMessage& msgin, TServiceId s
if ( ! propInfo.allocated() ) if ( ! propInfo.allocated() )
{ {
// Reaccess shared memory // Reaccess shared memory
_SMIdPool.reacquireId( propInfoClient.SMId ); _SMIdPool.reacquireId( propInfoClient.SMId & 0xFFF );
_PropertiesInMirror[propName].reaccess( propInfoClient.SMId ); _PropertiesInMirror[propName].reaccess( propInfoClient.SMId );
// Set property pointers but don't init values // Set property pointers but don't init values

@ -695,7 +695,7 @@ public:
/// Constructor /// Constructor
CMirrorService() : CMirrorService() :
_NumberOfOnlineMS(1), m_ShardId(DEFAULT_SHARD_ID), _NumberOfOnlineMS(1),
_RangeManagerReady(false), _MirrorsOnline(false), _DeltaSent(false), _BlockedAwaitingATAck(false), _RangeManagerReady(false), _MirrorsOnline(false), _DeltaSent(false), _BlockedAwaitingATAck(false),
_EmittedKBytesPerSecond(0.0f), _EmittedBytesPartialSum(0), _TimeOfLatestEmittedBytesAvg(0), _EmittedKBytesPerSecond(0.0f), _EmittedBytesPartialSum(0), _TimeOfLatestEmittedBytesAvg(0),
_NbDeltaUpdatesReceived(0), _NbExpectedDeltaUpdates(0), _IsPureReceiver(false) _NbDeltaUpdatesReceived(0), _NbExpectedDeltaUpdates(0), _IsPureReceiver(false)
@ -1054,6 +1054,9 @@ private:
/// Properties in the local mirror /// Properties in the local mirror
TPropertiesInMirrorMS _PropertiesInMirror; TPropertiesInMirrorMS _PropertiesInMirror;
/// Shard id as in the config file, DEFAULT_SHARD_ID otherwise. Used for pool namespacing
uint32 m_ShardId;
/// Shared memory id pool, to generate new ids /// Shared memory id pool, to generate new ids
CSMIdPool _SMIdPool; CSMIdPool _SMIdPool;

Loading…
Cancel
Save