KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > axis > ime > internal > MessageExchangeImpl


1 /*
2  * Copyright 2001-2004 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16
17 package org.apache.axis.ime.internal;
18
19 import org.apache.axis.AxisFault;
20 import org.apache.axis.MessageContext;
21 import org.apache.axis.components.logger.LogFactory;
22 import org.apache.axis.components.uuid.UUIDGenFactory;
23 import org.apache.axis.ime.MessageExchange;
24 import org.apache.axis.ime.MessageExchangeConstants;
25 import org.apache.axis.ime.MessageExchangeCorrelator;
26 import org.apache.axis.ime.MessageExchangeEvent;
27 import org.apache.axis.ime.MessageExchangeEventListener;
28 import org.apache.axis.ime.MessageExchangeLifecycle;
29 import org.apache.axis.ime.event.MessageFaultEvent;
30 import org.apache.axis.ime.event.MessageReceiveEvent;
31 import org.apache.commons.logging.Log;
32
33 import java.util.Hashtable JavaDoc;
34
35 /**
36  * @author James M Snell (jasnell@us.ibm.com)
37  * @author Ray Chun (rchun@sonicsoftware.com)
38  */

39 public class MessageExchangeImpl
40         implements MessageExchange, MessageExchangeLifecycle {
41
42     protected static Log log =
43         LogFactory.getLog(MessageExchangeImpl.class.getName());
44
45     public static final long NO_TIMEOUT = -1;
46     public static final long DEFAULT_TIMEOUT = 1000 * 30;
47
48     private MessageExchangeEventListener eventListener;
49     private MessageExchangeProvider provider;
50     protected Holder holder;
51
52     public MessageExchangeImpl(
53             MessageExchangeProvider provider) {
54     }
55
56     /**
57      * @see org.apache.axis.ime.MessageExchange#send(MessageContext)
58      */

59     public MessageExchangeCorrelator send(
60             MessageContext context)
61             throws AxisFault {
62         return send(context,null);
63     }
64
65     /**
66      * @see org.apache.axis.ime.MessageExchange#send(MessageContext)
67      */

68     public MessageExchangeCorrelator send(
69             MessageContext context,
70             MessageExchangeEventListener listener)
71             throws AxisFault {
72         if (log.isDebugEnabled()) {
73             log.debug("Enter: MessageExchangeImpl::send");
74         }
75         MessageExchangeCorrelator correlator =
76                 (MessageExchangeCorrelator) context.getProperty(
77                         MessageExchangeConstants.MESSAGE_CORRELATOR_PROPERTY);
78         if (correlator == null) {
79             correlator = new SimpleMessageExchangeCorrelator(
80                     UUIDGenFactory.getUUIDGen().nextUUID());
81             context.setProperty(
82                     MessageExchangeConstants.MESSAGE_CORRELATOR_PROPERTY,
83                     correlator);
84         }
85         MessageExchangeSendContext sendContext =
86             MessageExchangeSendContext.newInstance(
87                 correlator,
88                 context,
89                 listener);
90         if (listener != null) {
91             provider.processReceive(sendContext);
92         }
93         provider.processSend(sendContext);
94         if (log.isDebugEnabled()) {
95             log.debug("Exit: MessageExchangeImpl::send");
96         }
97         return correlator;
98     }
99
100     /**
101      * @see org.apache.axis.ime.MessageExchange#receive()
102      */

103     public MessageContext receive()
104             throws AxisFault {
105         return receive(null,NO_TIMEOUT);
106     }
107
108     /**
109      * @see org.apache.axis.ime.MessageExchange#receive(long)
110      */

111     public MessageContext receive(
112             long timeout)
113             throws AxisFault {
114         return receive(null,timeout);
115     }
116
117     /**
118      * @see org.apache.axis.ime.MessageExchange#receive(MessageExchangeCorrelator)
119      */

120     public MessageContext receive(
121             MessageExchangeCorrelator correlator)
122             throws AxisFault {
123         return receive(correlator,NO_TIMEOUT);
124     }
125
126     /**
127      * @see org.apache.axis.ime.MessageExchange#receive(MessageExchangeCorrelator,long)
128      */

129     public MessageContext receive(
130             MessageExchangeCorrelator correlator,
131             long timeout)
132             throws AxisFault {
133         if (log.isDebugEnabled()) {
134             log.debug("Enter: MessageExchangeImpl::receive");
135         }
136         holder = new Holder();
137         MessageExchangeEventListener oldListener =
138           getMessageExchangeEventListener();
139         Listener listener = new Listener(holder);
140         setMessageExchangeEventListener(listener);
141         try {
142             this.receive(correlator,listener);
143             if (timeout != NO_TIMEOUT)
144               holder.waitForNotify(timeout);
145             else
146               holder.waitForNotify();
147         } catch (InterruptedException JavaDoc ie) {
148             throw AxisFault.makeFault(ie);
149         } finally {
150           setMessageExchangeEventListener(oldListener);
151         }
152         if (log.isDebugEnabled()) {
153             log.debug("Exit: MessageExchangeImpl::receive");
154         }
155         if (holder.context != null) {
156             return holder.context;
157         }
158         if (holder.exception != null) {
159             throw AxisFault.makeFault((Exception JavaDoc) holder.exception);
160         }
161         return null;
162     }
163
164     /**
165      * @see org.apache.axis.ime.MessageExchange#receive(MessageContextListener)
166      */

167     public void receive(
168             MessageExchangeEventListener listener)
169             throws AxisFault {
170         receive(null,listener);
171     }
172
173     /**
174      * @see org.apache.axis.ime.MessageExchange#receive(MessageExchangeCorrelator,MessageContextListener)
175      */

176     public void receive(
177             MessageExchangeCorrelator correlator,
178             MessageExchangeEventListener listener)
179             throws AxisFault {
180         if (log.isDebugEnabled()) {
181             log.debug("Enter: MessageExchangeImpl::receive");
182         }
183         provider.processReceive(
184             MessageExchangeReceiveContext.newInstance(
185                 correlator,
186                 listener));
187         if (log.isDebugEnabled()) {
188             log.debug("Exit: MessageExchangeImpl::receive");
189         }
190
191     }
192
193     /**
194      * @see org.apache.axis.ime.MessageExchange#sendAndReceive(MessageContext)
195      */

196     public MessageContext sendAndReceive(
197             MessageContext context)
198             throws AxisFault {
199         return sendAndReceive(context,NO_TIMEOUT);
200     }
201
202     /**
203      * @see org.apache.axis.ime.MessageExchange#sendAndReceive(MessageContext,long)
204      */

205     public MessageContext sendAndReceive(
206             MessageContext context,
207             long timeout)
208             throws AxisFault {
209         if (log.isDebugEnabled()) {
210             log.debug("Enter: MessageExchangeImpl::sendAndReceive");
211         }
212         holder = new Holder();
213         MessageExchangeEventListener oldListener =
214           getMessageExchangeEventListener();
215         Listener listener = new Listener(holder);
216         setMessageExchangeEventListener(listener);
217         try {
218             this.send(context,listener);
219             if (timeout != NO_TIMEOUT)
220               holder.waitForNotify(timeout);
221             else
222               holder.waitForNotify();
223         } catch (InterruptedException JavaDoc ie) {
224             throw AxisFault.makeFault(ie);
225         } finally {
226           setMessageExchangeEventListener(oldListener);
227         }
228         if (log.isDebugEnabled()) {
229             log.debug("Exit: MessageExchangeImpl::sendAndReceive");
230         }
231         if (holder.context != null) {
232             return holder.context;
233         }
234         if (holder.exception != null) {
235             throw AxisFault.makeFault((Exception JavaDoc) holder.exception);
236         }
237         return null;
238     }
239
240     /**
241      * see org.apache.axis.ime.MessageExchange#setMessageExchangeFaultListener(MessageExchangeFaultListener)
242      */

243     public synchronized void setMessageExchangeEventListener(
244             MessageExchangeEventListener listener) {
245         this.eventListener = listener;
246     }
247
248     /**
249      * see org.apache.axis.ime.MessageExchange#getMessageExchangeStatusListener()
250      */

251     public synchronized MessageExchangeEventListener getMessageExchangeEventListener() {
252         return this.eventListener;
253     }
254
255     /**
256      * Unsupported for now
257      * @see org.apache.axis.ime.MessageExchange@setOption(String,Object)
258      */

259     public void setOption(
260             String JavaDoc OptionId,
261             Object JavaDoc OptionValue) {
262         provider.setOption(OptionId, OptionValue);
263     }
264
265     /**
266      * Unsupported for now
267      * @see org.apache.axis.ime.MessageExchange@getOption(String)
268      */

269     public Object JavaDoc getOption(
270             String JavaDoc OptionId) {
271         return provider.getOption(OptionId);
272     }
273
274     /**
275      * Unsupported for now
276      * @see org.apache.axis.ime.MessageExchange@getOption(String,Object)
277      */

278     public Object JavaDoc getOption(
279             String JavaDoc OptionId,
280             Object JavaDoc defaultValue) {
281         return provider.getOption(OptionId, defaultValue);
282     }
283
284     /**
285      * Unsupported for now
286      * @see org.apache.axis.ime.MessageExchange@getProperties()
287      */

288     public Hashtable JavaDoc getOptions() {
289         return provider.getOptions();
290     }
291
292     /**
293      * Unsupported for now
294      * @see org.apache.axis.ime.MessageExchange@setProperties(java.lang.Hashtable)
295      */

296     public void setOptions(Hashtable JavaDoc options) {
297         provider.setOptions(options);
298     }
299
300     /**
301      * Unsupported for now
302      * @see org.apache.axis.ime.MessageExchange@clearProperties()
303      */

304     public void clearOptions() {
305         provider.clearOptions();
306     }
307     
308     
309
310   // -- Utility Classes --- //
311

312     private class Holder {
313         private MessageExchangeCorrelator correlator;
314         private MessageContext context;
315         private Throwable JavaDoc exception;
316         private boolean done = false;
317
318         public synchronized void set(
319                 MessageExchangeCorrelator correlator,
320                 MessageContext context) {
321             this.correlator = correlator;
322             this.context = context;
323             done = true;
324             notifyAll();
325         }
326
327         public synchronized void set(
328                 MessageExchangeCorrelator correlator,
329                 Throwable JavaDoc throwable) {
330             this.correlator = correlator;
331             this.exception = throwable;
332             done = true;
333             notifyAll();
334         }
335
336         public synchronized void waitForNotify()
337                 throws InterruptedException JavaDoc {
338             if (!done) wait();
339             return;
340         }
341
342         public synchronized void waitForNotify(long timeout)
343                 throws InterruptedException JavaDoc {
344             if (!done) wait(timeout);
345             return;
346         }
347
348     }
349
350     public class Listener
351             implements MessageExchangeEventListener {
352
353         protected Holder holder;
354
355         public Listener(Holder holder) {
356             this.holder = holder;
357         }
358
359         /**
360          * @see org.apache.axis.ime.MessageExchangeReceiveListener#onReceive(MessageExchangeCorrelator, MessageContext)
361          */

362         public void onEvent(
363                 MessageExchangeEvent event) {
364             if (event instanceof MessageReceiveEvent) {
365                 MessageReceiveEvent receiveEvent = (MessageReceiveEvent)event;
366                 holder.set(
367                         receiveEvent.getMessageExchangeCorrelator(),
368                         receiveEvent.getMessageContext());
369             }
370             else if (event instanceof MessageFaultEvent) {
371                 MessageFaultEvent faultEvent = (MessageFaultEvent)event;
372                 holder.set(faultEvent.getMessageExchangeCorrelator(), faultEvent.getException());
373         }
374         }
375     }
376
377
378
379   // -- MessageExchangeLifecycle Implementation --- //
380

381     /**
382      * @see org.apache.axis.ime.MessageExchangeLifecycle#awaitShutdown()
383      */

384     public void awaitShutdown()
385             throws InterruptedException JavaDoc {
386         if (log.isDebugEnabled()) {
387             log.debug("Enter: MessageExchangeImpl::awaitShutdown");
388         }
389         provider.awaitShutdown();
390         if (log.isDebugEnabled()) {
391             log.debug("Exit: MessageExchangeImpl::awaitShutdown");
392         }
393     }
394
395     /**
396      * @see org.apache.axis.ime.MessageExchangeLifecycle#cleanup()
397      */

398     public void cleanup()
399             throws InterruptedException JavaDoc {
400         if (log.isDebugEnabled()) {
401             log.debug("Enter: MessageExchangeImpl::cleanup");
402         }
403         provider.cleanup();
404         if (log.isDebugEnabled()) {
405             log.debug("Exit: MessageExchangeImpl::cleanup");
406         }
407     }
408
409     /**
410      * @see org.apache.axis.ime.MessageExchangeLifecycle#awaitShutdown(long)
411      */

412     public void awaitShutdown(long timeout)
413             throws InterruptedException JavaDoc {
414         if (log.isDebugEnabled()) {
415             log.debug("Enter: MessageExchangeImpl::awaitShutdown");
416         }
417         provider.awaitShutdown(timeout);
418         if (log.isDebugEnabled()) {
419             log.debug("Exit: MessageExchangeImpl::awaitShutdown");
420         }
421     }
422
423     /**
424      * @see org.apache.axis.ime.MessageExchangeLifecycle#init()
425      */

426     public void init() {
427         if (log.isDebugEnabled()) {
428             log.debug("Enter: MessageExchangeImpl::init");
429         }
430         provider.init();
431         if (log.isDebugEnabled()) {
432             log.debug("Exit: MessageExchangeImpl::init");
433         }
434     }
435
436     /**
437      * @see org.apache.axis.ime.MessageExchangeLifecycle#shutdown()
438      */

439     public void shutdown() {
440         if (log.isDebugEnabled()) {
441             log.debug("Enter: MessageExchangeImpl::shutdown");
442         }
443         provider.shutdown();
444         if (log.isDebugEnabled()) {
445             log.debug("Exit: MessageExchangeImpl::shutdown");
446         }
447     }
448
449     /**
450      * @see org.apache.axis.ime.MessageExchangeLifecycle#shutdown(boolean)
451      */

452     public void shutdown(boolean force) {
453         if (log.isDebugEnabled()) {
454             log.debug("Enter: MessageExchangeImpl::shutdown");
455         }
456         provider.shutdown(force);
457         if (log.isDebugEnabled()) {
458             log.debug("Exit: MessageExchangeImpl::shutdown");
459         }
460     }
461
462 }
463
Popular Tags