1 5 package com.tc.objectserver.impl; 6 7 import bsh.EvalError; 8 import bsh.Interpreter; 9 10 import com.sleepycat.je.DatabaseException; 11 import com.tc.async.api.SEDA; 12 import com.tc.async.api.Sink; 13 import com.tc.async.api.Stage; 14 import com.tc.async.api.StageManager; 15 import com.tc.async.impl.NullSink; 16 import com.tc.config.schema.setup.L2TVSConfigurationSetupManager; 17 import com.tc.exception.TCRuntimeException; 18 import com.tc.io.TCFile; 19 import com.tc.io.TCFileImpl; 20 import com.tc.io.TCRandomFileAccessImpl; 21 import com.tc.l2.api.L2Coordinator; 22 import com.tc.l2.ha.L2HACoordinator; 23 import com.tc.l2.ha.L2HADisabledCooridinator; 24 import com.tc.lang.TCThreadGroup; 25 import com.tc.logging.CustomerLogging; 26 import com.tc.logging.TCLogger; 27 import com.tc.logging.TCLogging; 28 import com.tc.management.L2Management; 29 import com.tc.management.beans.TCServerInfoMBean; 30 import com.tc.management.remote.connect.ClientConnectEventHandler; 31 import com.tc.management.remote.protocol.terracotta.ClientTunnelingEventHandler; 32 import com.tc.management.remote.protocol.terracotta.JmxRemoteTunnelMessage; 33 import com.tc.management.remote.protocol.terracotta.L1JmxReady; 34 import com.tc.net.NIOWorkarounds; 35 import com.tc.net.TCSocketAddress; 36 import com.tc.net.protocol.PlainNetworkStackHarnessFactory; 37 import com.tc.net.protocol.tcm.CommunicationsManager; 38 import com.tc.net.protocol.tcm.CommunicationsManagerImpl; 39 import com.tc.net.protocol.tcm.HydrateHandler; 40 import com.tc.net.protocol.tcm.NetworkListener; 41 import com.tc.net.protocol.tcm.NullMessageMonitor; 42 import com.tc.net.protocol.tcm.TCMessageType; 43 import com.tc.net.protocol.transport.ConnectionIDFactory; 44 import com.tc.net.protocol.transport.ConnectionPolicy; 45 import com.tc.object.cache.CacheConfigImpl; 46 import com.tc.object.cache.CacheManager; 47 import com.tc.object.cache.EvictionPolicy; 48 import com.tc.object.cache.LFUConfigImpl; 49 import com.tc.object.cache.LFUEvictionPolicy; 50 import com.tc.object.cache.LRUEvictionPolicy; 51 import com.tc.object.cache.NullCache; 52 import com.tc.object.config.schema.NewL2DSOConfig; 53 import com.tc.object.config.schema.PersistenceMode; 54 import com.tc.object.msg.AcknowledgeTransactionMessageImpl; 55 import com.tc.object.msg.BatchTransactionAcknowledgeMessageImpl; 56 import com.tc.object.msg.BroadcastTransactionMessageImpl; 57 import com.tc.object.msg.ClientHandshakeAckMessageImpl; 58 import com.tc.object.msg.ClientHandshakeMessageImpl; 59 import com.tc.object.msg.ClusterMembershipMessage; 60 import com.tc.object.msg.CommitTransactionMessageImpl; 61 import com.tc.object.msg.JMXMessage; 62 import com.tc.object.msg.LockRequestMessage; 63 import com.tc.object.msg.LockResponseMessage; 64 import com.tc.object.msg.MessageRecycler; 65 import com.tc.object.msg.ObjectIDBatchRequestMessage; 66 import com.tc.object.msg.ObjectIDBatchRequestResponseMessage; 67 import com.tc.object.msg.RequestManagedObjectMessageImpl; 68 import com.tc.object.msg.RequestManagedObjectResponseMessage; 69 import com.tc.object.msg.RequestRootMessageImpl; 70 import com.tc.object.msg.RequestRootResponseMessage; 71 import com.tc.object.net.ChannelStats; 72 import com.tc.object.net.ChannelStatsImpl; 73 import com.tc.object.net.DSOChannelManager; 74 import com.tc.object.net.DSOChannelManagerImpl; 75 import com.tc.object.net.DSOChannelManagerMBean; 76 import com.tc.object.session.NullSessionManager; 77 import com.tc.object.session.SessionManager; 78 import com.tc.object.session.SessionProvider; 79 import com.tc.objectserver.DSOApplicationEvents; 80 import com.tc.objectserver.api.ObjectManagerMBean; 81 import com.tc.objectserver.api.ObjectRequestManager; 82 import com.tc.objectserver.core.api.DSOGlobalServerStats; 83 import com.tc.objectserver.core.api.DSOGlobalServerStatsImpl; 84 import com.tc.objectserver.core.api.ServerConfigurationContext; 85 import com.tc.objectserver.core.impl.MarkAndSweepGarbageCollector; 86 import com.tc.objectserver.core.impl.ServerConfigurationContextImpl; 87 import com.tc.objectserver.core.impl.ServerManagementContext; 88 import com.tc.objectserver.gtx.ServerGlobalTransactionManager; 89 import com.tc.objectserver.gtx.ServerGlobalTransactionManagerImpl; 90 import com.tc.objectserver.handler.ApplyCompleteTransactionHandler; 91 import com.tc.objectserver.handler.ApplyTransactionChangeHandler; 92 import com.tc.objectserver.handler.BroadcastChangeHandler; 93 import com.tc.objectserver.handler.ChannelLifeCycleHandler; 94 import com.tc.objectserver.handler.ClientHandshakeHandler; 95 import com.tc.objectserver.handler.CommitTransactionChangeHandler; 96 import com.tc.objectserver.handler.JMXEventsHandler; 97 import com.tc.objectserver.handler.ManagedObjectFaultHandler; 98 import com.tc.objectserver.handler.ManagedObjectFlushHandler; 99 import com.tc.objectserver.handler.ManagedObjectRequestHandler; 100 import com.tc.objectserver.handler.ProcessTransactionHandler; 101 import com.tc.objectserver.handler.RecallObjectsHandler; 102 import com.tc.objectserver.handler.RequestLockUnLockHandler; 103 import com.tc.objectserver.handler.RequestObjectIDBatchHandler; 104 import com.tc.objectserver.handler.RequestRootHandler; 105 import com.tc.objectserver.handler.RespondToObjectRequestHandler; 106 import com.tc.objectserver.handler.RespondToRequestLockHandler; 107 import com.tc.objectserver.handler.TransactionAcknowledgementHandler; 108 import com.tc.objectserver.handler.TransactionLookupHandler; 109 import com.tc.objectserver.handshakemanager.ServerClientHandshakeManager; 110 import com.tc.objectserver.l1.api.ClientStateManager; 111 import com.tc.objectserver.l1.impl.ClientStateManagerImpl; 112 import com.tc.objectserver.l1.impl.TransactionAcknowledgeAction; 113 import com.tc.objectserver.l1.impl.TransactionAcknowledgeActionImpl; 114 import com.tc.objectserver.lockmanager.api.LockManager; 115 import com.tc.objectserver.lockmanager.api.LockManagerMBean; 116 import com.tc.objectserver.lockmanager.impl.LockManagerImpl; 117 import com.tc.objectserver.managedobject.ManagedObjectChangeListenerProviderImpl; 118 import com.tc.objectserver.managedobject.ManagedObjectStateFactory; 119 import com.tc.objectserver.persistence.api.ClientStatePersistor; 120 import com.tc.objectserver.persistence.api.ManagedObjectStore; 121 import com.tc.objectserver.persistence.api.PersistenceTransactionProvider; 122 import com.tc.objectserver.persistence.api.Persistor; 123 import com.tc.objectserver.persistence.api.TransactionPersistor; 124 import com.tc.objectserver.persistence.api.TransactionStore; 125 import com.tc.objectserver.persistence.impl.InMemoryPersistor; 126 import com.tc.objectserver.persistence.impl.NullPersistenceTransactionProvider; 127 import com.tc.objectserver.persistence.impl.NullTransactionPersistor; 128 import com.tc.objectserver.persistence.impl.PersistentBatchSequenceProvider; 129 import com.tc.objectserver.persistence.impl.TransactionStoreImpl; 130 import com.tc.objectserver.persistence.sleepycat.ConnectionIDFactoryImpl; 131 import com.tc.objectserver.persistence.sleepycat.CustomSerializationAdapterFactory; 132 import com.tc.objectserver.persistence.sleepycat.DBEnvironment; 133 import com.tc.objectserver.persistence.sleepycat.DBException; 134 import com.tc.objectserver.persistence.sleepycat.SerializationAdapterFactory; 135 import com.tc.objectserver.persistence.sleepycat.SleepycatPersistor; 136 import com.tc.objectserver.tx.CommitTransactionMessageRecycler; 137 import com.tc.objectserver.tx.CommitTransactionMessageToTransactionBatchReader; 138 import com.tc.objectserver.tx.ServerTransactionManager; 139 import com.tc.objectserver.tx.ServerTransactionManagerImpl; 140 import com.tc.objectserver.tx.ServerTransactionManagerMBean; 141 import com.tc.objectserver.tx.TransactionBatchManager; 142 import com.tc.objectserver.tx.TransactionBatchManagerImpl; 143 import com.tc.objectserver.tx.TransactionSequencer; 144 import com.tc.objectserver.tx.TransactionalObjectManagerImpl; 145 import com.tc.objectserver.tx.TransactionalStageCoordinator; 146 import com.tc.objectserver.tx.TransactionalStagesCoordinatorImpl; 147 import com.tc.properties.TCProperties; 148 import com.tc.properties.TCPropertiesImpl; 149 import com.tc.stats.counter.sampled.SampledCounter; 150 import com.tc.stats.counter.sampled.SampledCounterConfig; 151 import com.tc.stats.counter.sampled.SampledCounterManager; 152 import com.tc.stats.counter.sampled.SampledCounterManagerImpl; 153 import com.tc.util.Assert; 154 import com.tc.util.SequenceValidator; 155 import com.tc.util.StartupLock; 156 import com.tc.util.TCTimeoutException; 157 import com.tc.util.TCTimerImpl; 158 import com.tc.util.io.FileUtils; 159 import com.tc.util.sequence.BatchSequence; 160 import com.tc.util.sequence.Sequence; 161 import com.tc.util.sequence.SimpleSequence; 162 import com.tc.util.startuplock.FileNotCreatedException; 163 import com.tc.util.startuplock.LocationNotCreatedException; 164 165 import java.io.File ; 166 import java.io.IOException ; 167 import java.util.Collections ; 168 import java.util.HashMap ; 169 import java.util.Properties ; 170 import java.util.Set ; 171 172 import javax.management.MBeanServer ; 173 import javax.management.NotCompliantMBeanException ; 174 175 180 public class DistributedObjectServer extends SEDA { 181 private final ConnectionPolicy connectionPolicy; 182 183 private static final TCLogger logger = CustomerLogging.getDSOGenericLogger(); 184 private static final TCLogger consoleLogger = CustomerLogging.getConsoleLogger(); 185 186 private final L2TVSConfigurationSetupManager configSetupManager; 187 private final Sink httpSink; 188 private NetworkListener l1Listener; 189 private CommunicationsManager communicationsManager; 190 private ServerConfigurationContext context; 191 private ObjectManagerImpl objectManager; 192 private TransactionalObjectManagerImpl txnObjectManager; 193 private SampledCounterManager sampledCounterManager; 194 private LockManager lockManager; 195 private ServerManagementContext managementContext; 196 private StartupLock startupLock; 197 198 private ClientStateManagerImpl clientStateManager; 199 200 private ManagedObjectStore objectStore; 201 private Persistor persistor; 202 private ServerTransactionManager transactionManager; 203 204 private CacheManager cacheManager; 205 206 private final TCServerInfoMBean tcServerInfoMBean; 207 private L2Management l2Management; 208 private L2Coordinator l2Coordinator; 209 210 private TCProperties l2Properties; 211 212 private ConnectionIDFactoryImpl connectionIdFactory; 213 214 public DistributedObjectServer(L2TVSConfigurationSetupManager configSetupManager, TCThreadGroup threadGroup, 215 ConnectionPolicy connectionPolicy, TCServerInfoMBean tcServerInfoMBean) { 216 this(configSetupManager, threadGroup, connectionPolicy, new NullSink(), tcServerInfoMBean); 217 } 218 219 public DistributedObjectServer(L2TVSConfigurationSetupManager configSetupManager, TCThreadGroup threadGroup, 220 ConnectionPolicy connectionPolicy, Sink httpSink, TCServerInfoMBean tcServerInfoMBean) { 221 super(threadGroup); 222 223 Assert.assertEquals(threadGroup, Thread.currentThread().getThreadGroup()); 227 228 this.configSetupManager = configSetupManager; 229 this.connectionPolicy = connectionPolicy; 230 this.httpSink = httpSink; 231 this.tcServerInfoMBean = tcServerInfoMBean; 232 } 233 234 public void dump() { 235 if (this.lockManager != null) { 236 this.lockManager.dump(); 237 } 238 239 if (this.objectManager != null) { 240 this.objectManager.dump(); 241 } 242 243 if (this.txnObjectManager != null) { 244 this.txnObjectManager.dump(); 245 } 246 247 if (this.transactionManager != null) { 248 this.transactionManager.dump(); 249 } 250 } 251 252 public synchronized void start() throws IOException , DatabaseException, LocationNotCreatedException, 253 FileNotCreatedException { 254 255 try { 256 startJMXServer(); 257 } catch (Exception e) { 258 String msg = "Unable to start the JMX server. Do you have another Terracotta Server running?"; 259 consoleLogger.error(msg); 260 logger.error(msg, e); 261 System.exit(-1); 262 } 263 264 NIOWorkarounds.solaris10Workaround(); 265 266 configSetupManager.commonl2Config().changesInItemIgnored(configSetupManager.commonl2Config().dataPath()); 267 NewL2DSOConfig l2DSOConfig = configSetupManager.dsoL2Config(); 268 l2DSOConfig.changesInItemIgnored(l2DSOConfig.persistenceMode()); 269 PersistenceMode persistenceMode = (PersistenceMode) l2DSOConfig.persistenceMode().getObject(); 270 271 final boolean swapEnabled = true; final boolean persistent = persistenceMode.equals(PersistenceMode.PERMANENT_STORE); 274 275 TCFile location = new TCFileImpl(this.configSetupManager.commonl2Config().dataPath().getFile()); 276 startupLock = new StartupLock(location); 277 278 if (!startupLock.canProceed(new TCRandomFileAccessImpl(), persistent)) { 279 consoleLogger.error("Another L2 process is using the directory " + location + " as data directory."); 280 if (!persistent) { 281 consoleLogger.error("This is not allowed with persistence mode set to temporary-swap-only."); 282 } 283 consoleLogger.error("Exiting..."); 284 System.exit(1); 285 } 286 287 int maxStageSize = 5000; 288 289 StageManager stageManager = getStageManager(); 290 SessionManager sessionManager = new NullSessionManager(); 291 SessionProvider sessionProvider = (SessionProvider) sessionManager; 292 l2Properties = TCPropertiesImpl.getProperties().getPropertiesFor("l2"); 293 294 EvictionPolicy swapCache; 295 final ClientStatePersistor clientStateStore; 296 final PersistenceTransactionProvider persistenceTransactionProvider; 297 final TransactionPersistor transactionPersistor; 298 final Sequence globalTransactionIDSequence; 299 logger.debug("server swap enabled: " + swapEnabled); 300 final ManagedObjectChangeListenerProviderImpl managedObjectChangeListenerProvider = new ManagedObjectChangeListenerProviderImpl(); 301 if (swapEnabled) { 302 File dbhome = new File(configSetupManager.commonl2Config().dataPath().getFile(), "objectdb"); 303 logger.debug("persistent: " + persistent); 304 if (!persistent) { 305 if (dbhome.exists()) { 306 logger.debug("deleting persistence database..."); 307 FileUtils.forceDelete(dbhome, "jdb"); 308 logger.debug("persistence database deleted."); 309 } 310 } 311 logger.debug("persistence database home: " + dbhome); 312 313 DBEnvironment dbenv = new DBEnvironment(persistent, dbhome, l2Properties.getPropertiesFor("berkeleydb") 314 .addAllPropertiesTo(new Properties())); 315 SerializationAdapterFactory serializationAdapterFactory = new CustomSerializationAdapterFactory(); 316 persistor = new SleepycatPersistor(TCLogging.getLogger(SleepycatPersistor.class), dbenv, 317 serializationAdapterFactory); 318 319 String cachePolicy = l2Properties.getProperty("objectmanager.cachePolicy").toUpperCase(); 320 if (cachePolicy.equals("LRU")) { 321 swapCache = new LRUEvictionPolicy(-1); 322 } else if (cachePolicy.equals("LFU")) { 323 swapCache = new LFUEvictionPolicy(-1, new LFUConfigImpl(l2Properties.getPropertiesFor("lfu"))); 324 } else { 325 throw new AssertionError ("Unknown Cache Policy : " + cachePolicy 326 + " Accepted Values are : <LRU>/<LFU> Please check tc.properties"); 327 } 328 objectStore = new PersistentManagedObjectStore(persistor.getManagedObjectPersistor()); 329 } else { 330 persistor = new InMemoryPersistor(); 331 swapCache = new NullCache(); 332 objectStore = new InMemoryManagedObjectStore(new HashMap ()); 333 } 334 335 persistenceTransactionProvider = persistor.getPersistenceTransactionProvider(); 336 PersistenceTransactionProvider nullPersistenceTransactionProvider = new NullPersistenceTransactionProvider(); 337 PersistenceTransactionProvider transactionStorePTP; 338 if (persistent) { 339 PersistentBatchSequenceProvider sequenceProvider = new PersistentBatchSequenceProvider(persistor 343 .getGlobalTransactionIDSequence()); 344 Stage requestBatchStage = stageManager 345 .createStage(ServerConfigurationContext.REQUEST_BATCH_GLOBAL_TRANSACTION_ID_SEQUENCE_STAGE, sequenceProvider, 346 1, maxStageSize); 347 sequenceProvider.setRequestBatchSink(requestBatchStage.getSink()); 348 globalTransactionIDSequence = new BatchSequence(sequenceProvider, 1000); 349 350 transactionPersistor = persistor.getTransactionPersistor(); 351 transactionStorePTP = persistenceTransactionProvider; 352 } else { 353 transactionPersistor = new NullTransactionPersistor(); 354 transactionStorePTP = nullPersistenceTransactionProvider; 355 globalTransactionIDSequence = new SimpleSequence(); 356 } 357 358 clientStateStore = persistor.getClientStatePersistor(); 359 360 ManagedObjectStateFactory.createInstance(managedObjectChangeListenerProvider, persistor); 361 362 communicationsManager = new CommunicationsManagerImpl(new NullMessageMonitor(), 363 new PlainNetworkStackHarnessFactory(), connectionPolicy); 364 365 final DSOApplicationEvents appEvents; 366 try { 367 appEvents = new DSOApplicationEvents(); 368 } catch (NotCompliantMBeanException ncmbe) { 369 throw new TCRuntimeException("Unable to construct the " + DSOApplicationEvents.class.getName() 370 + " MBean; this is a programming error. Please go fix that class.", ncmbe); 371 } 372 373 clientStateManager = new ClientStateManagerImpl(TCLogging.getLogger(ClientStateManager.class)); 374 375 l2DSOConfig.changesInItemIgnored(l2DSOConfig.garbageCollectionEnabled()); 376 boolean gcEnabled = l2DSOConfig.garbageCollectionEnabled().getBoolean(); 377 logger.debug("GC enabled: " + gcEnabled); 378 379 l2DSOConfig.changesInItemIgnored(l2DSOConfig.garbageCollectionInterval()); 380 long gcInterval = l2DSOConfig.garbageCollectionInterval().getInt(); 381 if (gcEnabled) logger.debug("GC interval: " + gcInterval + " seconds"); 382 383 l2DSOConfig.changesInItemIgnored(l2DSOConfig.garbageCollectionVerbose()); 384 boolean verboseGC = l2DSOConfig.garbageCollectionVerbose().getBoolean(); 385 if (gcEnabled) logger.debug("Verbose GC enabled: " + verboseGC); 386 sampledCounterManager = new SampledCounterManagerImpl(); 387 SampledCounter objectCreationRate = sampledCounterManager.createCounter(new SampledCounterConfig(1, 900, true, 0L)); 388 SampledCounter objectFaultRate = sampledCounterManager.createCounter(new SampledCounterConfig(1, 900, true, 0L)); 389 ObjectManagerStatsImpl objMgrStats = new ObjectManagerStatsImpl(objectCreationRate, objectFaultRate); 390 391 SequenceValidator sequenceValidator = new SequenceValidator(0); 392 ManagedObjectFaultHandler managedObjectFaultHandler = new ManagedObjectFaultHandler(); 393 Stage faultManagedObjectStage = stageManager.createStage(ServerConfigurationContext.MANAGED_OBJECT_FAULT_STAGE, 395 managedObjectFaultHandler, 4, -1); 396 ManagedObjectFlushHandler managedObjectFlushHandler = new ManagedObjectFlushHandler(); 397 Stage flushManagedObjectStage = stageManager.createStage(ServerConfigurationContext.MANAGED_OBJECT_FLUSH_STAGE, 398 managedObjectFlushHandler, (persistent ? 1 : 4), -1); 399 400 TCProperties objManagerProperties = l2Properties.getPropertiesFor("objectmanager"); 401 402 objectManager = new ObjectManagerImpl(new ObjectManagerConfig(gcInterval * 1000, gcEnabled, verboseGC, persistent, 403 objManagerProperties.getInt("deleteBatchSize")), 404 getThreadGroup(), clientStateManager, objectStore, swapCache, 405 persistenceTransactionProvider, faultManagedObjectStage.getSink(), 406 flushManagedObjectStage.getSink(), l2Management 407 .findObjectManagementMonitorMBean()); 408 objectManager.setStatsListener(objMgrStats); 409 objectManager.setGarbageCollector(new MarkAndSweepGarbageCollector(objectManager, clientStateManager, verboseGC)); 410 managedObjectChangeListenerProvider.setListener(objectManager); 411 412 TCProperties cacheManagerProperties = l2Properties.getPropertiesFor("cachemanager"); 413 if (cacheManagerProperties.getBoolean("enabled")) { 414 cacheManager = new CacheManager(objectManager, new CacheConfigImpl(cacheManagerProperties)); 415 if (logger.isDebugEnabled()) { 416 logger.debug("CacheManager Enabled : " + cacheManager); 417 } 418 } else { 419 logger.warn("CacheManager is Disabled"); 420 } 421 422 connectionIdFactory = new ConnectionIDFactoryImpl(clientStateStore); 423 424 l2DSOConfig.changesInItemIgnored(l2DSOConfig.listenPort()); 425 int serverPort = l2DSOConfig.listenPort().getInt(); 426 l1Listener = communicationsManager.createListener(sessionProvider, 427 new TCSocketAddress(TCSocketAddress.WILDCARD_ADDR, serverPort), 428 true, connectionIdFactory, httpSink); 429 430 ClientTunnelingEventHandler cteh = new ClientTunnelingEventHandler(); 431 432 DSOChannelManager channelManager = new DSOChannelManagerImpl(l1Listener.getChannelManager()); 433 channelManager.addEventListener(cteh); 434 channelManager.addEventListener(connectionIdFactory); 435 436 ChannelStats channelStats = new ChannelStatsImpl(sampledCounterManager, channelManager); 437 438 lockManager = new LockManagerImpl(channelManager); 439 TransactionAcknowledgeAction taa = new TransactionAcknowledgeActionImpl(channelManager); 440 ObjectInstanceMonitorImpl instanceMonitor = new ObjectInstanceMonitorImpl(); 441 TransactionBatchManager transactionBatchManager = new TransactionBatchManagerImpl(); 442 SampledCounter globalTxnCounter = sampledCounterManager.createCounter(new SampledCounterConfig(1, 300, true, 0L)); 443 444 final TransactionStore transactionStore = new TransactionStoreImpl(transactionPersistor, 445 globalTransactionIDSequence); 446 ServerGlobalTransactionManager gtxm = new ServerGlobalTransactionManagerImpl(sequenceValidator, transactionStore, 447 transactionStorePTP); 448 transactionManager = new ServerTransactionManagerImpl(gtxm, transactionStore, lockManager, clientStateManager, 449 objectManager, taa, globalTxnCounter, channelStats); 450 MessageRecycler recycler = new CommitTransactionMessageRecycler(transactionManager); 451 452 stageManager.createStage(ServerConfigurationContext.TRANSACTION_LOOKUP_STAGE, new TransactionLookupHandler(), 1, 453 maxStageSize); 454 455 stageManager.createStage(ServerConfigurationContext.APPLY_CHANGES_STAGE, 457 new ApplyTransactionChangeHandler(instanceMonitor, gtxm), 1, -1); 458 459 stageManager.createStage(ServerConfigurationContext.APPLY_COMPLETE_STAGE, new ApplyCompleteTransactionHandler(), 1, 460 maxStageSize); 461 462 stageManager.createStage(ServerConfigurationContext.RECALL_OBJECTS_STAGE, new RecallObjectsHandler(), 1, -1); 464 465 int commitThreads = (persistent ? 4 : 1); 466 stageManager 467 .createStage(ServerConfigurationContext.COMMIT_CHANGES_STAGE, 468 new CommitTransactionChangeHandler(gtxm, transactionStorePTP), commitThreads, maxStageSize); 469 470 TransactionalStageCoordinator txnStageCoordinator = new TransactionalStagesCoordinatorImpl(stageManager); 471 txnObjectManager = new TransactionalObjectManagerImpl(objectManager, new TransactionSequencer(), gtxm, 472 txnStageCoordinator); 473 objectManager.setTransactionalObjectManager(txnObjectManager); 474 Stage processTx = stageManager.createStage(ServerConfigurationContext.PROCESS_TRANSACTION_STAGE, 475 new ProcessTransactionHandler(transactionBatchManager, txnObjectManager, 476 sequenceValidator, recycler), 1, 477 maxStageSize); 478 479 Stage rootRequest = stageManager.createStage(ServerConfigurationContext.MANAGED_ROOT_REQUEST_STAGE, 480 new RequestRootHandler(), 1, maxStageSize); 481 482 stageManager.createStage(ServerConfigurationContext.BROADCAST_CHANGES_STAGE, 483 new BroadcastChangeHandler(transactionBatchManager), 1, maxStageSize); 484 stageManager.createStage(ServerConfigurationContext.RESPOND_TO_LOCK_REQUEST_STAGE, 485 new RespondToRequestLockHandler(), 1, maxStageSize); 486 Stage requestLock = stageManager.createStage(ServerConfigurationContext.REQUEST_LOCK_STAGE, 487 new RequestLockUnLockHandler(), 1, maxStageSize); 488 Stage channelLifecycleStage = stageManager 489 .createStage(ServerConfigurationContext.CHANNEL_LIFE_CYCLE_STAGE, 490 new ChannelLifeCycleHandler(communicationsManager, transactionManager, transactionBatchManager, 491 channelManager), 1, maxStageSize); 492 channelManager.addEventListener(new ChannelLifeCycleHandler.EventListener(channelLifecycleStage.getSink())); 493 494 SampledCounter globalObjectFaultCounter = sampledCounterManager.createCounter(new SampledCounterConfig(1, 300, 495 true, 0L)); 496 SampledCounter globalObjectFlushCounter = sampledCounterManager.createCounter(new SampledCounterConfig(1, 300, 497 true, 0L)); 498 Stage objectRequest = stageManager.createStage(ServerConfigurationContext.MANAGED_OBJECT_REQUEST_STAGE, 499 new ManagedObjectRequestHandler(globalObjectFaultCounter, 500 globalObjectFlushCounter), 1, 501 maxStageSize); 502 stageManager.createStage(ServerConfigurationContext.RESPOND_TO_OBJECT_REQUEST_STAGE, 503 new RespondToObjectRequestHandler(), 4, maxStageSize); 504 Stage oidRequest = stageManager.createStage(ServerConfigurationContext.OBJECT_ID_BATCH_REQUEST_STAGE, 505 new RequestObjectIDBatchHandler(objectStore), 1, maxStageSize); 506 Stage transactionAck = stageManager.createStage(ServerConfigurationContext.TRANSACTION_ACKNOWLEDGEMENT_STAGE, 507 new TransactionAcknowledgementHandler(), 1, maxStageSize); 508 Stage clientHandshake = stageManager.createStage(ServerConfigurationContext.CLIENT_HANDSHAKE_STAGE, 509 new ClientHandshakeHandler(), 1, maxStageSize); 510 Stage hydrateStage = stageManager.createStage(ServerConfigurationContext.HYDRATE_MESSAGE_SINK, 511 new HydrateHandler(), 1, maxStageSize); 512 513 Stage jmxEventsStage = stageManager.createStage(ServerConfigurationContext.JMX_EVENTS_STAGE, 514 new JMXEventsHandler(appEvents), 1, maxStageSize); 515 516 final Stage jmxRemoteConnectStage = stageManager.createStage(ServerConfigurationContext.JMXREMOTE_CONNECT_STAGE, 517 new ClientConnectEventHandler(), 1, maxStageSize); 518 cteh.setConnectStageSink(jmxRemoteConnectStage.getSink()); 519 final Stage jmxRemoteTunnelStage = stageManager.createStage(ServerConfigurationContext.JMXREMOTE_TUNNEL_STAGE, 520 cteh, 1, maxStageSize); 521 522 l1Listener.addClassMapping(TCMessageType.BATCH_TRANSACTION_ACK_MESSAGE, 523 BatchTransactionAcknowledgeMessageImpl.class); 524 l1Listener.addClassMapping(TCMessageType.REQUEST_ROOT_MESSAGE, RequestRootMessageImpl.class); 525 l1Listener.addClassMapping(TCMessageType.LOCK_REQUEST_MESSAGE, LockRequestMessage.class); 526 l1Listener.addClassMapping(TCMessageType.LOCK_RESPONSE_MESSAGE, LockResponseMessage.class); 527 l1Listener.addClassMapping(TCMessageType.LOCK_RECALL_MESSAGE, LockResponseMessage.class); 528 l1Listener.addClassMapping(TCMessageType.LOCK_QUERY_RESPONSE_MESSAGE, LockResponseMessage.class); 529 l1Listener.addClassMapping(TCMessageType.COMMIT_TRANSACTION_MESSAGE, CommitTransactionMessageImpl.class); 530 l1Listener.addClassMapping(TCMessageType.REQUEST_ROOT_RESPONSE_MESSAGE, RequestRootResponseMessage.class); 531 l1Listener.addClassMapping(TCMessageType.REQUEST_MANAGED_OBJECT_MESSAGE, RequestManagedObjectMessageImpl.class); 532 l1Listener.addClassMapping(TCMessageType.REQUEST_MANAGED_OBJECT_RESPONSE_MESSAGE, 533 RequestManagedObjectResponseMessage.class); 534 l1Listener.addClassMapping(TCMessageType.BROADCAST_TRANSACTION_MESSAGE, BroadcastTransactionMessageImpl.class); 535 l1Listener.addClassMapping(TCMessageType.OBJECT_ID_BATCH_REQUEST_MESSAGE, ObjectIDBatchRequestMessage.class); 536 l1Listener.addClassMapping(TCMessageType.OBJECT_ID_BATCH_REQUEST_RESPONSE_MESSAGE, 537 ObjectIDBatchRequestResponseMessage.class); 538 l1Listener.addClassMapping(TCMessageType.ACKNOWLEDGE_TRANSACTION_MESSAGE, AcknowledgeTransactionMessageImpl.class); 539 l1Listener.addClassMapping(TCMessageType.CLIENT_HANDSHAKE_MESSAGE, ClientHandshakeMessageImpl.class); 540 l1Listener.addClassMapping(TCMessageType.CLIENT_HANDSHAKE_ACK_MESSAGE, ClientHandshakeAckMessageImpl.class); 541 l1Listener.addClassMapping(TCMessageType.JMX_MESSAGE, JMXMessage.class); 542 l1Listener.addClassMapping(TCMessageType.JMXREMOTE_MESSAGE_CONNECTION_MESSAGE, JmxRemoteTunnelMessage.class); 543 l1Listener.addClassMapping(TCMessageType.CLUSTER_MEMBERSHIP_EVENT_MESSAGE, ClusterMembershipMessage.class); 544 l1Listener.addClassMapping(TCMessageType.CLIENT_JMX_READY_MESSAGE, L1JmxReady.class); 545 546 Sink hydrateSink = hydrateStage.getSink(); 547 l1Listener.routeMessageType(TCMessageType.COMMIT_TRANSACTION_MESSAGE, processTx.getSink(), hydrateSink); 548 l1Listener.routeMessageType(TCMessageType.LOCK_REQUEST_MESSAGE, requestLock.getSink(), hydrateSink); 549 l1Listener.routeMessageType(TCMessageType.REQUEST_ROOT_MESSAGE, rootRequest.getSink(), hydrateSink); 550 l1Listener.routeMessageType(TCMessageType.REQUEST_MANAGED_OBJECT_MESSAGE, objectRequest.getSink(), hydrateSink); 551 l1Listener.routeMessageType(TCMessageType.OBJECT_ID_BATCH_REQUEST_MESSAGE, oidRequest.getSink(), hydrateSink); 552 l1Listener.routeMessageType(TCMessageType.ACKNOWLEDGE_TRANSACTION_MESSAGE, transactionAck.getSink(), hydrateSink); 553 l1Listener.routeMessageType(TCMessageType.CLIENT_HANDSHAKE_MESSAGE, clientHandshake.getSink(), hydrateSink); 554 l1Listener.routeMessageType(TCMessageType.JMX_MESSAGE, jmxEventsStage.getSink(), hydrateSink); 555 l1Listener.routeMessageType(TCMessageType.JMXREMOTE_MESSAGE_CONNECTION_MESSAGE, jmxRemoteTunnelStage.getSink(), 556 hydrateSink); 557 l1Listener.routeMessageType(TCMessageType.CLIENT_JMX_READY_MESSAGE, jmxRemoteTunnelStage.getSink(), hydrateSink); 558 559 ObjectRequestManager objectRequestManager = new ObjectRequestManagerImpl(objectManager, transactionManager); 560 561 l2DSOConfig.changesInItemIgnored(l2DSOConfig.clientReconnectWindow()); 562 long reconnectTimeout = l2DSOConfig.clientReconnectWindow().getInt(); 563 logger.debug("Client Reconnect Window: " + reconnectTimeout + " seconds"); 564 reconnectTimeout *= 1000; 565 ServerClientHandshakeManager clientHandshakeManager = new ServerClientHandshakeManager( 566 TCLogging 567 .getLogger(ServerClientHandshakeManager.class), 568 channelManager, 569 objectManager, 570 sequenceValidator, 571 clientStateManager, 572 lockManager, 573 transactionManager, 574 stageManager 575 .getStage( 576 ServerConfigurationContext.RESPOND_TO_LOCK_REQUEST_STAGE) 577 .getSink(), 578 objectStore, 579 new TCTimerImpl( 580 "Reconnect timer", 581 true), 582 reconnectTimeout, persistent); 583 584 boolean networkedHA = l2Properties.getBoolean("ha.network.enabled"); 585 if (networkedHA) { 586 logger.info("L2 Networked HA Enabled "); 587 l2Coordinator = new L2HACoordinator(consoleLogger, this, stageManager, persistor.getClusterStateStore(), 588 objectManager); 589 } else { 590 l2Coordinator = new L2HADisabledCooridinator(); 591 } 592 593 context = new ServerConfigurationContextImpl(stageManager, objectManager, objectRequestManager, objectStore, 594 lockManager, channelManager, clientStateManager, transactionManager, 595 txnObjectManager, clientHandshakeManager, channelStats, l2Coordinator, 596 new CommitTransactionMessageToTransactionBatchReader()); 597 598 stageManager.startAll(context); 599 600 DSOGlobalServerStats serverStats = new DSOGlobalServerStatsImpl(globalObjectFlushCounter, globalObjectFaultCounter, 601 globalTxnCounter, objMgrStats); 602 603 managementContext = new ServerManagementContext((ServerTransactionManagerMBean) transactionManager, 605 (ObjectManagerMBean) objectManager, (LockManagerMBean) lockManager, 606 (DSOChannelManagerMBean) channelManager, serverStats, channelStats, 607 instanceMonitor, appEvents); 608 609 if (l2Properties.getBoolean("beanshell.enabled")) startBeanShell(l2Properties.getInt("beanshell.port")); 610 611 if (networkedHA) { 612 l2Coordinator.start(); 613 } else { 614 startActiveMode(); 616 } 617 } 618 619 public boolean startActiveMode() throws IOException { 620 Set existingConnections = Collections.unmodifiableSet(connectionIdFactory.loadConnectionIDs()); 621 context.getClientHandshakeManager().setStarting(existingConnections); 622 l1Listener.start(existingConnections); 623 consoleLogger.info("Terracotta Server has started up as ACTIVE node on port " + l1Listener.getBindPort() 624 + " successfully, and is now ready for work."); 625 return true; 626 } 627 628 public boolean stopActiveMode() throws TCTimeoutException { 629 consoleLogger.info("Stopping ACTIVE Terracotta Server on port " + l1Listener.getBindPort() + "."); 631 l1Listener.stop(10000); 632 l1Listener.getChannelManager().closeAllChannels(); 633 return true; 634 } 635 636 private void startBeanShell(int port) { 637 try { 638 Interpreter i = new Interpreter(); 639 i.set("dsoServer", this); 640 i.set("objectManager", objectManager); 641 i.set("txnObjectManager", txnObjectManager); 642 i.set("portnum", port); 643 i.eval("setAccessibility(true)"); i.eval("server(portnum)"); 645 consoleLogger.info("Bean shell is started on port " + port); 646 } catch (EvalError e) { 647 e.printStackTrace(); 648 } 649 } 650 651 public int getListenPort() { 652 return this.l1Listener.getBindPort(); 653 } 654 655 public synchronized void stop() { 656 try { 657 if (lockManager != null) lockManager.stop(); 658 } catch (InterruptedException e) { 659 logger.error(e); 660 } 661 662 getStageManager().stopAll(); 663 664 if (l1Listener != null) { 665 try { 666 l1Listener.stop(5000); 667 } catch (TCTimeoutException e) { 668 logger.warn("timeout trying to stop listener: " + e.getMessage()); 669 } 670 } 671 672 if ((communicationsManager != null)) { 673 communicationsManager.shutdown(); 674 } 675 676 if (objectManager != null) { 677 try { 678 objectManager.stop(); 679 } catch (Throwable e) { 680 logger.error(e); 681 } 682 } 683 684 clientStateManager.stop(); 685 686 try { 687 objectStore.shutdown(); 688 } catch (Throwable e) { 689 logger.warn(e); 690 } 691 692 try { 693 persistor.close(); 694 } catch (DBException e) { 695 logger.warn(e); 696 } 697 698 if (sampledCounterManager != null) { 699 try { 700 sampledCounterManager.shutdown(); 701 } catch (Exception e) { 702 logger.error(e); 703 } 704 } 705 706 try { 707 stopJMXServer(); 708 } catch (Throwable t) { 709 logger.error("Error shutting down jmx server", t); 710 } 711 712 basicStop(); 713 } 714 715 public void quickStop() { 716 try { 717 stopJMXServer(); 718 } catch (Throwable t) { 719 logger.error("Error shutting down jmx server", t); 720 } 721 722 726 } 728 729 private void basicStop() { 730 if (startupLock != null) { 731 startupLock.release(); 732 } 733 } 734 735 public ConnectionIDFactory getConnectionIdFactory() { 736 return connectionIdFactory; 737 } 738 739 public ManagedObjectStore getManagedObjectStore() { 740 return objectStore; 741 } 742 743 public ServerConfigurationContext getContext() { 744 return context; 745 } 746 747 public ServerManagementContext getManagementContext() { 748 return managementContext; 749 } 750 751 public MBeanServer getMBeanServer() { 752 return l2Management.getMBeanServer(); 753 } 754 755 private void startJMXServer() throws Exception { 756 l2Management = new L2Management(tcServerInfoMBean, configSetupManager); 757 758 763 if(!Boolean.getBoolean("org.terracotta.server.disableJmxConnector")) { 764 l2Management.start(); 765 } 766 } 767 768 private void stopJMXServer() throws Exception { 769 try { 770 if (l2Management != null) { 771 l2Management.stop(); 772 } 773 } finally { 774 l2Management = null; 775 } 776 } 777 } 778 | Popular Tags |