KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > objectserver > handler > BroadcastChangeHandler


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.objectserver.handler;
6
7 import com.tc.async.api.AbstractEventHandler;
8 import com.tc.async.api.ConfigurationContext;
9 import com.tc.async.api.EventContext;
10 import com.tc.async.api.EventHandlerException;
11 import com.tc.async.api.Sink;
12 import com.tc.net.protocol.tcm.ChannelID;
13 import com.tc.net.protocol.tcm.MessageChannel;
14 import com.tc.net.protocol.tcm.TCMessageType;
15 import com.tc.object.ObjectRequestID;
16 import com.tc.object.dmi.DmiDescriptor;
17 import com.tc.object.msg.BatchTransactionAcknowledgeMessage;
18 import com.tc.object.msg.BroadcastTransactionMessage;
19 import com.tc.object.net.DSOChannelManager;
20 import com.tc.object.net.NoSuchChannelException;
21 import com.tc.object.tx.TransactionID;
22 import com.tc.object.tx.TxnBatchID;
23 import com.tc.objectserver.context.BroadcastChangeContext;
24 import com.tc.objectserver.context.ManagedObjectRequestContext;
25 import com.tc.objectserver.core.api.ServerConfigurationContext;
26 import com.tc.objectserver.l1.api.ClientStateManager;
27 import com.tc.objectserver.tx.NoSuchBatchException;
28 import com.tc.objectserver.tx.ServerTransactionManager;
29 import com.tc.objectserver.tx.TransactionBatchManager;
30
31 import java.util.ArrayList JavaDoc;
32 import java.util.Collections JavaDoc;
33 import java.util.HashSet JavaDoc;
34 import java.util.List JavaDoc;
35 import java.util.Map JavaDoc;
36 import java.util.Set JavaDoc;
37
38 /**
39  * Broadcast the change to all connected clients
40  *
41  * @author steve
42  */

43 public class BroadcastChangeHandler extends AbstractEventHandler {
44   private DSOChannelManager channelManager;
45   private ClientStateManager clientStateManager;
46   private ServerTransactionManager transactionManager;
47   private Sink managedObjectRequestSink;
48   private Sink respondObjectRequestSink;
49   private final TransactionBatchManager transactionBatchManager;
50
51   public BroadcastChangeHandler(TransactionBatchManager transactionBatchManager) {
52     this.transactionBatchManager = transactionBatchManager;
53   }
54
55   public void handleEvent(EventContext context) throws EventHandlerException {
56     BroadcastChangeContext bcc = (BroadcastChangeContext) context;
57
58     final ChannelID committerID = bcc.getChannelID();
59     final TransactionID txnID = bcc.getTransactionID();
60
61     final MessageChannel[] channels = channelManager.getActiveChannels();
62
63     for (int i = 0; i < channels.length; i++) {
64       MessageChannel client = channels[i];
65       ChannelID clientID = client.getChannelID();
66
67       Map JavaDoc newRoots = bcc.getNewRoots();
68       Set JavaDoc notifiedWaiters = bcc.getNewlyPendingWaiters().getNotifiedFor(clientID);
69       List JavaDoc prunedChanges = Collections.EMPTY_LIST;
70       Set JavaDoc lookupObjectIDs = new HashSet JavaDoc();
71
72       if (!clientID.equals(committerID)) {
73         prunedChanges = clientStateManager.createPrunedChangesAndAddObjectIDTo(bcc.getChanges(), bcc.getIncludeIDs(),
74                                                                                clientID, lookupObjectIDs);
75       }
76
77       DmiDescriptor[] prunedDmis = pruneDmiDescriptors(bcc.getDmiDescriptors(), clientID, clientStateManager);
78       final boolean includeDmi = !clientID.equals(committerID) && prunedDmis.length > 0;
79       if (!prunedChanges.isEmpty() || !lookupObjectIDs.isEmpty() || !notifiedWaiters.isEmpty() || !newRoots.isEmpty()
80           || includeDmi) {
81         transactionManager.addWaitingForAcknowledgement(committerID, txnID, clientID);
82         if (lookupObjectIDs.size() > 0) {
83           // TODO:: Request ID is not used anywhere. RemoveIT.
84
// XXX:: It is important to keep the maxReachableSize to <= 0 so that we dont go into recursive lookups @see
85
// ObjectManagerImpl
86
this.managedObjectRequestSink.add(new ManagedObjectRequestContext(clientID, ObjectRequestID.NULL_ID,
87                                                                             lookupObjectIDs, -1,
88                                                                             this.respondObjectRequestSink));
89         }
90         final DmiDescriptor[] dmi = (includeDmi) ? prunedDmis : DmiDescriptor.EMPTY_ARRAY;
91         BroadcastTransactionMessage responseMessage = (BroadcastTransactionMessage) client
92             .createMessage(TCMessageType.BROADCAST_TRANSACTION_MESSAGE);
93         responseMessage.initialize(prunedChanges, lookupObjectIDs, bcc.getSerializer(), bcc.getLockIDs(),
94                                    getNextChangeIDFor(clientID), txnID, committerID, bcc.getGlobalTransactionID(), bcc
95                                        .getTransactionType(), bcc.getLowGlobalTransactionIDWatermark(),
96                                    notifiedWaiters, newRoots, dmi);
97
98         responseMessage.send();
99       }
100     }
101     transactionManager.broadcasted(committerID, txnID);
102     try {
103       TxnBatchID batchID = bcc.getBatchID();
104       if (transactionBatchManager.batchComponentComplete(committerID, batchID, txnID)) {
105         try {
106           BatchTransactionAcknowledgeMessage msg = channelManager.newBatchTransactionAcknowledgeMessage(committerID);
107           msg.initialize(batchID);
108           msg.send();
109         } catch (NoSuchChannelException e) {
110           getLogger().warn("Can't send transaction batch acknowledge message to unconnected client: " + committerID);
111         }
112       }
113     } catch (NoSuchBatchException e) {
114       throw new EventHandlerException(e);
115     }
116   }
117
118   private static DmiDescriptor[] pruneDmiDescriptors(DmiDescriptor[] dmiDescriptors, ChannelID clientID,
119                                                      ClientStateManager clientStateManager) {
120     if(dmiDescriptors.length == 0) {
121       return dmiDescriptors;
122     }
123     
124     List JavaDoc list = new ArrayList JavaDoc();
125     for (int i = 0; i < dmiDescriptors.length; i++) {
126       DmiDescriptor dd = dmiDescriptors[i];
127       if (dd.isFaultReceiver() || clientStateManager.hasReference(clientID, dd.getReceiverId())) {
128         list.add(dd);
129       }
130     }
131     DmiDescriptor[] rv = new DmiDescriptor[list.size()];
132     list.toArray(rv);
133     return rv;
134   }
135
136   private synchronized long getNextChangeIDFor(ChannelID id) {
137     // FIXME Fix this facility. Should keep a counter for every client and
138
// increment on every
139
return 0;
140   }
141
142   public void initialize(ConfigurationContext context) {
143     super.initialize(context);
144     ServerConfigurationContext scc = (ServerConfigurationContext) context;
145     this.channelManager = scc.getChannelManager();
146     this.clientStateManager = scc.getClientStateManager();
147     this.transactionManager = scc.getTransactionManager();
148     this.managedObjectRequestSink = scc.getStage(ServerConfigurationContext.MANAGED_OBJECT_REQUEST_STAGE).getSink();
149     this.respondObjectRequestSink = scc.getStage(ServerConfigurationContext.RESPOND_TO_OBJECT_REQUEST_STAGE).getSink();
150   }
151 }
152
Popular Tags