KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > petals > component > common > listener > MessageExchangeListener


1 /**
2  * PETALS - PETALS Services Platform.
3  * Copyright (c) 2006 EBM Websourcing, http://www.ebmwebsourcing.com/
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17  *
18  * -------------------------------------------------------------------------
19  * $Id: $
20  * -------------------------------------------------------------------------
21  */

22 package org.objectweb.petals.component.common.listener;
23
24 import java.util.logging.Level JavaDoc;
25 import java.util.logging.Logger JavaDoc;
26
27 import javax.jbi.messaging.DeliveryChannel;
28 import javax.jbi.messaging.ExchangeStatus;
29 import javax.jbi.messaging.MessageExchange;
30 import javax.jbi.messaging.MessagingException;
31
32 /**
33  * This class helps a Component to listen MessageExchange on the DeliveryChannel
34  * (accept() method). It acts as a Listener. When the listener calls the
35  * 'accept()' method, its thread is blocked, but not the object that
36  * instantiated it. The listener calls a 'process(Exchange)' on the
37  * MessageExchangeProcessor, which acts as a handler of MessageExchanges. This
38  * MessageExchangeProcessor has to be provided. <p/
39  * <p>
40  * The listener can be parametrized with a few parameters. See constructors.
41  * </p>
42  *
43  * @version $Revision: $ $Date: $
44  * @since Petals 1.0
45  * @author alouis
46  */

47 public class MessageExchangeListener extends Thread JavaDoc {
48
49     public enum IgnoredStatus {
50         NOTHING_IGNORED, DONE_IGNORED, ERROR_IGNORED, DONE_AND_ERROR_IGNORED
51     };
52
53     protected DeliveryChannel channel;
54
55     protected boolean running = false;
56
57     protected IgnoredStatus ignoredStatus;
58
59     protected long period;
60
61     protected Logger JavaDoc logger;
62
63     protected IMessageExchangeProcessor processor;
64
65     /**
66      * creates a listener on the specified channel. all exchanges are treated.
67      * no interval between two 'accept()';
68      *
69      * @param channel
70      * @param processor
71      */

72     public MessageExchangeListener(DeliveryChannel channel,
73             IMessageExchangeProcessor processor) {
74         this(channel, processor, IgnoredStatus.NOTHING_IGNORED);
75     }
76
77     /**
78      * creates a listener on the specified channel. exchanges are ignored
79      * depending on the specified exchange-status to ignore. no interval between
80      * two 'accept()';
81      *
82      * @param channel
83      * @param processor
84      * @param ignoredStatus
85      */

86     public MessageExchangeListener(DeliveryChannel channel,
87             IMessageExchangeProcessor processor, IgnoredStatus ignoredStatus) {
88         this(channel, processor, IgnoredStatus.NOTHING_IGNORED, 0);
89     }
90
91     /**
92      * creates a listener on the specified channel. exchanges are ignored
93      * depending on the specified exchange-status to ignore. wait for the
94      * specified interval between two 'accept()';
95      *
96      * @param channel
97      * @param processor
98      * @param ignoredStatus
99      * @param period
100      */

101     public MessageExchangeListener(DeliveryChannel channel,
102             IMessageExchangeProcessor processor, IgnoredStatus ignoredStatus,
103             long period) {
104         this.ignoredStatus = ignoredStatus;
105         this.channel = channel;
106         this.period = period;
107         this.processor = processor;
108     }
109     
110     /**
111      * creates a listener on the specified channel. exchanges are ignored
112      * depending on the specified exchange-status to ignore. wait for the
113      * specified interval between two 'accept()';
114      *
115      * @param channel
116      * @param processor
117      * @param ignoredStatus
118      * @param componentName name of the component. Used to define the Thread name
119      * @param period
120      */

121     public MessageExchangeListener(DeliveryChannel channel,
122             IMessageExchangeProcessor processor, IgnoredStatus ignoredStatus,
123             long period,String JavaDoc componentName) {
124         super(componentName+"-JBI listener thread");
125         this.ignoredStatus = ignoredStatus;
126         this.channel = channel;
127         this.period = period;
128         this.processor = processor;
129     }
130
131     /**
132      * Close the listener. This cause the terminaison of the loop on
133      * DeliveryChannel.accept(), and interrupt this listening thread if the
134      * thread is blocked on DeliveryChannel.accept();
135      */

136     public void close() {
137         terminate();
138         interrupt();
139     }
140
141     /**
142      * Start the thread listener ( Thread.start()). The listener waits on
143      * DeliveryChannel.accept(), treats the received MessageExchange, and waits
144      * again on DeliveryChannel.accept().
145      */

146     public void listen() {
147         start();
148     }
149
150     /**
151      * loop on accept
152      */

153     public void run() {
154         running = true;
155
156         while (running) {
157             MessageExchange exchange = null;
158
159             // receive a message
160
try {
161                 exchange = channel.accept();
162             } catch (MessagingException e) {
163                 logError(e);
164             }
165
166             // process the message
167
if (!isIgnored(exchange)) {
168                 process(exchange);
169             }
170
171             // wait before next accept
172
if (period > 0) {
173                 try {
174                     wait(period);
175                 } catch (InterruptedException JavaDoc e) {
176                     // nothing
177
}
178             }
179         }
180     }
181
182     public void setLogger(Logger JavaDoc logger) {
183         this.logger = logger;
184     }
185
186     /**
187      * Terminate the listening. On the next MessageExchange treated after the
188      * DeliveryChannel.accept(), the listener will stop waiting on
189      * DeliveryChannel.accept().
190      */

191     public void terminate() {
192         running = false;
193     }
194
195     /**
196      * depending on the IgnoredStatus, tells if the exchange has to be ignored.
197      * If the exchange is null, return true.
198      *
199      * @param exchange
200      * @return
201      */

202     protected boolean isIgnored(MessageExchange exchange) {
203         boolean result = false;
204
205         if (exchange != null) {
206             ExchangeStatus status = exchange.getStatus();
207             if (ExchangeStatus.DONE.equals(status)) {
208                 result = IgnoredStatus.DONE_IGNORED == ignoredStatus
209                         || IgnoredStatus.DONE_AND_ERROR_IGNORED == ignoredStatus;
210             } else if (ExchangeStatus.ERROR.equals(status)) {
211                 result = IgnoredStatus.ERROR_IGNORED == ignoredStatus
212                         || IgnoredStatus.DONE_AND_ERROR_IGNORED == ignoredStatus;
213             }
214         } else {
215             result = true;
216         }
217         return result;
218     }
219
220     protected void logError(Throwable JavaDoc e) {
221         if (logger != null)
222             logger.log(Level.SEVERE, e.getMessage(), e);
223         else
224             e.printStackTrace();
225     }
226
227     /**
228      * call the processor's process() method
229      *
230      * @param exchange
231      */

232     protected void process(MessageExchange exchange) {
233         if (processor != null)
234             try {
235                 processor.process(exchange);
236             } catch (Exception JavaDoc e) {
237                 logError(e);
238             }
239     }
240 }
241
Popular Tags