KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > object > DistributedObjectClient


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.object;
6
7 import com.tc.async.api.SEDA;
8 import com.tc.async.api.Sink;
9 import com.tc.async.api.Stage;
10 import com.tc.async.api.StageManager;
11 import com.tc.cluster.Cluster;
12 import com.tc.config.schema.dynamic.ConfigItem;
13 import com.tc.lang.TCThreadGroup;
14 import com.tc.logging.ChannelIDLogger;
15 import com.tc.logging.ChannelIDLoggerProvider;
16 import com.tc.logging.CustomerLogging;
17 import com.tc.logging.TCLogger;
18 import com.tc.logging.TCLogging;
19 import com.tc.management.L1Management;
20 import com.tc.management.beans.sessions.SessionMonitorMBean;
21 import com.tc.management.remote.protocol.terracotta.JmxRemoteTunnelMessage;
22 import com.tc.management.remote.protocol.terracotta.L1JmxReady;
23 import com.tc.management.remote.protocol.terracotta.TunnelingEventHandler;
24 import com.tc.net.MaxConnectionsExceededException;
25 import com.tc.net.core.ConnectionInfo;
26 import com.tc.net.protocol.PlainNetworkStackHarnessFactory;
27 import com.tc.net.protocol.tcm.CommunicationsManager;
28 import com.tc.net.protocol.tcm.CommunicationsManagerImpl;
29 import com.tc.net.protocol.tcm.HydrateHandler;
30 import com.tc.net.protocol.tcm.NullMessageMonitor;
31 import com.tc.net.protocol.tcm.TCMessageType;
32 import com.tc.net.protocol.transport.NullConnectionPolicy;
33 import com.tc.object.bytecode.Manager;
34 import com.tc.object.bytecode.hook.impl.PreparedComponentsFromL2Connection;
35 import com.tc.object.cache.CacheConfigImpl;
36 import com.tc.object.cache.CacheManager;
37 import com.tc.object.cache.ClockEvictionPolicy;
38 import com.tc.object.config.DSOClientConfigHelper;
39 import com.tc.object.dna.impl.DNAEncoding;
40 import com.tc.object.event.DmiManager;
41 import com.tc.object.event.DmiManagerImpl;
42 import com.tc.object.field.TCFieldFactory;
43 import com.tc.object.gtx.ClientGlobalTransactionManager;
44 import com.tc.object.gtx.ClientGlobalTransactionManagerImpl;
45 import com.tc.object.handler.BatchTransactionAckHandler;
46 import com.tc.object.handler.ClientCoordinationHandler;
47 import com.tc.object.handler.DmiHandler;
48 import com.tc.object.handler.LockResponseHandler;
49 import com.tc.object.handler.ReceiveObjectHandler;
50 import com.tc.object.handler.ReceiveRootIDHandler;
51 import com.tc.object.handler.ReceiveTransactionCompleteHandler;
52 import com.tc.object.handler.ReceiveTransactionHandler;
53 import com.tc.object.handshakemanager.ClientHandshakeManager;
54 import com.tc.object.idprovider.api.ObjectIDProvider;
55 import com.tc.object.idprovider.impl.ObjectIDProviderImpl;
56 import com.tc.object.idprovider.impl.RemoteObjectIDBatchSequenceProvider;
57 import com.tc.object.loaders.ClassProvider;
58 import com.tc.object.lockmanager.api.ClientLockManager;
59 import com.tc.object.lockmanager.impl.ClientLockManagerImpl;
60 import com.tc.object.lockmanager.impl.RemoteLockManagerImpl;
61 import com.tc.object.lockmanager.impl.ThreadLockManagerImpl;
62 import com.tc.object.logging.RuntimeLogger;
63 import com.tc.object.logging.RuntimeLoggerImpl;
64 import com.tc.object.msg.AcknowledgeTransactionMessageImpl;
65 import com.tc.object.msg.BatchTransactionAcknowledgeMessageImpl;
66 import com.tc.object.msg.BroadcastTransactionMessageImpl;
67 import com.tc.object.msg.ClientHandshakeAckMessageImpl;
68 import com.tc.object.msg.ClientHandshakeMessageImpl;
69 import com.tc.object.msg.ClusterMembershipMessage;
70 import com.tc.object.msg.CommitTransactionMessageImpl;
71 import com.tc.object.msg.JMXMessage;
72 import com.tc.object.msg.LockRequestMessage;
73 import com.tc.object.msg.LockResponseMessage;
74 import com.tc.object.msg.ObjectIDBatchRequestMessage;
75 import com.tc.object.msg.ObjectIDBatchRequestResponseMessage;
76 import com.tc.object.msg.RequestManagedObjectMessageImpl;
77 import com.tc.object.msg.RequestManagedObjectResponseMessage;
78 import com.tc.object.msg.RequestRootMessageImpl;
79 import com.tc.object.msg.RequestRootResponseMessage;
80 import com.tc.object.net.DSOClientMessageChannel;
81 import com.tc.object.session.SessionManager;
82 import com.tc.object.session.SessionManagerImpl;
83 import com.tc.object.session.SessionProvider;
84 import com.tc.object.tx.ClientTransactionFactory;
85 import com.tc.object.tx.ClientTransactionFactoryImpl;
86 import com.tc.object.tx.ClientTransactionManager;
87 import com.tc.object.tx.ClientTransactionManagerImpl;
88 import com.tc.object.tx.LockAccounting;
89 import com.tc.object.tx.RemoteTransactionManager;
90 import com.tc.object.tx.RemoteTransactionManagerImpl;
91 import com.tc.object.tx.TransactionBatchAccounting;
92 import com.tc.object.tx.TransactionBatchFactory;
93 import com.tc.object.tx.TransactionBatchWriterFactory;
94 import com.tc.properties.TCProperties;
95 import com.tc.properties.TCPropertiesImpl;
96 import com.tc.util.Assert;
97 import com.tc.util.TCTimeoutException;
98 import com.tc.util.concurrent.ThreadUtil;
99 import com.tc.util.sequence.BatchSequence;
100 import com.tc.util.sequence.Sequence;
101 import com.tc.util.sequence.SimpleSequence;
102
103 import java.io.IOException JavaDoc;
104 import java.net.ConnectException JavaDoc;
105 import java.util.Collection JavaDoc;
106 import java.util.Collections JavaDoc;
107
108 /**
109  * This is the main point of entry into the DSO client.
110  */

111 public class DistributedObjectClient extends SEDA {
112
113   private static final TCLogger logger = CustomerLogging.getDSOGenericLogger();
114   private static final TCLogger consoleLogger = CustomerLogging.getConsoleLogger();
115
116   private final DSOClientConfigHelper config;
117   private final ClassProvider classProvider;
118   private final PreparedComponentsFromL2Connection connectionComponents;
119   private final Manager manager;
120   private final Cluster cluster;
121
122   private DSOClientMessageChannel channel;
123   private ClientLockManager lockManager;
124   private ClientObjectManagerImpl objectManager;
125   private ClientTransactionManager txManager;
126   private CommunicationsManager communicationsManager;
127   private RemoteTransactionManager rtxManager;
128   private PauseListener pauseListener;
129   private ClientHandshakeManager clientHandshakeManager;
130   private RuntimeLogger runtimeLogger;
131   private CacheManager cacheManager;
132   private L1Management l1Management;
133   private TCProperties l1Properties;
134   private DmiManager dmiManager;
135
136   public DistributedObjectClient(DSOClientConfigHelper config, TCThreadGroup threadGroup, ClassProvider classProvider,
137                                  PreparedComponentsFromL2Connection connectionComponents, Manager manager,
138                                  Cluster cluster) {
139     super(threadGroup);
140     Assert.assertNotNull(config);
141     this.config = config;
142     this.classProvider = classProvider;
143     this.connectionComponents = connectionComponents;
144     this.pauseListener = new NullPauseListener();
145     this.manager = manager;
146     this.cluster = cluster;
147   }
148
149   public void setPauseListener(PauseListener pauseListener) {
150     this.pauseListener = pauseListener;
151   }
152
153   public void start() {
154     int maxSize = 50000;
155     int faultCount = config.getFaultCount();
156
157     final Sequence sessionSequence = new SimpleSequence();
158     final SessionManager sessionManager = new SessionManagerImpl(sessionSequence);
159     final SessionProvider sessionProvider = (SessionProvider) sessionManager;
160
161     communicationsManager = new CommunicationsManagerImpl(new NullMessageMonitor(),
162                                                           new PlainNetworkStackHarnessFactory(),
163                                                           new NullConnectionPolicy());
164
165     logger.debug("Created CommunicationsManager.");
166
167     ConfigItem connectionInfoItem = this.connectionComponents.createConnectionInfoConfigItem();
168     ConnectionInfo[] connectionInfo = (ConnectionInfo[]) connectionInfoItem.getObject();
169
170     String JavaDoc serverHost = connectionInfo[0].getHostname();
171     int serverPort = connectionInfo[0].getPort();
172
173     channel = new DSOClientMessageChannelImpl(communicationsManager.createClientChannel(sessionProvider, -1,
174                                                                                         serverHost, serverPort, 10000,
175                                                                                         connectionInfoItem));
176     this.runtimeLogger = new RuntimeLoggerImpl(config);
177
178     logger.debug("Created channel.");
179
180     ChannelIDLoggerProvider cidLoggerProvider = new ChannelIDLoggerProvider(channel.getChannelIDProvider());
181
182     ClientTransactionFactory txFactory = new ClientTransactionFactoryImpl(runtimeLogger, channel.getChannelIDProvider());
183
184     TransactionBatchFactory txBatchFactory = new TransactionBatchWriterFactory(channel
185         .getCommitTransactionMessageFactory(), new DNAEncoding(classProvider));
186
187     rtxManager = new RemoteTransactionManagerImpl(new ChannelIDLogger(channel.getChannelIDProvider(), TCLogging
188         .getLogger(RemoteTransactionManagerImpl.class)), txBatchFactory, new TransactionBatchAccounting(),
189                                                   new LockAccounting(), sessionManager, channel);
190
191     ClientGlobalTransactionManager gtxManager = new ClientGlobalTransactionManagerImpl(rtxManager);
192
193     lockManager = new ClientLockManagerImpl(new ChannelIDLogger(channel.getChannelIDProvider(), TCLogging
194         .getLogger(ClientLockManager.class)), new RemoteLockManagerImpl(channel.getLockRequestMessageFactory(),
195                                                                         gtxManager), sessionManager);
196
197     RemoteObjectManager remoteObjectManager = new RemoteObjectManagerImpl(new ChannelIDLogger(channel
198         .getChannelIDProvider(), TCLogging.getLogger(RemoteObjectManager.class)), channel.getChannelIDProvider(),
199                                                                           channel.getRequestRootMessageFactory(),
200                                                                           channel
201                                                                               .getRequestManagedObjectMessageFactory(),
202                                                                           new NullObjectRequestMonitor(), faultCount,
203                                                                           sessionManager);
204
205     RemoteObjectIDBatchSequenceProvider remoteIDProvider = new RemoteObjectIDBatchSequenceProvider(channel
206         .getObjectIDBatchRequestMessageFactory());
207     BatchSequence sequence = new BatchSequence(remoteIDProvider, 50000);
208     ObjectIDProvider idProvider = new ObjectIDProviderImpl(sequence);
209
210     TCClassFactory classFactory = new TCClassFactoryImpl(new TCFieldFactory(config), config, classProvider);
211     TCObjectFactory objectFactory = new TCObjectFactoryImpl(classFactory);
212
213     objectManager = new ClientObjectManagerImpl(remoteObjectManager, config, idProvider, new ClockEvictionPolicy(-1),
214                                                 runtimeLogger, channel.getChannelIDProvider(), classProvider,
215                                                 classFactory, objectFactory, config.getPortability(), channel);
216
217     l1Properties = TCPropertiesImpl.getProperties().getPropertiesFor("l1");
218     TCProperties cacheManagerProperties = l1Properties.getPropertiesFor("cachemanager");
219     if (cacheManagerProperties.getBoolean("enabled")) {
220       this.cacheManager = new CacheManager(objectManager, new CacheConfigImpl(cacheManagerProperties));
221       if (logger.isDebugEnabled()) {
222         logger.debug("CacheManager Enabled : " + cacheManager);
223       }
224     } else {
225       logger.warn("CacheManager is Disabled");
226     }
227
228     // Set up the JMX management stuff
229
final TunnelingEventHandler teh = new TunnelingEventHandler(channel.channel());
230     l1Management = new L1Management(teh);
231     l1Management.start();
232
233     txManager = new ClientTransactionManagerImpl(channel.getChannelIDProvider(), objectManager,
234                                                  new ThreadLockManagerImpl(lockManager), txFactory, rtxManager,
235                                                  runtimeLogger, l1Management.findClientTxMonitorMBean());
236
237     StageManager stageManager = getStageManager();
238
239     // stageManager.turnTracingOn();
240
stageManager.setLoggerProvider(cidLoggerProvider);
241
242     Stage lockResponse = stageManager.createStage(ClientConfigurationContext.LOCK_RESPONSE_STAGE,
243                                                   new LockResponseHandler(sessionManager), 1, maxSize);
244     Stage receiveRootID = stageManager.createStage(ClientConfigurationContext.RECEIVE_ROOT_ID_STAGE,
245                                                    new ReceiveRootIDHandler(), 1, maxSize);
246     Stage receiveObject = stageManager.createStage(ClientConfigurationContext.RECEIVE_OBJECT_STAGE,
247                                                    new ReceiveObjectHandler(), 1, maxSize);
248     this.dmiManager = new DmiManagerImpl(classProvider, objectManager, runtimeLogger);
249     Stage dmiStage = stageManager.createStage(ClientConfigurationContext.DMI_STAGE, new DmiHandler(dmiManager), 1,
250                                               maxSize);
251
252     Stage receiveTransaction = stageManager
253         .createStage(ClientConfigurationContext.RECEIVE_TRANSACTION_STAGE,
254                      new ReceiveTransactionHandler(channel.getChannelIDProvider(), channel
255                          .getAcknowledgeTransactionMessageFactory(), gtxManager, sessionManager, dmiStage.getSink()),
256                      1, maxSize);
257     Stage oidRequestResponse = stageManager.createStage(ClientConfigurationContext.OBJECT_ID_REQUEST_RESPONSE_STAGE,
258                                                         remoteIDProvider, 1, maxSize);
259     Stage transactionResponse = stageManager.createStage(ClientConfigurationContext.RECEIVE_TRANSACTION_COMPLETE_STAGE,
260                                                          new ReceiveTransactionCompleteHandler(), 1, maxSize);
261     Stage hydrateStage = stageManager.createStage(ClientConfigurationContext.HYDRATE_MESSAGE_STAGE,
262                                                   new HydrateHandler(), 1, maxSize);
263     Stage batchTxnAckStage = stageManager.createStage(ClientConfigurationContext.BATCH_TXN_ACK_STAGE,
264                                                       new BatchTransactionAckHandler(), 1, maxSize);
265
266     // By design this stage needs to be single threaded. If it wasn't then cluster memebership messages could get
267
// processed before the client handshake ack, and this client would get a faulty view of the cluster at best, or
268
// more likely an AssertionError
269
Stage pauseStage = stageManager.createStage(ClientConfigurationContext.CLIENT_COORDINATION_STAGE,
270                                                 new ClientCoordinationHandler(cluster), 1, maxSize);
271
272     final Stage jmxRemoteTunnelStage = stageManager.createStage(ClientConfigurationContext.JMXREMOTE_TUNNEL_STAGE, teh,
273                                                                 1, maxSize);
274
275     // This set is designed to give the handshake manager an opportunity to pause stages when it is pausing due to
276
// disconnect. Unfortunately, the lock response stage can block, which I didn't realize at the time, so it's not
277
// being used.
278
Collection JavaDoc stagesToPauseOnDisconnect = Collections.EMPTY_LIST;
279     clientHandshakeManager = new ClientHandshakeManager(new ChannelIDLogger(channel.getChannelIDProvider(), TCLogging
280         .getLogger(ClientHandshakeManager.class)), channel.getChannelIDProvider(), channel
281         .getClientHandshakeMessageFactory(), objectManager, remoteObjectManager, lockManager, rtxManager, gtxManager,
282                                                         stagesToPauseOnDisconnect, pauseStage.getSink(),
283                                                         sessionManager, pauseListener, sequence, cluster);
284     channel.addListener(clientHandshakeManager);
285
286     ClientConfigurationContext cc = new ClientConfigurationContext(stageManager, lockManager, remoteObjectManager,
287                                                                    txManager, clientHandshakeManager);
288     stageManager.startAll(cc);
289
290     channel.addClassMapping(TCMessageType.BATCH_TRANSACTION_ACK_MESSAGE, BatchTransactionAcknowledgeMessageImpl.class);
291     channel.addClassMapping(TCMessageType.REQUEST_ROOT_MESSAGE, RequestRootMessageImpl.class);
292     channel.addClassMapping(TCMessageType.LOCK_REQUEST_MESSAGE, LockRequestMessage.class);
293     channel.addClassMapping(TCMessageType.LOCK_RESPONSE_MESSAGE, LockResponseMessage.class);
294     channel.addClassMapping(TCMessageType.LOCK_RECALL_MESSAGE, LockResponseMessage.class);
295     channel.addClassMapping(TCMessageType.LOCK_QUERY_RESPONSE_MESSAGE, LockResponseMessage.class);
296     channel.addClassMapping(TCMessageType.COMMIT_TRANSACTION_MESSAGE, CommitTransactionMessageImpl.class);
297     channel.addClassMapping(TCMessageType.REQUEST_ROOT_RESPONSE_MESSAGE, RequestRootResponseMessage.class);
298     channel.addClassMapping(TCMessageType.REQUEST_MANAGED_OBJECT_MESSAGE, RequestManagedObjectMessageImpl.class);
299     channel.addClassMapping(TCMessageType.REQUEST_MANAGED_OBJECT_RESPONSE_MESSAGE,
300                             RequestManagedObjectResponseMessage.class);
301     channel.addClassMapping(TCMessageType.BROADCAST_TRANSACTION_MESSAGE, BroadcastTransactionMessageImpl.class);
302     channel.addClassMapping(TCMessageType.OBJECT_ID_BATCH_REQUEST_MESSAGE, ObjectIDBatchRequestMessage.class);
303     channel.addClassMapping(TCMessageType.OBJECT_ID_BATCH_REQUEST_RESPONSE_MESSAGE,
304                             ObjectIDBatchRequestResponseMessage.class);
305     channel.addClassMapping(TCMessageType.ACKNOWLEDGE_TRANSACTION_MESSAGE, AcknowledgeTransactionMessageImpl.class);
306     channel.addClassMapping(TCMessageType.CLIENT_HANDSHAKE_MESSAGE, ClientHandshakeMessageImpl.class);
307     channel.addClassMapping(TCMessageType.CLIENT_HANDSHAKE_ACK_MESSAGE, ClientHandshakeAckMessageImpl.class);
308     channel.addClassMapping(TCMessageType.JMX_MESSAGE, JMXMessage.class);
309     channel.addClassMapping(TCMessageType.JMXREMOTE_MESSAGE_CONNECTION_MESSAGE, JmxRemoteTunnelMessage.class);
310     channel.addClassMapping(TCMessageType.CLUSTER_MEMBERSHIP_EVENT_MESSAGE, ClusterMembershipMessage.class);
311     channel.addClassMapping(TCMessageType.CLIENT_JMX_READY_MESSAGE, L1JmxReady.class);
312
313     logger.debug("Added class mappings.");
314
315     Sink hydrateSink = hydrateStage.getSink();
316     channel.routeMessageType(TCMessageType.LOCK_RESPONSE_MESSAGE, lockResponse.getSink(), hydrateSink);
317     channel.routeMessageType(TCMessageType.LOCK_QUERY_RESPONSE_MESSAGE, lockResponse.getSink(), hydrateSink);
318     channel.routeMessageType(TCMessageType.LOCK_RECALL_MESSAGE, lockResponse.getSink(), hydrateSink);
319     channel.routeMessageType(TCMessageType.REQUEST_ROOT_RESPONSE_MESSAGE, receiveRootID.getSink(), hydrateSink);
320     channel.routeMessageType(TCMessageType.REQUEST_MANAGED_OBJECT_RESPONSE_MESSAGE, receiveObject.getSink(),
321                              hydrateSink);
322     channel.routeMessageType(TCMessageType.BROADCAST_TRANSACTION_MESSAGE, receiveTransaction.getSink(), hydrateSink);
323     channel.routeMessageType(TCMessageType.OBJECT_ID_BATCH_REQUEST_RESPONSE_MESSAGE, oidRequestResponse.getSink(),
324                              hydrateSink);
325     channel.routeMessageType(TCMessageType.ACKNOWLEDGE_TRANSACTION_MESSAGE, transactionResponse.getSink(), hydrateSink);
326     channel.routeMessageType(TCMessageType.BATCH_TRANSACTION_ACK_MESSAGE, batchTxnAckStage.getSink(), hydrateSink);
327     channel.routeMessageType(TCMessageType.CLIENT_HANDSHAKE_ACK_MESSAGE, pauseStage.getSink(), hydrateSink);
328     channel.routeMessageType(TCMessageType.JMXREMOTE_MESSAGE_CONNECTION_MESSAGE, jmxRemoteTunnelStage.getSink(),
329                              hydrateSink);
330     channel.routeMessageType(TCMessageType.CLUSTER_MEMBERSHIP_EVENT_MESSAGE, pauseStage.getSink(), hydrateSink);
331
332     while (true) {
333       try {
334         logger.debug("Trying to open channel....");
335         channel.open();
336         logger.debug("Channel open");
337         break;
338       } catch (TCTimeoutException tcte) {
339         consoleLogger.warn("Timeout connecting to server: " + serverHost + ":" + serverPort + ". " + tcte.getMessage());
340         ThreadUtil.reallySleep(5000);
341       } catch (ConnectException JavaDoc e) {
342         consoleLogger.warn("Connection refused from server: " + serverHost + ":" + serverPort);
343         ThreadUtil.reallySleep(5000);
344       } catch (MaxConnectionsExceededException e) {
345         consoleLogger.warn("Connection refused MAXIMUM CONNECTIONS TO SERVER EXCEEDED: " + serverHost + ":"
346                            + serverPort);
347         ThreadUtil.reallySleep(5000);
348       } catch (IOException JavaDoc ioe) {
349         ioe.printStackTrace();
350         throw new RuntimeException JavaDoc(ioe);
351       }
352     }
353
354     clientHandshakeManager.waitForHandshake();
355
356     cluster.addClusterEventListener(l1Management.getTerracottaCluster());
357   }
358
359   public void stop() {
360     manager.stop();
361   }
362
363   public ClientLockManager getLockManager() {
364     return lockManager;
365   }
366
367   public ClientTransactionManager getTransactionManager() {
368     return txManager;
369   }
370
371   public ClientObjectManager getObjectManager() {
372     return objectManager;
373   }
374
375   public RemoteTransactionManager getRemoteTransactionManager() {
376     return rtxManager;
377   }
378
379   public CommunicationsManager getCommunicationsManager() {
380     return communicationsManager;
381   }
382
383   public DSOClientMessageChannel getChannel() {
384     return channel;
385   }
386
387   public ClientHandshakeManager getClientHandshakeManager() {
388     return clientHandshakeManager;
389   }
390
391   public RuntimeLogger getRuntimeLogger() {
392     return runtimeLogger;
393   }
394
395   public SessionMonitorMBean getSessionMonitorMBean() {
396     return l1Management.findSessionMonitorMBean();
397   }
398
399   public DmiManager getDmiManager() {
400     return dmiManager;
401   }
402
403 }
404
Popular Tags