KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > objectserver > impl > DistributedObjectServer


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.objectserver.impl;
6
7 import bsh.EvalError;
8 import bsh.Interpreter;
9
10 import com.sleepycat.je.DatabaseException;
11 import com.tc.async.api.SEDA;
12 import com.tc.async.api.Sink;
13 import com.tc.async.api.Stage;
14 import com.tc.async.api.StageManager;
15 import com.tc.async.impl.NullSink;
16 import com.tc.config.schema.setup.L2TVSConfigurationSetupManager;
17 import com.tc.exception.TCRuntimeException;
18 import com.tc.io.TCFile;
19 import com.tc.io.TCFileImpl;
20 import com.tc.io.TCRandomFileAccessImpl;
21 import com.tc.l2.api.L2Coordinator;
22 import com.tc.l2.ha.L2HACoordinator;
23 import com.tc.l2.ha.L2HADisabledCooridinator;
24 import com.tc.lang.TCThreadGroup;
25 import com.tc.logging.CustomerLogging;
26 import com.tc.logging.TCLogger;
27 import com.tc.logging.TCLogging;
28 import com.tc.management.L2Management;
29 import com.tc.management.beans.TCServerInfoMBean;
30 import com.tc.management.remote.connect.ClientConnectEventHandler;
31 import com.tc.management.remote.protocol.terracotta.ClientTunnelingEventHandler;
32 import com.tc.management.remote.protocol.terracotta.JmxRemoteTunnelMessage;
33 import com.tc.management.remote.protocol.terracotta.L1JmxReady;
34 import com.tc.net.NIOWorkarounds;
35 import com.tc.net.TCSocketAddress;
36 import com.tc.net.protocol.PlainNetworkStackHarnessFactory;
37 import com.tc.net.protocol.tcm.CommunicationsManager;
38 import com.tc.net.protocol.tcm.CommunicationsManagerImpl;
39 import com.tc.net.protocol.tcm.HydrateHandler;
40 import com.tc.net.protocol.tcm.NetworkListener;
41 import com.tc.net.protocol.tcm.NullMessageMonitor;
42 import com.tc.net.protocol.tcm.TCMessageType;
43 import com.tc.net.protocol.transport.ConnectionIDFactory;
44 import com.tc.net.protocol.transport.ConnectionPolicy;
45 import com.tc.object.cache.CacheConfigImpl;
46 import com.tc.object.cache.CacheManager;
47 import com.tc.object.cache.EvictionPolicy;
48 import com.tc.object.cache.LFUConfigImpl;
49 import com.tc.object.cache.LFUEvictionPolicy;
50 import com.tc.object.cache.LRUEvictionPolicy;
51 import com.tc.object.cache.NullCache;
52 import com.tc.object.config.schema.NewL2DSOConfig;
53 import com.tc.object.config.schema.PersistenceMode;
54 import com.tc.object.msg.AcknowledgeTransactionMessageImpl;
55 import com.tc.object.msg.BatchTransactionAcknowledgeMessageImpl;
56 import com.tc.object.msg.BroadcastTransactionMessageImpl;
57 import com.tc.object.msg.ClientHandshakeAckMessageImpl;
58 import com.tc.object.msg.ClientHandshakeMessageImpl;
59 import com.tc.object.msg.ClusterMembershipMessage;
60 import com.tc.object.msg.CommitTransactionMessageImpl;
61 import com.tc.object.msg.JMXMessage;
62 import com.tc.object.msg.LockRequestMessage;
63 import com.tc.object.msg.LockResponseMessage;
64 import com.tc.object.msg.MessageRecycler;
65 import com.tc.object.msg.ObjectIDBatchRequestMessage;
66 import com.tc.object.msg.ObjectIDBatchRequestResponseMessage;
67 import com.tc.object.msg.RequestManagedObjectMessageImpl;
68 import com.tc.object.msg.RequestManagedObjectResponseMessage;
69 import com.tc.object.msg.RequestRootMessageImpl;
70 import com.tc.object.msg.RequestRootResponseMessage;
71 import com.tc.object.net.ChannelStats;
72 import com.tc.object.net.ChannelStatsImpl;
73 import com.tc.object.net.DSOChannelManager;
74 import com.tc.object.net.DSOChannelManagerImpl;
75 import com.tc.object.net.DSOChannelManagerMBean;
76 import com.tc.object.session.NullSessionManager;
77 import com.tc.object.session.SessionManager;
78 import com.tc.object.session.SessionProvider;
79 import com.tc.objectserver.DSOApplicationEvents;
80 import com.tc.objectserver.api.ObjectManagerMBean;
81 import com.tc.objectserver.api.ObjectRequestManager;
82 import com.tc.objectserver.core.api.DSOGlobalServerStats;
83 import com.tc.objectserver.core.api.DSOGlobalServerStatsImpl;
84 import com.tc.objectserver.core.api.ServerConfigurationContext;
85 import com.tc.objectserver.core.impl.MarkAndSweepGarbageCollector;
86 import com.tc.objectserver.core.impl.ServerConfigurationContextImpl;
87 import com.tc.objectserver.core.impl.ServerManagementContext;
88 import com.tc.objectserver.gtx.ServerGlobalTransactionManager;
89 import com.tc.objectserver.gtx.ServerGlobalTransactionManagerImpl;
90 import com.tc.objectserver.handler.ApplyCompleteTransactionHandler;
91 import com.tc.objectserver.handler.ApplyTransactionChangeHandler;
92 import com.tc.objectserver.handler.BroadcastChangeHandler;
93 import com.tc.objectserver.handler.ChannelLifeCycleHandler;
94 import com.tc.objectserver.handler.ClientHandshakeHandler;
95 import com.tc.objectserver.handler.CommitTransactionChangeHandler;
96 import com.tc.objectserver.handler.JMXEventsHandler;
97 import com.tc.objectserver.handler.ManagedObjectFaultHandler;
98 import com.tc.objectserver.handler.ManagedObjectFlushHandler;
99 import com.tc.objectserver.handler.ManagedObjectRequestHandler;
100 import com.tc.objectserver.handler.ProcessTransactionHandler;
101 import com.tc.objectserver.handler.RecallObjectsHandler;
102 import com.tc.objectserver.handler.RequestLockUnLockHandler;
103 import com.tc.objectserver.handler.RequestObjectIDBatchHandler;
104 import com.tc.objectserver.handler.RequestRootHandler;
105 import com.tc.objectserver.handler.RespondToObjectRequestHandler;
106 import com.tc.objectserver.handler.RespondToRequestLockHandler;
107 import com.tc.objectserver.handler.TransactionAcknowledgementHandler;
108 import com.tc.objectserver.handler.TransactionLookupHandler;
109 import com.tc.objectserver.handshakemanager.ServerClientHandshakeManager;
110 import com.tc.objectserver.l1.api.ClientStateManager;
111 import com.tc.objectserver.l1.impl.ClientStateManagerImpl;
112 import com.tc.objectserver.l1.impl.TransactionAcknowledgeAction;
113 import com.tc.objectserver.l1.impl.TransactionAcknowledgeActionImpl;
114 import com.tc.objectserver.lockmanager.api.LockManager;
115 import com.tc.objectserver.lockmanager.api.LockManagerMBean;
116 import com.tc.objectserver.lockmanager.impl.LockManagerImpl;
117 import com.tc.objectserver.managedobject.ManagedObjectChangeListenerProviderImpl;
118 import com.tc.objectserver.managedobject.ManagedObjectStateFactory;
119 import com.tc.objectserver.persistence.api.ClientStatePersistor;
120 import com.tc.objectserver.persistence.api.ManagedObjectStore;
121 import com.tc.objectserver.persistence.api.PersistenceTransactionProvider;
122 import com.tc.objectserver.persistence.api.Persistor;
123 import com.tc.objectserver.persistence.api.TransactionPersistor;
124 import com.tc.objectserver.persistence.api.TransactionStore;
125 import com.tc.objectserver.persistence.impl.InMemoryPersistor;
126 import com.tc.objectserver.persistence.impl.NullPersistenceTransactionProvider;
127 import com.tc.objectserver.persistence.impl.NullTransactionPersistor;
128 import com.tc.objectserver.persistence.impl.PersistentBatchSequenceProvider;
129 import com.tc.objectserver.persistence.impl.TransactionStoreImpl;
130 import com.tc.objectserver.persistence.sleepycat.ConnectionIDFactoryImpl;
131 import com.tc.objectserver.persistence.sleepycat.CustomSerializationAdapterFactory;
132 import com.tc.objectserver.persistence.sleepycat.DBEnvironment;
133 import com.tc.objectserver.persistence.sleepycat.DBException;
134 import com.tc.objectserver.persistence.sleepycat.SerializationAdapterFactory;
135 import com.tc.objectserver.persistence.sleepycat.SleepycatPersistor;
136 import com.tc.objectserver.tx.CommitTransactionMessageRecycler;
137 import com.tc.objectserver.tx.CommitTransactionMessageToTransactionBatchReader;
138 import com.tc.objectserver.tx.ServerTransactionManager;
139 import com.tc.objectserver.tx.ServerTransactionManagerImpl;
140 import com.tc.objectserver.tx.ServerTransactionManagerMBean;
141 import com.tc.objectserver.tx.TransactionBatchManager;
142 import com.tc.objectserver.tx.TransactionBatchManagerImpl;
143 import com.tc.objectserver.tx.TransactionSequencer;
144 import com.tc.objectserver.tx.TransactionalObjectManagerImpl;
145 import com.tc.objectserver.tx.TransactionalStageCoordinator;
146 import com.tc.objectserver.tx.TransactionalStagesCoordinatorImpl;
147 import com.tc.properties.TCProperties;
148 import com.tc.properties.TCPropertiesImpl;
149 import com.tc.stats.counter.sampled.SampledCounter;
150 import com.tc.stats.counter.sampled.SampledCounterConfig;
151 import com.tc.stats.counter.sampled.SampledCounterManager;
152 import com.tc.stats.counter.sampled.SampledCounterManagerImpl;
153 import com.tc.util.Assert;
154 import com.tc.util.SequenceValidator;
155 import com.tc.util.StartupLock;
156 import com.tc.util.TCTimeoutException;
157 import com.tc.util.TCTimerImpl;
158 import com.tc.util.io.FileUtils;
159 import com.tc.util.sequence.BatchSequence;
160 import com.tc.util.sequence.Sequence;
161 import com.tc.util.sequence.SimpleSequence;
162 import com.tc.util.startuplock.FileNotCreatedException;
163 import com.tc.util.startuplock.LocationNotCreatedException;
164
165 import java.io.File JavaDoc;
166 import java.io.IOException JavaDoc;
167 import java.util.Collections JavaDoc;
168 import java.util.HashMap JavaDoc;
169 import java.util.Properties JavaDoc;
170 import java.util.Set JavaDoc;
171
172 import javax.management.MBeanServer JavaDoc;
173 import javax.management.NotCompliantMBeanException JavaDoc;
174
175 /**
176  * Startup and shutdown point. Builds and starts the server
177  *
178  * @author steve
179  */

180 public class DistributedObjectServer extends SEDA {
181   private final ConnectionPolicy connectionPolicy;
182
183   private static final TCLogger logger = CustomerLogging.getDSOGenericLogger();
184   private static final TCLogger consoleLogger = CustomerLogging.getConsoleLogger();
185
186   private final L2TVSConfigurationSetupManager configSetupManager;
187   private final Sink httpSink;
188   private NetworkListener l1Listener;
189   private CommunicationsManager communicationsManager;
190   private ServerConfigurationContext context;
191   private ObjectManagerImpl objectManager;
192   private TransactionalObjectManagerImpl txnObjectManager;
193   private SampledCounterManager sampledCounterManager;
194   private LockManager lockManager;
195   private ServerManagementContext managementContext;
196   private StartupLock startupLock;
197
198   private ClientStateManagerImpl clientStateManager;
199
200   private ManagedObjectStore objectStore;
201   private Persistor persistor;
202   private ServerTransactionManager transactionManager;
203
204   private CacheManager cacheManager;
205
206   private final TCServerInfoMBean tcServerInfoMBean;
207   private L2Management l2Management;
208   private L2Coordinator l2Coordinator;
209
210   private TCProperties l2Properties;
211
212   private ConnectionIDFactoryImpl connectionIdFactory;
213
214   public DistributedObjectServer(L2TVSConfigurationSetupManager configSetupManager, TCThreadGroup threadGroup,
215                                  ConnectionPolicy connectionPolicy, TCServerInfoMBean tcServerInfoMBean) {
216     this(configSetupManager, threadGroup, connectionPolicy, new NullSink(), tcServerInfoMBean);
217   }
218
219   public DistributedObjectServer(L2TVSConfigurationSetupManager configSetupManager, TCThreadGroup threadGroup,
220                                  ConnectionPolicy connectionPolicy, Sink httpSink, TCServerInfoMBean tcServerInfoMBean) {
221     super(threadGroup);
222
223     // This assertion is here because we want to assume that all threads spawned by the server (including any created in
224
// 3rd party libs) inherit their thread group from the current thread . Consider this before removing the assertion.
225
// Even in tests, we probably don't want different thread group configurations
226
Assert.assertEquals(threadGroup, Thread.currentThread().getThreadGroup());
227
228     this.configSetupManager = configSetupManager;
229     this.connectionPolicy = connectionPolicy;
230     this.httpSink = httpSink;
231     this.tcServerInfoMBean = tcServerInfoMBean;
232   }
233
234   public void dump() {
235     if (this.lockManager != null) {
236       this.lockManager.dump();
237     }
238
239     if (this.objectManager != null) {
240       this.objectManager.dump();
241     }
242
243     if (this.txnObjectManager != null) {
244       this.txnObjectManager.dump();
245     }
246
247     if (this.transactionManager != null) {
248       this.transactionManager.dump();
249     }
250   }
251
252   public synchronized void start() throws IOException JavaDoc, DatabaseException, LocationNotCreatedException,
253       FileNotCreatedException {
254
255     try {
256       startJMXServer();
257     } catch (Exception JavaDoc e) {
258       String JavaDoc msg = "Unable to start the JMX server. Do you have another Terracotta Server running?";
259       consoleLogger.error(msg);
260       logger.error(msg, e);
261       System.exit(-1);
262     }
263
264     NIOWorkarounds.solaris10Workaround();
265
266     configSetupManager.commonl2Config().changesInItemIgnored(configSetupManager.commonl2Config().dataPath());
267     NewL2DSOConfig l2DSOConfig = configSetupManager.dsoL2Config();
268     l2DSOConfig.changesInItemIgnored(l2DSOConfig.persistenceMode());
269     PersistenceMode persistenceMode = (PersistenceMode) l2DSOConfig.persistenceMode().getObject();
270
271     final boolean swapEnabled = true; // 2006-01-31 andrew -- no longer possible to use in-memory only; DSO folks say
272
// it's broken
273
final boolean persistent = persistenceMode.equals(PersistenceMode.PERMANENT_STORE);
274
275     TCFile location = new TCFileImpl(this.configSetupManager.commonl2Config().dataPath().getFile());
276     startupLock = new StartupLock(location);
277
278     if (!startupLock.canProceed(new TCRandomFileAccessImpl(), persistent)) {
279       consoleLogger.error("Another L2 process is using the directory " + location + " as data directory.");
280       if (!persistent) {
281         consoleLogger.error("This is not allowed with persistence mode set to temporary-swap-only.");
282       }
283       consoleLogger.error("Exiting...");
284       System.exit(1);
285     }
286
287     int maxStageSize = 5000;
288
289     StageManager stageManager = getStageManager();
290     SessionManager sessionManager = new NullSessionManager();
291     SessionProvider sessionProvider = (SessionProvider) sessionManager;
292     l2Properties = TCPropertiesImpl.getProperties().getPropertiesFor("l2");
293
294     EvictionPolicy swapCache;
295     final ClientStatePersistor clientStateStore;
296     final PersistenceTransactionProvider persistenceTransactionProvider;
297     final TransactionPersistor transactionPersistor;
298     final Sequence globalTransactionIDSequence;
299     logger.debug("server swap enabled: " + swapEnabled);
300     final ManagedObjectChangeListenerProviderImpl managedObjectChangeListenerProvider = new ManagedObjectChangeListenerProviderImpl();
301     if (swapEnabled) {
302       File dbhome = new File(configSetupManager.commonl2Config().dataPath().getFile(), "objectdb");
303       logger.debug("persistent: " + persistent);
304       if (!persistent) {
305         if (dbhome.exists()) {
306           logger.debug("deleting persistence database...");
307           FileUtils.forceDelete(dbhome, "jdb");
308           logger.debug("persistence database deleted.");
309         }
310       }
311       logger.debug("persistence database home: " + dbhome);
312
313       DBEnvironment dbenv = new DBEnvironment(persistent, dbhome, l2Properties.getPropertiesFor("berkeleydb")
314           .addAllPropertiesTo(new Properties()));
315       SerializationAdapterFactory serializationAdapterFactory = new CustomSerializationAdapterFactory();
316       persistor = new SleepycatPersistor(TCLogging.getLogger(SleepycatPersistor.class), dbenv,
317                                          serializationAdapterFactory);
318
319       String JavaDoc cachePolicy = l2Properties.getProperty("objectmanager.cachePolicy").toUpperCase();
320       if (cachePolicy.equals("LRU")) {
321         swapCache = new LRUEvictionPolicy(-1);
322       } else if (cachePolicy.equals("LFU")) {
323         swapCache = new LFUEvictionPolicy(-1, new LFUConfigImpl(l2Properties.getPropertiesFor("lfu")));
324       } else {
325         throw new AssertionError JavaDoc("Unknown Cache Policy : " + cachePolicy
326                                  + " Accepted Values are : <LRU>/<LFU> Please check tc.properties");
327       }
328       objectStore = new PersistentManagedObjectStore(persistor.getManagedObjectPersistor());
329     } else {
330       persistor = new InMemoryPersistor();
331       swapCache = new NullCache();
332       objectStore = new InMemoryManagedObjectStore(new HashMap JavaDoc());
333     }
334
335     persistenceTransactionProvider = persistor.getPersistenceTransactionProvider();
336     PersistenceTransactionProvider nullPersistenceTransactionProvider = new NullPersistenceTransactionProvider();
337     PersistenceTransactionProvider transactionStorePTP;
338     if (persistent) {
339       // XXX: This construction/initialization order is pretty lame. Perhaps
340
// making the sequence provider its own
341
// handler isn't the right thing to do.
342
PersistentBatchSequenceProvider sequenceProvider = new PersistentBatchSequenceProvider(persistor
343           .getGlobalTransactionIDSequence());
344       Stage requestBatchStage = stageManager
345           .createStage(ServerConfigurationContext.REQUEST_BATCH_GLOBAL_TRANSACTION_ID_SEQUENCE_STAGE, sequenceProvider,
346                        1, maxStageSize);
347       sequenceProvider.setRequestBatchSink(requestBatchStage.getSink());
348       globalTransactionIDSequence = new BatchSequence(sequenceProvider, 1000);
349
350       transactionPersistor = persistor.getTransactionPersistor();
351       transactionStorePTP = persistenceTransactionProvider;
352     } else {
353       transactionPersistor = new NullTransactionPersistor();
354       transactionStorePTP = nullPersistenceTransactionProvider;
355       globalTransactionIDSequence = new SimpleSequence();
356     }
357
358     clientStateStore = persistor.getClientStatePersistor();
359
360     ManagedObjectStateFactory.createInstance(managedObjectChangeListenerProvider, persistor);
361
362     communicationsManager = new CommunicationsManagerImpl(new NullMessageMonitor(),
363                                                           new PlainNetworkStackHarnessFactory(), connectionPolicy);
364
365     final DSOApplicationEvents appEvents;
366     try {
367       appEvents = new DSOApplicationEvents();
368     } catch (NotCompliantMBeanException JavaDoc ncmbe) {
369       throw new TCRuntimeException("Unable to construct the " + DSOApplicationEvents.class.getName()
370                                    + " MBean; this is a programming error. Please go fix that class.", ncmbe);
371     }
372
373     clientStateManager = new ClientStateManagerImpl(TCLogging.getLogger(ClientStateManager.class));
374
375     l2DSOConfig.changesInItemIgnored(l2DSOConfig.garbageCollectionEnabled());
376     boolean gcEnabled = l2DSOConfig.garbageCollectionEnabled().getBoolean();
377     logger.debug("GC enabled: " + gcEnabled);
378
379     l2DSOConfig.changesInItemIgnored(l2DSOConfig.garbageCollectionInterval());
380     long gcInterval = l2DSOConfig.garbageCollectionInterval().getInt();
381     if (gcEnabled) logger.debug("GC interval: " + gcInterval + " seconds");
382
383     l2DSOConfig.changesInItemIgnored(l2DSOConfig.garbageCollectionVerbose());
384     boolean verboseGC = l2DSOConfig.garbageCollectionVerbose().getBoolean();
385     if (gcEnabled) logger.debug("Verbose GC enabled: " + verboseGC);
386     sampledCounterManager = new SampledCounterManagerImpl();
387     SampledCounter objectCreationRate = sampledCounterManager.createCounter(new SampledCounterConfig(1, 900, true, 0L));
388     SampledCounter objectFaultRate = sampledCounterManager.createCounter(new SampledCounterConfig(1, 900, true, 0L));
389     ObjectManagerStatsImpl objMgrStats = new ObjectManagerStatsImpl(objectCreationRate, objectFaultRate);
390
391     SequenceValidator sequenceValidator = new SequenceValidator(0);
392     ManagedObjectFaultHandler managedObjectFaultHandler = new ManagedObjectFaultHandler();
393     // Server initiated request processing queues shouldn't have any max queue size.
394
Stage faultManagedObjectStage = stageManager.createStage(ServerConfigurationContext.MANAGED_OBJECT_FAULT_STAGE,
395                                                              managedObjectFaultHandler, 4, -1);
396     ManagedObjectFlushHandler managedObjectFlushHandler = new ManagedObjectFlushHandler();
397     Stage flushManagedObjectStage = stageManager.createStage(ServerConfigurationContext.MANAGED_OBJECT_FLUSH_STAGE,
398                                                              managedObjectFlushHandler, (persistent ? 1 : 4), -1);
399
400     TCProperties objManagerProperties = l2Properties.getPropertiesFor("objectmanager");
401
402     objectManager = new ObjectManagerImpl(new ObjectManagerConfig(gcInterval * 1000, gcEnabled, verboseGC, persistent,
403                                                                   objManagerProperties.getInt("deleteBatchSize")),
404                                           getThreadGroup(), clientStateManager, objectStore, swapCache,
405                                           persistenceTransactionProvider, faultManagedObjectStage.getSink(),
406                                           flushManagedObjectStage.getSink(), l2Management
407                                               .findObjectManagementMonitorMBean());
408     objectManager.setStatsListener(objMgrStats);
409     objectManager.setGarbageCollector(new MarkAndSweepGarbageCollector(objectManager, clientStateManager, verboseGC));
410     managedObjectChangeListenerProvider.setListener(objectManager);
411
412     TCProperties cacheManagerProperties = l2Properties.getPropertiesFor("cachemanager");
413     if (cacheManagerProperties.getBoolean("enabled")) {
414       cacheManager = new CacheManager(objectManager, new CacheConfigImpl(cacheManagerProperties));
415       if (logger.isDebugEnabled()) {
416         logger.debug("CacheManager Enabled : " + cacheManager);
417       }
418     } else {
419       logger.warn("CacheManager is Disabled");
420     }
421
422     connectionIdFactory = new ConnectionIDFactoryImpl(clientStateStore);
423
424     l2DSOConfig.changesInItemIgnored(l2DSOConfig.listenPort());
425     int serverPort = l2DSOConfig.listenPort().getInt();
426     l1Listener = communicationsManager.createListener(sessionProvider,
427                                                       new TCSocketAddress(TCSocketAddress.WILDCARD_ADDR, serverPort),
428                                                       true, connectionIdFactory, httpSink);
429
430     ClientTunnelingEventHandler cteh = new ClientTunnelingEventHandler();
431
432     DSOChannelManager channelManager = new DSOChannelManagerImpl(l1Listener.getChannelManager());
433     channelManager.addEventListener(cteh);
434     channelManager.addEventListener(connectionIdFactory);
435
436     ChannelStats channelStats = new ChannelStatsImpl(sampledCounterManager, channelManager);
437
438     lockManager = new LockManagerImpl(channelManager);
439     TransactionAcknowledgeAction taa = new TransactionAcknowledgeActionImpl(channelManager);
440     ObjectInstanceMonitorImpl instanceMonitor = new ObjectInstanceMonitorImpl();
441     TransactionBatchManager transactionBatchManager = new TransactionBatchManagerImpl();
442     SampledCounter globalTxnCounter = sampledCounterManager.createCounter(new SampledCounterConfig(1, 300, true, 0L));
443
444     final TransactionStore transactionStore = new TransactionStoreImpl(transactionPersistor,
445                                                                        globalTransactionIDSequence);
446     ServerGlobalTransactionManager gtxm = new ServerGlobalTransactionManagerImpl(sequenceValidator, transactionStore,
447                                                                                  transactionStorePTP);
448     transactionManager = new ServerTransactionManagerImpl(gtxm, transactionStore, lockManager, clientStateManager,
449                                                           objectManager, taa, globalTxnCounter, channelStats);
450     MessageRecycler recycler = new CommitTransactionMessageRecycler(transactionManager);
451
452     stageManager.createStage(ServerConfigurationContext.TRANSACTION_LOOKUP_STAGE, new TransactionLookupHandler(), 1,
453                              maxStageSize);
454
455     // Lookup stage should never be blocked trying to add to apply stage
456
stageManager.createStage(ServerConfigurationContext.APPLY_CHANGES_STAGE,
457                              new ApplyTransactionChangeHandler(instanceMonitor, gtxm), 1, -1);
458
459     stageManager.createStage(ServerConfigurationContext.APPLY_COMPLETE_STAGE, new ApplyCompleteTransactionHandler(), 1,
460                              maxStageSize);
461
462     // Server initiated request processing stages should not be bounded
463
stageManager.createStage(ServerConfigurationContext.RECALL_OBJECTS_STAGE, new RecallObjectsHandler(), 1, -1);
464
465     int commitThreads = (persistent ? 4 : 1);
466     stageManager
467         .createStage(ServerConfigurationContext.COMMIT_CHANGES_STAGE,
468                      new CommitTransactionChangeHandler(gtxm, transactionStorePTP), commitThreads, maxStageSize);
469
470     TransactionalStageCoordinator txnStageCoordinator = new TransactionalStagesCoordinatorImpl(stageManager);
471     txnObjectManager = new TransactionalObjectManagerImpl(objectManager, new TransactionSequencer(), gtxm,
472                                                           txnStageCoordinator);
473     objectManager.setTransactionalObjectManager(txnObjectManager);
474     Stage processTx = stageManager.createStage(ServerConfigurationContext.PROCESS_TRANSACTION_STAGE,
475                                                new ProcessTransactionHandler(transactionBatchManager, txnObjectManager,
476                                                                              sequenceValidator, recycler), 1,
477                                                maxStageSize);
478
479     Stage rootRequest = stageManager.createStage(ServerConfigurationContext.MANAGED_ROOT_REQUEST_STAGE,
480                                                  new RequestRootHandler(), 1, maxStageSize);
481
482     stageManager.createStage(ServerConfigurationContext.BROADCAST_CHANGES_STAGE,
483                              new BroadcastChangeHandler(transactionBatchManager), 1, maxStageSize);
484     stageManager.createStage(ServerConfigurationContext.RESPOND_TO_LOCK_REQUEST_STAGE,
485                              new RespondToRequestLockHandler(), 1, maxStageSize);
486     Stage requestLock = stageManager.createStage(ServerConfigurationContext.REQUEST_LOCK_STAGE,
487                                                  new RequestLockUnLockHandler(), 1, maxStageSize);
488     Stage channelLifecycleStage = stageManager
489         .createStage(ServerConfigurationContext.CHANNEL_LIFE_CYCLE_STAGE,
490                      new ChannelLifeCycleHandler(communicationsManager, transactionManager, transactionBatchManager,
491                                                  channelManager), 1, maxStageSize);
492     channelManager.addEventListener(new ChannelLifeCycleHandler.EventListener(channelLifecycleStage.getSink()));
493
494     SampledCounter globalObjectFaultCounter = sampledCounterManager.createCounter(new SampledCounterConfig(1, 300,
495                                                                                                            true, 0L));
496     SampledCounter globalObjectFlushCounter = sampledCounterManager.createCounter(new SampledCounterConfig(1, 300,
497                                                                                                            true, 0L));
498     Stage objectRequest = stageManager.createStage(ServerConfigurationContext.MANAGED_OBJECT_REQUEST_STAGE,
499                                                    new ManagedObjectRequestHandler(globalObjectFaultCounter,
500                                                                                    globalObjectFlushCounter), 1,
501                                                    maxStageSize);
502     stageManager.createStage(ServerConfigurationContext.RESPOND_TO_OBJECT_REQUEST_STAGE,
503                              new RespondToObjectRequestHandler(), 4, maxStageSize);
504     Stage oidRequest = stageManager.createStage(ServerConfigurationContext.OBJECT_ID_BATCH_REQUEST_STAGE,
505                                                 new RequestObjectIDBatchHandler(objectStore), 1, maxStageSize);
506     Stage transactionAck = stageManager.createStage(ServerConfigurationContext.TRANSACTION_ACKNOWLEDGEMENT_STAGE,
507                                                     new TransactionAcknowledgementHandler(), 1, maxStageSize);
508     Stage clientHandshake = stageManager.createStage(ServerConfigurationContext.CLIENT_HANDSHAKE_STAGE,
509                                                      new ClientHandshakeHandler(), 1, maxStageSize);
510     Stage hydrateStage = stageManager.createStage(ServerConfigurationContext.HYDRATE_MESSAGE_SINK,
511                                                   new HydrateHandler(), 1, maxStageSize);
512
513     Stage jmxEventsStage = stageManager.createStage(ServerConfigurationContext.JMX_EVENTS_STAGE,
514                                                     new JMXEventsHandler(appEvents), 1, maxStageSize);
515
516     final Stage jmxRemoteConnectStage = stageManager.createStage(ServerConfigurationContext.JMXREMOTE_CONNECT_STAGE,
517                                                                  new ClientConnectEventHandler(), 1, maxStageSize);
518     cteh.setConnectStageSink(jmxRemoteConnectStage.getSink());
519     final Stage jmxRemoteTunnelStage = stageManager.createStage(ServerConfigurationContext.JMXREMOTE_TUNNEL_STAGE,
520                                                                 cteh, 1, maxStageSize);
521
522     l1Listener.addClassMapping(TCMessageType.BATCH_TRANSACTION_ACK_MESSAGE,
523                                BatchTransactionAcknowledgeMessageImpl.class);
524     l1Listener.addClassMapping(TCMessageType.REQUEST_ROOT_MESSAGE, RequestRootMessageImpl.class);
525     l1Listener.addClassMapping(TCMessageType.LOCK_REQUEST_MESSAGE, LockRequestMessage.class);
526     l1Listener.addClassMapping(TCMessageType.LOCK_RESPONSE_MESSAGE, LockResponseMessage.class);
527     l1Listener.addClassMapping(TCMessageType.LOCK_RECALL_MESSAGE, LockResponseMessage.class);
528     l1Listener.addClassMapping(TCMessageType.LOCK_QUERY_RESPONSE_MESSAGE, LockResponseMessage.class);
529     l1Listener.addClassMapping(TCMessageType.COMMIT_TRANSACTION_MESSAGE, CommitTransactionMessageImpl.class);
530     l1Listener.addClassMapping(TCMessageType.REQUEST_ROOT_RESPONSE_MESSAGE, RequestRootResponseMessage.class);
531     l1Listener.addClassMapping(TCMessageType.REQUEST_MANAGED_OBJECT_MESSAGE, RequestManagedObjectMessageImpl.class);
532     l1Listener.addClassMapping(TCMessageType.REQUEST_MANAGED_OBJECT_RESPONSE_MESSAGE,
533                                RequestManagedObjectResponseMessage.class);
534     l1Listener.addClassMapping(TCMessageType.BROADCAST_TRANSACTION_MESSAGE, BroadcastTransactionMessageImpl.class);
535     l1Listener.addClassMapping(TCMessageType.OBJECT_ID_BATCH_REQUEST_MESSAGE, ObjectIDBatchRequestMessage.class);
536     l1Listener.addClassMapping(TCMessageType.OBJECT_ID_BATCH_REQUEST_RESPONSE_MESSAGE,
537                                ObjectIDBatchRequestResponseMessage.class);
538     l1Listener.addClassMapping(TCMessageType.ACKNOWLEDGE_TRANSACTION_MESSAGE, AcknowledgeTransactionMessageImpl.class);
539     l1Listener.addClassMapping(TCMessageType.CLIENT_HANDSHAKE_MESSAGE, ClientHandshakeMessageImpl.class);
540     l1Listener.addClassMapping(TCMessageType.CLIENT_HANDSHAKE_ACK_MESSAGE, ClientHandshakeAckMessageImpl.class);
541     l1Listener.addClassMapping(TCMessageType.JMX_MESSAGE, JMXMessage.class);
542     l1Listener.addClassMapping(TCMessageType.JMXREMOTE_MESSAGE_CONNECTION_MESSAGE, JmxRemoteTunnelMessage.class);
543     l1Listener.addClassMapping(TCMessageType.CLUSTER_MEMBERSHIP_EVENT_MESSAGE, ClusterMembershipMessage.class);
544     l1Listener.addClassMapping(TCMessageType.CLIENT_JMX_READY_MESSAGE, L1JmxReady.class);
545
546     Sink hydrateSink = hydrateStage.getSink();
547     l1Listener.routeMessageType(TCMessageType.COMMIT_TRANSACTION_MESSAGE, processTx.getSink(), hydrateSink);
548     l1Listener.routeMessageType(TCMessageType.LOCK_REQUEST_MESSAGE, requestLock.getSink(), hydrateSink);
549     l1Listener.routeMessageType(TCMessageType.REQUEST_ROOT_MESSAGE, rootRequest.getSink(), hydrateSink);
550     l1Listener.routeMessageType(TCMessageType.REQUEST_MANAGED_OBJECT_MESSAGE, objectRequest.getSink(), hydrateSink);
551     l1Listener.routeMessageType(TCMessageType.OBJECT_ID_BATCH_REQUEST_MESSAGE, oidRequest.getSink(), hydrateSink);
552     l1Listener.routeMessageType(TCMessageType.ACKNOWLEDGE_TRANSACTION_MESSAGE, transactionAck.getSink(), hydrateSink);
553     l1Listener.routeMessageType(TCMessageType.CLIENT_HANDSHAKE_MESSAGE, clientHandshake.getSink(), hydrateSink);
554     l1Listener.routeMessageType(TCMessageType.JMX_MESSAGE, jmxEventsStage.getSink(), hydrateSink);
555     l1Listener.routeMessageType(TCMessageType.JMXREMOTE_MESSAGE_CONNECTION_MESSAGE, jmxRemoteTunnelStage.getSink(),
556                                 hydrateSink);
557     l1Listener.routeMessageType(TCMessageType.CLIENT_JMX_READY_MESSAGE, jmxRemoteTunnelStage.getSink(), hydrateSink);
558
559     ObjectRequestManager objectRequestManager = new ObjectRequestManagerImpl(objectManager, transactionManager);
560
561     l2DSOConfig.changesInItemIgnored(l2DSOConfig.clientReconnectWindow());
562     long reconnectTimeout = l2DSOConfig.clientReconnectWindow().getInt();
563     logger.debug("Client Reconnect Window: " + reconnectTimeout + " seconds");
564     reconnectTimeout *= 1000;
565     ServerClientHandshakeManager clientHandshakeManager = new ServerClientHandshakeManager(
566                                                                                            TCLogging
567                                                                                                .getLogger(ServerClientHandshakeManager.class),
568                                                                                            channelManager,
569                                                                                            objectManager,
570                                                                                            sequenceValidator,
571                                                                                            clientStateManager,
572                                                                                            lockManager,
573                                                                                            transactionManager,
574                                                                                            stageManager
575                                                                                                .getStage(
576                                                                                                          ServerConfigurationContext.RESPOND_TO_LOCK_REQUEST_STAGE)
577                                                                                                .getSink(),
578                                                                                            objectStore,
579                                                                                            new TCTimerImpl(
580                                                                                                            "Reconnect timer",
581                                                                                                            true),
582                                                                                            reconnectTimeout, persistent);
583
584     boolean networkedHA = l2Properties.getBoolean("ha.network.enabled");
585     if (networkedHA) {
586       logger.info("L2 Networked HA Enabled ");
587       l2Coordinator = new L2HACoordinator(consoleLogger, this, stageManager, persistor.getClusterStateStore(),
588                                           objectManager);
589     } else {
590       l2Coordinator = new L2HADisabledCooridinator();
591     }
592
593     context = new ServerConfigurationContextImpl(stageManager, objectManager, objectRequestManager, objectStore,
594                                                  lockManager, channelManager, clientStateManager, transactionManager,
595                                                  txnObjectManager, clientHandshakeManager, channelStats, l2Coordinator,
596                                                  new CommitTransactionMessageToTransactionBatchReader());
597
598     stageManager.startAll(context);
599
600     DSOGlobalServerStats serverStats = new DSOGlobalServerStatsImpl(globalObjectFlushCounter, globalObjectFaultCounter,
601                                                                     globalTxnCounter, objMgrStats);
602
603     // XXX: yucky casts
604
managementContext = new ServerManagementContext((ServerTransactionManagerMBean) transactionManager,
605                                                     (ObjectManagerMBean) objectManager, (LockManagerMBean) lockManager,
606                                                     (DSOChannelManagerMBean) channelManager, serverStats, channelStats,
607                                                     instanceMonitor, appEvents);
608
609     if (l2Properties.getBoolean("beanshell.enabled")) startBeanShell(l2Properties.getInt("beanshell.port"));
610     
611     if (networkedHA) {
612       l2Coordinator.start();
613     } else {
614       // In non-network enabled HA, Only active server reached here.
615
startActiveMode();
616     }
617   }
618
619   public boolean startActiveMode() throws IOException JavaDoc {
620     Set JavaDoc existingConnections = Collections.unmodifiableSet(connectionIdFactory.loadConnectionIDs());
621     context.getClientHandshakeManager().setStarting(existingConnections);
622     l1Listener.start(existingConnections);
623     consoleLogger.info("Terracotta Server has started up as ACTIVE node on port " + l1Listener.getBindPort()
624                        + " successfully, and is now ready for work.");
625     return true;
626   }
627
628   public boolean stopActiveMode() throws TCTimeoutException {
629     // TODO:: Make this not take timeout and force stop
630
consoleLogger.info("Stopping ACTIVE Terracotta Server on port " + l1Listener.getBindPort() + ".");
631     l1Listener.stop(10000);
632     l1Listener.getChannelManager().closeAllChannels();
633     return true;
634   }
635
636   private void startBeanShell(int port) {
637     try {
638       Interpreter i = new Interpreter();
639       i.set("dsoServer", this);
640       i.set("objectManager", objectManager);
641       i.set("txnObjectManager", txnObjectManager);
642       i.set("portnum", port);
643       i.eval("setAccessibility(true)"); // turn off access restrictions
644
i.eval("server(portnum)");
645       consoleLogger.info("Bean shell is started on port " + port);
646     } catch (EvalError e) {
647       e.printStackTrace();
648     }
649   }
650
651   public int getListenPort() {
652     return this.l1Listener.getBindPort();
653   }
654
655   public synchronized void stop() {
656     try {
657       if (lockManager != null) lockManager.stop();
658     } catch (InterruptedException JavaDoc e) {
659       logger.error(e);
660     }
661
662     getStageManager().stopAll();
663
664     if (l1Listener != null) {
665       try {
666         l1Listener.stop(5000);
667       } catch (TCTimeoutException e) {
668         logger.warn("timeout trying to stop listener: " + e.getMessage());
669       }
670     }
671
672     if ((communicationsManager != null)) {
673       communicationsManager.shutdown();
674     }
675
676     if (objectManager != null) {
677       try {
678         objectManager.stop();
679       } catch (Throwable JavaDoc e) {
680         logger.error(e);
681       }
682     }
683
684     clientStateManager.stop();
685
686     try {
687       objectStore.shutdown();
688     } catch (Throwable JavaDoc e) {
689       logger.warn(e);
690     }
691
692     try {
693       persistor.close();
694     } catch (DBException e) {
695       logger.warn(e);
696     }
697
698     if (sampledCounterManager != null) {
699       try {
700         sampledCounterManager.shutdown();
701       } catch (Exception JavaDoc e) {
702         logger.error(e);
703       }
704     }
705
706     try {
707       stopJMXServer();
708     } catch (Throwable JavaDoc t) {
709       logger.error("Error shutting down jmx server", t);
710     }
711
712     basicStop();
713   }
714
715   public void quickStop() {
716     try {
717       stopJMXServer();
718     } catch (Throwable JavaDoc t) {
719       logger.error("Error shutting down jmx server", t);
720     }
721
722     // XXX: not calling basicStop() here, it creates a race condition with the Sleepycat's own writer lock (see
723
// LKC-3239) Provided we ever fix graceful server shutdown, we'll want to uncommnet this at that time and/or get rid
724
// of this method completely
725

726     // basicStop();
727
}
728
729   private void basicStop() {
730     if (startupLock != null) {
731       startupLock.release();
732     }
733   }
734
735   public ConnectionIDFactory getConnectionIdFactory() {
736     return connectionIdFactory;
737   }
738
739   public ManagedObjectStore getManagedObjectStore() {
740     return objectStore;
741   }
742
743   public ServerConfigurationContext getContext() {
744     return context;
745   }
746
747   public ServerManagementContext getManagementContext() {
748     return managementContext;
749   }
750
751   public MBeanServer JavaDoc getMBeanServer() {
752     return l2Management.getMBeanServer();
753   }
754
755   private void startJMXServer() throws Exception JavaDoc {
756     l2Management = new L2Management(tcServerInfoMBean, configSetupManager);
757     
758     /*
759      * Some tests use this if they run with jdk1.4 and start multiple in-process
760      * DistributedObjectServers. When we no longer support 1.4, this can be
761      * removed. See com.tctest.LockManagerSystemTest.
762      */

763     if(!Boolean.getBoolean("org.terracotta.server.disableJmxConnector")) {
764       l2Management.start();
765     }
766   }
767
768   private void stopJMXServer() throws Exception JavaDoc {
769     try {
770       if (l2Management != null) {
771         l2Management.stop();
772       }
773     } finally {
774       l2Management = null;
775     }
776   }
777 }
778
Popular Tags