1 5 package com.tc.objectserver.handler; 6 7 import com.tc.async.api.AbstractEventHandler; 8 import com.tc.async.api.ConfigurationContext; 9 import com.tc.async.api.EventContext; 10 import com.tc.async.api.EventHandlerException; 11 import com.tc.async.api.Sink; 12 import com.tc.net.protocol.tcm.ChannelID; 13 import com.tc.net.protocol.tcm.MessageChannel; 14 import com.tc.net.protocol.tcm.TCMessageType; 15 import com.tc.object.ObjectRequestID; 16 import com.tc.object.dmi.DmiDescriptor; 17 import com.tc.object.msg.BatchTransactionAcknowledgeMessage; 18 import com.tc.object.msg.BroadcastTransactionMessage; 19 import com.tc.object.net.DSOChannelManager; 20 import com.tc.object.net.NoSuchChannelException; 21 import com.tc.object.tx.TransactionID; 22 import com.tc.object.tx.TxnBatchID; 23 import com.tc.objectserver.context.BroadcastChangeContext; 24 import com.tc.objectserver.context.ManagedObjectRequestContext; 25 import com.tc.objectserver.core.api.ServerConfigurationContext; 26 import com.tc.objectserver.l1.api.ClientStateManager; 27 import com.tc.objectserver.tx.NoSuchBatchException; 28 import com.tc.objectserver.tx.ServerTransactionManager; 29 import com.tc.objectserver.tx.TransactionBatchManager; 30 31 import java.util.ArrayList ; 32 import java.util.Collections ; 33 import java.util.HashSet ; 34 import java.util.List ; 35 import java.util.Map ; 36 import java.util.Set ; 37 38 43 public class BroadcastChangeHandler extends AbstractEventHandler { 44 private DSOChannelManager channelManager; 45 private ClientStateManager clientStateManager; 46 private ServerTransactionManager transactionManager; 47 private Sink managedObjectRequestSink; 48 private Sink respondObjectRequestSink; 49 private final TransactionBatchManager transactionBatchManager; 50 51 public BroadcastChangeHandler(TransactionBatchManager transactionBatchManager) { 52 this.transactionBatchManager = transactionBatchManager; 53 } 54 55 public void handleEvent(EventContext context) throws EventHandlerException { 56 BroadcastChangeContext bcc = (BroadcastChangeContext) context; 57 58 final ChannelID committerID = bcc.getChannelID(); 59 final TransactionID txnID = bcc.getTransactionID(); 60 61 final MessageChannel[] channels = channelManager.getActiveChannels(); 62 63 for (int i = 0; i < channels.length; i++) { 64 MessageChannel client = channels[i]; 65 ChannelID clientID = client.getChannelID(); 66 67 Map newRoots = bcc.getNewRoots(); 68 Set notifiedWaiters = bcc.getNewlyPendingWaiters().getNotifiedFor(clientID); 69 List prunedChanges = Collections.EMPTY_LIST; 70 Set lookupObjectIDs = new HashSet (); 71 72 if (!clientID.equals(committerID)) { 73 prunedChanges = clientStateManager.createPrunedChangesAndAddObjectIDTo(bcc.getChanges(), bcc.getIncludeIDs(), 74 clientID, lookupObjectIDs); 75 } 76 77 DmiDescriptor[] prunedDmis = pruneDmiDescriptors(bcc.getDmiDescriptors(), clientID, clientStateManager); 78 final boolean includeDmi = !clientID.equals(committerID) && prunedDmis.length > 0; 79 if (!prunedChanges.isEmpty() || !lookupObjectIDs.isEmpty() || !notifiedWaiters.isEmpty() || !newRoots.isEmpty() 80 || includeDmi) { 81 transactionManager.addWaitingForAcknowledgement(committerID, txnID, clientID); 82 if (lookupObjectIDs.size() > 0) { 83 this.managedObjectRequestSink.add(new ManagedObjectRequestContext(clientID, ObjectRequestID.NULL_ID, 87 lookupObjectIDs, -1, 88 this.respondObjectRequestSink)); 89 } 90 final DmiDescriptor[] dmi = (includeDmi) ? prunedDmis : DmiDescriptor.EMPTY_ARRAY; 91 BroadcastTransactionMessage responseMessage = (BroadcastTransactionMessage) client 92 .createMessage(TCMessageType.BROADCAST_TRANSACTION_MESSAGE); 93 responseMessage.initialize(prunedChanges, lookupObjectIDs, bcc.getSerializer(), bcc.getLockIDs(), 94 getNextChangeIDFor(clientID), txnID, committerID, bcc.getGlobalTransactionID(), bcc 95 .getTransactionType(), bcc.getLowGlobalTransactionIDWatermark(), 96 notifiedWaiters, newRoots, dmi); 97 98 responseMessage.send(); 99 } 100 } 101 transactionManager.broadcasted(committerID, txnID); 102 try { 103 TxnBatchID batchID = bcc.getBatchID(); 104 if (transactionBatchManager.batchComponentComplete(committerID, batchID, txnID)) { 105 try { 106 BatchTransactionAcknowledgeMessage msg = channelManager.newBatchTransactionAcknowledgeMessage(committerID); 107 msg.initialize(batchID); 108 msg.send(); 109 } catch (NoSuchChannelException e) { 110 getLogger().warn("Can't send transaction batch acknowledge message to unconnected client: " + committerID); 111 } 112 } 113 } catch (NoSuchBatchException e) { 114 throw new EventHandlerException(e); 115 } 116 } 117 118 private static DmiDescriptor[] pruneDmiDescriptors(DmiDescriptor[] dmiDescriptors, ChannelID clientID, 119 ClientStateManager clientStateManager) { 120 if(dmiDescriptors.length == 0) { 121 return dmiDescriptors; 122 } 123 124 List list = new ArrayList (); 125 for (int i = 0; i < dmiDescriptors.length; i++) { 126 DmiDescriptor dd = dmiDescriptors[i]; 127 if (dd.isFaultReceiver() || clientStateManager.hasReference(clientID, dd.getReceiverId())) { 128 list.add(dd); 129 } 130 } 131 DmiDescriptor[] rv = new DmiDescriptor[list.size()]; 132 list.toArray(rv); 133 return rv; 134 } 135 136 private synchronized long getNextChangeIDFor(ChannelID id) { 137 return 0; 140 } 141 142 public void initialize(ConfigurationContext context) { 143 super.initialize(context); 144 ServerConfigurationContext scc = (ServerConfigurationContext) context; 145 this.channelManager = scc.getChannelManager(); 146 this.clientStateManager = scc.getClientStateManager(); 147 this.transactionManager = scc.getTransactionManager(); 148 this.managedObjectRequestSink = scc.getStage(ServerConfigurationContext.MANAGED_OBJECT_REQUEST_STAGE).getSink(); 149 this.respondObjectRequestSink = scc.getStage(ServerConfigurationContext.RESPOND_TO_OBJECT_REQUEST_STAGE).getSink(); 150 } 151 } 152 | Popular Tags |