1 5 package com.tc.objectserver.handler; 6 7 import com.tc.async.api.AbstractEventHandler; 8 import com.tc.async.api.ConfigurationContext; 9 import com.tc.async.api.EventContext; 10 import com.tc.async.api.Sink; 11 import com.tc.object.gtx.GlobalTransactionID; 12 import com.tc.object.lockmanager.api.Notify; 13 import com.tc.object.tx.ServerTransactionID; 14 import com.tc.objectserver.api.ObjectInstanceMonitor; 15 import com.tc.objectserver.context.ApplyTransactionContext; 16 import com.tc.objectserver.context.BroadcastChangeContext; 17 import com.tc.objectserver.core.api.ServerConfigurationContext; 18 import com.tc.objectserver.gtx.ServerGlobalTransactionManager; 19 import com.tc.objectserver.lockmanager.api.LockManager; 20 import com.tc.objectserver.lockmanager.api.NotifiedWaiters; 21 import com.tc.objectserver.managedobject.BackReferences; 22 import com.tc.objectserver.tx.ServerTransaction; 23 import com.tc.objectserver.tx.ServerTransactionManager; 24 import com.tc.objectserver.tx.TransactionalObjectManager; 25 26 import java.util.Iterator ; 27 import java.util.LinkedList ; 28 29 35 public class ApplyTransactionChangeHandler extends AbstractEventHandler { 36 private ServerTransactionManager transactionManager; 37 private LockManager lockManager; 38 private Sink broadcastChangesSink; 39 private final ObjectInstanceMonitor instanceMonitor; 40 private final ServerGlobalTransactionManager gtxm; 41 private TransactionalObjectManager txnObjectMgr; 42 43 public ApplyTransactionChangeHandler(ObjectInstanceMonitor instanceMonitor, ServerGlobalTransactionManager gtxm) { 44 this.instanceMonitor = instanceMonitor; 45 this.gtxm = gtxm; 46 } 47 48 public void handleEvent(EventContext context) { 49 ApplyTransactionContext atc = (ApplyTransactionContext) context; 50 final ServerTransaction txn = atc.getTxn(); 51 52 NotifiedWaiters notifiedWaiters = new NotifiedWaiters(); 53 final ServerTransactionID stxnID = txn.getServerTransactionID(); 54 final BackReferences includeIDs = new BackReferences(); 55 56 GlobalTransactionID gtxnID = gtxm.createGlobalTransactionID(stxnID); 58 59 if (gtxm.needsApply(stxnID)) { 63 transactionManager.apply(gtxnID, txn, atc.getObjects(), includeIDs, instanceMonitor); 64 txnObjectMgr.applyTransactionComplete(stxnID); 65 } else { 66 transactionManager.skipApplyAndCommit(txn); 67 getLogger().warn("Not applying previously applied transaction: " + stxnID); 68 } 69 70 if (!txn.isPassive()) { 71 for (Iterator i = txn.addNotifiesTo(new LinkedList ()).iterator(); i.hasNext();) { 72 Notify notify = (Notify) i.next(); 73 lockManager.notify(notify.getLockID(), txn.getChannelID(), notify.getThreadID(), notify.getIsAll(), 74 notifiedWaiters); 75 } 76 broadcastChangesSink.add(new BroadcastChangeContext(gtxnID, txn, gtxm.getLowGlobalTransactionIDWatermark(), 77 notifiedWaiters, includeIDs)); 78 } 79 } 80 81 public void initialize(ConfigurationContext context) { 82 super.initialize(context); 83 ServerConfigurationContext scc = (ServerConfigurationContext) context; 84 this.transactionManager = scc.getTransactionManager(); 85 this.broadcastChangesSink = scc.getStage(ServerConfigurationContext.BROADCAST_CHANGES_STAGE).getSink(); 86 this.txnObjectMgr = scc.getTransactionalObjectManager(); 87 this.lockManager = scc.getLockManager(); 88 } 89 } 90 | Popular Tags |