KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > rift > coad > daemon > messageservice > named > NamedMemoryQueue


1 /*
2  * MessageService: The message service daemon
3  * Copyright (C) 2007 Rift IT Contracting
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18  *
19  * NamedMemoryQueue.java
20  */

21
22 // package path
23
package com.rift.coad.daemon.messageservice.named;
24
25 // java imports
26
import java.util.ArrayList JavaDoc;
27 import java.util.Date JavaDoc;
28 import java.util.Map JavaDoc;
29 import java.util.HashMap JavaDoc;
30 import java.util.Iterator JavaDoc;
31 import java.util.List JavaDoc;
32 import java.util.Queue JavaDoc;
33 import java.util.concurrent.ConcurrentLinkedQueue JavaDoc;
34 import java.util.concurrent.ConcurrentHashMap JavaDoc;
35 import javax.transaction.xa.XAException JavaDoc;
36 import javax.transaction.xa.XAResource JavaDoc;
37 import javax.transaction.xa.Xid JavaDoc;
38
39
40 // hibernate imports
41
import org.hibernate.*;
42 import org.hibernate.cfg.*;
43
44 // logging import
45
import org.apache.log4j.Logger;
46
47 // coadunation imports
48
import com.rift.coad.daemon.messageservice.Message;
49 import com.rift.coad.daemon.messageservice.MessageServiceException;
50 import com.rift.coad.daemon.messageservice.MessageManager;
51 import com.rift.coad.daemon.messageservice.MessageServiceImpl;
52 import com.rift.coad.daemon.messageservice.ProcessMonitor;
53 import com.rift.coad.daemon.messageservice.message.MessageManagerFactory;
54 import com.rift.coad.daemon.messageservice.db.*;
55 import com.rift.coad.hibernate.util.HibernateUtil;
56 import com.rift.coad.util.transaction.TransactionManager;
57 import com.rift.coad.util.transaction.UserTransactionWrapper;
58         
59 /**
60  * This object is responsible for acting as the none volitile part of the named
61  * queue.
62  *
63  * @author Brett Chaldecott
64  */

65 public class NamedMemoryQueue implements XAResource JavaDoc {
66     
67     /**
68      * This object tracks the changes to a
69      */

70     public class Changes {
71         
72         // private member variables
73
private List JavaDoc removeList = new ArrayList JavaDoc();
74         private List JavaDoc addList = new ArrayList JavaDoc();
75         
76         
77         /**
78          * The changes made in a transaction
79          */

80         public Changes() {
81         }
82         
83         
84         /**
85          * This method adds an id to the list.
86          */

87         public void addRemoveMessage(MessageManager messageManager) {
88             removeList.add(messageManager);
89         }
90         
91         
92         /**
93          * This method returns the ids of the messages.
94          *
95          * @return The list of ids.
96          */

97         public List JavaDoc getRemoveList() {
98             return removeList;
99         }
100         
101         /**
102          * This method add a message to the list of messages
103          */

104         public void addNewMessage(MessageManager messageManager) {
105             addList.add(messageManager);
106         }
107         
108         /**
109          * This method returns the list of message that have been added in the
110          * transaction.
111          *
112          * @return The list of ids.
113          */

114         public List JavaDoc getAddList() {
115             return addList;
116         }
117     }
118     
119     
120     // singleton methods
121
private static Map JavaDoc singletons = new ConcurrentHashMap JavaDoc();
122     private static Map JavaDoc keyIndex = new HashMap JavaDoc();
123     
124     protected static Logger log =
125             Logger.getLogger(NamedMemoryQueue.class.getName());
126     
127     // private member variables
128
private String JavaDoc queueName = null;
129     private Queue JavaDoc queue = new ConcurrentLinkedQueue JavaDoc();
130     private UserTransactionWrapper utw = null;
131     private Map JavaDoc changes = new ConcurrentHashMap JavaDoc();
132     private ThreadLocal JavaDoc currentTransaction = new ThreadLocal JavaDoc();
133     
134     /**
135      * Creates a new instance of QueueMemoryIndex
136      *
137      * @param queueName The name of the queue to instanciate.
138      * @exception MessageServiceException
139      */

140     public NamedMemoryQueue(String JavaDoc queueName) throws MessageServiceException {
141         this.queueName = queueName;
142     }
143     
144     
145     /**
146      * This object is responsible for returning returning an instance of the
147      * named memory queue identified by the queuename. If not found it returns
148      * null.
149      *
150      * @return The reference to the named memory queueu.
151      * @param queueName The name of the queue to retrieve.
152      * @exception MessageServiceException
153      */

154     public static NamedMemoryQueue getInstance(String JavaDoc queueName) throws
155             MessageServiceException {
156         Object JavaDoc syncObj = getSyncObject(queueName);
157         synchronized(syncObj) {
158             NamedMemoryQueue singleton =
159                     (NamedMemoryQueue)singletons.get(queueName);
160             if (singleton == null) {
161                 singleton = new NamedMemoryQueue(queueName);
162                 singletons.put(queueName,singleton);
163             }
164             return singleton;
165         }
166     }
167     
168     
169     /**
170      * This method lists the queues currently in memory.
171      *
172      * @param list The list of named queues.
173      * @exception MessageServiceException
174      */

175     public static List JavaDoc listQueues() throws MessageServiceException {
176         return new ArrayList JavaDoc(singletons.keySet());
177     }
178     
179     
180     /**
181      * This method returns the synchronization key.
182      *
183      * @return The reference to the object that the synchronization can be done
184      * on.
185      * @param queueName The name of the queue.
186      */

187     private static synchronized Object JavaDoc getSyncObject(String JavaDoc queueName) {
188         Object JavaDoc syncObj = keyIndex.get(queueName);
189         if (syncObj == null) {
190             syncObj = new String JavaDoc(queueName);
191             keyIndex.put(queueName,syncObj);
192         }
193         return syncObj;
194     }
195     
196     
197     /**
198      * This method adds a message to the queue.
199      *
200      * @param messageManager The message manager.
201      */

202     public void addMessage(MessageManager messageManager) throws
203             MessageServiceException {
204         try {
205             TransactionManager.getInstance().bindResource(this,false);
206             ((Changes)currentTransaction.get()).addNewMessage(
207                     messageManager);
208         } catch (Exception JavaDoc ex) {
209             log.error("Failed to add a message : " +
210                     ex.getMessage(),ex);
211             throw new MessageServiceException("Failed to add a message : " +
212                     ex.getMessage(),ex);
213         }
214     }
215         
216     
217     /**
218      * This method returns the first message on the list.
219      */

220     public synchronized Message poll(long delay) throws
221             MessageServiceException {
222         try {
223             Date JavaDoc startTime = new Date JavaDoc();
224             while (queue.size() == 0) {
225                 Date JavaDoc currentTime = new Date JavaDoc();
226                 long difference = (startTime.getTime() + delay) -
227                         currentTime.getTime();
228                 if (difference <= 0) {
229                     return null;
230                 }
231                 wait(difference);
232             }
233             TransactionManager.getInstance().bindResource(this,false);
234             MessageManager messageManager = (MessageManager)queue.poll();
235             Message message = messageManager.getMessage();
236             messageManager.remove();
237             ((Changes)currentTransaction.get()).addRemoveMessage(
238                     messageManager);
239             log.debug("Return the message : " + message.getMessageId());
240             return message;
241         } catch (MessageServiceException ex) {
242             log.error("Failed to poll for a message : " +
243                     ex.getMessage(),ex);
244             throw ex;
245         } catch (Exception JavaDoc ex) {
246             log.error("Failed to poll for a message : " +
247                     ex.getMessage(),ex);
248             throw new MessageServiceException(
249                     "Failed to poll for a message : " +
250                     ex.getMessage(),ex);
251         } catch (Throwable JavaDoc ex) {
252             log.error("Caught an unexpected exception : " +
253                     ex.getMessage(),ex);
254             throw new MessageServiceException(
255                     "Caught an unexpected exception : " +
256                     ex.getMessage(),ex);
257         }
258     }
259     
260     
261     /**
262      * This method returns the list of messages for this queue.
263      *
264      * @return The list of messages in the queue.
265      * @exception MessageServiceException
266      */

267     public synchronized List JavaDoc getMessages() throws MessageServiceException {
268         try {
269             List JavaDoc list = new ArrayList JavaDoc();
270             for (Iterator JavaDoc iter = queue.iterator(); iter.hasNext();) {
271                 MessageManager messageManager = (MessageManager)iter.next();
272                 list.add(messageManager.getMessage());
273             }
274             return list;
275         } catch (Exception JavaDoc ex) {
276             log.error("Failed to retrieve the list of messages : " +
277                     ex.getMessage(),ex);
278             throw new MessageServiceException(
279                     "Failed to retrieve the list of messages : " +
280                     ex.getMessage(),ex);
281         }
282     }
283     
284     
285     /**
286      * This method is responsible for purging the contents of the named message
287      * queue.
288      *
289      * @exception MessageServiceException
290      */

291     public synchronized void purge() throws MessageServiceException {
292         try {
293             for (Iterator JavaDoc iter = queue.iterator(); iter.hasNext();) {
294                 MessageManager messageManager = (MessageManager)iter.next();
295                 messageManager.remove();
296             }
297             queue.clear();
298         } catch (Exception JavaDoc ex) {
299             log.error("Failed to purge the queue : " +
300                     ex.getMessage(),ex);
301             throw new MessageServiceException(
302                     "Failed to purge the queue : " +
303                     ex.getMessage(),ex);
304         }
305     }
306     
307     
308     /**
309      * This method is called to commit the specified transaction.
310      *
311      * @param xid The id of the transaction to commit.
312      * @param onePhase If true a one phase commit should be used.
313      * @exception XAException
314      */

315     public void commit(Xid JavaDoc xid, boolean b) throws XAException JavaDoc {
316         Changes changes = (Changes)this.changes.remove(xid);
317         for (Iterator JavaDoc iter = changes.getAddList().iterator(); iter.hasNext();) {
318             queue.add(iter.next());
319         }
320         synchronized(this) {
321             notifyAll();
322         }
323     }
324     
325     
326     /**
327      * The resource manager has dissociated this object from the transaction.
328      *
329      * @param xid The id of the transaction that is getting ended.
330      * @param flags The flags associated with this operation.
331      * @exception XAException
332      */

333     public void end(Xid JavaDoc xid, int i) throws XAException JavaDoc {
334     }
335     
336     
337     /**
338      * The transaction has been completed and must be forgotten.
339      *
340      * @param xid The id of the transaction to forget.
341      * @exception XAException
342      */

343     public void forget(Xid JavaDoc xid) throws XAException JavaDoc {
344         changes.remove(xid);
345     }
346     
347     
348     /**
349      * This method returns the transaction timeout for this object.
350      *
351      * @return The int containing the transaction timeout.
352      * @exception XAException
353      */

354     public int getTransactionTimeout() throws XAException JavaDoc {
355         return -1;
356     }
357     
358     
359     /**
360      * This method returns true if this object is the resource manager getting
361      * queried.
362      *
363      * @return TRUE if this is the resource manager, FALSE if not.
364      * @param xaResource The resource to perform the check against.
365      * @exception XAException
366      */

367     public boolean isSameRM(XAResource JavaDoc xAResource) throws XAException JavaDoc {
368         return this == xAResource;
369     }
370     
371     
372     /**
373      * This is called before a transaction is committed.
374      *
375      * @return The results of the transaction.
376      * @param xid The id of the transaction to check against.
377      * @exception XAException
378      */

379     public int prepare(Xid JavaDoc xid) throws XAException JavaDoc {
380         return XAResource.XA_OK;
381     }
382     
383     
384     /**
385      * This method returns the list of transaction branches for this resource
386      * manager.
387      *
388      * @return The list of resource branches.
389      * @param flags The flags
390      * @exception XAException
391      */

392     public Xid JavaDoc[] recover(int i) throws XAException JavaDoc {
393         return null;
394     }
395     
396     
397     /**
398      * This method is called to roll back the specified transaction.
399      *
400      * @param xid The id of the transaction to roll back.
401      * @exception XAException
402      */

403     public void rollback(Xid JavaDoc xid) throws XAException JavaDoc {
404         Changes changes = (Changes)this.changes.get(xid);
405         if (changes == null) {
406             return;
407         }
408         for (Iterator JavaDoc iter = changes.getRemoveList().iterator(); iter.hasNext();) {
409             queue.add(iter.next());
410         }
411         synchronized (this) {
412             notifyAll();
413         }
414     }
415     
416     
417     /**
418      * This method sets the transaction timeout for this resource manager.
419      *
420      * @return TRUE if the transaction timeout can be set successfully.
421      * @param transactionTimeout The new transaction timeout value.
422      * @exception XAException
423      */

424     public boolean setTransactionTimeout(int i) throws XAException JavaDoc {
425         return true;
426     }
427     
428     
429     /**
430      * This method is called to start a transaction on a resource manager.
431      *
432      * @param xid The id of the new transaction.
433      * @param flags The flags associated with the transaction.
434      * @exception XAException
435      */

436     public void start(Xid JavaDoc xid, int i) throws XAException JavaDoc {
437         Changes changes = (Changes)this.changes.get(xid);
438         if (changes == null) {
439             changes = new Changes();
440             this.changes.put(xid,changes);
441         }
442         currentTransaction.set(changes);
443     }
444 }
445
Popular Tags