KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > util > queue > TransactionalQueueSession


1 /*
2  * $Id: TransactionalQueueSession.java 3798 2006-11-04 04:07:14Z aperepel $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.util.queue;
12
13 import org.mule.util.xa.AbstractXAResourceManager;
14 import org.mule.util.xa.DefaultXASession;
15
16 import java.io.IOException JavaDoc;
17
18 /**
19  * A Queue session that is used to manage the transaction context of a Queue
20  *
21  * @author <a HREF="mailto:ross.mason@symphonysoft.com">Ross Mason</a>
22  * @version $Revision: 3798 $
23  */

24 class TransactionalQueueSession extends DefaultXASession implements QueueSession
25 {
26
27     protected TransactionalQueueManager queueManager;
28
29     public TransactionalQueueSession(AbstractXAResourceManager resourceManager,
30                                      TransactionalQueueManager queueManager)
31     {
32         super(resourceManager);
33         this.queueManager = queueManager;
34     }
35
36     /*
37      * (non-Javadoc)
38      *
39      * @see org.mule.transaction.xa.queue.QueueSession#getQueue(java.lang.String)
40      */

41     public Queue getQueue(String JavaDoc name)
42     {
43         QueueInfo queue = queueManager.getQueue(name);
44         return new QueueImpl(queue);
45     }
46
47     protected class QueueImpl implements Queue
48     {
49
50         protected QueueInfo queue;
51
52         public QueueImpl(QueueInfo queue)
53         {
54             this.queue = queue;
55         }
56
57         public void put(Object JavaDoc item) throws InterruptedException JavaDoc
58         {
59             offer(item, Long.MAX_VALUE);
60         }
61
62         public boolean offer(Object JavaDoc item, long timeout) throws InterruptedException JavaDoc
63         {
64             if (localContext != null)
65             {
66                 return ((TransactionalQueueManager.QueueTransactionContext)localContext).offer(queue, item,
67                     timeout);
68             }
69             else
70             {
71                 try
72                 {
73                     Object JavaDoc id = queueManager.doStore(queue, item);
74                     try
75                     {
76                         if (!queue.offer(id, 0, timeout))
77                         {
78                             queueManager.doRemove(queue, item);
79                             return false;
80                         }
81                         else
82                         {
83                             return true;
84                         }
85                     }
86                     catch (InterruptedException JavaDoc e)
87                     {
88                         queueManager.doRemove(queue, item);
89                         throw e;
90                     }
91                 }
92                 catch (IOException JavaDoc e)
93                 {
94                     throw new RuntimeException JavaDoc(e);
95                 }
96             }
97         }
98
99         public Object JavaDoc take() throws InterruptedException JavaDoc
100         {
101             return poll(Long.MAX_VALUE);
102         }
103
104         public Object JavaDoc poll(long timeout) throws InterruptedException JavaDoc
105         {
106             try
107             {
108                 if (localContext != null)
109                 {
110                     return ((TransactionalQueueManager.QueueTransactionContext)localContext).poll(queue,
111                         timeout);
112                 }
113                 else
114                 {
115                     Object JavaDoc id = queue.poll(timeout);
116                     if (id != null)
117                     {
118                         Object JavaDoc item = queueManager.doLoad(queue, id);
119                         queueManager.doRemove(queue, id);
120                         return item;
121                     }
122                     return null;
123                 }
124             }
125             catch (IOException JavaDoc e)
126             {
127                 throw new RuntimeException JavaDoc(e);
128             }
129         }
130
131         public Object JavaDoc peek() throws InterruptedException JavaDoc
132         {
133             try
134             {
135                 if (localContext != null)
136                 {
137                     return ((TransactionalQueueManager.QueueTransactionContext)localContext).peek(queue);
138                 }
139                 else
140                 {
141                     Object JavaDoc id = queue.peek();
142                     if (id != null)
143                     {
144                         Object JavaDoc item = queueManager.doLoad(queue, id);
145                         queueManager.doRemove(queue, id);
146                         return item;
147                     }
148                     return null;
149                 }
150             }
151             catch (IOException JavaDoc e)
152             {
153                 throw new RuntimeException JavaDoc(e);
154             }
155         }
156
157         public int size()
158         {
159             if (localContext != null)
160             {
161                 return ((TransactionalQueueManager.QueueTransactionContext)localContext).size(queue);
162             }
163             else
164             {
165                 return queue.list.size();
166             }
167         }
168
169     }
170 }
171
Popular Tags