KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > util > queue > TransactionalQueueManager


1 /*
2  * $Id: TransactionalQueueManager.java 3798 2006-11-04 04:07:14Z aperepel $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.util.queue;
12
13 import org.apache.commons.logging.Log;
14 import org.apache.commons.logging.LogFactory;
15 import org.mule.util.queue.QueuePersistenceStrategy.Holder;
16 import org.mule.util.xa.AbstractTransactionContext;
17 import org.mule.util.xa.AbstractXAResourceManager;
18 import org.mule.util.xa.ResourceManagerException;
19 import org.mule.util.xa.ResourceManagerSystemException;
20
21 import javax.transaction.xa.XAResource JavaDoc;
22
23 import java.io.IOException JavaDoc;
24 import java.util.ArrayList JavaDoc;
25 import java.util.HashMap JavaDoc;
26 import java.util.Iterator JavaDoc;
27 import java.util.LinkedList JavaDoc;
28 import java.util.List JavaDoc;
29 import java.util.Map JavaDoc;
30
31 /**
32  * The Transactional Queue Manager is responsible for creating and Managing
33  * transactional Queues. Queues can also be persistent by setting a persistence
34  * strategy on the manager. Default straties are provided for Memory, Jounaling,
35  * Cache and File.
36  *
37  * @author <a HREF="mailto:gnt@codehaus.org">Guillaume Nodet</a>
38  * @author <a HREF="mailto:ross.mason@symphonysoft.com">Ross Mason</a>
39  * @version $Revision: 3798 $
40  */

41 public class TransactionalQueueManager extends AbstractXAResourceManager implements QueueManager
42 {
43
44     private static Log logger = LogFactory.getLog(TransactionalQueueManager.class);
45
46     private Map JavaDoc queues = new HashMap JavaDoc();
47
48     private QueuePersistenceStrategy memoryPersistenceStrategy = new MemoryPersistenceStrategy();
49     private QueuePersistenceStrategy persistenceStrategy;
50
51     private QueueConfiguration defaultQueueConfiguration = new QueueConfiguration(false);
52
53     public synchronized QueueSession getQueueSession()
54     {
55         return new TransactionalQueueSession(this, this);
56     }
57
58     public synchronized void setDefaultQueueConfiguration(QueueConfiguration config)
59     {
60         this.defaultQueueConfiguration = config;
61     }
62
63     public synchronized void setQueueConfiguration(String JavaDoc queueName, QueueConfiguration config)
64     {
65         getQueue(queueName).config = config;
66     }
67
68     protected synchronized QueueInfo getQueue(String JavaDoc name)
69     {
70         QueueInfo q = (QueueInfo)queues.get(name);
71         if (q == null)
72         {
73             q = new QueueInfo();
74             q.name = name;
75             q.list = new LinkedList JavaDoc();
76             q.config = defaultQueueConfiguration;
77             queues.put(name, q);
78         }
79         return q;
80     }
81
82     /*
83      * (non-Javadoc)
84      *
85      * @see org.mule.transaction.xa.AbstractResourceManager#getLogger()
86      */

87     protected Log getLogger()
88     {
89         return logger;
90     }
91
92     public void close()
93     {
94         try
95         {
96             stop(SHUTDOWN_MODE_NORMAL);
97         }
98         catch (ResourceManagerException e)
99         {
100             logger.error("Error disposing manager", e);
101         }
102     }
103
104     protected void doStart() throws ResourceManagerSystemException
105     {
106         if (persistenceStrategy != null)
107         {
108             try
109             {
110                 persistenceStrategy.open();
111             }
112             catch (IOException JavaDoc e)
113             {
114                 throw new ResourceManagerSystemException(e);
115             }
116         }
117     }
118
119     protected boolean shutdown(int mode, long timeoutMSecs)
120     {
121         try
122         {
123             if (persistenceStrategy != null)
124             {
125                 persistenceStrategy.close();
126             }
127         }
128         catch (IOException JavaDoc e)
129         {
130             logger.error("Error closing persistent store", e);
131         }
132         return super.shutdown(mode, timeoutMSecs);
133     }
134
135     protected void recover() throws ResourceManagerSystemException
136     {
137         if (persistenceStrategy != null)
138         {
139             try
140             {
141                 List JavaDoc msgs = persistenceStrategy.restore();
142                 for (Iterator JavaDoc it = msgs.iterator(); it.hasNext();)
143                 {
144                     Holder h = (Holder)it.next();
145                     getQueue(h.getQueue()).putNow(h.getId());
146                 }
147             }
148             catch (Exception JavaDoc e)
149             {
150                 throw new ResourceManagerSystemException(e);
151             }
152         }
153     }
154
155     /*
156      * (non-Javadoc)
157      *
158      * @see org.mule.transaction.xa.AbstractResourceManager#createTransactionContext()
159      */

160     protected AbstractTransactionContext createTransactionContext(Object JavaDoc session)
161     {
162         return new QueueTransactionContext();
163     }
164
165     /*
166      * (non-Javadoc)
167      *
168      * @see org.mule.transaction.xa.AbstractResourceManager#doBegin(org.mule.transaction.xa.AbstractTransactionContext)
169      */

170     protected void doBegin(AbstractTransactionContext context)
171     {
172         // Nothing special to do
173
}
174
175     /*
176      * (non-Javadoc)
177      *
178      * @see org.mule.transaction.xa.AbstractResourceManager#doPrepare(org.mule.transaction.xa.AbstractTransactionContext)
179      */

180     protected int doPrepare(AbstractTransactionContext context)
181     {
182         return XAResource.XA_OK;
183     }
184
185     /*
186      * (non-Javadoc)
187      *
188      * @see org.mule.transaction.xa.AbstractResourceManager#doCommit(org.mule.transaction.xa.AbstractTransactionContext)
189      */

190     protected void doCommit(AbstractTransactionContext context) throws ResourceManagerException
191     {
192         QueueTransactionContext ctx = (QueueTransactionContext)context;
193         try
194         {
195             if (ctx.added != null)
196             {
197                 for (Iterator JavaDoc it = ctx.added.entrySet().iterator(); it.hasNext();)
198                 {
199                     Map.Entry JavaDoc entry = (Map.Entry JavaDoc)it.next();
200                     QueueInfo queue = (QueueInfo)entry.getKey();
201                     List JavaDoc queueAdded = (List JavaDoc)entry.getValue();
202                     if (queueAdded != null && queueAdded.size() > 0)
203                     {
204                         for (Iterator JavaDoc itAdded = queueAdded.iterator(); itAdded.hasNext();)
205                         {
206                             Object JavaDoc object = itAdded.next();
207                             Object JavaDoc id = doStore(queue, object);
208                             queue.putNow(id);
209                         }
210                     }
211                 }
212             }
213             if (ctx.removed != null)
214             {
215                 for (Iterator JavaDoc it = ctx.removed.entrySet().iterator(); it.hasNext();)
216                 {
217                     Map.Entry JavaDoc entry = (Map.Entry JavaDoc)it.next();
218                     QueueInfo queue = (QueueInfo)entry.getKey();
219                     List JavaDoc queueRemoved = (List JavaDoc)entry.getValue();
220                     if (queueRemoved != null && queueRemoved.size() > 0)
221                     {
222                         for (Iterator JavaDoc itRemoved = queueRemoved.iterator(); itRemoved.hasNext();)
223                         {
224                             Object JavaDoc id = itRemoved.next();
225                             doRemove(queue, id);
226                         }
227                     }
228                 }
229             }
230         }
231         catch (Exception JavaDoc e)
232         {
233             // throw new ResourceManagerException("Could not commit
234
// transaction", e);
235
// TODO: add an i18n Message
236
throw new ResourceManagerException(e);
237         }
238         finally
239         {
240             ctx.added = null;
241             ctx.removed = null;
242         }
243     }
244
245     protected Object JavaDoc doStore(QueueInfo queue, Object JavaDoc object) throws IOException JavaDoc
246     {
247         QueuePersistenceStrategy ps = (queue.config.persistent)
248                         ? persistenceStrategy : memoryPersistenceStrategy;
249         Object JavaDoc id = ps.store(queue.name, object);
250         return id;
251     }
252
253     protected void doRemove(QueueInfo queue, Object JavaDoc id) throws IOException JavaDoc
254     {
255         QueuePersistenceStrategy ps = (queue.config.persistent)
256                         ? persistenceStrategy : memoryPersistenceStrategy;
257         ps.remove(queue.name, id);
258     }
259
260     protected Object JavaDoc doLoad(QueueInfo queue, Object JavaDoc id) throws IOException JavaDoc
261     {
262         QueuePersistenceStrategy ps = (queue.config.persistent)
263                         ? persistenceStrategy : memoryPersistenceStrategy;
264         Object JavaDoc obj = ps.load(queue.name, id);
265         return obj;
266     }
267
268     /*
269      * (non-Javadoc)
270      *
271      * @see org.mule.transaction.xa.AbstractResourceManager#doRollback(org.mule.transaction.xa.AbstractTransactionContext)
272      */

273     protected void doRollback(AbstractTransactionContext context) throws ResourceManagerException
274     {
275         QueueTransactionContext ctx = (QueueTransactionContext)context;
276         if (ctx.removed != null)
277         {
278             for (Iterator JavaDoc it = ctx.removed.entrySet().iterator(); it.hasNext();)
279             {
280                 Map.Entry JavaDoc entry = (Map.Entry JavaDoc)it.next();
281                 QueueInfo queue = (QueueInfo)entry.getKey();
282                 List JavaDoc queueRemoved = (List JavaDoc)entry.getValue();
283                 if (queueRemoved != null && queueRemoved.size() > 0)
284                 {
285                     for (Iterator JavaDoc itRemoved = queueRemoved.iterator(); itRemoved.hasNext();)
286                     {
287                         Object JavaDoc id = itRemoved.next();
288                         queue.putNow(id);
289                     }
290                 }
291             }
292         }
293         ctx.added = null;
294         ctx.removed = null;
295     }
296
297     protected class QueueTransactionContext extends AbstractTransactionContext
298     {
299         protected Map JavaDoc added;
300         protected Map JavaDoc removed;
301
302         public boolean offer(QueueInfo queue, Object JavaDoc item, long timeout) throws InterruptedException JavaDoc
303         {
304             readOnly = false;
305             if (added == null)
306             {
307                 added = new HashMap JavaDoc();
308             }
309             List JavaDoc queueAdded = (List JavaDoc)added.get(queue);
310             if (queueAdded == null)
311             {
312                 queueAdded = new ArrayList JavaDoc();
313                 added.put(queue, queueAdded);
314             }
315             // wait for enough room
316
if (queue.offer(null, queueAdded.size(), Long.MAX_VALUE))
317             {
318                 queueAdded.add(item);
319                 return true;
320             }
321             else
322             {
323                 return false;
324             }
325         }
326
327         public Object JavaDoc poll(QueueInfo queue, long timeout) throws IOException JavaDoc, InterruptedException JavaDoc
328         {
329             readOnly = false;
330             if (added != null)
331             {
332                 List JavaDoc queueAdded = (List JavaDoc)added.get(queue);
333                 if (queueAdded != null)
334                 {
335                     return queueAdded.remove(queueAdded.size() - 1);
336                 }
337             }
338             Object JavaDoc o = queue.poll(Long.MAX_VALUE);
339             if (o != null)
340             {
341                 if (removed == null)
342                 {
343                     removed = new HashMap JavaDoc();
344                 }
345                 List JavaDoc queueRemoved = (List JavaDoc)removed.get(queue);
346                 if (queueRemoved == null)
347                 {
348                     queueRemoved = new ArrayList JavaDoc();
349                     removed.put(queue, queueRemoved);
350                 }
351                 queueRemoved.add(o);
352                 o = doLoad(queue, o);
353             }
354             return o;
355         }
356
357         public Object JavaDoc peek(QueueInfo queue) throws IOException JavaDoc, InterruptedException JavaDoc
358         {
359             readOnly = false;
360             if (added != null)
361             {
362                 List JavaDoc queueAdded = (List JavaDoc)added.get(queue);
363                 if (queueAdded != null)
364                 {
365                     return queueAdded.get(queueAdded.size() - 1);
366                 }
367             }
368             Object JavaDoc o = queue.peek();
369             if (o != null)
370             {
371                 o = doLoad(queue, o);
372             }
373             return o;
374         }
375
376         public int size(QueueInfo queue)
377         {
378             int sz = queue.list.size();
379             if (added != null)
380             {
381                 List JavaDoc queueAdded = (List JavaDoc)added.get(queue);
382                 if (queueAdded != null)
383                 {
384                     sz += queueAdded.size();
385                 }
386             }
387             return sz;
388         }
389
390     }
391
392     /**
393      * @return Returns the persistenceStrategy.
394      */

395     public QueuePersistenceStrategy getPersistenceStrategy()
396     {
397         return persistenceStrategy;
398     }
399
400     /**
401      * @param persistenceStrategy The persistenceStrategy to set.
402      */

403     public void setPersistenceStrategy(QueuePersistenceStrategy persistenceStrategy)
404     {
405         if (operationMode != OPERATION_MODE_STOPPED)
406         {
407             throw new IllegalStateException JavaDoc();
408         }
409         this.persistenceStrategy = persistenceStrategy;
410     }
411
412     public QueuePersistenceStrategy getMemoryPersistenceStrategy()
413     {
414         return memoryPersistenceStrategy;
415     }
416
417     public void setMemoryPersistenceStrategy(QueuePersistenceStrategy memoryPersistenceStrategy)
418     {
419         if (operationMode != OPERATION_MODE_STOPPED)
420         {
421             throw new IllegalStateException JavaDoc();
422         }
423         this.memoryPersistenceStrategy = memoryPersistenceStrategy;
424     }
425 }
426
Popular Tags