1 17 package org.quartz.impl.jdbcjobstore; 18 19 import java.sql.Connection ; 20 import java.util.HashSet ; 21 22 import javax.naming.InitialContext ; 23 import javax.naming.NamingException ; 24 import javax.transaction.Synchronization ; 25 import javax.transaction.SystemException ; 26 import javax.transaction.Transaction ; 27 import javax.transaction.TransactionManager ; 28 29 import org.apache.commons.logging.Log; 30 import org.apache.commons.logging.LogFactory; 31 32 63 public class JTANonClusteredSemaphore implements Semaphore { 64 65 72 73 public static final String DEFAULT_TRANSACTION_MANANGER_LOCATION = "java:TransactionManager"; 74 75 ThreadLocal lockOwners = new ThreadLocal (); 76 77 HashSet locks = new HashSet (); 78 79 private final Log log = LogFactory.getLog(getClass()); 80 81 private String transactionManagerJNDIName = DEFAULT_TRANSACTION_MANANGER_LOCATION; 82 83 84 91 92 protected Log getLog() { 93 return log; 94 } 95 96 public void setTransactionManagerJNDIName(String transactionManagerJNDIName) { 97 this.transactionManagerJNDIName = transactionManagerJNDIName; 98 } 99 100 private HashSet getThreadLocks() { 101 HashSet threadLocks = (HashSet ) lockOwners.get(); 102 if (threadLocks == null) { 103 threadLocks = new HashSet (); 104 lockOwners.set(threadLocks); 105 } 106 return threadLocks; 107 } 108 109 115 public synchronized boolean obtainLock(Connection conn, String lockName) throws LockException { 116 117 lockName = lockName.intern(); 118 119 Log log = getLog(); 120 121 if(log.isDebugEnabled()) { 122 log.debug( 123 "Lock '" + lockName + "' is desired by: " 124 + Thread.currentThread().getName()); 125 } 126 127 if (!isLockOwner(conn, lockName)) { 128 if(log.isDebugEnabled()) { 129 log.debug( 130 "Lock '" + lockName + "' is being obtained: " 131 + Thread.currentThread().getName()); 132 } 133 134 while (locks.contains(lockName)) { 135 try { 136 this.wait(); 137 } catch (InterruptedException ie) { 138 if(log.isDebugEnabled()) { 139 log.debug( 140 "Lock '" + lockName + "' was not obtained by: " 141 + Thread.currentThread().getName()); 142 } 143 } 144 } 145 146 Transaction t = getTransaction(); 149 if (t != null) { 150 try { 151 t.registerSynchronization(new SemaphoreSynchronization(lockName)); 152 } catch (Exception e) { 153 throw new LockException("Failed to register semaphore with Transaction.", e); 154 } 155 } 156 157 if(log.isDebugEnabled()) { 158 log.debug( 159 "Lock '" + lockName + "' given to: " 160 + Thread.currentThread().getName()); 161 } 162 163 164 getThreadLocks().add(lockName); 165 locks.add(lockName); 166 } else if(log.isDebugEnabled()) { 167 log.debug( 168 "Lock '" + lockName + "' already owned by: " 169 + Thread.currentThread().getName() 170 + " -- but not owner!", 171 new Exception ("stack-trace of wrongful returner")); 172 } 173 174 return true; 175 } 176 177 184 protected Transaction getTransaction() throws LockException{ 185 InitialContext ic = null; 186 try { 187 ic = new InitialContext (); 188 TransactionManager tm = (TransactionManager )ic.lookup(transactionManagerJNDIName); 189 190 return tm.getTransaction(); 191 } catch (SystemException e) { 192 throw new LockException("Failed to get Transaction from TransactionManager", e); 193 } catch (NamingException e) { 194 throw new LockException("Failed to find TransactionManager in JNDI under name: " + transactionManagerJNDIName, e); 195 } finally { 196 if (ic != null) { 197 try { 198 ic.close(); 199 } catch (NamingException ignored) { 200 } 201 } 202 } 203 } 204 205 209 public synchronized void releaseLock(Connection conn, String lockName) throws LockException { 210 releaseLock(lockName, false); 211 } 212 213 225 protected synchronized void releaseLock( 226 String lockName, boolean fromSynchronization) throws LockException { 227 lockName = lockName.intern(); 228 229 if (isLockOwner(null, lockName)) { 230 231 if (fromSynchronization == false) { 232 Transaction t = getTransaction(); 233 if (t != null) { 234 if(getLog().isDebugEnabled()) { 235 getLog().debug( 236 "Lock '" + lockName + "' is in a JTA transaction. " + 237 "Return deferred by: " + Thread.currentThread().getName()); 238 } 239 240 return; 243 } 244 } 245 246 if(getLog().isDebugEnabled()) { 247 getLog().debug( 248 "Lock '" + lockName + "' returned by: " 249 + Thread.currentThread().getName()); 250 } 251 getThreadLocks().remove(lockName); 252 locks.remove(lockName); 253 this.notify(); 254 } else if (getLog().isDebugEnabled()) { 255 getLog().debug( 256 "Lock '" + lockName + "' attempt to return by: " 257 + Thread.currentThread().getName() 258 + " -- but not owner!", 259 new Exception ("stack-trace of wrongful returner")); 260 } 261 } 262 263 267 public synchronized boolean isLockOwner(Connection conn, String lockName) { 268 lockName = lockName.intern(); 269 270 return getThreadLocks().contains(lockName); 271 } 272 273 276 public boolean requiresConnection() { 277 return false; 278 } 279 280 285 private class SemaphoreSynchronization implements Synchronization { 286 private String lockName; 287 288 public SemaphoreSynchronization(String lockName) { 289 this.lockName = lockName; 290 } 291 292 public void beforeCompletion() { 293 } 295 296 public void afterCompletion(int status) { 297 try { 298 releaseLock(lockName, true); 299 } catch (LockException e) { 300 } 302 } 303 } 304 } 305 | Popular Tags |