1 5 package com.tc.object; 6 7 import com.tc.async.api.SEDA; 8 import com.tc.async.api.Sink; 9 import com.tc.async.api.Stage; 10 import com.tc.async.api.StageManager; 11 import com.tc.cluster.Cluster; 12 import com.tc.config.schema.dynamic.ConfigItem; 13 import com.tc.lang.TCThreadGroup; 14 import com.tc.logging.ChannelIDLogger; 15 import com.tc.logging.ChannelIDLoggerProvider; 16 import com.tc.logging.CustomerLogging; 17 import com.tc.logging.TCLogger; 18 import com.tc.logging.TCLogging; 19 import com.tc.management.L1Management; 20 import com.tc.management.beans.sessions.SessionMonitorMBean; 21 import com.tc.management.remote.protocol.terracotta.JmxRemoteTunnelMessage; 22 import com.tc.management.remote.protocol.terracotta.L1JmxReady; 23 import com.tc.management.remote.protocol.terracotta.TunnelingEventHandler; 24 import com.tc.net.MaxConnectionsExceededException; 25 import com.tc.net.core.ConnectionInfo; 26 import com.tc.net.protocol.PlainNetworkStackHarnessFactory; 27 import com.tc.net.protocol.tcm.CommunicationsManager; 28 import com.tc.net.protocol.tcm.CommunicationsManagerImpl; 29 import com.tc.net.protocol.tcm.HydrateHandler; 30 import com.tc.net.protocol.tcm.NullMessageMonitor; 31 import com.tc.net.protocol.tcm.TCMessageType; 32 import com.tc.net.protocol.transport.NullConnectionPolicy; 33 import com.tc.object.bytecode.Manager; 34 import com.tc.object.bytecode.hook.impl.PreparedComponentsFromL2Connection; 35 import com.tc.object.cache.CacheConfigImpl; 36 import com.tc.object.cache.CacheManager; 37 import com.tc.object.cache.ClockEvictionPolicy; 38 import com.tc.object.config.DSOClientConfigHelper; 39 import com.tc.object.dna.impl.DNAEncoding; 40 import com.tc.object.event.DmiManager; 41 import com.tc.object.event.DmiManagerImpl; 42 import com.tc.object.field.TCFieldFactory; 43 import com.tc.object.gtx.ClientGlobalTransactionManager; 44 import com.tc.object.gtx.ClientGlobalTransactionManagerImpl; 45 import com.tc.object.handler.BatchTransactionAckHandler; 46 import com.tc.object.handler.ClientCoordinationHandler; 47 import com.tc.object.handler.DmiHandler; 48 import com.tc.object.handler.LockResponseHandler; 49 import com.tc.object.handler.ReceiveObjectHandler; 50 import com.tc.object.handler.ReceiveRootIDHandler; 51 import com.tc.object.handler.ReceiveTransactionCompleteHandler; 52 import com.tc.object.handler.ReceiveTransactionHandler; 53 import com.tc.object.handshakemanager.ClientHandshakeManager; 54 import com.tc.object.idprovider.api.ObjectIDProvider; 55 import com.tc.object.idprovider.impl.ObjectIDProviderImpl; 56 import com.tc.object.idprovider.impl.RemoteObjectIDBatchSequenceProvider; 57 import com.tc.object.loaders.ClassProvider; 58 import com.tc.object.lockmanager.api.ClientLockManager; 59 import com.tc.object.lockmanager.impl.ClientLockManagerImpl; 60 import com.tc.object.lockmanager.impl.RemoteLockManagerImpl; 61 import com.tc.object.lockmanager.impl.ThreadLockManagerImpl; 62 import com.tc.object.logging.RuntimeLogger; 63 import com.tc.object.logging.RuntimeLoggerImpl; 64 import com.tc.object.msg.AcknowledgeTransactionMessageImpl; 65 import com.tc.object.msg.BatchTransactionAcknowledgeMessageImpl; 66 import com.tc.object.msg.BroadcastTransactionMessageImpl; 67 import com.tc.object.msg.ClientHandshakeAckMessageImpl; 68 import com.tc.object.msg.ClientHandshakeMessageImpl; 69 import com.tc.object.msg.ClusterMembershipMessage; 70 import com.tc.object.msg.CommitTransactionMessageImpl; 71 import com.tc.object.msg.JMXMessage; 72 import com.tc.object.msg.LockRequestMessage; 73 import com.tc.object.msg.LockResponseMessage; 74 import com.tc.object.msg.ObjectIDBatchRequestMessage; 75 import com.tc.object.msg.ObjectIDBatchRequestResponseMessage; 76 import com.tc.object.msg.RequestManagedObjectMessageImpl; 77 import com.tc.object.msg.RequestManagedObjectResponseMessage; 78 import com.tc.object.msg.RequestRootMessageImpl; 79 import com.tc.object.msg.RequestRootResponseMessage; 80 import com.tc.object.net.DSOClientMessageChannel; 81 import com.tc.object.session.SessionManager; 82 import com.tc.object.session.SessionManagerImpl; 83 import com.tc.object.session.SessionProvider; 84 import com.tc.object.tx.ClientTransactionFactory; 85 import com.tc.object.tx.ClientTransactionFactoryImpl; 86 import com.tc.object.tx.ClientTransactionManager; 87 import com.tc.object.tx.ClientTransactionManagerImpl; 88 import com.tc.object.tx.LockAccounting; 89 import com.tc.object.tx.RemoteTransactionManager; 90 import com.tc.object.tx.RemoteTransactionManagerImpl; 91 import com.tc.object.tx.TransactionBatchAccounting; 92 import com.tc.object.tx.TransactionBatchFactory; 93 import com.tc.object.tx.TransactionBatchWriterFactory; 94 import com.tc.properties.TCProperties; 95 import com.tc.properties.TCPropertiesImpl; 96 import com.tc.util.Assert; 97 import com.tc.util.TCTimeoutException; 98 import com.tc.util.concurrent.ThreadUtil; 99 import com.tc.util.sequence.BatchSequence; 100 import com.tc.util.sequence.Sequence; 101 import com.tc.util.sequence.SimpleSequence; 102 103 import java.io.IOException ; 104 import java.net.ConnectException ; 105 import java.util.Collection ; 106 import java.util.Collections ; 107 108 111 public class DistributedObjectClient extends SEDA { 112 113 private static final TCLogger logger = CustomerLogging.getDSOGenericLogger(); 114 private static final TCLogger consoleLogger = CustomerLogging.getConsoleLogger(); 115 116 private final DSOClientConfigHelper config; 117 private final ClassProvider classProvider; 118 private final PreparedComponentsFromL2Connection connectionComponents; 119 private final Manager manager; 120 private final Cluster cluster; 121 122 private DSOClientMessageChannel channel; 123 private ClientLockManager lockManager; 124 private ClientObjectManagerImpl objectManager; 125 private ClientTransactionManager txManager; 126 private CommunicationsManager communicationsManager; 127 private RemoteTransactionManager rtxManager; 128 private PauseListener pauseListener; 129 private ClientHandshakeManager clientHandshakeManager; 130 private RuntimeLogger runtimeLogger; 131 private CacheManager cacheManager; 132 private L1Management l1Management; 133 private TCProperties l1Properties; 134 private DmiManager dmiManager; 135 136 public DistributedObjectClient(DSOClientConfigHelper config, TCThreadGroup threadGroup, ClassProvider classProvider, 137 PreparedComponentsFromL2Connection connectionComponents, Manager manager, 138 Cluster cluster) { 139 super(threadGroup); 140 Assert.assertNotNull(config); 141 this.config = config; 142 this.classProvider = classProvider; 143 this.connectionComponents = connectionComponents; 144 this.pauseListener = new NullPauseListener(); 145 this.manager = manager; 146 this.cluster = cluster; 147 } 148 149 public void setPauseListener(PauseListener pauseListener) { 150 this.pauseListener = pauseListener; 151 } 152 153 public void start() { 154 int maxSize = 50000; 155 int faultCount = config.getFaultCount(); 156 157 final Sequence sessionSequence = new SimpleSequence(); 158 final SessionManager sessionManager = new SessionManagerImpl(sessionSequence); 159 final SessionProvider sessionProvider = (SessionProvider) sessionManager; 160 161 communicationsManager = new CommunicationsManagerImpl(new NullMessageMonitor(), 162 new PlainNetworkStackHarnessFactory(), 163 new NullConnectionPolicy()); 164 165 logger.debug("Created CommunicationsManager."); 166 167 ConfigItem connectionInfoItem = this.connectionComponents.createConnectionInfoConfigItem(); 168 ConnectionInfo[] connectionInfo = (ConnectionInfo[]) connectionInfoItem.getObject(); 169 170 String serverHost = connectionInfo[0].getHostname(); 171 int serverPort = connectionInfo[0].getPort(); 172 173 channel = new DSOClientMessageChannelImpl(communicationsManager.createClientChannel(sessionProvider, -1, 174 serverHost, serverPort, 10000, 175 connectionInfoItem)); 176 this.runtimeLogger = new RuntimeLoggerImpl(config); 177 178 logger.debug("Created channel."); 179 180 ChannelIDLoggerProvider cidLoggerProvider = new ChannelIDLoggerProvider(channel.getChannelIDProvider()); 181 182 ClientTransactionFactory txFactory = new ClientTransactionFactoryImpl(runtimeLogger, channel.getChannelIDProvider()); 183 184 TransactionBatchFactory txBatchFactory = new TransactionBatchWriterFactory(channel 185 .getCommitTransactionMessageFactory(), new DNAEncoding(classProvider)); 186 187 rtxManager = new RemoteTransactionManagerImpl(new ChannelIDLogger(channel.getChannelIDProvider(), TCLogging 188 .getLogger(RemoteTransactionManagerImpl.class)), txBatchFactory, new TransactionBatchAccounting(), 189 new LockAccounting(), sessionManager, channel); 190 191 ClientGlobalTransactionManager gtxManager = new ClientGlobalTransactionManagerImpl(rtxManager); 192 193 lockManager = new ClientLockManagerImpl(new ChannelIDLogger(channel.getChannelIDProvider(), TCLogging 194 .getLogger(ClientLockManager.class)), new RemoteLockManagerImpl(channel.getLockRequestMessageFactory(), 195 gtxManager), sessionManager); 196 197 RemoteObjectManager remoteObjectManager = new RemoteObjectManagerImpl(new ChannelIDLogger(channel 198 .getChannelIDProvider(), TCLogging.getLogger(RemoteObjectManager.class)), channel.getChannelIDProvider(), 199 channel.getRequestRootMessageFactory(), 200 channel 201 .getRequestManagedObjectMessageFactory(), 202 new NullObjectRequestMonitor(), faultCount, 203 sessionManager); 204 205 RemoteObjectIDBatchSequenceProvider remoteIDProvider = new RemoteObjectIDBatchSequenceProvider(channel 206 .getObjectIDBatchRequestMessageFactory()); 207 BatchSequence sequence = new BatchSequence(remoteIDProvider, 50000); 208 ObjectIDProvider idProvider = new ObjectIDProviderImpl(sequence); 209 210 TCClassFactory classFactory = new TCClassFactoryImpl(new TCFieldFactory(config), config, classProvider); 211 TCObjectFactory objectFactory = new TCObjectFactoryImpl(classFactory); 212 213 objectManager = new ClientObjectManagerImpl(remoteObjectManager, config, idProvider, new ClockEvictionPolicy(-1), 214 runtimeLogger, channel.getChannelIDProvider(), classProvider, 215 classFactory, objectFactory, config.getPortability(), channel); 216 217 l1Properties = TCPropertiesImpl.getProperties().getPropertiesFor("l1"); 218 TCProperties cacheManagerProperties = l1Properties.getPropertiesFor("cachemanager"); 219 if (cacheManagerProperties.getBoolean("enabled")) { 220 this.cacheManager = new CacheManager(objectManager, new CacheConfigImpl(cacheManagerProperties)); 221 if (logger.isDebugEnabled()) { 222 logger.debug("CacheManager Enabled : " + cacheManager); 223 } 224 } else { 225 logger.warn("CacheManager is Disabled"); 226 } 227 228 final TunnelingEventHandler teh = new TunnelingEventHandler(channel.channel()); 230 l1Management = new L1Management(teh); 231 l1Management.start(); 232 233 txManager = new ClientTransactionManagerImpl(channel.getChannelIDProvider(), objectManager, 234 new ThreadLockManagerImpl(lockManager), txFactory, rtxManager, 235 runtimeLogger, l1Management.findClientTxMonitorMBean()); 236 237 StageManager stageManager = getStageManager(); 238 239 stageManager.setLoggerProvider(cidLoggerProvider); 241 242 Stage lockResponse = stageManager.createStage(ClientConfigurationContext.LOCK_RESPONSE_STAGE, 243 new LockResponseHandler(sessionManager), 1, maxSize); 244 Stage receiveRootID = stageManager.createStage(ClientConfigurationContext.RECEIVE_ROOT_ID_STAGE, 245 new ReceiveRootIDHandler(), 1, maxSize); 246 Stage receiveObject = stageManager.createStage(ClientConfigurationContext.RECEIVE_OBJECT_STAGE, 247 new ReceiveObjectHandler(), 1, maxSize); 248 this.dmiManager = new DmiManagerImpl(classProvider, objectManager, runtimeLogger); 249 Stage dmiStage = stageManager.createStage(ClientConfigurationContext.DMI_STAGE, new DmiHandler(dmiManager), 1, 250 maxSize); 251 252 Stage receiveTransaction = stageManager 253 .createStage(ClientConfigurationContext.RECEIVE_TRANSACTION_STAGE, 254 new ReceiveTransactionHandler(channel.getChannelIDProvider(), channel 255 .getAcknowledgeTransactionMessageFactory(), gtxManager, sessionManager, dmiStage.getSink()), 256 1, maxSize); 257 Stage oidRequestResponse = stageManager.createStage(ClientConfigurationContext.OBJECT_ID_REQUEST_RESPONSE_STAGE, 258 remoteIDProvider, 1, maxSize); 259 Stage transactionResponse = stageManager.createStage(ClientConfigurationContext.RECEIVE_TRANSACTION_COMPLETE_STAGE, 260 new ReceiveTransactionCompleteHandler(), 1, maxSize); 261 Stage hydrateStage = stageManager.createStage(ClientConfigurationContext.HYDRATE_MESSAGE_STAGE, 262 new HydrateHandler(), 1, maxSize); 263 Stage batchTxnAckStage = stageManager.createStage(ClientConfigurationContext.BATCH_TXN_ACK_STAGE, 264 new BatchTransactionAckHandler(), 1, maxSize); 265 266 Stage pauseStage = stageManager.createStage(ClientConfigurationContext.CLIENT_COORDINATION_STAGE, 270 new ClientCoordinationHandler(cluster), 1, maxSize); 271 272 final Stage jmxRemoteTunnelStage = stageManager.createStage(ClientConfigurationContext.JMXREMOTE_TUNNEL_STAGE, teh, 273 1, maxSize); 274 275 Collection stagesToPauseOnDisconnect = Collections.EMPTY_LIST; 279 clientHandshakeManager = new ClientHandshakeManager(new ChannelIDLogger(channel.getChannelIDProvider(), TCLogging 280 .getLogger(ClientHandshakeManager.class)), channel.getChannelIDProvider(), channel 281 .getClientHandshakeMessageFactory(), objectManager, remoteObjectManager, lockManager, rtxManager, gtxManager, 282 stagesToPauseOnDisconnect, pauseStage.getSink(), 283 sessionManager, pauseListener, sequence, cluster); 284 channel.addListener(clientHandshakeManager); 285 286 ClientConfigurationContext cc = new ClientConfigurationContext(stageManager, lockManager, remoteObjectManager, 287 txManager, clientHandshakeManager); 288 stageManager.startAll(cc); 289 290 channel.addClassMapping(TCMessageType.BATCH_TRANSACTION_ACK_MESSAGE, BatchTransactionAcknowledgeMessageImpl.class); 291 channel.addClassMapping(TCMessageType.REQUEST_ROOT_MESSAGE, RequestRootMessageImpl.class); 292 channel.addClassMapping(TCMessageType.LOCK_REQUEST_MESSAGE, LockRequestMessage.class); 293 channel.addClassMapping(TCMessageType.LOCK_RESPONSE_MESSAGE, LockResponseMessage.class); 294 channel.addClassMapping(TCMessageType.LOCK_RECALL_MESSAGE, LockResponseMessage.class); 295 channel.addClassMapping(TCMessageType.LOCK_QUERY_RESPONSE_MESSAGE, LockResponseMessage.class); 296 channel.addClassMapping(TCMessageType.COMMIT_TRANSACTION_MESSAGE, CommitTransactionMessageImpl.class); 297 channel.addClassMapping(TCMessageType.REQUEST_ROOT_RESPONSE_MESSAGE, RequestRootResponseMessage.class); 298 channel.addClassMapping(TCMessageType.REQUEST_MANAGED_OBJECT_MESSAGE, RequestManagedObjectMessageImpl.class); 299 channel.addClassMapping(TCMessageType.REQUEST_MANAGED_OBJECT_RESPONSE_MESSAGE, 300 RequestManagedObjectResponseMessage.class); 301 channel.addClassMapping(TCMessageType.BROADCAST_TRANSACTION_MESSAGE, BroadcastTransactionMessageImpl.class); 302 channel.addClassMapping(TCMessageType.OBJECT_ID_BATCH_REQUEST_MESSAGE, ObjectIDBatchRequestMessage.class); 303 channel.addClassMapping(TCMessageType.OBJECT_ID_BATCH_REQUEST_RESPONSE_MESSAGE, 304 ObjectIDBatchRequestResponseMessage.class); 305 channel.addClassMapping(TCMessageType.ACKNOWLEDGE_TRANSACTION_MESSAGE, AcknowledgeTransactionMessageImpl.class); 306 channel.addClassMapping(TCMessageType.CLIENT_HANDSHAKE_MESSAGE, ClientHandshakeMessageImpl.class); 307 channel.addClassMapping(TCMessageType.CLIENT_HANDSHAKE_ACK_MESSAGE, ClientHandshakeAckMessageImpl.class); 308 channel.addClassMapping(TCMessageType.JMX_MESSAGE, JMXMessage.class); 309 channel.addClassMapping(TCMessageType.JMXREMOTE_MESSAGE_CONNECTION_MESSAGE, JmxRemoteTunnelMessage.class); 310 channel.addClassMapping(TCMessageType.CLUSTER_MEMBERSHIP_EVENT_MESSAGE, ClusterMembershipMessage.class); 311 channel.addClassMapping(TCMessageType.CLIENT_JMX_READY_MESSAGE, L1JmxReady.class); 312 313 logger.debug("Added class mappings."); 314 315 Sink hydrateSink = hydrateStage.getSink(); 316 channel.routeMessageType(TCMessageType.LOCK_RESPONSE_MESSAGE, lockResponse.getSink(), hydrateSink); 317 channel.routeMessageType(TCMessageType.LOCK_QUERY_RESPONSE_MESSAGE, lockResponse.getSink(), hydrateSink); 318 channel.routeMessageType(TCMessageType.LOCK_RECALL_MESSAGE, lockResponse.getSink(), hydrateSink); 319 channel.routeMessageType(TCMessageType.REQUEST_ROOT_RESPONSE_MESSAGE, receiveRootID.getSink(), hydrateSink); 320 channel.routeMessageType(TCMessageType.REQUEST_MANAGED_OBJECT_RESPONSE_MESSAGE, receiveObject.getSink(), 321 hydrateSink); 322 channel.routeMessageType(TCMessageType.BROADCAST_TRANSACTION_MESSAGE, receiveTransaction.getSink(), hydrateSink); 323 channel.routeMessageType(TCMessageType.OBJECT_ID_BATCH_REQUEST_RESPONSE_MESSAGE, oidRequestResponse.getSink(), 324 hydrateSink); 325 channel.routeMessageType(TCMessageType.ACKNOWLEDGE_TRANSACTION_MESSAGE, transactionResponse.getSink(), hydrateSink); 326 channel.routeMessageType(TCMessageType.BATCH_TRANSACTION_ACK_MESSAGE, batchTxnAckStage.getSink(), hydrateSink); 327 channel.routeMessageType(TCMessageType.CLIENT_HANDSHAKE_ACK_MESSAGE, pauseStage.getSink(), hydrateSink); 328 channel.routeMessageType(TCMessageType.JMXREMOTE_MESSAGE_CONNECTION_MESSAGE, jmxRemoteTunnelStage.getSink(), 329 hydrateSink); 330 channel.routeMessageType(TCMessageType.CLUSTER_MEMBERSHIP_EVENT_MESSAGE, pauseStage.getSink(), hydrateSink); 331 332 while (true) { 333 try { 334 logger.debug("Trying to open channel...."); 335 channel.open(); 336 logger.debug("Channel open"); 337 break; 338 } catch (TCTimeoutException tcte) { 339 consoleLogger.warn("Timeout connecting to server: " + serverHost + ":" + serverPort + ". " + tcte.getMessage()); 340 ThreadUtil.reallySleep(5000); 341 } catch (ConnectException e) { 342 consoleLogger.warn("Connection refused from server: " + serverHost + ":" + serverPort); 343 ThreadUtil.reallySleep(5000); 344 } catch (MaxConnectionsExceededException e) { 345 consoleLogger.warn("Connection refused MAXIMUM CONNECTIONS TO SERVER EXCEEDED: " + serverHost + ":" 346 + serverPort); 347 ThreadUtil.reallySleep(5000); 348 } catch (IOException ioe) { 349 ioe.printStackTrace(); 350 throw new RuntimeException (ioe); 351 } 352 } 353 354 clientHandshakeManager.waitForHandshake(); 355 356 cluster.addClusterEventListener(l1Management.getTerracottaCluster()); 357 } 358 359 public void stop() { 360 manager.stop(); 361 } 362 363 public ClientLockManager getLockManager() { 364 return lockManager; 365 } 366 367 public ClientTransactionManager getTransactionManager() { 368 return txManager; 369 } 370 371 public ClientObjectManager getObjectManager() { 372 return objectManager; 373 } 374 375 public RemoteTransactionManager getRemoteTransactionManager() { 376 return rtxManager; 377 } 378 379 public CommunicationsManager getCommunicationsManager() { 380 return communicationsManager; 381 } 382 383 public DSOClientMessageChannel getChannel() { 384 return channel; 385 } 386 387 public ClientHandshakeManager getClientHandshakeManager() { 388 return clientHandshakeManager; 389 } 390 391 public RuntimeLogger getRuntimeLogger() { 392 return runtimeLogger; 393 } 394 395 public SessionMonitorMBean getSessionMonitorMBean() { 396 return l1Management.findSessionMonitorMBean(); 397 } 398 399 public DmiManager getDmiManager() { 400 return dmiManager; 401 } 402 403 } 404 | Popular Tags |