1 5 package com.tc.object.handler; 6 7 import com.tc.async.api.AbstractEventHandler; 8 import com.tc.async.api.ConfigurationContext; 9 import com.tc.async.api.EventContext; 10 import com.tc.async.api.Sink; 11 import com.tc.net.protocol.tcm.ChannelIDProvider; 12 import com.tc.object.ClientConfigurationContext; 13 import com.tc.object.dmi.DmiDescriptor; 14 import com.tc.object.gtx.ClientGlobalTransactionManager; 15 import com.tc.object.gtx.GlobalTransactionID; 16 import com.tc.object.lockmanager.api.ClientLockManager; 17 import com.tc.object.lockmanager.api.LockContext; 18 import com.tc.object.msg.AcknowledgeTransactionMessage; 19 import com.tc.object.msg.AcknowledgeTransactionMessageFactory; 20 import com.tc.object.msg.BroadcastTransactionMessageImpl; 21 import com.tc.object.session.SessionManager; 22 import com.tc.object.tx.ClientTransactionManager; 23 import com.tc.util.Assert; 24 25 import java.util.Collection ; 26 import java.util.Iterator ; 27 import java.util.LinkedList ; 28 import java.util.List ; 29 30 33 public class ReceiveTransactionHandler extends AbstractEventHandler { 34 private ClientTransactionManager txManager; 35 private ClientLockManager lockManager; 36 private final SessionManager sessionManager; 37 private final ClientGlobalTransactionManager gtxManager; 38 private final AcknowledgeTransactionMessageFactory atmFactory; 39 private final ChannelIDProvider cidProvider; 40 private final Sink dmiSink; 41 42 public ReceiveTransactionHandler(ChannelIDProvider provider, AcknowledgeTransactionMessageFactory atmFactory, 43 ClientGlobalTransactionManager gtxManager, SessionManager sessionManager, 44 Sink dmiSink) { 45 this.cidProvider = provider; 46 this.atmFactory = atmFactory; 47 this.gtxManager = gtxManager; 48 this.sessionManager = sessionManager; 49 this.dmiSink = dmiSink; 50 } 51 52 public void handleEvent(EventContext context) { 53 final BroadcastTransactionMessageImpl btm = (BroadcastTransactionMessageImpl) context; 54 55 if (false) System.err.println(cidProvider.getChannelID() + ": ReceiveTransactionHandler: committer=" 56 + btm.getCommitterID() + ", " + btm.getTransactionID() + btm.getGlobalTransactionID() 57 + ", notified: " + btm.addNotifiesTo(new LinkedList ()) + ", lookup ObjectIDs: " 58 + btm.getLookupObjectIDs()); 59 60 Assert.eval(btm.getLockIDs().length > 0); 61 GlobalTransactionID lowWaterMark = btm.getLowGlobalTransactionIDWatermark(); 62 if (!lowWaterMark.isNull()) { 63 gtxManager.setLowWatermark(lowWaterMark); 64 } 65 if (gtxManager.startApply(btm.getCommitterID(), btm.getTransactionID(), btm.getGlobalTransactionID())) { 66 if (btm.getObjectChanges().size() > 0 || btm.getLookupObjectIDs().size() > 0 || btm.getNewRoots().size() > 0) { 67 68 if (false) System.err.println(cidProvider.getChannelID() + " Applying - committer=" + btm.getCommitterID() 69 + " , " + btm.getTransactionID() + " , " + btm.getGlobalTransactionID()); 70 71 txManager.apply(btm.getTransactionType(), btm.getLockIDs(), btm.getObjectChanges(), btm.getLookupObjectIDs(), 72 btm.getNewRoots()); 73 } 74 } 75 76 Collection notifies = btm.addNotifiesTo(new LinkedList ()); 77 for (Iterator i = notifies.iterator(); i.hasNext();) { 78 LockContext lc = (LockContext) i.next(); 79 lockManager.notified(lc.getLockID(), lc.getThreadID()); 80 } 81 82 List dmis = btm.getDmiDescriptors(); 83 for (Iterator i = dmis.iterator(); i.hasNext();) { 84 DmiDescriptor dd = (DmiDescriptor) i.next(); 85 dmiSink.add(dd); 86 } 87 88 if (sessionManager.isCurrentSession(btm.getLocalSessionID())) { 91 AcknowledgeTransactionMessage ack = atmFactory.newAcknowledgeTransactionMessage(); 92 ack.initialize(btm.getCommitterID(), btm.getTransactionID()); 93 ack.send(); 94 } 95 btm.recycle(); 96 } 97 98 public void initialize(ConfigurationContext context) { 99 super.initialize(context); 100 ClientConfigurationContext ccc = (ClientConfigurationContext) context; 101 this.txManager = ccc.getTransactionManager(); 102 this.lockManager = ccc.getLockManager(); 103 } 104 105 } | Popular Tags |