1 5 package com.tc.object.lockmanager.impl; 6 7 import com.tc.async.api.EventContext; 8 import com.tc.async.api.Sink; 9 import com.tc.async.impl.NullSink; 10 import com.tc.net.protocol.tcm.ChannelID; 11 import com.tc.object.lockmanager.api.ClientLockManager; 12 import com.tc.object.lockmanager.api.LockContext; 13 import com.tc.object.lockmanager.api.LockFlushCallback; 14 import com.tc.object.lockmanager.api.LockID; 15 import com.tc.object.lockmanager.api.LockRequest; 16 import com.tc.object.lockmanager.api.RemoteLockManager; 17 import com.tc.object.lockmanager.api.ThreadID; 18 import com.tc.object.lockmanager.api.WaitContext; 19 import com.tc.object.lockmanager.api.WaitLockRequest; 20 import com.tc.object.session.SessionProvider; 21 import com.tc.object.tx.WaitInvocation; 22 import com.tc.objectserver.api.TestSink; 23 import com.tc.objectserver.context.LockResponseContext; 24 import com.tc.objectserver.lockmanager.api.NotifiedWaiters; 25 import com.tc.objectserver.lockmanager.api.NullChannelManager; 26 import com.tc.objectserver.lockmanager.impl.LockManagerImpl; 27 28 import java.util.ArrayList ; 29 import java.util.Collection ; 30 import java.util.HashSet ; 31 import java.util.Iterator ; 32 import java.util.Set ; 33 34 public class ClientServerLockManagerGlue implements RemoteLockManager, Runnable { 35 36 private static final Sink NULL_SINK = new NullSink(); 37 38 private LockManagerImpl serverLockManager; 39 private ClientLockManager clientLockManager; 40 41 private TestSink sink = new TestSink(); 42 private ChannelID channelID = new ChannelID(1); 43 private boolean stop = false; 44 private Thread eventNotifier; 45 46 private final SessionProvider sessionProvider; 47 48 public ClientServerLockManagerGlue(SessionProvider sessionProvider) { 49 super(); 50 this.sessionProvider = sessionProvider; 51 eventNotifier = new Thread (this, "ClientServerLockManagerGlue"); 52 eventNotifier.setDaemon(true); 53 eventNotifier.start(); 54 } 55 56 public void requestLock(LockID lockID, ThreadID threadID, int lockType) { 57 serverLockManager.requestLock(lockID, channelID, threadID, lockType, sink); 58 } 59 60 public void releaseLock(LockID lockID, ThreadID threadID) { 61 serverLockManager.unlock(lockID, channelID, threadID); 62 } 63 64 public void releaseLockWait(LockID lockID, ThreadID threadID, WaitInvocation call) { 65 serverLockManager.wait(lockID, channelID, threadID, call, sink); 66 } 67 68 public void recallCommit(LockID lockID, Collection lockContext, Collection waitContext, Collection pendingRequests) { 69 Collection serverLC = new ArrayList (); 70 for (Iterator i = lockContext.iterator(); i.hasNext();) { 71 LockRequest request = (LockRequest) i.next(); 72 LockContext ctxt = new LockContext(request.lockID(), channelID, request.threadID(), request.lockLevel()); 73 serverLC.add(ctxt); 74 } 75 76 Collection serverWC = new ArrayList (); 77 for (Iterator i = waitContext.iterator(); i.hasNext();) { 78 WaitLockRequest request = (WaitLockRequest) i.next(); 79 WaitContext ctxt = new WaitContext(request.lockID(), channelID, request.threadID(), request.lockLevel(), request 80 .getWaitInvocation()); 81 serverWC.add(ctxt); 82 } 83 84 Collection serverPC = new ArrayList (); 85 for (Iterator i = pendingRequests.iterator(); i.hasNext();) { 86 LockRequest request = (LockRequest) i.next(); 87 LockContext ctxt = new LockContext(request.lockID(), channelID, request.threadID(), request.lockLevel()); 88 serverPC.add(ctxt); 89 } 90 91 serverLockManager.recallCommit(lockID, channelID, serverLC, serverWC, serverPC, sink); 92 } 93 94 public void flush(LockID lockID) { 95 return; 96 } 97 98 public boolean isTransactionsForLockFlushed(LockID lockID, LockFlushCallback callback) { 99 return true; 100 } 101 102 public void set(ClientLockManager clmgr, LockManagerImpl slmgr) { 103 this.clientLockManager = clmgr; 104 this.serverLockManager = slmgr; 105 this.serverLockManager.start(); 106 } 107 108 public void run() { 109 while (!stop) { 110 EventContext ec = null; 111 try { 112 ec = sink.take(); 113 } catch (InterruptedException e) { 114 } 116 if (ec instanceof LockResponseContext) { 117 LockResponseContext lrc = (LockResponseContext) ec; 118 if (lrc.isLockAward()) { 119 clientLockManager.awardLock(sessionProvider.getSessionID(), lrc.getLockID(), lrc.getThreadID(), lrc 120 .getLockLevel()); 121 } 122 } 123 } 125 } 126 127 public LockManagerImpl restartServer() { 128 int policy = this.serverLockManager.getLockPolicy(); 129 this.serverLockManager = new LockManagerImpl(new NullChannelManager()); 130 if (!clientLockManager.isStarting()) clientLockManager.starting(); 131 for (Iterator i = clientLockManager.addAllHeldLocksTo(new HashSet ()).iterator(); i.hasNext();) { 132 LockRequest request = (LockRequest) i.next(); 133 serverLockManager 134 .reestablishLock(request.lockID(), channelID, request.threadID(), request.lockLevel(), NULL_SINK); 135 } 136 137 for (Iterator i = clientLockManager.addAllWaitersTo(new HashSet ()).iterator(); i.hasNext();) { 138 WaitLockRequest request = (WaitLockRequest) i.next(); 139 serverLockManager.reestablishWait(request.lockID(), channelID, request.threadID(), request.lockLevel(), request 140 .getWaitInvocation(), sink); 141 } 142 143 for (Iterator i = clientLockManager.addAllPendingLockRequestsTo(new HashSet ()).iterator(); i.hasNext();) { 144 LockRequest request = (LockRequest) i.next(); 145 serverLockManager.requestLock(request.lockID(), channelID, request.threadID(), request.lockLevel(), sink); 146 } 147 148 if (policy == LockManagerImpl.ALTRUISTIC_LOCK_POLICY) { 149 this.serverLockManager.setLockPolicy(policy); 150 } 151 this.serverLockManager.start(); 152 return this.serverLockManager; 153 } 154 155 public void notify(LockID lockID1, ThreadID tx2, boolean all) { 156 NotifiedWaiters waiters = new NotifiedWaiters(); 157 serverLockManager.notify(lockID1, channelID, tx2, all, waiters); 158 Set s = waiters.getNotifiedFor(channelID); 159 for (Iterator i = s.iterator(); i.hasNext();) { 160 LockContext lc = (LockContext) i.next(); 161 clientLockManager.notified(lc.getLockID(), lc.getThreadID()); 162 } 163 } 164 165 public void stop() { 166 stop = true; 167 eventNotifier.interrupt(); 168 } 169 170 public void queryLock(LockID lockID, ThreadID threadID) { 171 serverLockManager.queryLock(lockID, channelID, threadID, sink); 172 } 173 174 public void tryRequestLock(LockID lockID, ThreadID threadID, int lockType) { 175 serverLockManager.tryRequestLock(lockID, channelID, threadID, lockType, sink); 176 } 177 178 public void interrruptWait(LockID lockID, ThreadID threadID) { 179 serverLockManager.interrupt(lockID, channelID, threadID); 180 181 } 182 } 183 | Popular Tags |