KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > objectserver > tx > ServerTransactionManagerImpl


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.objectserver.tx;
6
7 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
8
9 import com.tc.logging.TCLogger;
10 import com.tc.logging.TCLogging;
11 import com.tc.net.protocol.tcm.ChannelID;
12 import com.tc.object.ObjectID;
13 import com.tc.object.dna.api.DNA;
14 import com.tc.object.dna.impl.VersionizedDNAWrapper;
15 import com.tc.object.gtx.GlobalTransactionID;
16 import com.tc.object.net.ChannelStats;
17 import com.tc.object.tx.ServerTransactionID;
18 import com.tc.object.tx.TransactionID;
19 import com.tc.objectserver.api.ObjectInstanceMonitor;
20 import com.tc.objectserver.api.ObjectManager;
21 import com.tc.objectserver.core.api.ManagedObject;
22 import com.tc.objectserver.gtx.ServerGlobalTransactionManager;
23 import com.tc.objectserver.l1.api.ClientStateManager;
24 import com.tc.objectserver.l1.impl.TransactionAcknowledgeAction;
25 import com.tc.objectserver.lockmanager.api.LockManager;
26 import com.tc.objectserver.managedobject.BackReferences;
27 import com.tc.objectserver.persistence.api.PersistenceTransaction;
28 import com.tc.objectserver.persistence.api.TransactionStore;
29 import com.tc.stats.counter.Counter;
30
31 import java.util.ArrayList JavaDoc;
32 import java.util.Collection JavaDoc;
33 import java.util.Collections JavaDoc;
34 import java.util.HashMap JavaDoc;
35 import java.util.Iterator JavaDoc;
36 import java.util.List JavaDoc;
37 import java.util.Map JavaDoc;
38 import java.util.Set JavaDoc;
39 import java.util.Map.Entry;
40
41 public class ServerTransactionManagerImpl implements ServerTransactionManager, ServerTransactionManagerMBean {
42
43   private static final TCLogger logger = TCLogging
44                                                                        .getLogger(ServerTransactionManager.class);
45
46   private final Map JavaDoc transactionAccounts = Collections.synchronizedMap(new HashMap JavaDoc());
47   private final ClientStateManager stateManager;
48   private final ObjectManager objectManager;
49   private final TransactionAcknowledgeAction action;
50   private final LockManager lockManager;
51   private final List rootEventListeners = new CopyOnWriteArrayList();
52   private final List txnEventListeners = new CopyOnWriteArrayList();
53
54   private final Counter transactionRateCounter;
55
56   private final ChannelStats channelStats;
57
58   private final ServerGlobalTransactionManager gtxm;
59
60   public ServerTransactionManagerImpl(ServerGlobalTransactionManager gtxm, TransactionStore transactionStore,
61                                       LockManager lockManager, ClientStateManager stateManager,
62                                       ObjectManager objectManager, TransactionAcknowledgeAction action,
63                                       Counter transactionRateCounter, ChannelStats channelStats) {
64     this.gtxm = gtxm;
65     this.lockManager = lockManager;
66     this.objectManager = objectManager;
67     this.stateManager = stateManager;
68     this.action = action;
69     this.transactionRateCounter = transactionRateCounter;
70     this.channelStats = channelStats;
71     this.addTransactionListener(gtxm);
72   }
73
74   public void dump() {
75     StringBuffer JavaDoc buf = new StringBuffer JavaDoc("ServerTransactionManager");
76     buf.append("transactionAccounts: " + transactionAccounts);
77     buf.append("\n/ServerTransactionManager");
78     System.err.println(buf.toString());
79   }
80
81   // TODO:: shutdown clients should not be cleared immediately. some time to apply all the transactions
82
// on the wire should be given before removing if from accounting and releasing the lock.
83
public void shutdownClient(ChannelID waitee) {
84     transactionAccounts.remove(waitee);
85     Map JavaDoc currentStates = new HashMap JavaDoc(transactionAccounts);
86     for (Iterator JavaDoc i = currentStates.keySet().iterator(); i.hasNext();) {
87       ChannelID key = (ChannelID) i.next();
88
89       TransactionAccount client = getTransactionAccount(key);
90       if (client != null) {
91         for (Iterator JavaDoc it = client.requestersWaitingFor(waitee).iterator(); it.hasNext();) {
92           TransactionID reqID = (TransactionID) it.next();
93           acknowledgement(client.getClientID(), reqID, waitee);
94         }
95       }
96     }
97
98     stateManager.shutdownClient(waitee);
99     lockManager.clearAllLocksFor(waitee);
100     gtxm.shutdownClient(waitee);
101     fireClientDisconnectedEvent(waitee);
102   }
103
104   public void setResentTransactionIDs(ChannelID channelID, Collection JavaDoc transactionIDs) {
105     Collection JavaDoc stxIDs = new ArrayList();
106     for (Iterator JavaDoc iter = transactionIDs.iterator(); iter.hasNext();) {
107       TransactionID txn = (TransactionID) iter.next();
108       stxIDs.add(new ServerTransactionID(channelID, txn));
109     }
110     fireAddResentTransactionIDsEvent(stxIDs);
111   }
112
113   public void addWaitingForAcknowledgement(ChannelID waiter, TransactionID txnID, ChannelID waitee) {
114     TransactionAccount ci = getOrCreateTransactionAccount(waiter);
115     ci.addWaitee(waitee, txnID);
116   }
117
118   // For testing
119
public boolean isWaiting(ChannelID waiter, TransactionID txnID) {
120     TransactionAccount c = getTransactionAccount(waiter);
121     return c != null && c.hasWaitees(txnID);
122   }
123
124   private void acknowledge(ChannelID waiter, TransactionID txnID) {
125     final ServerTransactionID serverTxnID = new ServerTransactionID(waiter, txnID);
126     fireTransactionCompleteEvent(serverTxnID);
127     if (!gtxm.needsApply(serverTxnID)) {
128       // the GlobalTransactionID can by null if the server crashed before the global transaction was stored. We only
129
// want to accept acknowledgements for the global transaction id that we actually persisted.
130
action.acknowledgeTransaction(serverTxnID);
131     }
132   }
133
134   public void acknowledgement(ChannelID waiter, TransactionID txnID, ChannelID waitee) {
135
136     TransactionAccount transactionAccount = getTransactionAccount(waiter);
137     if (transactionAccount == null) {
138       // This can happen if an ack makes it into the system and the server crashed
139
// leading to a removed state;
140
logger.warn("Waiter not found in the states map: " + waiter);
141       return;
142     }
143
144     if (transactionAccount.removeWaitee(waitee, txnID)) {
145       acknowledge(waiter, txnID);
146     }
147   }
148
149   public void apply(GlobalTransactionID gtxID, ServerTransaction txn, Map JavaDoc objects, BackReferences includeIDs,
150                     ObjectInstanceMonitor instanceMonitor) {
151
152     final ChannelID channelID = txn.getChannelID();
153     final TransactionID txnID = txn.getTransactionID();
154     final List changes = txn.getChanges();
155
156     TransactionAccount ci;
157     if (txn.isPassive()) {
158       ci = getOrCreateNullTransactionAccount(channelID);
159     } else {
160       // There could potentically be a small leak if the clients crash and then shutdownClient() called before
161
// apply() is called. Will create a TransactionAccount which will never get removed.
162
ci = getOrCreateTransactionAccount(channelID);
163     }
164     ci.applyStarted(txnID);
165
166     for (Iterator JavaDoc i = changes.iterator(); i.hasNext();) {
167       DNA orgDNA = (DNA) i.next();
168       long version = orgDNA.getVersion();
169       if (version == DNA.NULL_VERSION) {
170         version = gtxID.toLong();
171       }
172       DNA change = new VersionizedDNAWrapper(orgDNA, version, true);
173       ManagedObject mo = (ManagedObject) objects.get(change.getObjectID());
174       mo.apply(change, txnID, includeIDs, instanceMonitor);
175       if (!change.isDelta() && !txn.isPassive()) {
176         // Only New objects reference are added here
177
stateManager.addReference(txn.getChannelID(), mo.getID());
178       }
179     }
180
181     Map JavaDoc newRoots = txn.getNewRoots();
182
183     if (newRoots.size() > 0) {
184       for (Iterator JavaDoc i = newRoots.entrySet().iterator(); i.hasNext();) {
185         Entry entry = (Entry) i.next();
186         String JavaDoc rootName = (String JavaDoc) entry.getKey();
187         ObjectID newID = (ObjectID) entry.getValue();
188         objectManager.createRoot(rootName, newID);
189       }
190     }
191     transactionRateCounter.increment();
192     if (!channelID.isNull()) channelStats.notifyTransaction(channelID);
193
194   }
195
196   public void skipApplyAndCommit(ServerTransaction txn) {
197     final ChannelID channelID = txn.getChannelID();
198     final TransactionID txnID = txn.getTransactionID();
199     TransactionAccount ci = getOrCreateTransactionAccount(channelID);
200     if (ci.skipApplyAndCommit(txnID)) {
201       acknowledge(channelID, txnID);
202     }
203     fireTransactionAppliedEvent(txn.getServerTransactionID());
204   }
205
206   public void release(PersistenceTransaction ptx, Collection JavaDoc objects, Map JavaDoc newRoots) {
207     // change done so now we can release the objects
208
objectManager.releaseAll(ptx, objects);
209
210     // NOTE: important to have released all objects in the TXN before
211
// calling this event as the listeners tries to lookup for the object and blocks
212
for (Iterator JavaDoc i = newRoots.entrySet().iterator(); i.hasNext();) {
213       Map.Entry JavaDoc entry = (Entry) i.next();
214       fireRootCreatedEvent((String JavaDoc) entry.getKey(), (ObjectID) entry.getValue());
215     }
216   }
217
218   public void incomingTransactions(ChannelID cid, Set JavaDoc serverTxnIDs, boolean relayed) {
219     TransactionAccount ci = getOrCreateTransactionAccount(cid);
220     for (Iterator JavaDoc i = serverTxnIDs.iterator(); i.hasNext();) {
221       final ServerTransactionID txnId = (ServerTransactionID) i.next();
222       final TransactionID txnID = txnId.getClientTransactionID();
223       if (!relayed) {
224         ci.relayTransactionComplete(txnID);
225       }
226     }
227     fireIncomingTransactionsEvent(cid, serverTxnIDs);
228   }
229
230   public void transactionsRelayed(ChannelID channelID, Set JavaDoc serverTxnIDs) {
231     TransactionAccount ci = getOrCreateTransactionAccount(channelID);
232     if (ci == null) {
233       logger.warn("transactionsRelayed(): TransactionAccount not found for " + channelID);
234       return;
235     }
236     for (Iterator JavaDoc i = serverTxnIDs.iterator(); i.hasNext();) {
237       final ServerTransactionID txnId = (ServerTransactionID) i.next();
238       final TransactionID txnID = txnId.getClientTransactionID();
239       if (ci.relayTransactionComplete(txnID)) {
240         acknowledge(channelID, txnID);
241       }
242     }
243   }
244
245   public void committed(Collection JavaDoc txnsIds) {
246     for (Iterator JavaDoc i = txnsIds.iterator(); i.hasNext();) {
247       final ServerTransactionID txnId = (ServerTransactionID) i.next();
248       final ChannelID waiter = txnId.getChannelID();
249       final TransactionID txnID = txnId.getClientTransactionID();
250
251       TransactionAccount ci = getTransactionAccount(waiter);
252       if (ci != null && ci.applyCommitted(txnID)) {
253         acknowledge(waiter, txnID);
254       }
255
256       // TODO :: Move this to apply() and not commit(). Also check out DEV-473
257
fireTransactionAppliedEvent(txnId);
258     }
259   }
260
261   public void broadcasted(ChannelID waiter, TransactionID txnID) {
262     TransactionAccount ci = getTransactionAccount(waiter);
263
264     if (ci != null && ci.broadcastCompleted(txnID)) {
265       acknowledge(waiter, txnID);
266     }
267   }
268
269   private TransactionAccount getOrCreateTransactionAccount(ChannelID clientID) {
270     synchronized (transactionAccounts) {
271       TransactionAccount ta = (TransactionAccount) transactionAccounts.get(clientID);
272       if ((ta == null) || (ta instanceof NullTransactionAccount)) {
273         Object JavaDoc old = transactionAccounts.put(clientID, (ta = new TransactionAccountImpl(clientID)));
274         if (old != null) {
275           logger.info("Transaction Account changed from : " + old + " to " + ta);
276         }
277       }
278       return ta;
279     }
280   }
281
282   private TransactionAccount getOrCreateNullTransactionAccount(ChannelID clientID) {
283     synchronized (transactionAccounts) {
284       TransactionAccount ta = (TransactionAccount) transactionAccounts.get(clientID);
285       if ((ta == null) || (ta instanceof TransactionAccountImpl)) {
286         Object JavaDoc old = transactionAccounts.put(clientID, (ta = new NullTransactionAccount(clientID)));
287         if (old != null) {
288           logger.info("Transaction Account changed from : " + old + " to " + ta);
289         }
290       }
291       return ta;
292
293     }
294   }
295
296   private TransactionAccount getTransactionAccount(ChannelID clientID) {
297     return (TransactionAccount) transactionAccounts.get(clientID);
298   }
299
300   public void addRootListener(ServerTransactionManagerEventListener listener) {
301     if (listener == null) { throw new IllegalArgumentException JavaDoc("listener cannot be null"); }
302     this.rootEventListeners.add(listener);
303   }
304
305   private void fireRootCreatedEvent(String JavaDoc rootName, ObjectID id) {
306     for (Iterator JavaDoc iter = rootEventListeners.iterator(); iter.hasNext();) {
307       try {
308         ServerTransactionManagerEventListener listener = (ServerTransactionManagerEventListener) iter.next();
309         listener.rootCreated(rootName, id);
310       } catch (Exception JavaDoc e) {
311         if (logger.isDebugEnabled()) {
312           logger.debug(e);
313         } else {
314           logger.warn("Exception in rootCreated event callback: " + e.getMessage());
315         }
316       }
317     }
318   }
319
320   public void addTransactionListener(ServerTransactionListener listener) {
321     if (listener == null) { throw new IllegalArgumentException JavaDoc("listener cannot be null"); }
322     this.txnEventListeners.add(listener);
323   }
324
325   private void fireIncomingTransactionsEvent(ChannelID cid, Set JavaDoc serverTxnIDs) {
326     for (Iterator JavaDoc iter = txnEventListeners.iterator(); iter.hasNext();) {
327       try {
328         ServerTransactionListener listener = (ServerTransactionListener) iter.next();
329         listener.incomingTransactions(cid, serverTxnIDs);
330       } catch (Exception JavaDoc e) {
331         if (logger.isDebugEnabled()) {
332           logger.debug(e);
333         } else {
334           logger.warn("Exception in Txn complete event callback: " + e.getMessage());
335         }
336       }
337     }
338   }
339
340   private void fireTransactionCompleteEvent(ServerTransactionID stxID) {
341     for (Iterator JavaDoc iter = txnEventListeners.iterator(); iter.hasNext();) {
342       try {
343         ServerTransactionListener listener = (ServerTransactionListener) iter.next();
344         listener.transactionCompleted(stxID);
345       } catch (Exception JavaDoc e) {
346         if (logger.isDebugEnabled()) {
347           logger.debug(e);
348         } else {
349           logger.warn("Exception in Txn complete event callback: " + e.getMessage());
350         }
351       }
352     }
353   }
354
355   private void fireTransactionAppliedEvent(ServerTransactionID stxID) {
356     for (Iterator JavaDoc iter = txnEventListeners.iterator(); iter.hasNext();) {
357       try {
358         ServerTransactionListener listener = (ServerTransactionListener) iter.next();
359         listener.transactionApplied(stxID);
360       } catch (Exception JavaDoc e) {
361         if (logger.isDebugEnabled()) {
362           logger.debug(e);
363         } else {
364           logger.warn("Exception in Txn Applied event callback: " + e.getMessage());
365         }
366       }
367     }
368   }
369
370   private void fireAddResentTransactionIDsEvent(Collection JavaDoc stxIDs) {
371     for (Iterator JavaDoc iter = txnEventListeners.iterator(); iter.hasNext();) {
372       try {
373         ServerTransactionListener listener = (ServerTransactionListener) iter.next();
374         listener.addResentServerTransactionIDs(stxIDs);
375       } catch (Exception JavaDoc e) {
376         if (logger.isDebugEnabled()) {
377           logger.debug(e);
378         } else {
379           logger.warn("Exception in addResentServerTransactionIDs() event callback: " + e.getMessage());
380         }
381       }
382     }
383   }
384
385   private void fireClientDisconnectedEvent(ChannelID waitee) {
386     for (Iterator JavaDoc iter = txnEventListeners.iterator(); iter.hasNext();) {
387       try {
388         ServerTransactionListener listener = (ServerTransactionListener) iter.next();
389         listener.clearAllTransactionsFor(waitee);
390       } catch (Exception JavaDoc e) {
391         if (logger.isDebugEnabled()) {
392           logger.debug(e);
393         } else {
394           logger.warn("Exception in addResentServerTransactionIDs() event callback: " + e.getMessage());
395         }
396       }
397     }
398   }
399 }
400
Popular Tags