1 2 3 package org.enhydra.shark.processlocking; 4 5 import java.util.*; 6 7 import com.lutris.appserver.server.sql.DBTransaction; 8 import com.lutris.appserver.server.sql.DatabaseManagerException; 9 import com.lutris.appserver.server.sql.LogicalDatabase; 10 import org.enhydra.dods.DODS; 11 import org.enhydra.shark.api.RootError; 12 import org.enhydra.shark.api.RootException; 13 import org.enhydra.shark.api.SharkTransaction; 14 import org.enhydra.shark.api.internal.processlocking.LockMaster; 15 import org.enhydra.shark.api.internal.working.CallbackUtilities; 16 import org.enhydra.shark.processlocking.data.LockEntryDO; 17 import org.enhydra.shark.processlocking.data.LockEntryDelete; 18 import org.enhydra.shark.processlocking.data.LockEntryQuery; 19 import org.enhydra.shark.utilities.dods.DODSUtilities; 20 21 32 public class DODSLockMaster implements LockMaster { 33 34 private static final String ENG_PARAM_NAME = "enginename"; 35 private static final String TOUT_PARM_NAME = "DODSLockMaster.Timeout"; 36 private static final String LWT_PARAM_NAME = "DODSLockMaster.LockWaitTime"; 37 private static final String DBG_PARAM_NAME = "DODSLockMaster.debug"; 38 private static final String LDB_PARAM_NAME = "DODSLockMaster.DatabaseName"; 39 40 private Long defaultTimeout; 41 private long lockWaitTime; 42 private CallbackUtilities callback; 43 private LogicalDatabase db; 44 private Map locks; 45 private String lockMasterName; 46 private boolean cleanStarted; 47 private boolean _debug_; 48 49 52 public DODSLockMaster() { 53 locks = new HashMap(); 54 defaultTimeout = null; 55 cleanStarted = false; 56 } 57 58 66 public void configure(CallbackUtilities cbImpl) throws RootException { 67 if (null == cbImpl) 68 throw new RootException("Cannot configure without call back impl."); 69 callback = cbImpl; 70 DODSUtilities.init(callback.getProperties()); 71 if (LockEntryDO 72 .getConfigurationAdministration() 73 .getCacheAdministration(0) 74 .getMaxCacheSize() > 0) { 75 callback.error("cache for lock entries isn't allowed!!!"); 76 throw new RootError("cache for lock entries isn't allowed!!!"); 77 } 78 lockMasterName = callback.getProperty(ENG_PARAM_NAME,"dodsLockMaster"); 79 defaultTimeout = new Long (callback.getProperty(TOUT_PARM_NAME, "-1")); 80 lockWaitTime = Long.parseLong(callback.getProperty(LWT_PARAM_NAME,"100")); 81 String dbName = callback 82 .getProperty(LDB_PARAM_NAME, DODS.getDatabaseManager().getDefaultDB()); 83 _debug_ = Boolean 84 .valueOf(callback.getProperty(DBG_PARAM_NAME, "false")) 85 .booleanValue(); 86 try { 87 db = DODS.getDatabaseManager().findLogicalDatabase(dbName); 88 } catch (DatabaseManagerException e) { 89 throw new RootException("Couldn't find logical database.", e); 90 } 91 if (!cleanStarted) { 92 _cleanAllLocks(); 93 cleanStarted = true; 94 } 95 callback.debug(new StringBuffer (lockMasterName) 96 .append(" startup, timeout is ") 97 .append(defaultTimeout) 98 .toString()); 99 } 100 101 115 public boolean lock(SharkTransaction t, 116 String processId, 117 Long timeout) throws RootException { 118 boolean ret = true; 119 if (null != processId) { 120 if (null == timeout) 121 timeout = defaultTimeout; 122 long limit = timeout.longValue(); 123 boolean checkTimeout = 0 < limit; 124 limit += System.currentTimeMillis(); 125 126 while (hasLock(processId)) { 127 ret = false; 128 try { 129 Thread.sleep(lockWaitTime); 130 } catch (Exception e) {} 131 if (checkTimeout && (System.currentTimeMillis() > limit)) { 132 RootException tme = new RootException 133 (new StringBuffer ("Timeout expired waiting on ") 134 .append(processId) 135 .toString()); 136 callback.error("DODSLockMaster", tme); 137 throw tme; 138 } 139 } 140 } 141 if (_debug_) System.err.println("LOCK:"+processId+":"+t); 142 return ret; 143 } 144 145 153 public boolean lock(SharkTransaction t, 154 String processId) throws RootException { 155 return lock(t, processId, null); 156 } 157 158 162 public void unlock(SharkTransaction t, 163 String processId) throws RootException { 164 if (null != processId) { 165 removeLock(processId); 166 if (_debug_) System.err.println("UNLOCK:"+processId+":"+t); 167 } 168 } 169 170 176 public synchronized void unlock(SharkTransaction t) throws RootException { 177 List processLocks = retrieveLocks(); 178 if (null == processLocks) { 179 throw new RootException("Transaction hasn't locked anything"); 180 } 181 DBTransaction dbt = createTransaction(); 182 try { 183 for (Iterator it = processLocks.iterator(); it.hasNext();) { 184 String processId = (String )it.next(); 185 LockEntryQuery qry = new LockEntryQuery(dbt); 186 qry.setQueryEngineName(lockMasterName); 187 qry.setQueryId(processId); 188 LockEntryDelete led = new LockEntryDelete(qry); 190 led.save(); 191 locks.remove(processId); 193 if (_debug_) System.err.println("gUNLOCK:"+processId+":"+t); 194 } 195 dbt.commit(); 196 } catch (Exception e) { 197 throw new RootException(e); 198 } finally { 199 dbt.release(); 200 } 201 } 202 203 210 public List getLocks(SharkTransaction t) throws RootException { 211 List transactionLocks = retrieveLocks(); 212 if (null == transactionLocks) 213 throw new RootException("Transaction hasn't locked anything"); 214 return transactionLocks; 215 } 216 217 218 private List retrieveLocks() { 219 List ret = new ArrayList(); 220 Set entries = locks.entrySet(); 221 Thread th = Thread.currentThread(); 222 for (Iterator it = entries.iterator(); it.hasNext();) { 223 Map.Entry me = (Map.Entry)it.next(); 224 if (th.equals(me.getValue())) { 225 ret.add(me.getKey()); 226 } 227 } 228 return ret; 229 } 230 231 232 private synchronized boolean hasLock(String processId) throws RootException { 233 Thread lockOwner = (Thread )locks.get(processId); 234 if (null == lockOwner) { 235 DBTransaction dbt = createTransaction(); 236 try { 237 LockEntryDO le = LockEntryDO.createVirgin(dbt); 238 le.setEngineName(lockMasterName); 239 le.setId(processId); 240 le.save(); 241 dbt.commit(); 242 } catch (Exception e) { 243 return true; 244 } finally { 245 dbt.release(); 246 } 247 locks.put(processId, Thread.currentThread()); 248 return false; 249 } else if (lockOwner.equals(Thread.currentThread())) { 250 return false; 251 } else { 252 return true; 253 } 254 255 } 256 257 258 private synchronized void removeLock(String processId) throws RootException { 259 Thread lockOwner = (Thread )locks.get(processId); 260 if (Thread.currentThread().equals(lockOwner)) { 261 DBTransaction dbt = createTransaction(); 262 try { 263 LockEntryQuery qry = new LockEntryQuery(dbt); 264 qry.setQueryEngineName(lockMasterName); 265 qry.setQueryId(processId); 266 qry.requireUniqueInstance(); 267 LockEntryDO row = qry.getNextDO(); 268 row.delete(); 269 dbt.commit(); 270 } catch (Exception e) { 271 throw new RootException(e); 272 } finally { 273 dbt.release(); 274 } 275 locks.remove(processId); 276 } else { 277 RootException tme = new RootException 278 (new StringBuffer ("Trying to unlock ") 279 .append(processId) 280 .append(" while it hasn't been locked ?!?") 281 .toString()); 282 callback.error("DODSLockMaster", tme); 283 throw tme; 284 } 285 } 286 287 288 private void _cleanAllLocks() throws RootException { 289 DBTransaction dbt = createTransaction(); 290 try { 291 LockEntryQuery leqry = new LockEntryQuery(dbt); 292 leqry.setQueryEngineName(lockMasterName); 293 new LockEntryDelete(leqry).save(); 294 dbt.commit(); 295 } catch (Exception e) { 296 throw new RootException(e); 297 } finally { 298 dbt.release(); 299 } 300 } 301 302 303 private DBTransaction createTransaction() throws RootException { 304 try { 305 return db.createTransaction(); 306 } catch (Throwable t) { 307 throw new RootException 308 (new StringBuffer ("Didn't create transaction, there are ") 309 .append(db.getActiveConnectionCount()) 310 .append(" active connections.") 311 .toString(), t); 312 } 313 } 314 } 315 316 317 | Popular Tags |