1 10 11 package org.mule.util.queue; 12 13 import org.apache.commons.logging.Log; 14 import org.apache.commons.logging.LogFactory; 15 import org.mule.util.queue.QueuePersistenceStrategy.Holder; 16 import org.mule.util.xa.AbstractTransactionContext; 17 import org.mule.util.xa.AbstractXAResourceManager; 18 import org.mule.util.xa.ResourceManagerException; 19 import org.mule.util.xa.ResourceManagerSystemException; 20 21 import javax.transaction.xa.XAResource ; 22 23 import java.io.IOException ; 24 import java.util.ArrayList ; 25 import java.util.HashMap ; 26 import java.util.Iterator ; 27 import java.util.LinkedList ; 28 import java.util.List ; 29 import java.util.Map ; 30 31 41 public class TransactionalQueueManager extends AbstractXAResourceManager implements QueueManager 42 { 43 44 private static Log logger = LogFactory.getLog(TransactionalQueueManager.class); 45 46 private Map queues = new HashMap (); 47 48 private QueuePersistenceStrategy memoryPersistenceStrategy = new MemoryPersistenceStrategy(); 49 private QueuePersistenceStrategy persistenceStrategy; 50 51 private QueueConfiguration defaultQueueConfiguration = new QueueConfiguration(false); 52 53 public synchronized QueueSession getQueueSession() 54 { 55 return new TransactionalQueueSession(this, this); 56 } 57 58 public synchronized void setDefaultQueueConfiguration(QueueConfiguration config) 59 { 60 this.defaultQueueConfiguration = config; 61 } 62 63 public synchronized void setQueueConfiguration(String queueName, QueueConfiguration config) 64 { 65 getQueue(queueName).config = config; 66 } 67 68 protected synchronized QueueInfo getQueue(String name) 69 { 70 QueueInfo q = (QueueInfo)queues.get(name); 71 if (q == null) 72 { 73 q = new QueueInfo(); 74 q.name = name; 75 q.list = new LinkedList (); 76 q.config = defaultQueueConfiguration; 77 queues.put(name, q); 78 } 79 return q; 80 } 81 82 87 protected Log getLogger() 88 { 89 return logger; 90 } 91 92 public void close() 93 { 94 try 95 { 96 stop(SHUTDOWN_MODE_NORMAL); 97 } 98 catch (ResourceManagerException e) 99 { 100 logger.error("Error disposing manager", e); 101 } 102 } 103 104 protected void doStart() throws ResourceManagerSystemException 105 { 106 if (persistenceStrategy != null) 107 { 108 try 109 { 110 persistenceStrategy.open(); 111 } 112 catch (IOException e) 113 { 114 throw new ResourceManagerSystemException(e); 115 } 116 } 117 } 118 119 protected boolean shutdown(int mode, long timeoutMSecs) 120 { 121 try 122 { 123 if (persistenceStrategy != null) 124 { 125 persistenceStrategy.close(); 126 } 127 } 128 catch (IOException e) 129 { 130 logger.error("Error closing persistent store", e); 131 } 132 return super.shutdown(mode, timeoutMSecs); 133 } 134 135 protected void recover() throws ResourceManagerSystemException 136 { 137 if (persistenceStrategy != null) 138 { 139 try 140 { 141 List msgs = persistenceStrategy.restore(); 142 for (Iterator it = msgs.iterator(); it.hasNext();) 143 { 144 Holder h = (Holder)it.next(); 145 getQueue(h.getQueue()).putNow(h.getId()); 146 } 147 } 148 catch (Exception e) 149 { 150 throw new ResourceManagerSystemException(e); 151 } 152 } 153 } 154 155 160 protected AbstractTransactionContext createTransactionContext(Object session) 161 { 162 return new QueueTransactionContext(); 163 } 164 165 170 protected void doBegin(AbstractTransactionContext context) 171 { 172 } 174 175 180 protected int doPrepare(AbstractTransactionContext context) 181 { 182 return XAResource.XA_OK; 183 } 184 185 190 protected void doCommit(AbstractTransactionContext context) throws ResourceManagerException 191 { 192 QueueTransactionContext ctx = (QueueTransactionContext)context; 193 try 194 { 195 if (ctx.added != null) 196 { 197 for (Iterator it = ctx.added.entrySet().iterator(); it.hasNext();) 198 { 199 Map.Entry entry = (Map.Entry )it.next(); 200 QueueInfo queue = (QueueInfo)entry.getKey(); 201 List queueAdded = (List )entry.getValue(); 202 if (queueAdded != null && queueAdded.size() > 0) 203 { 204 for (Iterator itAdded = queueAdded.iterator(); itAdded.hasNext();) 205 { 206 Object object = itAdded.next(); 207 Object id = doStore(queue, object); 208 queue.putNow(id); 209 } 210 } 211 } 212 } 213 if (ctx.removed != null) 214 { 215 for (Iterator it = ctx.removed.entrySet().iterator(); it.hasNext();) 216 { 217 Map.Entry entry = (Map.Entry )it.next(); 218 QueueInfo queue = (QueueInfo)entry.getKey(); 219 List queueRemoved = (List )entry.getValue(); 220 if (queueRemoved != null && queueRemoved.size() > 0) 221 { 222 for (Iterator itRemoved = queueRemoved.iterator(); itRemoved.hasNext();) 223 { 224 Object id = itRemoved.next(); 225 doRemove(queue, id); 226 } 227 } 228 } 229 } 230 } 231 catch (Exception e) 232 { 233 throw new ResourceManagerException(e); 237 } 238 finally 239 { 240 ctx.added = null; 241 ctx.removed = null; 242 } 243 } 244 245 protected Object doStore(QueueInfo queue, Object object) throws IOException 246 { 247 QueuePersistenceStrategy ps = (queue.config.persistent) 248 ? persistenceStrategy : memoryPersistenceStrategy; 249 Object id = ps.store(queue.name, object); 250 return id; 251 } 252 253 protected void doRemove(QueueInfo queue, Object id) throws IOException 254 { 255 QueuePersistenceStrategy ps = (queue.config.persistent) 256 ? persistenceStrategy : memoryPersistenceStrategy; 257 ps.remove(queue.name, id); 258 } 259 260 protected Object doLoad(QueueInfo queue, Object id) throws IOException 261 { 262 QueuePersistenceStrategy ps = (queue.config.persistent) 263 ? persistenceStrategy : memoryPersistenceStrategy; 264 Object obj = ps.load(queue.name, id); 265 return obj; 266 } 267 268 273 protected void doRollback(AbstractTransactionContext context) throws ResourceManagerException 274 { 275 QueueTransactionContext ctx = (QueueTransactionContext)context; 276 if (ctx.removed != null) 277 { 278 for (Iterator it = ctx.removed.entrySet().iterator(); it.hasNext();) 279 { 280 Map.Entry entry = (Map.Entry )it.next(); 281 QueueInfo queue = (QueueInfo)entry.getKey(); 282 List queueRemoved = (List )entry.getValue(); 283 if (queueRemoved != null && queueRemoved.size() > 0) 284 { 285 for (Iterator itRemoved = queueRemoved.iterator(); itRemoved.hasNext();) 286 { 287 Object id = itRemoved.next(); 288 queue.putNow(id); 289 } 290 } 291 } 292 } 293 ctx.added = null; 294 ctx.removed = null; 295 } 296 297 protected class QueueTransactionContext extends AbstractTransactionContext 298 { 299 protected Map added; 300 protected Map removed; 301 302 public boolean offer(QueueInfo queue, Object item, long timeout) throws InterruptedException 303 { 304 readOnly = false; 305 if (added == null) 306 { 307 added = new HashMap (); 308 } 309 List queueAdded = (List )added.get(queue); 310 if (queueAdded == null) 311 { 312 queueAdded = new ArrayList (); 313 added.put(queue, queueAdded); 314 } 315 if (queue.offer(null, queueAdded.size(), Long.MAX_VALUE)) 317 { 318 queueAdded.add(item); 319 return true; 320 } 321 else 322 { 323 return false; 324 } 325 } 326 327 public Object poll(QueueInfo queue, long timeout) throws IOException , InterruptedException 328 { 329 readOnly = false; 330 if (added != null) 331 { 332 List queueAdded = (List )added.get(queue); 333 if (queueAdded != null) 334 { 335 return queueAdded.remove(queueAdded.size() - 1); 336 } 337 } 338 Object o = queue.poll(Long.MAX_VALUE); 339 if (o != null) 340 { 341 if (removed == null) 342 { 343 removed = new HashMap (); 344 } 345 List queueRemoved = (List )removed.get(queue); 346 if (queueRemoved == null) 347 { 348 queueRemoved = new ArrayList (); 349 removed.put(queue, queueRemoved); 350 } 351 queueRemoved.add(o); 352 o = doLoad(queue, o); 353 } 354 return o; 355 } 356 357 public Object peek(QueueInfo queue) throws IOException , InterruptedException 358 { 359 readOnly = false; 360 if (added != null) 361 { 362 List queueAdded = (List )added.get(queue); 363 if (queueAdded != null) 364 { 365 return queueAdded.get(queueAdded.size() - 1); 366 } 367 } 368 Object o = queue.peek(); 369 if (o != null) 370 { 371 o = doLoad(queue, o); 372 } 373 return o; 374 } 375 376 public int size(QueueInfo queue) 377 { 378 int sz = queue.list.size(); 379 if (added != null) 380 { 381 List queueAdded = (List )added.get(queue); 382 if (queueAdded != null) 383 { 384 sz += queueAdded.size(); 385 } 386 } 387 return sz; 388 } 389 390 } 391 392 395 public QueuePersistenceStrategy getPersistenceStrategy() 396 { 397 return persistenceStrategy; 398 } 399 400 403 public void setPersistenceStrategy(QueuePersistenceStrategy persistenceStrategy) 404 { 405 if (operationMode != OPERATION_MODE_STOPPED) 406 { 407 throw new IllegalStateException (); 408 } 409 this.persistenceStrategy = persistenceStrategy; 410 } 411 412 public QueuePersistenceStrategy getMemoryPersistenceStrategy() 413 { 414 return memoryPersistenceStrategy; 415 } 416 417 public void setMemoryPersistenceStrategy(QueuePersistenceStrategy memoryPersistenceStrategy) 418 { 419 if (operationMode != OPERATION_MODE_STOPPED) 420 { 421 throw new IllegalStateException (); 422 } 423 this.memoryPersistenceStrategy = memoryPersistenceStrategy; 424 } 425 } 426 | Popular Tags |