KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > rift > coad > daemon > messageservice > MessageQueue


1 /*
2  * MessageService: The message service daemon
3  * Copyright (C) 2006 Rift IT Contracting
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18  *
19  * MessageQueue.java
20  */

21
22 // package path
23
package com.rift.coad.daemon.messageservice;
24
25 // java imports
26
import java.lang.ThreadLocal JavaDoc;
27 import java.util.Date JavaDoc;
28 import java.util.Queue JavaDoc;
29 import java.util.PriorityQueue JavaDoc;
30 import java.util.HashMap JavaDoc;
31 import java.util.Map JavaDoc;
32 import java.util.ArrayList JavaDoc;
33 import java.util.concurrent.ConcurrentHashMap JavaDoc;
34 import javax.transaction.xa.XAException JavaDoc;
35 import javax.transaction.xa.XAResource JavaDoc;
36 import javax.transaction.xa.Xid JavaDoc;
37
38 // logging import
39
import org.apache.log4j.Logger;
40
41 // coadunation imports
42
import com.rift.coad.util.transaction.TransactionManager;
43 import com.rift.coad.util.lock.LockRef;
44 import com.rift.coad.util.lock.ObjectLockFactory;
45
46 /**
47  * The message queue object is responsible for queuing messages in memory.
48  *
49  * @author Brett Chaldecott
50  */

51 public class MessageQueue implements XAResource JavaDoc {
52     
53     /**
54      * This object tracks the changes made by a transaction.
55      */

56     public class TransactionChange {
57         
58         // private member variables
59
private ArrayList JavaDoc addList = new ArrayList JavaDoc();
60         private ArrayList JavaDoc updateList = new ArrayList JavaDoc();
61         private ArrayList JavaDoc removeList = new ArrayList JavaDoc();
62         
63         /**
64          * The constructor of the transaction change object.
65          */

66         public TransactionChange() {
67             
68         }
69         
70         
71         /**
72          * This method adds to the add list
73          *
74          * @param message The message to add to the add list.
75          */

76         public void add(MessageManager messageManager) {
77             addList.add(messageManager);
78         }
79         
80         
81         /**
82          * This method returns the list of added entries.
83          *
84          * @return The list of added messages.
85          */

86         public ArrayList JavaDoc getAddList() {
87             return addList;
88         }
89         
90         
91         /**
92          * This method add an entry to the update list.
93          *
94          * @param message The message to add.
95          */

96         public void update(MessageManager messageManager) {
97             updateList.add(messageManager);
98         }
99         
100         
101         /**
102          * This method returns an array list containing all the updated messages.
103          *
104          * @return The array list of all the updated messages.
105          */

106         public ArrayList JavaDoc getUpdateList() {
107             return updateList;
108         }
109         
110         
111         /**
112          * This method marks a message within a transaction as being removed
113          * by that transaction.
114          *
115          * @param message The message to remove.
116          */

117         public void remove(MessageManager messageManager) {
118             removeList.add(messageManager);
119         }
120         
121         
122         /**
123          * This method returns list of removed entries.
124          *
125          */

126         public ArrayList JavaDoc getRemoveList() {
127             return removeList;
128         }
129     }
130     
131     
132     /**
133      * This is the id index.
134      */

135     public class IDIndex {
136         private Map JavaDoc baseIndex = new HashMap JavaDoc();
137         private ThreadLocal JavaDoc threadIndex = new ThreadLocal JavaDoc();
138         private Map JavaDoc transactionIndex = new HashMap JavaDoc();
139         
140         
141         /**
142          * The constructor of the ID index.
143          */

144         public IDIndex() {
145             
146         }
147         
148         
149         /**
150          * This method is called to commit the specified transaction.
151          *
152          * @param xid The id of the transaction to commit.
153          * @exception MessageServiceException
154          */

155         public synchronized void commit(Xid JavaDoc xid) throws
156                 MessageServiceException {
157             try {
158                 TransactionChange changes = (TransactionChange)transactionChange.
159                         get(xid);
160                 // add entries
161
ArrayList JavaDoc addedEntries = changes.getAddList();
162                 for (int index = 0; index < addedEntries.size(); index++) {
163                     MessageManager messageManager =
164                             (MessageManager)addedEntries.get(index);
165                     baseIndex.put(messageManager.getID(),messageManager);
166                 }
167                 
168                 // remove entries
169
ArrayList JavaDoc removedEntries = changes.getRemoveList();
170                 for (int index = 0; index < removedEntries.size(); index++) {
171                     MessageManager messageManager =
172                             (MessageManager)removedEntries.get(index);
173                     baseIndex.remove(messageManager.getID());
174                 }
175                 transactionIndex.remove(xid);
176             } catch (Exception JavaDoc ex) {
177                 log.error("Failed to commit the transaction : " + ex.getMessage()
178                 ,ex);
179                 throw new MessageServiceException("Failed to commit the " +
180                         "transaction : " + ex.getMessage());
181             }
182         }
183         
184         
185         /**
186          * The transaction has been completed and must be forgotten.
187          *
188          * @param xid The id of the transaction to forget.
189          * @exception MessageServiceException
190          */

191         public synchronized void forget(Xid JavaDoc xid) throws MessageServiceException {
192             transactionIndex.remove(xid);
193         }
194         
195         
196         /**
197          * This method is called to roll back the specified transaction.
198          *
199          * @param xid The id of the transaction to roll back.
200          * @exception MessageServiceException
201          */

202         public synchronized void rollback(Xid JavaDoc xid) throws MessageServiceException {
203             transactionIndex.remove(xid);
204         }
205         
206         
207         /**
208          * This method is called to start a transaction on a resource manager.
209          *
210          * @param xid The id of the new transaction.
211          * @exception MessageServiceException
212          */

213         public synchronized void start(Xid JavaDoc xid) throws MessageServiceException {
214             Map JavaDoc transactionScopedIndex = null;
215             if (transactionIndex.containsKey(xid)) {
216                 transactionScopedIndex = (Map JavaDoc)transactionIndex.get(
217                         xid);
218             } else {
219                 transactionScopedIndex = new HashMap JavaDoc(baseIndex);
220                 transactionIndex.put(xid,
221                         transactionScopedIndex);
222             }
223             threadIndex.set(transactionScopedIndex);
224         }
225         
226         
227         /**
228          * This method adds a message to the message queue.
229          *
230          * @param message The message to add to the message queue.
231          * @exception MessageServiceException
232          */

233         public void addMessage(MessageManager message) throws
234                 MessageServiceException {
235             Map JavaDoc index = (Map JavaDoc)threadIndex.get();
236             index.put(message.getID(),message);
237         }
238         
239         
240         /**
241          * This method removes the specified message from the list.
242          *
243          * @param messageId The id of the message to remove.
244          * @exception MessageServiceException
245          */

246         public void removeMessage(String JavaDoc messageId) throws
247                 MessageServiceException {
248             try {
249                 Map JavaDoc index = (Map JavaDoc)threadIndex.get();
250                 if (index.containsKey(messageId)) {
251                     MessageManager message = (MessageManager)index.get(messageId);
252                     TransactionChange change = (TransactionChange)
253                             transactionChange.get(transactionId.get());
254                     change.remove(message);
255                     index.remove(messageId);
256                 } else {
257                     throw new MessageServiceException("The message [" + messageId
258                             + "] was not found to remove");
259                 }
260             } catch (MessageServiceException ex) {
261                 throw ex;
262             } catch (Exception JavaDoc ex) {
263                 log.error("Failed to remove the message : " +
264                         ex.getMessage(),ex);
265                 throw new MessageServiceException(
266                         "Failed to remove the message : " + ex.getMessage(),ex);
267             }
268         }
269         
270         
271         /**
272          * This method retrieves the specified message from the list.
273          *
274          * @param messageId The id of the message to retrieve.
275          * @exception MessageServiceException
276          */

277         public MessageManager getMessage(String JavaDoc messageId) throws
278                 MessageServiceException {
279             Map JavaDoc index = (Map JavaDoc)threadIndex.get();
280             if (index.containsKey(messageId)) {
281                 return (MessageManager)index.get(messageId);
282             } else {
283                 throw new MessageServiceException("The message [" + messageId
284                         + "] was not found.");
285             }
286         }
287         
288         
289     }
290     
291     
292     /**
293      * This class is responsible for managing the queue of entries.
294      */

295     public class QueueIndex {
296         // private member variables
297
private PriorityQueue JavaDoc baseQueue = new PriorityQueue JavaDoc();
298         private Map JavaDoc processingEntries = new HashMap JavaDoc();
299         
300         
301         /**
302          * The constructor of the
303          */

304         public QueueIndex() {
305             
306         }
307         
308         
309         /**
310          * This method is called to commit the specified transaction.
311          *
312          * @param xid The id of the transaction to commit.
313          * @exception MessageServiceException
314          */

315         public synchronized void commit(Xid JavaDoc xid) throws
316                 MessageServiceException {
317             try {
318                 TransactionChange changes = (TransactionChange)transactionChange.
319                         get(xid);
320                 // add entries
321
ArrayList JavaDoc addedEntries = changes.getAddList();
322                 for (int index = 0; index < addedEntries.size(); index++) {
323                     MessageManager messageManager =
324                             (MessageManager)addedEntries.get(index);
325                     baseQueue.add(messageManager);
326                 }
327                 
328                 // remove entries
329
ArrayList JavaDoc removedEntries = changes.getRemoveList();
330                 for (int index = 0; index < removedEntries.size(); index++) {
331                     MessageManager messageManager =
332                             (MessageManager)removedEntries.get(index);
333                     if (baseQueue.contains(messageManager)) {
334                         baseQueue.remove(messageManager);
335                     } else if (processingEntries.containsKey(messageManager)) {
336                         LockRef lockRef = (LockRef)processingEntries.get(
337                                 messageManager);
338                         processingEntries.remove(messageManager);
339                         lockRef.release();
340                     }
341                 }
342             } catch (Exception JavaDoc ex) {
343                 log.error("Failed to commit the transaction : " + ex.getMessage()
344                         ,ex);
345                 throw new MessageServiceException("Failed to commit the " +
346                         "transaction : " + ex.getMessage());
347             }
348         }
349         
350         
351         /**
352          * The transaction has been completed and must be forgotten.
353          *
354          * @param xid The id of the transaction to forget.
355          * @exception MessageServiceException
356          */

357         public synchronized void forget(Xid JavaDoc xid) throws MessageServiceException {
358             
359         }
360         
361         
362         /**
363          * This method is called to roll back the specified transaction.
364          *
365          * @param xid The id of the transaction to roll back.
366          * @exception MessageServiceException
367          */

368         public synchronized void rollback(Xid JavaDoc xid) throws MessageServiceException {
369             
370         }
371         
372         
373         /**
374          * This method is called to start a transaction on a resource manager.
375          *
376          * @param xid The id of the new transaction.
377          * @exception MessageServiceException
378          */

379         public synchronized void start(Xid JavaDoc xid) throws MessageServiceException {
380             
381         }
382         
383         
384         /**
385          * This method returns the next message manager or null.
386          *
387          * @return The reference to message manager.
388          * @param nextRunTime The date wrapper object.
389          * @exception MessageServiceException
390          */

391         public synchronized MessageManager popFrontMessage(Date JavaDoc nextRunTime) throws
392                 MessageServiceException {
393             LockRef lockRef = null;
394             try {
395                 MessageManager messageManager = (MessageManager)baseQueue.peek();
396                 if (messageManager == null) {
397                     return null;
398                 }
399                 try {
400                     lockRef = ObjectLockFactory.getInstance().acquireWriteLock(
401                             messageManager,ObjectLockFactory.WAIT_ON_THREAD);
402                 } catch (Exception JavaDoc ex) {
403                     log.debug("Cannot aquire a lock on this object because : " +
404                             ex.getMessage(),ex);
405                     return null;
406                 }
407                 Date JavaDoc currentDate = new Date JavaDoc();
408                 Date JavaDoc nextProcessDate = messageManager.nextProcessTime();
409                 if (nextProcessDate == null) {
410                     throw new MessageServiceException(
411                         "The next process date is invalid cannot be null");
412                 } else if (nextProcessDate.getTime() <= currentDate.getTime()) {
413                     baseQueue.poll();
414                     processingEntries.put(messageManager,lockRef);
415                     lockRef = null;
416                     return messageManager;
417                 }
418                 nextRunTime.setTime(nextProcessDate.getTime());
419                 return null;
420             } catch (Exception JavaDoc ex) {
421                 log.error("Failed to pop a message off the queue : " +
422                         ex.getMessage(),ex);
423                 throw new MessageServiceException(
424                         "Failed to pop a message off the queue : " +
425                         ex.getMessage(),ex);
426             } finally {
427                 try {
428                     if (lockRef != null) {
429                         lockRef.release();
430                         lockRef = null;
431                     }
432                 } catch (Exception JavaDoc ex2) {
433                     log.error("Failed to release the lock :" + ex2.getMessage(),
434                             ex2);
435                 }
436             }
437         }
438
439
440         /**
441          * This method returns the next message manager or null.
442          *
443          * @return The reference to message manager.
444          * @param nextRunTime The date wrapper object.
445          * @exception MessageServiceException
446          */

447         public synchronized void pushBackMessage(MessageManager messageManager) throws
448                 MessageServiceException {
449             try {
450                 LockRef lockRef = (LockRef)processingEntries.get(messageManager);
451                 if (lockRef == null) {
452                     log.error("This message is not locked : " +
453                             messageManager.getID());
454                     throw new MessageServiceException(
455                             "This message is not locked : " +
456                             messageManager.getID());
457                 }
458                 baseQueue.add(messageManager);
459                 processingEntries.remove(messageManager);
460                 lockRef.release();
461             } catch (MessageServiceException ex) {
462                 throw ex;
463             } catch (Exception JavaDoc ex) {
464                 log.error("Failed to push a message back in the queue for " +
465                         "processing : " + ex.getMessage(),ex);
466                 throw new MessageServiceException(
467                         "Failed to push a message back in the queue for " +
468                         "processing : " + ex.getMessage(),ex);
469             }
470         }
471         
472     }
473     
474     // the logger reference
475
protected static Logger log =
476             Logger.getLogger(MessageQueue.class.getName());
477     
478     // private member variables
479
private ThreadLocal JavaDoc transactionId = new ThreadLocal JavaDoc();
480     private Map JavaDoc transactionChange = new ConcurrentHashMap JavaDoc();
481     private IDIndex idIndex = new IDIndex();
482     private QueueIndex queueIndex = new QueueIndex();
483     private String JavaDoc name = null;
484     
485     /**
486      * Creates a new instance of MessageQueue
487      */

488     public MessageQueue(String JavaDoc name) {
489         this.name = name;
490     }
491     
492     
493     /**
494      * This method is called to commit the specified transaction.
495      *
496      * @param xid The id of the transaction to commit.
497      * @param onePhase If true a one phase commit should be used.
498      * @exception XAException
499      */

500     public synchronized void commit(Xid JavaDoc xid, boolean onePhase) throws
501             XAException JavaDoc {
502         try {
503             idIndex.commit(xid);
504             queueIndex.commit(xid);
505             transactionChange.remove(xid);
506             ProcessMonitor.getInstance().notifyProcessor();
507         } catch (Exception JavaDoc ex) {
508             log.error("Failed to commit the changes : " +
509                     ex.getMessage(),ex);
510             throw new XAException JavaDoc("Failed to commit the changes : " +
511                     ex.getMessage());
512         }
513     }
514     
515     
516     /**
517      * The resource manager has dissociated this object from the transaction.
518      *
519      * @param xid The id of the transaction that is getting ended.
520      * @param flags The flags associated with this operation.
521      * @exception XAException
522      */

523     public void end(Xid JavaDoc xid, int flags) throws XAException JavaDoc {
524     }
525     
526     
527     /**
528      * The transaction has been completed and must be forgotten.
529      *
530      * @param xid The id of the transaction to forget.
531      * @exception XAException
532      */

533     public void forget(Xid JavaDoc xid) throws XAException JavaDoc {
534         try {
535             idIndex.forget(xid);
536             queueIndex.forget(xid);
537             transactionChange.remove(xid);
538         } catch (Exception JavaDoc ex) {
539             log.error("Failed to forget the changes : " +
540                     ex.getMessage(),ex);
541             throw new XAException JavaDoc("Failed to forget the changes : " +
542                     ex.getMessage());
543         }
544     }
545     
546     
547     /**
548      * This method returns the transaction timeout for this object.
549      *
550      * @return The int containing the transaction timeout.
551      * @exception XAException
552      */

553     public int getTransactionTimeout() throws XAException JavaDoc {
554         return -1;
555     }
556     
557     
558     /**
559      * This method returns true if this object is the resource manager getting
560      * queried.
561      *
562      * @return TRUE if this is the resource manager, FALSE if not.
563      * @param xaResource The resource to perform the check against.
564      * @exception XAException
565      */

566     public boolean isSameRM(XAResource JavaDoc xAResource) throws XAException JavaDoc {
567         return this == xAResource;
568     }
569     
570     
571     /**
572      * This is called before a transaction is committed.
573      *
574      * @return The results of the transaction.
575      * @param xid The id of the transaction to check against.
576      * @exception XAException
577      */

578     public int prepare(Xid JavaDoc xid) throws XAException JavaDoc {
579         return XAResource.XA_OK;
580     }
581     
582     
583     /**
584      * This method returns the list of transaction branches for this resource
585      * manager.
586      *
587      * @return The list of resource branches.
588      * @param flags The flags
589      * @exception XAException
590      */

591     public Xid JavaDoc[] recover(int flags) throws XAException JavaDoc {
592         return null;
593     }
594     
595     
596     /**
597      * This method is called to roll back the specified transaction.
598      *
599      * @param xid The id of the transaction to roll back.
600      * @exception XAException
601      */

602     public void rollback(Xid JavaDoc xid) throws XAException JavaDoc {
603         try {
604             idIndex.rollback(xid);
605             queueIndex.rollback(xid);
606             transactionChange.remove(xid);
607         } catch (Exception JavaDoc ex) {
608             log.error("Failed to rollback the changes : " +
609                     ex.getMessage(),ex);
610             throw new XAException JavaDoc("Failed to rollback the changes : " +
611                     ex.getMessage());
612         }
613     }
614     
615     
616     /**
617      * This method sets the transaction timeout for this resource manager.
618      *
619      * @return TRUE if the transaction timeout can be set successfully.
620      * @param transactionTimeout The new transaction timeout value.
621      * @exception XAException
622      */

623     public boolean setTransactionTimeout(int transactionTimeout) throws
624             XAException JavaDoc {
625         return true;
626     }
627     
628     
629     /**
630      * This method is called to start a transaction on a resource manager.
631      *
632      * @param xid The id of the new transaction.
633      * @param flags The flags associated with the transaction.
634      * @exception XAException
635      */

636     public void start(Xid JavaDoc xid, int flags) throws XAException JavaDoc {
637         try {
638             if (!transactionChange.containsKey(xid)) {
639                 transactionChange.put(xid,new TransactionChange());
640             }
641             transactionId.set(xid);
642             idIndex.start(xid);
643             queueIndex.start(xid);
644         } catch (Exception JavaDoc ex) {
645             log.error("Failed to start the transaction : " + ex.getMessage(),ex);
646             throw new XAException JavaDoc("Failed to start the transaction : " +
647                     ex.getMessage());
648         }
649     }
650     
651     
652     /**
653      * This method returns the name of the message queue.
654      *
655      * @return The name of the message queue.
656      */

657     public String JavaDoc getName() {
658         return name;
659     }
660     
661     
662     /**
663      * This method adds a message to the message queue.
664      *
665      * @param message The message to add to the message queue.
666      */

667     public void addMessage(MessageManager message) throws
668             MessageServiceException {
669         try {
670             TransactionManager.getInstance().bindResource(this,false);
671             TransactionChange change = (TransactionChange)transactionChange.get(
672                     transactionId.get());
673             change.add(message);
674             idIndex.addMessage(message);
675         } catch (Exception JavaDoc ex) {
676             log.error("Failed to add a message : " + ex.getMessage(),ex);
677             throw new MessageServiceException("Failed to add a message : " +
678                     ex.getMessage());
679         }
680     }
681     
682     
683     /**
684      * This method removes the specified message from the list.
685      *
686      * @param messageId The id of the message to remove.
687      */

688     public void removeMessage(String JavaDoc messageId) throws
689             MessageServiceException {
690         try {
691             TransactionManager.getInstance().bindResource(this,false);
692             idIndex.removeMessage(messageId);
693         } catch (Exception JavaDoc ex) {
694             log.error("Failed to remove a message : " + ex.getMessage(),ex);
695             throw new MessageServiceException("Failed to remove a message : " +
696                     ex.getMessage());
697         }
698     }
699     
700     
701     /**
702      * This method retrieves the specified message from the list.
703      *
704      * @param messageId The id of the message to retrieve.
705      */

706     public MessageManager getMessage(String JavaDoc messageId) throws
707             MessageServiceException {
708         try {
709             TransactionManager.getInstance().bindResource(this,false);
710             return idIndex.getMessage(messageId);
711         } catch (Exception JavaDoc ex) {
712             log.error("Failed to get a message : " + ex.getMessage(),ex);
713             throw new MessageServiceException("Failed to get a message : " +
714                     ex.getMessage());
715         }
716     }
717     
718     
719     /**
720      * This method returns the next message manager or null.
721      *
722      * @return The reference to message manager.
723      * @param nextRunTime The date wrapper object.
724      * @exception MessageServiceException
725      */

726     public MessageManager popFrontMessage(Date JavaDoc nextRunTime) throws
727             MessageServiceException {
728         return queueIndex.popFrontMessage(nextRunTime);
729     }
730     
731     
732     /**
733      * This method returns the next message manager or null.
734      *
735      * @return The reference to message manager.
736      * @param nextRunTime The date wrapper object.
737      * @exception MessageServiceException
738      */

739     public void pushBackMessage(MessageManager messageManager) throws
740             MessageServiceException {
741         queueIndex.pushBackMessage(messageManager);
742     }
743 }
744
Popular Tags