1 10 11 package org.mule.util.xa; 12 13 import java.util.ArrayList ; 14 import java.util.Collection ; 15 import java.util.Collections ; 16 import java.util.Iterator ; 17 18 import javax.transaction.Status ; 19 20 import org.apache.commons.logging.Log; 21 import org.mule.config.i18n.Message; 22 import org.mule.config.i18n.Messages; 23 24 29 public abstract class AbstractResourceManager 30 { 31 32 35 public static final int SHUTDOWN_MODE_NORMAL = 0; 36 37 40 public static final int SHUTDOWN_MODE_ROLLBACK = 1; 41 42 45 public static final int SHUTDOWN_MODE_KILL = 2; 46 47 protected static final int OPERATION_MODE_STOPPED = 0; 48 protected static final int OPERATION_MODE_STOPPING = 1; 49 protected static final int OPERATION_MODE_STARTED = 2; 50 protected static final int OPERATION_MODE_STARTING = 3; 51 protected static final int OPERATION_MODE_RECOVERING = 4; 52 53 protected static final int DEFAULT_TIMEOUT_MSECS = 5000; 54 protected static final int DEFAULT_COMMIT_TIMEOUT_FACTOR = 2; 55 56 protected Collection globalTransactions = Collections.synchronizedCollection(new ArrayList ()); 57 protected int operationMode = OPERATION_MODE_STOPPED; 58 protected long defaultTimeout = DEFAULT_TIMEOUT_MSECS; 59 protected Log logger = getLogger(); 60 protected boolean dirty = false; 61 62 protected abstract Log getLogger(); 63 64 public synchronized void start() throws ResourceManagerSystemException 65 { 66 logger.info("Starting ResourceManager"); 67 operationMode = OPERATION_MODE_STARTING; 68 doStart(); 70 recover(); 71 operationMode = OPERATION_MODE_STARTED; 73 if (dirty) 74 { 75 logger 76 .warn("Started ResourceManager, but in dirty mode only (Recovery of pending transactions failed)"); 77 } 78 else 79 { 80 logger.info("Started ResourceManager"); 81 } 82 } 83 84 protected void doStart() throws ResourceManagerSystemException 85 { 86 } 88 89 protected void recover() throws ResourceManagerSystemException 90 { 91 } 93 94 public synchronized void stop() throws ResourceManagerSystemException 95 { 96 stop(SHUTDOWN_MODE_NORMAL); 97 } 98 99 public synchronized boolean stop(int mode) throws ResourceManagerSystemException 100 { 101 return stop(mode, getDefaultTransactionTimeout() * DEFAULT_COMMIT_TIMEOUT_FACTOR); 102 } 103 104 public synchronized boolean stop(int mode, long timeOut) throws ResourceManagerSystemException 105 { 106 logger.info("Stopping ResourceManager"); 107 operationMode = OPERATION_MODE_STOPPING; 108 boolean success = shutdown(mode, timeOut); 111 if (success) 114 { 115 operationMode = OPERATION_MODE_STOPPED; 116 logger.info("Stopped ResourceManager"); 117 } 118 else 119 { 120 logger.warn("Failed to stop ResourceManager"); 121 } 122 123 return success; 124 } 125 126 protected boolean shutdown(int mode, long timeoutMSecs) 127 { 128 switch (mode) 129 { 130 case SHUTDOWN_MODE_NORMAL : 131 return waitForAllTxToStop(timeoutMSecs); 132 case SHUTDOWN_MODE_ROLLBACK : 133 throw new UnsupportedOperationException (); 134 case SHUTDOWN_MODE_KILL : 136 return true; 137 default : 138 return false; 139 } 140 } 141 142 145 public long getDefaultTransactionTimeout() 146 { 147 return defaultTimeout; 148 } 149 150 155 public void setDefaultTransactionTimeout(long timeout) 156 { 157 defaultTimeout = timeout; 158 } 159 160 175 public AbstractTransactionContext startTransaction(Object session) throws ResourceManagerException 176 { 177 return createTransactionContext(session); 178 } 179 180 public void beginTransaction(AbstractTransactionContext context) throws ResourceManagerException 181 { 182 assureStarted(); 184 185 synchronized (context) 186 { 187 if (logger.isDebugEnabled()) 188 { 189 logger.debug("Beginning transaction " + context); 190 } 191 doBegin(context); 192 context.status = Status.STATUS_ACTIVE; 193 if (logger.isDebugEnabled()) 194 { 195 logger.debug("Began transaction " + context); 196 } 197 } 198 globalTransactions.add(context); 199 } 200 201 public int prepareTransaction(AbstractTransactionContext context) throws ResourceManagerException 202 { 203 assureReady(); 204 synchronized (context) 205 { 206 if (logger.isDebugEnabled()) 207 { 208 logger.debug("Preparing transaction " + context); 209 } 210 context.status = Status.STATUS_PREPARING; 211 int status = doPrepare(context); 212 context.status = Status.STATUS_PREPARED; 213 if (logger.isDebugEnabled()) 214 { 215 logger.debug("Prepared transaction " + context); 216 } 217 return status; 218 } 219 } 220 221 public void rollbackTransaction(AbstractTransactionContext context) throws ResourceManagerException 222 { 223 assureReady(); 224 synchronized (context) 225 { 226 if (logger.isDebugEnabled()) 227 { 228 logger.debug("Rolling back transaction " + context); 229 } 230 try 231 { 232 context.status = Status.STATUS_ROLLING_BACK; 233 doRollback(context); 234 context.status = Status.STATUS_ROLLEDBACK; 235 } 236 catch (Error e) 237 { 238 setDirty(context, e); 239 throw e; 240 } 241 catch (RuntimeException e) 242 { 243 setDirty(context, e); 244 throw e; 245 } 246 catch (ResourceManagerSystemException e) 247 { 248 setDirty(context, e); 249 throw e; 250 } 251 finally 252 { 253 globalTransactions.remove(context); 254 context.finalCleanUp(); 255 context.notifyFinish(); 257 } 258 if (logger.isDebugEnabled()) 259 { 260 logger.debug("Rolled back transaction " + context); 261 } 262 } 263 } 264 265 public void setTransactionRollbackOnly(AbstractTransactionContext context) 266 throws ResourceManagerException 267 { 268 context.status = Status.STATUS_MARKED_ROLLBACK; 269 } 270 271 public void commitTransaction(AbstractTransactionContext context) throws ResourceManagerException 272 { 273 assureReady(); 274 if (context.status == Status.STATUS_MARKED_ROLLBACK) 275 { 276 throw new ResourceManagerException(new Message(Messages.TX_MARKED_FOR_ROLLBACK)); 277 } 278 synchronized (context) 279 { 280 if (logger.isDebugEnabled()) 281 { 282 logger.debug("Committing transaction " + context); 283 } 284 try 285 { 286 context.status = Status.STATUS_COMMITTING; 287 doCommit(context); 288 context.status = Status.STATUS_COMMITTED; 289 } 290 catch (Error e) 291 { 292 setDirty(context, e); 293 throw e; 294 } 295 catch (RuntimeException e) 296 { 297 setDirty(context, e); 298 throw e; 299 } 300 catch (ResourceManagerSystemException e) 301 { 302 setDirty(context, e); 303 throw e; 304 } 305 catch (ResourceManagerException e) 306 { 307 logger.warn("Could not commit tx " + context + ", rolling back instead", e); 308 doRollback(context); 309 } 310 finally 311 { 312 globalTransactions.remove(context); 313 context.finalCleanUp(); 314 context.notifyFinish(); 316 } 317 if (logger.isDebugEnabled()) 318 { 319 logger.debug("Committed transaction " + context); 320 } 321 } 322 } 323 324 protected abstract AbstractTransactionContext createTransactionContext(Object session); 325 326 protected abstract void doBegin(AbstractTransactionContext context); 327 328 protected abstract int doPrepare(AbstractTransactionContext context); 329 330 protected abstract void doCommit(AbstractTransactionContext context) throws ResourceManagerException; 331 332 protected abstract void doRollback(AbstractTransactionContext context) throws ResourceManagerException; 333 334 338 protected boolean waitForAllTxToStop(long timeoutMSecs) 339 { 340 long startTime = System.currentTimeMillis(); 341 342 350 Collection transactionsToStop; 351 synchronized (globalTransactions) 352 { 353 transactionsToStop = new ArrayList (globalTransactions); 354 } 355 for (Iterator it = transactionsToStop.iterator(); it.hasNext();) 356 { 357 long remainingTimeout = startTime - System.currentTimeMillis() + timeoutMSecs; 358 359 if (remainingTimeout <= 0) 360 { 361 return false; 362 } 363 364 AbstractTransactionContext context = (AbstractTransactionContext)it.next(); 365 synchronized (context) 366 { 367 if (!context.finished) 368 { 369 logger.info("Waiting for tx " + context + " to finish for " + remainingTimeout 370 + " milli seconds"); 371 } 372 while (!context.finished && remainingTimeout > 0) 373 { 374 try 375 { 376 context.wait(remainingTimeout); 377 } 378 catch (InterruptedException e) 379 { 380 return false; 381 } 382 remainingTimeout = startTime - System.currentTimeMillis() + timeoutMSecs; 383 } 384 if (context.finished) 385 { 386 logger.info("Tx " + context + " finished"); 387 } 388 else 389 { 390 logger.warn("Tx " + context + " failed to finish in given time"); 391 } 392 } 393 } 394 395 return (globalTransactions.size() == 0); 396 } 397 398 405 protected void setDirty(AbstractTransactionContext context, Throwable t) 406 { 407 logger.error("Fatal error during critical commit/rollback of transaction " + context 408 + ", setting resource manager to dirty.", t); 409 dirty = true; 410 } 411 412 417 protected void assureStarted() throws ResourceManagerSystemException 418 { 419 if (operationMode != OPERATION_MODE_STARTED) 420 { 421 throw new ResourceManagerSystemException(new Message(Messages.RESOURCE_MANAGER_NOT_STARTED)); 422 } 423 if (dirty) 426 { 427 throw new ResourceManagerSystemException(new Message(Messages.RESOURCE_MANAGER_DIRTY)); 428 } 429 } 430 431 437 protected void assureReady() throws ResourceManagerSystemException 438 { 439 if (operationMode != OPERATION_MODE_STARTED && operationMode != OPERATION_MODE_STOPPING) 440 { 441 throw new ResourceManagerSystemException(new Message(Messages.RESOURCE_MANAGER_NOT_READY)); 442 } 443 if (dirty) 446 { 447 throw new ResourceManagerSystemException(new Message(Messages.RESOURCE_MANAGER_DIRTY)); 448 } 449 } 450 451 } 452 | Popular Tags |