1 23 24 package org.apache.slide.store.txfile; 25 26 import org.apache.commons.transaction.file.FileResourceManager; 27 import org.apache.commons.transaction.file.ResourceManager; 28 import org.apache.commons.transaction.file.ResourceManagerException; 29 import org.apache.commons.transaction.util.xa.XidWrapper; 30 import org.apache.slide.common.*; 31 import org.apache.slide.macro.ConflictException; 32 33 import java.io.File ; 34 import java.util.Hashtable ; 35 36 import javax.transaction.Status ; 37 import javax.transaction.xa.XAException ; 38 import javax.transaction.xa.XAResource ; 39 import javax.transaction.xa.Xid ; 40 41 import org.apache.slide.util.logger.TxLogger; 42 import org.apache.slide.util.logger.Logger; 43 44 54 public abstract class AbstractTxFileStoreService extends AbstractServiceBase implements Status { 55 56 protected static final int DEBUG_LEVEL = Logger.DEBUG; 57 58 protected static final String STORE_DIR_PARAMETER = "rootpath"; 59 protected static final String WORK_DIR_PARAMETER = "workpath"; 60 protected static final String TIMEOUT_PARAMETER = "timeout"; 61 protected static final String URLENCODE_PATH_PARAMETER = "url-encode-path"; 62 protected static final String DEBUG_MODE_PARAMETER = "debug"; 63 64 protected FileResourceManager rm; 65 protected boolean started = false; 66 protected String storeDir; 67 protected String workDir; 68 69 protected ThreadLocal activeTransactionBranch = new ThreadLocal (); 71 72 public void setParameters(Hashtable parameters) 73 throws ServiceParameterErrorException, ServiceParameterMissingException { 74 75 storeDir = (String ) parameters.get(STORE_DIR_PARAMETER); 76 workDir = (String ) parameters.get(WORK_DIR_PARAMETER); 77 78 if (storeDir == null) { 79 throw new ServiceParameterMissingException(this, STORE_DIR_PARAMETER); 80 } 81 if (workDir == null) { 82 throw new ServiceParameterMissingException(this, WORK_DIR_PARAMETER); 83 } 84 85 new File (storeDir).mkdirs(); 86 new File (workDir).mkdirs(); 87 88 boolean debug = false; 89 String debugString = (String ) parameters.get(DEBUG_MODE_PARAMETER); 90 if (debugString != null) { 91 debug = "true".equals(debugString); 92 } 93 94 boolean urlEncodePath = false; 95 String urlEncodePathString = (String ) parameters.get(URLENCODE_PATH_PARAMETER); 96 if (urlEncodePathString != null) { 97 urlEncodePath = "true".equals(urlEncodePathString); 98 } 99 100 rm = 101 new FileResourceManager( 102 storeDir, 103 workDir, 104 urlEncodePath, 105 new TxLogger(getLogger(), FileResourceManager.class.getName()), 106 debug); 107 108 getLogger().log( 109 "File Store configured to " + storeDir + ", working directory " + workDir, 110 getLogChannel(), 111 Logger.INFO); 112 113 String timeoutString = (String ) parameters.get(TIMEOUT_PARAMETER); 114 if (timeoutString != null) { 115 try { 116 int timeout = Integer.parseInt(timeoutString); 117 rm.setDefaultTransactionTimeout(timeout * 1000); 118 getLogger().log("Set timeout to " + timeoutString, getLogChannel(), Logger.INFO); 119 } catch (NumberFormatException nfe) { 120 getLogger().log( 121 "Can not set timeout, '" + timeoutString + "' must be an integer!", 122 getLogChannel(), 123 Logger.WARNING); 124 } 125 } 126 127 } 128 129 public String toString() { 130 return "TxFileStore at " + storeDir + " working on " + workDir; 131 } 132 133 public void connect() throws ServiceConnectionFailedException { 134 try { 135 rm.start(); 136 started = true; 137 } catch (ResourceManagerException e) { 138 throw new ServiceConnectionFailedException(this, e); 139 } 140 } 141 142 public void disconnect() throws ServiceDisconnectionFailedException { 143 try { 144 if (!rm.stop(ResourceManager.SHUTDOWN_MODE_NORMAL)) { 145 throw new ServiceDisconnectionFailedException(this, "Shut down timed out"); 146 } 147 started = false; 148 } catch (ResourceManagerException e) { 149 throw new ServiceDisconnectionFailedException(this, e); 150 } 151 } 152 153 public boolean isConnected() throws ServiceAccessException { 154 return started; 155 } 156 157 public void reset() { 158 rm.reset(); 159 } 160 161 public int getTransactionTimeout() throws XAException { 162 try { 163 long msecs = rm.getTransactionTimeout(getActiveTxId()); 164 return Math.round(msecs / (float) 1000); 165 } catch (ResourceManagerException e) { 166 throw createXAException(e); 167 } 168 } 169 170 public boolean setTransactionTimeout(int seconds) throws XAException { 171 try { 172 rm.setTransactionTimeout(getActiveTxId(), seconds * 1000); 173 return true; 174 } catch (ResourceManagerException e) { 175 throw createXAException(e); 176 } 177 } 178 179 public boolean isSameRM(XAResource xares) throws XAException { 180 return (xares instanceof AbstractTxFileStoreService && ((AbstractTxFileStoreService) xares).rm.equals(this.rm)); 181 } 182 183 public synchronized Xid [] recover(int flag) throws XAException { 184 return null; 186 } 187 188 public synchronized void forget(Xid xid) throws XAException { 189 } 191 192 public synchronized int prepare(Xid xid) throws XAException { 193 Object txId = wrap(xid); 194 Thread currentThread = Thread.currentThread(); 195 getLogger().log( 196 "Thread " + currentThread + " prepares transaction branch " + txId, 197 getLogChannel(), 198 DEBUG_LEVEL); 199 try { 200 int status = rm.prepareTransaction(txId); 201 switch (status) { 202 case ResourceManager.PREPARE_SUCCESS_READONLY : 203 return XA_RDONLY; 204 case ResourceManager.PREPARE_SUCCESS : 205 return XA_OK; 206 default : 207 throw new XAException (XAException.XA_RBROLLBACK); 208 } 209 } catch (ResourceManagerException e) { 210 getLogger().log( 211 "Thread " + currentThread + " failed to prepare transaction branch " + txId, 212 e, 213 getLogChannel(), 214 Logger.CRITICAL); 215 throw createXAException(e); 216 } 217 } 218 219 public synchronized void rollback(Xid xid) throws XAException { 220 Object txId = wrap(xid); 221 Thread currentThread = Thread.currentThread(); 222 getLogger().log( 223 "Thread " + currentThread + " rolls back transaction branch " + txId, 224 getLogChannel(), 225 DEBUG_LEVEL); 226 227 try { 228 rm.rollbackTransaction(txId); 229 activeTransactionBranch.set(null); 230 } catch (ResourceManagerException e) { 231 getLogger().log( 232 "Thread " + currentThread + " failed to roll back transaction branch " + txId, 233 e, 234 getLogChannel(), 235 Logger.CRITICAL); 236 throw createXAException(e); 237 } 238 } 239 240 public synchronized void commit(Xid xid, boolean onePhase) throws XAException { 241 Object txId = wrap(xid); 242 Thread currentThread = Thread.currentThread(); 243 getLogger().log( 244 "Thread " + currentThread + " commits transaction branch " + txId, 245 getLogChannel(), 246 DEBUG_LEVEL); 247 248 try { 249 if (!onePhase && rm.getTransactionState(txId) != STATUS_PREPARED) { 250 throw new XAException (XAException.XAER_INVAL); 251 } 252 253 rm.commitTransaction(txId); 254 activeTransactionBranch.set(null); 255 } catch (ResourceManagerException e) { 256 getLogger().log( 257 "Thread " + currentThread + " failed to commit transaction branch " + txId, 258 e, 259 getLogChannel(), 260 Logger.CRITICAL); 261 throw createXAException(e); 262 } 263 } 264 265 public synchronized void end(Xid xid, int flags) throws XAException { 266 Object txId = wrap(xid); 267 Thread currentThread = Thread.currentThread(); 268 getLogger().log( 269 "Thread " 270 + currentThread 271 + (flags == TMSUSPEND ? " suspends" : flags == TMFAIL ? " fails" : " ends") 272 + " work on behalf of transaction branch " 273 + txId, 274 getLogChannel(), 275 DEBUG_LEVEL); 276 277 switch (flags) { 278 case TMSUSPEND : 279 activeTransactionBranch.set(null); 280 break; 281 case TMFAIL : 282 try { 283 rm.markTransactionForRollback(wrap(xid)); 284 } catch (ResourceManagerException e) { 285 throw createXAException(e); 286 } 287 break; 288 case TMSUCCESS : 289 break; 291 } 292 } 293 294 public synchronized void start(Xid xid, int flags) throws XAException { 295 Object txId = wrap(xid); 296 Thread currentThread = Thread.currentThread(); 297 getLogger().log( 298 "Thread " 299 + currentThread 300 + (flags == TMNOFLAGS ? " starts" : flags == TMJOIN ? " joins" : " resumes") 301 + " work on behalf of transaction branch " 302 + txId, 303 getLogChannel(), 304 DEBUG_LEVEL); 305 306 switch (flags) { 307 case TMNOFLAGS : 309 if (getActiveTxId() != null) { 310 throw new XAException (XAException.XAER_INVAL); 311 } 312 try { 313 rm.startTransaction(txId); 314 activeTransactionBranch.set(txId); 315 } catch (ResourceManagerException e) { 316 throw createXAException(e); 317 } 318 break; 319 case TMJOIN : 320 if (getActiveTxId() != null) { 321 throw new XAException (XAException.XAER_INVAL); 322 } 323 try { 324 if (rm.getTransactionState(txId) == STATUS_NO_TRANSACTION) { 325 throw new XAException (XAException.XAER_INVAL); 326 } 327 } catch (ResourceManagerException e) { 328 throw createXAException(e); 329 } 330 activeTransactionBranch.set(txId); 331 break; 332 case TMRESUME : 333 activeTransactionBranch.set(txId); 334 break; 335 } 336 } 337 338 public synchronized void throwInternalError(String cause) throws ServiceAccessException { 339 Object txId = getActiveTxId(); 340 341 getLogger().log( 342 "Thread " 343 + Thread.currentThread() 344 + " marked transaction branch " 345 + txId 346 + " for rollback. Cause: " 347 + cause, 348 getLogChannel(), 349 Logger.WARNING); 350 351 try { 352 rm.markTransactionForRollback(txId); 353 } catch (ResourceManagerException re) { 354 throw new ServiceAccessException(this, re); 355 } 356 357 throw new ServiceAccessException(this, cause); 358 359 } 360 361 public synchronized void throwInternalError(Throwable cause) throws ServiceAccessException { 362 Object txId = getActiveTxId(); 363 364 getLogger().log( 365 "Thread " 366 + Thread.currentThread() 367 + " marked transaction branch " 368 + txId 369 + " for rollback. Cause: " 370 + cause, 371 getLogChannel(), 372 Logger.WARNING); 373 374 try { 375 rm.markTransactionForRollback(txId); 376 } catch (ResourceManagerException re) { 377 throw new ServiceAccessException(this, re); 378 } 379 380 throw new ServiceAccessException(this, cause); 381 382 } 383 384 public synchronized void throwInternalError(Throwable cause, String uri) throws ServiceAccessException { 388 Object txId = getActiveTxId(); 389 390 if ((cause instanceof ResourceManagerException) 391 && ((ResourceManagerException) cause).getStatus() == ResourceManagerException.ERR_NO_LOCK) { 392 393 if (txId != null) { 396 try { 397 rm.markTransactionForRollback(txId); 398 } catch (ResourceManagerException re) { 399 throw new ServiceAccessException(this, re); 400 } 401 } 402 getLogger().log( 403 "DEADLOCK VICTIM: Thread " 404 + Thread.currentThread() 405 + " marked transaction branch " 406 + txId 407 + " for rollback", 408 getLogChannel(), 409 Logger.INFO); 410 411 throw new ServiceAccessException(this, new ConflictException(uri)); 412 413 } else { 414 415 getLogger().log( 416 "Could not process URI '" 417 + uri 418 + "'! Thread " 419 + Thread.currentThread() 420 + " marking transaction branch " 421 + txId 422 + " for rollback", 423 cause, 424 getLogChannel(), 425 Logger.WARNING); 426 427 if (txId != null) { 428 try { 429 rm.markTransactionForRollback(txId); 430 } catch (ResourceManagerException re) { 431 throw new ServiceAccessException(this, re); 432 } 433 } 434 435 throw new ServiceAccessException(this, cause); 436 437 } 438 } 439 440 protected Object getActiveTxId() { 441 Object txId = activeTransactionBranch.get(); 442 return txId; 443 } 444 445 protected XAException createXAException(ResourceManagerException e) { 446 if (e.getStatus() == ResourceManagerException.ERR_DUP_TX) { 447 return new XAException (XAException.XAER_DUPID); 448 } else if (e.getStatus() == ResourceManagerException.ERR_TXID_INVALID) { 449 return new XAException (XAException.XAER_NOTA); 450 } else { 451 return new XAException (e.toString()); 452 } 453 } 454 455 protected String wrap(Xid xid) { 456 String sxid = XidWrapper.wrap(xid).toString(); 457 sxid = sxid.replace('\'', '.').replace('"', '.').replace(':', '.').replace('/', '.').replace('\\', '.'); 460 return sxid; 461 } 462 463 abstract protected String getLogChannel(); 464 } 465 | Popular Tags |