1 21 22 package com.rift.coad.daemon.messageservice.named; 24 25 import java.util.ArrayList ; 27 import java.util.Date ; 28 import java.util.Map ; 29 import java.util.HashMap ; 30 import java.util.Iterator ; 31 import java.util.List ; 32 import java.util.Queue ; 33 import java.util.concurrent.ConcurrentLinkedQueue ; 34 import java.util.concurrent.ConcurrentHashMap ; 35 import javax.transaction.xa.XAException ; 36 import javax.transaction.xa.XAResource ; 37 import javax.transaction.xa.Xid ; 38 39 40 import org.hibernate.*; 42 import org.hibernate.cfg.*; 43 44 import org.apache.log4j.Logger; 46 47 import com.rift.coad.daemon.messageservice.Message; 49 import com.rift.coad.daemon.messageservice.MessageServiceException; 50 import com.rift.coad.daemon.messageservice.MessageManager; 51 import com.rift.coad.daemon.messageservice.MessageServiceImpl; 52 import com.rift.coad.daemon.messageservice.ProcessMonitor; 53 import com.rift.coad.daemon.messageservice.message.MessageManagerFactory; 54 import com.rift.coad.daemon.messageservice.db.*; 55 import com.rift.coad.hibernate.util.HibernateUtil; 56 import com.rift.coad.util.transaction.TransactionManager; 57 import com.rift.coad.util.transaction.UserTransactionWrapper; 58 59 65 public class NamedMemoryQueue implements XAResource { 66 67 70 public class Changes { 71 72 private List removeList = new ArrayList (); 74 private List addList = new ArrayList (); 75 76 77 80 public Changes() { 81 } 82 83 84 87 public void addRemoveMessage(MessageManager messageManager) { 88 removeList.add(messageManager); 89 } 90 91 92 97 public List getRemoveList() { 98 return removeList; 99 } 100 101 104 public void addNewMessage(MessageManager messageManager) { 105 addList.add(messageManager); 106 } 107 108 114 public List getAddList() { 115 return addList; 116 } 117 } 118 119 120 private static Map singletons = new ConcurrentHashMap (); 122 private static Map keyIndex = new HashMap (); 123 124 protected static Logger log = 125 Logger.getLogger(NamedMemoryQueue.class.getName()); 126 127 private String queueName = null; 129 private Queue queue = new ConcurrentLinkedQueue (); 130 private UserTransactionWrapper utw = null; 131 private Map changes = new ConcurrentHashMap (); 132 private ThreadLocal currentTransaction = new ThreadLocal (); 133 134 140 public NamedMemoryQueue(String queueName) throws MessageServiceException { 141 this.queueName = queueName; 142 } 143 144 145 154 public static NamedMemoryQueue getInstance(String queueName) throws 155 MessageServiceException { 156 Object syncObj = getSyncObject(queueName); 157 synchronized(syncObj) { 158 NamedMemoryQueue singleton = 159 (NamedMemoryQueue)singletons.get(queueName); 160 if (singleton == null) { 161 singleton = new NamedMemoryQueue(queueName); 162 singletons.put(queueName,singleton); 163 } 164 return singleton; 165 } 166 } 167 168 169 175 public static List listQueues() throws MessageServiceException { 176 return new ArrayList (singletons.keySet()); 177 } 178 179 180 187 private static synchronized Object getSyncObject(String queueName) { 188 Object syncObj = keyIndex.get(queueName); 189 if (syncObj == null) { 190 syncObj = new String (queueName); 191 keyIndex.put(queueName,syncObj); 192 } 193 return syncObj; 194 } 195 196 197 202 public void addMessage(MessageManager messageManager) throws 203 MessageServiceException { 204 try { 205 TransactionManager.getInstance().bindResource(this,false); 206 ((Changes)currentTransaction.get()).addNewMessage( 207 messageManager); 208 } catch (Exception ex) { 209 log.error("Failed to add a message : " + 210 ex.getMessage(),ex); 211 throw new MessageServiceException("Failed to add a message : " + 212 ex.getMessage(),ex); 213 } 214 } 215 216 217 220 public synchronized Message poll(long delay) throws 221 MessageServiceException { 222 try { 223 Date startTime = new Date (); 224 while (queue.size() == 0) { 225 Date currentTime = new Date (); 226 long difference = (startTime.getTime() + delay) - 227 currentTime.getTime(); 228 if (difference <= 0) { 229 return null; 230 } 231 wait(difference); 232 } 233 TransactionManager.getInstance().bindResource(this,false); 234 MessageManager messageManager = (MessageManager)queue.poll(); 235 Message message = messageManager.getMessage(); 236 messageManager.remove(); 237 ((Changes)currentTransaction.get()).addRemoveMessage( 238 messageManager); 239 log.debug("Return the message : " + message.getMessageId()); 240 return message; 241 } catch (MessageServiceException ex) { 242 log.error("Failed to poll for a message : " + 243 ex.getMessage(),ex); 244 throw ex; 245 } catch (Exception ex) { 246 log.error("Failed to poll for a message : " + 247 ex.getMessage(),ex); 248 throw new MessageServiceException( 249 "Failed to poll for a message : " + 250 ex.getMessage(),ex); 251 } catch (Throwable ex) { 252 log.error("Caught an unexpected exception : " + 253 ex.getMessage(),ex); 254 throw new MessageServiceException( 255 "Caught an unexpected exception : " + 256 ex.getMessage(),ex); 257 } 258 } 259 260 261 267 public synchronized List getMessages() throws MessageServiceException { 268 try { 269 List list = new ArrayList (); 270 for (Iterator iter = queue.iterator(); iter.hasNext();) { 271 MessageManager messageManager = (MessageManager)iter.next(); 272 list.add(messageManager.getMessage()); 273 } 274 return list; 275 } catch (Exception ex) { 276 log.error("Failed to retrieve the list of messages : " + 277 ex.getMessage(),ex); 278 throw new MessageServiceException( 279 "Failed to retrieve the list of messages : " + 280 ex.getMessage(),ex); 281 } 282 } 283 284 285 291 public synchronized void purge() throws MessageServiceException { 292 try { 293 for (Iterator iter = queue.iterator(); iter.hasNext();) { 294 MessageManager messageManager = (MessageManager)iter.next(); 295 messageManager.remove(); 296 } 297 queue.clear(); 298 } catch (Exception ex) { 299 log.error("Failed to purge the queue : " + 300 ex.getMessage(),ex); 301 throw new MessageServiceException( 302 "Failed to purge the queue : " + 303 ex.getMessage(),ex); 304 } 305 } 306 307 308 315 public void commit(Xid xid, boolean b) throws XAException { 316 Changes changes = (Changes)this.changes.remove(xid); 317 for (Iterator iter = changes.getAddList().iterator(); iter.hasNext();) { 318 queue.add(iter.next()); 319 } 320 synchronized(this) { 321 notifyAll(); 322 } 323 } 324 325 326 333 public void end(Xid xid, int i) throws XAException { 334 } 335 336 337 343 public void forget(Xid xid) throws XAException { 344 changes.remove(xid); 345 } 346 347 348 354 public int getTransactionTimeout() throws XAException { 355 return -1; 356 } 357 358 359 367 public boolean isSameRM(XAResource xAResource) throws XAException { 368 return this == xAResource; 369 } 370 371 372 379 public int prepare(Xid xid) throws XAException { 380 return XAResource.XA_OK; 381 } 382 383 384 392 public Xid [] recover(int i) throws XAException { 393 return null; 394 } 395 396 397 403 public void rollback(Xid xid) throws XAException { 404 Changes changes = (Changes)this.changes.get(xid); 405 if (changes == null) { 406 return; 407 } 408 for (Iterator iter = changes.getRemoveList().iterator(); iter.hasNext();) { 409 queue.add(iter.next()); 410 } 411 synchronized (this) { 412 notifyAll(); 413 } 414 } 415 416 417 424 public boolean setTransactionTimeout(int i) throws XAException { 425 return true; 426 } 427 428 429 436 public void start(Xid xid, int i) throws XAException { 437 Changes changes = (Changes)this.changes.get(xid); 438 if (changes == null) { 439 changes = new Changes(); 440 this.changes.put(xid,changes); 441 } 442 currentTransaction.set(changes); 443 } 444 } 445 | Popular Tags |