1 21 22 package com.rift.coad.daemon.messageservice; 24 25 import java.util.Date ; 27 import java.util.List ; 28 import java.util.ArrayList ; 29 import java.util.Map ; 30 import java.util.HashMap ; 31 import java.util.Vector ; 32 import java.util.concurrent.ConcurrentHashMap ; 33 import javax.transaction.xa.XAException ; 34 import javax.transaction.xa.XAResource ; 35 import javax.transaction.xa.Xid ; 36 37 import org.apache.log4j.Logger; 39 40 import org.hibernate.*; 42 import org.hibernate.cfg.*; 43 44 import com.rift.coad.util.transaction.TransactionManager; 46 import com.rift.coad.util.lock.LockRef; 47 import com.rift.coad.util.lock.ObjectLockFactory; 48 import com.rift.coad.daemon.messageservice.db.*; 49 import com.rift.coad.hibernate.util.HibernateUtil; 50 51 52 57 public class MessageQueueManager implements XAResource { 58 59 62 public class Changes { 63 private Xid transactionId = null; 65 private List queues = new ArrayList (); 66 private List locks = new ArrayList (); 67 68 69 74 public Changes(Xid transactionId) { 75 this.transactionId = transactionId; 76 } 77 78 79 84 public void add(MessageQueue messageQueue, LockRef lockRef) throws 85 MessageServiceException { 86 try { 87 lockRef.setLockName(transactionId); 88 locks.add(lockRef); 89 queues.add(messageQueue); 90 } catch (Exception ex) { 91 log.error("Failed to add the " + 92 "change entries : " + ex.getMessage(),ex); 93 throw new MessageServiceException("Failed to add the " + 94 "change entries + " + ex.getMessage(),ex); 95 } 96 } 97 98 99 104 public List getQueues() { 105 return queues; 106 } 107 108 109 114 public List getLocks() { 115 return locks; 116 } 117 } 118 119 public final static String UNSORTED = "UNSORTED"; 121 public final static String DEAD_LETTER = "DEAD_LETTER"; 122 123 private static MessageQueueManager singleton = null; 125 126 protected Logger log = 128 Logger.getLogger(MessageQueueManager.class.getName()); 129 130 131 private ThreadLocal currentTransaction = new ThreadLocal (); 133 private Map keyLockMap = new HashMap (); 134 private Map messageQueues = new ConcurrentHashMap (); 135 private Map transactionChanges = new ConcurrentHashMap (); 136 private Vector listIndex = new Vector (); 137 private int pos = 0; 138 139 142 private MessageQueueManager() { 143 } 144 145 146 151 public static synchronized MessageQueueManager getInstance() { 152 if (singleton == null) { 153 singleton = new MessageQueueManager(); 154 } 155 return singleton; 156 } 157 158 159 166 public MessageQueue getQueue(String name) throws MessageServiceException { 167 LockRef lockRef = null; 168 try { 169 lockRef = getLock(name); 170 if (messageQueues.containsKey(name)) { 171 MessageQueue messageQueue = 172 (MessageQueue)messageQueues.get(name); 173 lockRef.release(); 174 return messageQueue; 175 } 176 Session session = HibernateUtil. 178 getInstance(MessageServiceImpl.class).getSession(); 179 List list = session.createQuery("FROM MessageQueue AS queue " + 180 "WHERE queue.messageQueueName = ?").setString(0,name).list(); 181 MessageQueue queue = new MessageQueue(name); 182 if (list.size() == 1) { 183 com.rift.coad.daemon.messageservice.db.MessageQueue dbQueue = 184 (com.rift.coad.daemon.messageservice.db.MessageQueue) 185 list.get(0); 186 if ((dbQueue.getNamed() != null) && 187 (dbQueue.getNamed() == 1)) { 188 log.error("This is a named queue [" + name + 189 "] and cannot be loaded into memory."); 190 throw new MessageServiceException 191 ("This is a named queue [" + name + 192 "] and cannot be loaded into memory."); 193 } 194 messageQueues.put(name,queue); 195 addQueueToIndex(queue); 196 return queue; 197 } 198 TransactionManager.getInstance().bindResource(this,false); 199 com.rift.coad.daemon.messageservice.db.MessageQueue dbQueue = new 200 com.rift.coad.daemon.messageservice.db.MessageQueue(name); 201 session.persist(dbQueue); 202 Changes changes = (Changes)currentTransaction.get(); 203 changes.add(queue,lockRef); 204 lockRef = null; 205 return queue; 206 } catch (MessageServiceException ex) { 207 throw ex; 208 } catch (Exception ex) { 209 log.error("Failed to retrieve th message queue [" +name + "] : " + 210 ex.getMessage(),ex); 211 throw new MessageServiceException 212 ("Failed to retrieve th message queue [" +name + "] : " + 213 ex.getMessage(),ex); 214 } finally { 215 if (lockRef != null) { 216 try { 217 lockRef.release(); 218 } catch (Exception ex2) { 219 log.error("Failed to unlock the queue [" + name + "] : " + 220 ex2.getMessage(),ex2); 221 } 222 } 223 } 224 } 225 226 227 234 public synchronized MessageProcessInfo getNextMessage(Date nextRunTime) 235 throws MessageServiceException { 236 Vector index = cloneIndex(); 237 int currentPos = pos; 238 Date currentDate = nextRunTime; 239 MessageManager result = null; 240 while (index.size() > 0) { 241 currentPos++; 242 if (currentPos >= index.size()) { 243 currentPos = 0; 244 } 245 MessageQueue messageQueue = (MessageQueue)index.get(currentPos); 246 Date nextDate = new Date (); 247 result = messageQueue.popFrontMessage(nextDate); 248 if (result != null) { 249 MessageProcessInfo messageProcessInfo = new 250 MessageProcessInfo(messageQueue,result); 251 return messageProcessInfo; 252 } 253 if ((currentDate == nextRunTime) || 254 (currentDate.getTime() > nextDate.getTime())) { 255 currentDate = nextDate; 256 } 257 if (currentPos == pos) { 258 break; 259 } 260 } 261 nextRunTime.setTime(currentDate.getTime()); 263 264 pos = currentPos; 266 267 return null; 269 } 270 271 272 279 public void commit(Xid xid, boolean b) throws XAException { 280 try { 281 Changes changes = (Changes)transactionChanges.get(xid); 282 transactionChanges.remove(xid); 283 List queues = changes.getQueues(); 284 List locks = changes.getLocks(); 285 for (int index = 0; index < queues.size(); index++) { 286 MessageQueue queue = (MessageQueue)queues.get(index); 287 messageQueues.put(queue.getName(),queue); 288 addQueueToIndex(queue); 289 } 290 for (int index = 0; index < locks.size(); index++) { 291 LockRef lockRef = (LockRef)locks.get(index); 292 lockRef.release(); 293 } 294 } catch (Exception ex) { 295 log.error("Failed to commit the changes : " + 296 ex.getMessage(),ex); 297 throw new XAException ("Failed to commit the changes : " + 298 ex.getMessage()); 299 } 300 } 301 302 303 310 public void end(Xid xid, int i) throws XAException { 311 } 312 313 314 320 public void forget(Xid xid) throws XAException { 321 try { 322 Changes changes = (Changes)transactionChanges.get(xid); 323 transactionChanges.remove(xid); 324 List locks = changes.getLocks(); 325 for (int index = 0; index < locks.size(); index++) { 326 LockRef lockRef = (LockRef)locks.get(index); 327 lockRef.release(); 328 } 329 } catch (Exception ex) { 330 log.error("Failed to forget the changes : " + 331 ex.getMessage(),ex); 332 throw new XAException ("Failed to forget the changes : " + 333 ex.getMessage()); 334 } 335 } 336 337 338 344 public int getTransactionTimeout() throws XAException { 345 return -1; 346 } 347 348 349 357 public boolean isSameRM(XAResource xAResource) throws XAException { 358 return this == xAResource; 359 } 360 361 368 public int prepare(Xid xid) throws XAException { 369 return XAResource.XA_OK; 370 } 371 372 373 381 public Xid [] recover(int i) throws XAException { 382 return null; 383 } 384 385 386 392 public void rollback(Xid xid) throws XAException { 393 try { 394 Changes changes = (Changes)transactionChanges.get(xid); 395 transactionChanges.remove(xid); 396 List locks = changes.getLocks(); 397 for (int index = 0; index < locks.size(); index++) { 398 LockRef lockRef = (LockRef)locks.get(index); 399 lockRef.release(); 400 } 401 } catch (Exception ex) { 402 log.error("Failed to rollback the changes : " + 403 ex.getMessage(),ex); 404 throw new XAException ("Failed to rollback the changes : " + 405 ex.getMessage()); 406 } 407 } 408 409 410 417 public boolean setTransactionTimeout(int i) throws XAException { 418 return true; 419 } 420 421 422 429 public void start(Xid xid, int i) throws XAException { 430 if (transactionChanges.containsKey(xid)) { 431 currentTransaction.set(transactionChanges.get(xid)); 432 } else { 433 Changes changes = new Changes(xid); 434 transactionChanges.put(xid,changes); 435 currentTransaction.set(changes); 436 } 437 } 438 439 440 447 private LockRef getLock(String name) throws MessageServiceException { 448 try { 449 Object key = null; 450 synchronized(keyLockMap) { 451 if (keyLockMap.containsKey(name)) { 452 key = keyLockMap.get(name); 453 } else { 454 key = new String (name); 455 keyLockMap.put(name,key); 456 } 457 } 458 LockRef lockRef = 459 ObjectLockFactory.getInstance().acquireWriteLock(key); 460 Changes changes = (Changes)currentTransaction.get(); 461 462 return lockRef; 463 } catch (Exception ex) { 464 log.error("Failed to retrieve a lock on the message queue : " + 465 ex.getMessage(),ex); 466 throw new MessageServiceException 467 ("Failed to retrieve a lock on the message queue : " + 468 ex.getMessage(),ex); 469 } 470 } 471 472 473 476 private void addQueueToIndex(MessageQueue messageQueue) { 477 synchronized(listIndex) { 478 listIndex.add(messageQueue); 479 } 480 } 481 482 483 488 private Vector cloneIndex() { 489 synchronized(listIndex) { 490 return (Vector )listIndex.clone(); 491 } 492 } 493 } 494 | Popular Tags |