KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > rift > coad > daemon > messageservice > MessageQueueManager


1 /*
2  * MessageService: The message service daemon
3  * Copyright (C) 2006 Rift IT Contracting
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18  *
19  * MessageQueueManager.java
20  */

21
22 // package path
23
package com.rift.coad.daemon.messageservice;
24
25 // java imports
26
import java.util.Date JavaDoc;
27 import java.util.List JavaDoc;
28 import java.util.ArrayList JavaDoc;
29 import java.util.Map JavaDoc;
30 import java.util.HashMap JavaDoc;
31 import java.util.Vector JavaDoc;
32 import java.util.concurrent.ConcurrentHashMap JavaDoc;
33 import javax.transaction.xa.XAException JavaDoc;
34 import javax.transaction.xa.XAResource JavaDoc;
35 import javax.transaction.xa.Xid JavaDoc;
36
37 // logging import
38
import org.apache.log4j.Logger;
39
40 // hibernate imports
41
import org.hibernate.*;
42 import org.hibernate.cfg.*;
43
44 // coadunation imports
45
import com.rift.coad.util.transaction.TransactionManager;
46 import com.rift.coad.util.lock.LockRef;
47 import com.rift.coad.util.lock.ObjectLockFactory;
48 import com.rift.coad.daemon.messageservice.db.*;
49 import com.rift.coad.hibernate.util.HibernateUtil;
50
51
52 /**
53  * This object is responsible for managing the message queues.
54  *
55  * @author Brett Chaldecott
56  */

57 public class MessageQueueManager implements XAResource JavaDoc{
58     
59     /**
60      * This object tracks the changes made by a transaction
61      */

62     public class Changes {
63         // the class private member variables
64
private Xid JavaDoc transactionId = null;
65         private List JavaDoc queues = new ArrayList JavaDoc();
66         private List JavaDoc locks = new ArrayList JavaDoc();
67         
68         
69         /**
70          * The constructor of the changes object.
71          *
72          * @param transactionId The id of the current transaction
73          */

74         public Changes(Xid JavaDoc transactionId) {
75             this.transactionId = transactionId;
76         }
77         
78         
79         /**
80          * This method adds a new message queue to the list of changes.
81          *
82          * @param messageQueue
83          */

84         public void add(MessageQueue messageQueue, LockRef lockRef) throws
85                 MessageServiceException {
86             try {
87                 lockRef.setLockName(transactionId);
88                 locks.add(lockRef);
89                 queues.add(messageQueue);
90             } catch (Exception JavaDoc ex) {
91                 log.error("Failed to add the " +
92                         "change entries : " + ex.getMessage(),ex);
93                 throw new MessageServiceException("Failed to add the " +
94                         "change entries + " + ex.getMessage(),ex);
95             }
96         }
97         
98         
99         /**
100          * This method returns the list of queues.
101          *
102          * @return The list of queues.
103          */

104         public List JavaDoc getQueues() {
105             return queues;
106         }
107         
108         
109         /**
110          * This method returns the list of locks.
111          *
112          * @return The list of locks.
113          */

114         public List JavaDoc getLocks() {
115             return locks;
116         }
117     }
118     
119     // class constants
120
public final static String JavaDoc UNSORTED = "UNSORTED";
121     public final static String JavaDoc DEAD_LETTER = "DEAD_LETTER";
122     
123     // private singleton methods
124
private static MessageQueueManager singleton = null;
125     
126     // the logger reference
127
protected Logger log =
128             Logger.getLogger(MessageQueueManager.class.getName());
129     
130     
131     // private member variables
132
private ThreadLocal JavaDoc currentTransaction = new ThreadLocal JavaDoc();
133     private Map JavaDoc keyLockMap = new HashMap JavaDoc();
134     private Map JavaDoc messageQueues = new ConcurrentHashMap JavaDoc();
135     private Map JavaDoc transactionChanges = new ConcurrentHashMap JavaDoc();
136     private Vector JavaDoc listIndex = new Vector JavaDoc();
137     private int pos = 0;
138     
139     /**
140      * Creates a new instance of MessageQueueManager
141      */

142     private MessageQueueManager() {
143     }
144     
145     
146     /**
147      * This method returns an instance of the MessageQueueManager singleton.
148      *
149      * @return An instance of the message queue manager.
150      */

151     public static synchronized MessageQueueManager getInstance() {
152         if (singleton == null) {
153             singleton = new MessageQueueManager();
154         }
155         return singleton;
156     }
157     
158     
159     /**
160      * This method returns a reference to the message queue in questions.
161      *
162      * @return The reference to the message queue.
163      * @param name The name of the message queue to return.
164      * @exception MessageServiceException
165      */

166     public MessageQueue getQueue(String JavaDoc name) throws MessageServiceException {
167         LockRef lockRef = null;
168         try {
169             lockRef = getLock(name);
170             if (messageQueues.containsKey(name)) {
171                 MessageQueue messageQueue =
172                         (MessageQueue)messageQueues.get(name);
173                 lockRef.release();
174                 return messageQueue;
175             }
176             //MessageTransactionLock.getInstance().lock();
177
Session session = HibernateUtil.
178                     getInstance(MessageServiceImpl.class).getSession();
179             List JavaDoc list = session.createQuery("FROM MessageQueue AS queue " +
180                     "WHERE queue.messageQueueName = ?").setString(0,name).list();
181             MessageQueue queue = new MessageQueue(name);
182             if (list.size() == 1) {
183                 com.rift.coad.daemon.messageservice.db.MessageQueue dbQueue =
184                         (com.rift.coad.daemon.messageservice.db.MessageQueue)
185                         list.get(0);
186                 if ((dbQueue.getNamed() != null) &&
187                         (dbQueue.getNamed() == 1)) {
188                     log.error("This is a named queue [" + name +
189                             "] and cannot be loaded into memory.");
190                     throw new MessageServiceException
191                             ("This is a named queue [" + name +
192                             "] and cannot be loaded into memory.");
193                 }
194                 messageQueues.put(name,queue);
195                 addQueueToIndex(queue);
196                 return queue;
197             }
198             TransactionManager.getInstance().bindResource(this,false);
199             com.rift.coad.daemon.messageservice.db.MessageQueue dbQueue = new
200                     com.rift.coad.daemon.messageservice.db.MessageQueue(name);
201             session.persist(dbQueue);
202             Changes changes = (Changes)currentTransaction.get();
203             changes.add(queue,lockRef);
204             lockRef = null;
205             return queue;
206         } catch (MessageServiceException ex) {
207             throw ex;
208         } catch (Exception JavaDoc ex) {
209             log.error("Failed to retrieve th message queue [" +name + "] : " +
210                     ex.getMessage(),ex);
211             throw new MessageServiceException
212                     ("Failed to retrieve th message queue [" +name + "] : " +
213                     ex.getMessage(),ex);
214         } finally {
215             if (lockRef != null) {
216                 try {
217                     lockRef.release();
218                 } catch (Exception JavaDoc ex2) {
219                     log.error("Failed to unlock the queue [" + name + "] : " +
220                         ex2.getMessage(),ex2);
221                 }
222             }
223         }
224     }
225     
226     
227     /**
228      * This method returns the next message to process.
229      *
230      * @return NULL if no message is found, A message process object otherwise.
231      * @param nextRunTime The next run time.
232      * @exception MessageServiceException
233      */

234     public synchronized MessageProcessInfo getNextMessage(Date JavaDoc nextRunTime)
235             throws MessageServiceException {
236         Vector JavaDoc index = cloneIndex();
237         int currentPos = pos;
238         Date JavaDoc currentDate = nextRunTime;
239         MessageManager result = null;
240         while (index.size() > 0) {
241             currentPos++;
242             if (currentPos >= index.size()) {
243                 currentPos = 0;
244             }
245             MessageQueue messageQueue = (MessageQueue)index.get(currentPos);
246             Date JavaDoc nextDate = new Date JavaDoc();
247             result = messageQueue.popFrontMessage(nextDate);
248             if (result != null) {
249                 MessageProcessInfo messageProcessInfo = new
250                         MessageProcessInfo(messageQueue,result);
251                 return messageProcessInfo;
252             }
253             if ((currentDate == nextRunTime) ||
254                     (currentDate.getTime() > nextDate.getTime())) {
255                 currentDate = nextDate;
256             }
257             if (currentPos == pos) {
258                 break;
259             }
260         }
261         // set the next runtime delay
262
nextRunTime.setTime(currentDate.getTime());
263         
264         // reset the pos
265
pos = currentPos;
266         
267         // return the result
268
return null;
269     }
270     
271     
272     /**
273      * This method is called to commit the specified transaction.
274      *
275      * @param xid The id of the transaction to commit.
276      * @param onePhase If true a one phase commit should be used.
277      * @exception XAException
278      */

279     public void commit(Xid JavaDoc xid, boolean b) throws XAException JavaDoc {
280         try {
281             Changes changes = (Changes)transactionChanges.get(xid);
282             transactionChanges.remove(xid);
283             List JavaDoc queues = changes.getQueues();
284             List JavaDoc locks = changes.getLocks();
285             for (int index = 0; index < queues.size(); index++) {
286                 MessageQueue queue = (MessageQueue)queues.get(index);
287                 messageQueues.put(queue.getName(),queue);
288                 addQueueToIndex(queue);
289             }
290             for (int index = 0; index < locks.size(); index++) {
291                 LockRef lockRef = (LockRef)locks.get(index);
292                 lockRef.release();
293             }
294         } catch (Exception JavaDoc ex) {
295             log.error("Failed to commit the changes : " +
296                     ex.getMessage(),ex);
297             throw new XAException JavaDoc("Failed to commit the changes : " +
298                     ex.getMessage());
299         }
300     }
301     
302     
303     /**
304      * The resource manager has dissociated this object from the transaction.
305      *
306      * @param xid The id of the transaction that is getting ended.
307      * @param flags The flags associated with this operation.
308      * @exception XAException
309      */

310     public void end(Xid JavaDoc xid, int i) throws XAException JavaDoc {
311     }
312     
313     
314     /**
315      * The transaction has been completed and must be forgotten.
316      *
317      * @param xid The id of the transaction to forget.
318      * @exception XAException
319      */

320     public void forget(Xid JavaDoc xid) throws XAException JavaDoc {
321         try {
322             Changes changes = (Changes)transactionChanges.get(xid);
323             transactionChanges.remove(xid);
324             List JavaDoc locks = changes.getLocks();
325             for (int index = 0; index < locks.size(); index++) {
326                 LockRef lockRef = (LockRef)locks.get(index);
327                 lockRef.release();
328             }
329         } catch (Exception JavaDoc ex) {
330             log.error("Failed to forget the changes : " +
331                     ex.getMessage(),ex);
332             throw new XAException JavaDoc("Failed to forget the changes : " +
333                     ex.getMessage());
334         }
335     }
336     
337     
338     /**
339      * This method returns the transaction timeout for this object.
340      *
341      * @return The int containing the transaction timeout.
342      * @exception XAException
343      */

344     public int getTransactionTimeout() throws XAException JavaDoc {
345         return -1;
346     }
347     
348     
349     /**
350      * This method returns true if this object is the resource manager getting
351      * queried.
352      *
353      * @return TRUE if this is the resource manager, FALSE if not.
354      * @param xaResource The resource to perform the check against.
355      * @exception XAException
356      */

357     public boolean isSameRM(XAResource JavaDoc xAResource) throws XAException JavaDoc {
358         return this == xAResource;
359     }
360     
361     /**
362      * This is called before a transaction is committed.
363      *
364      * @return The results of the transaction.
365      * @param xid The id of the transaction to check against.
366      * @exception XAException
367      */

368     public int prepare(Xid JavaDoc xid) throws XAException JavaDoc {
369         return XAResource.XA_OK;
370     }
371     
372     
373     /**
374      * This method returns the list of transaction branches for this resource
375      * manager.
376      *
377      * @return The list of resource branches.
378      * @param flags The flags
379      * @exception XAException
380      */

381     public Xid JavaDoc[] recover(int i) throws XAException JavaDoc {
382         return null;
383     }
384     
385     
386     /**
387      * This method is called to roll back the specified transaction.
388      *
389      * @param xid The id of the transaction to roll back.
390      * @exception XAException
391      */

392     public void rollback(Xid JavaDoc xid) throws XAException JavaDoc {
393         try {
394             Changes changes = (Changes)transactionChanges.get(xid);
395             transactionChanges.remove(xid);
396             List JavaDoc locks = changes.getLocks();
397             for (int index = 0; index < locks.size(); index++) {
398                 LockRef lockRef = (LockRef)locks.get(index);
399                 lockRef.release();
400             }
401         } catch (Exception JavaDoc ex) {
402             log.error("Failed to rollback the changes : " +
403                     ex.getMessage(),ex);
404             throw new XAException JavaDoc("Failed to rollback the changes : " +
405                     ex.getMessage());
406         }
407     }
408     
409     
410     /**
411      * This method sets the transaction timeout for this resource manager.
412      *
413      * @return TRUE if the transaction timeout can be set successfully.
414      * @param transactionTimeout The new transaction timeout value.
415      * @exception XAException
416      */

417     public boolean setTransactionTimeout(int i) throws XAException JavaDoc {
418         return true;
419     }
420     
421     
422     /**
423      * This method is called to start a transaction on a resource manager.
424      *
425      * @param xid The id of the new transaction.
426      * @param flags The flags associated with the transaction.
427      * @exception XAException
428      */

429     public void start(Xid JavaDoc xid, int i) throws XAException JavaDoc {
430         if (transactionChanges.containsKey(xid)) {
431             currentTransaction.set(transactionChanges.get(xid));
432         } else {
433             Changes changes = new Changes(xid);
434             transactionChanges.put(xid,changes);
435             currentTransaction.set(changes);
436         }
437     }
438     
439     
440     /**
441      * This method returns the named lock
442      *
443      * @return The reference to the lock.
444      * @param The name of the queue that must be locked.
445      * @exception MessageServiceException
446      */

447     private LockRef getLock(String JavaDoc name) throws MessageServiceException {
448         try {
449             Object JavaDoc key = null;
450             synchronized(keyLockMap) {
451                 if (keyLockMap.containsKey(name)) {
452                     key = keyLockMap.get(name);
453                 } else {
454                     key = new String JavaDoc(name);
455                     keyLockMap.put(name,key);
456                 }
457             }
458             LockRef lockRef =
459                     ObjectLockFactory.getInstance().acquireWriteLock(key);
460             Changes changes = (Changes)currentTransaction.get();
461             
462             return lockRef;
463         } catch (Exception JavaDoc ex) {
464             log.error("Failed to retrieve a lock on the message queue : " +
465                     ex.getMessage(),ex);
466             throw new MessageServiceException
467                     ("Failed to retrieve a lock on the message queue : " +
468                     ex.getMessage(),ex);
469         }
470     }
471     
472     
473     /**
474      * This method is called to add an entry to the messsage queue
475      */

476     private void addQueueToIndex(MessageQueue messageQueue) {
477         synchronized(listIndex) {
478             listIndex.add(messageQueue);
479         }
480     }
481     
482     
483     /**
484      * This method is called to clone the index.
485      *
486      * @return The cloned index
487      */

488     private Vector JavaDoc cloneIndex() {
489         synchronized(listIndex) {
490             return (Vector JavaDoc)listIndex.clone();
491         }
492     }
493 }
494
Popular Tags