KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > providers > TransactedPollingMessageReceiver


1 /*
2  * $Id: TransactedPollingMessageReceiver.java 3798 2006-11-04 04:07:14Z aperepel $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.providers;
12
13 import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
14
15 import javax.resource.spi.work.Work JavaDoc;
16
17 import java.util.Iterator JavaDoc;
18 import java.util.List JavaDoc;
19
20 import org.mule.config.ThreadingProfile;
21 import org.mule.transaction.TransactionCallback;
22 import org.mule.transaction.TransactionTemplate;
23 import org.mule.umo.UMOComponent;
24 import org.mule.umo.UMOException;
25 import org.mule.umo.endpoint.UMOEndpoint;
26 import org.mule.umo.lifecycle.InitialisationException;
27 import org.mule.umo.provider.UMOConnector;
28
29 /**
30  * The TransactedPollingMessageReceiver is an abstract receiver that handles polling
31  * and transaction management. Derived implementations of these class must be thread
32  * safe as several threads can be started at once for an improveded throuput.
33  *
34  * @author <a HREF=mailto:gnt@codehaus.org">Guillaume Nodet</a>
35  * @author <a HREF="mailto:ross.mason@symphonysoft.com">Ross Mason</a>
36  * @version $Revision: 3798 $
37  */

38 public abstract class TransactedPollingMessageReceiver extends PollingMessageReceiver
39 {
40     /** determines whether messages will be received in a transaction template */
41     protected boolean receiveMessagesInTransaction = true;
42
43     /** determines whether Multiple receivers are created to improve throughput */
44     protected boolean useMultipleReceivers = true;
45
46     public TransactedPollingMessageReceiver(UMOConnector connector,
47                                             UMOComponent component,
48                                             final UMOEndpoint endpoint,
49                                             Long JavaDoc frequency) throws InitialisationException
50     {
51         super(connector, component, endpoint, frequency);
52
53         if (endpoint.getTransactionConfig().getFactory() != null)
54         {
55             receiveMessagesInTransaction = true;
56         }
57         else
58         {
59             receiveMessagesInTransaction = false;
60         }
61     }
62
63     public void doStart() throws UMOException
64     {
65         // Connector property overrides any implied value
66
useMultipleReceivers = connector.isCreateMultipleTransactedReceivers();
67         ThreadingProfile tp = connector.getReceiverThreadingProfile();
68         if (useMultipleReceivers && receiveMessagesInTransaction && tp.isDoThreading())
69         {
70             for (int i = 0; i < tp.getMaxThreadsActive(); i++)
71             {
72                 super.doStart();
73             }
74         }
75         else
76         {
77             super.doStart();
78         }
79     }
80
81     public void poll() throws Exception JavaDoc
82     {
83         TransactionTemplate tt = new TransactionTemplate(endpoint.getTransactionConfig(),
84             connector.getExceptionListener());
85         if (receiveMessagesInTransaction)
86         {
87             // Receive messages and process them in a single transaction
88
// Do not enable threading here, but serveral workers
89
// may have been started
90
TransactionCallback cb = new TransactionCallback()
91             {
92                 public Object JavaDoc doInTransaction() throws Exception JavaDoc
93                 {
94                     List JavaDoc messages = getMessages();
95                     if (messages != null && messages.size() > 0)
96                     {
97                         for (Iterator JavaDoc it = messages.iterator(); it.hasNext();)
98                         {
99                             Object JavaDoc message = it.next();
100                             if (logger.isTraceEnabled())
101                             {
102                                 logger.trace("Received Message: " + message);
103                             }
104                             processMessage(message);
105                         }
106                     }
107                     return null;
108                 }
109             };
110             tt.execute(cb);
111         }
112         else
113         {
114             // Receive messages and launch a worker thread
115
// for each message
116
List JavaDoc messages = getMessages();
117             if (messages != null && messages.size() > 0)
118             {
119                 final CountDownLatch countdown = new CountDownLatch(messages.size());
120                 for (Iterator JavaDoc it = messages.iterator(); it.hasNext();)
121                 {
122                     final Object JavaDoc message = it.next();
123                     if (logger.isTraceEnabled())
124                     {
125                         logger.trace("Received Message: " + message);
126                     }
127                     try
128                     {
129                         getWorkManager().scheduleWork(new MessageProcessorWorker(tt, countdown, message));
130                     }
131                     catch (Exception JavaDoc e)
132                     {
133                         countdown.countDown();
134                         throw e;
135                     }
136                 }
137                 countdown.await();
138             }
139         }
140     }
141
142     protected class MessageProcessorWorker implements Work JavaDoc, TransactionCallback
143     {
144         private TransactionTemplate tt;
145         private Object JavaDoc message;
146         private CountDownLatch latch;
147
148         public MessageProcessorWorker(TransactionTemplate tt, CountDownLatch latch, Object JavaDoc message)
149         {
150             this.tt = tt;
151             this.message = message;
152             this.latch = latch;
153         }
154
155         public void release()
156         {
157             // nothing to do
158
}
159
160         public void run()
161         {
162             try
163             {
164                 tt.execute(this);
165             }
166             catch (Exception JavaDoc e)
167             {
168                 handleException(e);
169             }
170             finally
171             {
172                 latch.countDown();
173             }
174         }
175
176         public Object JavaDoc doInTransaction() throws Exception JavaDoc
177         {
178             processMessage(message);
179             return null;
180         }
181
182     }
183
184     protected abstract List JavaDoc getMessages() throws Exception JavaDoc;
185
186     protected abstract void processMessage(Object JavaDoc message) throws Exception JavaDoc;
187
188 }
189
Popular Tags