KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > wsif > util > jms > WSIFJMSDestination


1 /*
2  * The Apache Software License, Version 1.1
3  *
4  *
5  * Copyright (c) 2002 The Apache Software Foundation. All rights
6  * reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  * notice, this list of conditions and the following disclaimer.
14  *
15  * 2. Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  *
20  * 3. The end-user documentation included with the redistribution,
21  * if any, must include the following acknowledgment:
22  * "This product includes software developed by the
23  * Apache Software Foundation (http://www.apache.org/)."
24  * Alternately, this acknowledgment may appear in the software itself,
25  * if and wherever such third-party acknowledgments normally appear.
26  *
27  * 4. The names "WSIF" and "Apache Software Foundation" must
28  * not be used to endorse or promote products derived from this
29  * software without prior written permission. For written
30  * permission, please contact apache@apache.org.
31  *
32  * 5. Products derived from this software may not be called "Apache",
33  * nor may "Apache" appear in their name, without prior written
34  * permission of the Apache Software Foundation.
35  *
36  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
37  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
38  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
39  * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
40  * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
41  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
42  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
43  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
44  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
45  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
46  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
47  * SUCH DAMAGE.
48  * ====================================================================
49  *
50  * This software consists of voluntary contributions made by many
51  * individuals on behalf of the Apache Software Foundation and was
52  * originally based on software copyright (c) 2001, 2002, International
53  * Business Machines, Inc., http://www.apache.org. For more
54  * information on the Apache Software Foundation, please see
55  * <http://www.apache.org/>.
56  */

57
58 package org.apache.wsif.util.jms;
59
60 import java.io.Serializable JavaDoc;
61 import java.util.HashMap JavaDoc;
62
63 import javax.jms.Destination JavaDoc;
64 import javax.jms.JMSException JavaDoc;
65 import javax.jms.Message JavaDoc;
66 import javax.jms.ObjectMessage JavaDoc;
67 import javax.jms.Queue JavaDoc;
68 import javax.jms.QueueConnection JavaDoc;
69 import javax.jms.QueueReceiver JavaDoc;
70 import javax.jms.QueueSender JavaDoc;
71 import javax.jms.QueueSession JavaDoc;
72 import javax.jms.Session JavaDoc;
73 import javax.jms.TextMessage JavaDoc;
74 import org.apache.wsif.WSIFException;
75 import org.apache.wsif.logging.Trc;
76
77 /**
78  * A WSIFJMSDestination is a pair of queues, one that read from and
79  * the other that is written to. This class provides various methods
80  * for different flavours of reading and writing messages to those
81  * queues. This class hides the JMS interface.
82  *
83  * @author Mark Whitlock <whitlock@apache.org>
84  */

85 public class WSIFJMSDestination {
86     protected WSIFJMSFinder finder;
87     protected QueueConnection JavaDoc connection = null;
88     protected QueueSession JavaDoc session = null;
89     protected Queue JavaDoc readQ = null;
90     protected Queue JavaDoc writeQ = null;
91     protected QueueSender JavaDoc sender = null;
92
93     protected boolean asyncMode = false;
94     protected Queue JavaDoc syncTempQueue = null;
95
96     protected WSIFJMSProperties inProps;
97     protected WSIFJMSProperties outProps;
98     protected Message JavaDoc lastMessage = null;
99     protected long timeout;
100     protected String JavaDoc replyToName = null;
101
102     /**
103      * Public constructor.
104      * @param finder used to find JMS objects.
105      */

106     public WSIFJMSDestination(WSIFJMSFinder finder) throws WSIFException {
107         this(finder, WSIFJMSConstants.WAIT_FOREVER);
108         Trc.entry(this, finder);
109         Trc.exit();
110     }
111
112     /**
113      * Public constructor.
114      * @param finder used to find JMS objects.
115      * @param timeout is the maximum time to wait on a synchronous receive
116      */

117     public WSIFJMSDestination(WSIFJMSFinder finder, long timeout)
118         throws WSIFException {
119         this(finder, null, timeout);
120         Trc.entry(this, finder, new Long JavaDoc(timeout));
121         Trc.exit();
122     }
123
124     /**
125      * Public constructor.
126      * @param finder used to find JMS objects.
127      * @param altdestName is an alterative JMS provider destination name
128      * @param timeout is the maximum time to wait on a synchronous receive
129      */

130     public WSIFJMSDestination(
131         WSIFJMSFinder finder,
132         String JavaDoc altDestName,
133         long timeout)
134         throws WSIFException {
135         Trc.entry(this, finder, altDestName, new Long JavaDoc(timeout));
136
137         inProps = new WSIFJMSProperties(WSIFJMSProperties.IN);
138         outProps = new WSIFJMSProperties(WSIFJMSProperties.OUT);
139         this.timeout = timeout;
140         this.finder = finder;
141
142         try {
143             connection = finder.getFactory().createQueueConnection();
144             session =
145                 connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
146
147             Destination JavaDoc initDest = finder.getInitialDestination();
148             if (initDest != null && altDestName != null)
149                 throw new WSIFException("Both jndiDestinationName and jmsproviderDestinationName cannot be specified");
150             if (initDest == null && altDestName == null)
151                 throw new WSIFException("Either jndiDestinationName or jmsproviderDestinationName must be specified");
152
153             if (altDestName != null)
154                 initDest = session.createQueue(altDestName);
155
156             writeQ = (Queue JavaDoc) initDest;
157             readQ = null;
158
159             connection.start();
160
161         } catch (JMSException JavaDoc je) {
162             Trc.exception(je);
163             throw WSIFJMSConstants.ToWsifException(je);
164         }
165         if (Trc.ON)
166             Trc.exit(deep());
167     }
168
169     /**
170      * Close all objects.
171      */

172     public void close() throws WSIFException {
173         Trc.entry(this);
174         try {
175             QueueSender JavaDoc sndr = sender;
176             QueueSession JavaDoc sssn = session;
177             QueueConnection JavaDoc cnnctn = connection;
178
179             sender = null; // Ensure these are nulled (flagging the close()),
180
session = null; // even if a close() throws a JMSException
181
connection = null;
182
183             if (sndr != null)
184                 sndr.close();
185             if (sssn != null)
186                 sssn.close();
187             if (cnnctn != null)
188                 cnnctn.close();
189         } catch (JMSException JavaDoc je) {
190             Trc.exception(je);
191             throw WSIFJMSConstants.ToWsifException(je);
192         }
193         Trc.exit();
194     }
195
196     /**
197      * close the destination at finalize.
198      */

199     public void finalize() throws WSIFException {
200         Trc.entry(this);
201         close();
202         Trc.exit();
203     }
204
205     /**
206      * Send a message to the write queue
207      * @param data is the message
208      * @return the id of the message that was sent.
209      */

210     public String JavaDoc send(String JavaDoc data) throws WSIFException {
211         Trc.entry(this, data);
212         String JavaDoc s = send(data, null);
213         Trc.exit(s);
214         return s;
215     }
216
217     /**
218      * Send a message to the write queue
219      * @param data is the message
220      * @param id is the correlation id to set on the message
221      * @return the id of the message that was sent.
222      */

223     public String JavaDoc send(String JavaDoc data, String JavaDoc id) throws WSIFException {
224         Trc.entry(this, data, id);
225         areWeClosed();
226         try {
227             TextMessage JavaDoc msg = session.createTextMessage();
228             msg.setText(data);
229             String JavaDoc s = send(msg, id, true);
230             Trc.exit(s);
231             return s;
232         } catch (JMSException JavaDoc je) {
233             Trc.exception(je);
234             throw WSIFJMSConstants.ToWsifException(je);
235         }
236     }
237
238     /**
239      * Send a message to the write queue
240      * @param data is the message
241      * @return the id of the message that was sent.
242      */

243     public String JavaDoc send(Serializable JavaDoc data) throws WSIFException {
244         Trc.entry(this, data);
245         String JavaDoc s = send(data, null);
246         Trc.exit(s);
247         return s;
248     }
249
250     /**
251      * Send a message to the write queue
252      * @param data is the message
253      * @param id is the correlation id to set on the message
254      * @return the id of the message that was sent.
255      */

256     public String JavaDoc send(Serializable JavaDoc data, String JavaDoc id) throws WSIFException {
257         Trc.entry(this, data, id);
258         areWeClosed();
259
260         try {
261             ObjectMessage JavaDoc msg = session.createObjectMessage();
262             msg.setObject(data);
263             String JavaDoc s = send(msg, id, true);
264             Trc.exit(s);
265             return s;
266         } catch (JMSException JavaDoc je) {
267             Trc.exception(je);
268             throw WSIFJMSConstants.ToWsifException(je);
269         }
270     }
271
272     /**
273      * Sends a message to the write queue.
274      * @param message
275      * @param id Correlation id
276      * @param setReplyTo If true JMSReplyTo is always set. If false JMSReplyTo
277      * is only set if the ReplyTo was explicitly set as a
278      * property.
279      */

280     public String JavaDoc send(Message JavaDoc msg, String JavaDoc id, boolean setReplyTo)
281         throws WSIFException {
282
283         Trc.entry(this, msg, id);
284         areWeClosed();
285
286         String JavaDoc msgId = null;
287         boolean propsSet = true;
288
289         try {
290             if (sender == null)
291                 sender = session.createSender(writeQ);
292
293             // Process replyTo queues separately since they are not
294
// ordinary JMS properties.
295
if (inProps.containsKey(WSIFJMSConstants.REPLY_TO)) {
296                 String JavaDoc rto = (String JavaDoc) inProps.get(WSIFJMSConstants.REPLY_TO);
297                 setReplyToQueue(rto);
298                 inProps.remove(WSIFJMSConstants.REPLY_TO);
299                 msg.setJMSReplyTo(readQ);
300             } else if (setReplyTo) {
301                 setReplyToQueue();
302                 msg.setJMSReplyTo(readQ);
303             }
304
305             if (id != null)
306                 msg.setJMSCorrelationID(id);
307             inProps.set(sender, msg);
308
309             sender.send(msg);
310             msgId = msg.getJMSMessageID();
311
312         } catch (JMSException JavaDoc je) {
313             Trc.exception(je);
314             throw WSIFJMSConstants.ToWsifException(je);
315         } finally {
316             // If properties were set, trash the sender so
317
// we get the default props next time.
318
if (propsSet)
319                 sender = null;
320             inProps.clear();
321         }
322
323         Trc.exit(msgId);
324         return msgId;
325     }
326
327     /**
328      * Blocking receive for the wsif.syncrequest.timeout
329      * @return the received message
330      */

331     public String JavaDoc receive() throws WSIFException {
332         Trc.entry(this);
333         String JavaDoc s = receiveString(null);
334         Trc.exit(s);
335         return s;
336     }
337
338     /**
339      * Blocking receive for the wsif.syncrequest.timeout
340      * @return the received message
341      */

342     public String JavaDoc receiveString(String JavaDoc id) throws WSIFException {
343         Trc.entry(this, id);
344         String JavaDoc s = receiveString( id, timeout );
345         Trc.exit(s);
346         return s;
347     }
348
349     /**
350      * Blocking receive waits for the specified timeout
351      * @return the received message
352      */

353     public String JavaDoc receiveString(String JavaDoc id, long timeout) throws WSIFException {
354         Trc.entry(this, id);
355         Message JavaDoc msg = receive(id, timeout);
356         String JavaDoc s = null;
357         try {
358             if (msg instanceof TextMessage JavaDoc)
359                 s = ((TextMessage JavaDoc) msg).getText();
360             else
361                 throw new WSIFException(
362                     "Reply message was not a TextMessage:msg="
363                         + (msg == null ? "null" : msg.toString()));
364         } catch (JMSException JavaDoc e) {
365             Trc.exception(e);
366             throw WSIFJMSConstants.ToWsifException(e);
367         }
368         Trc.exit(s);
369         return s;
370     }
371
372     /**
373      * Blocking receive waits for a message for the wsif.syncrequest.timeout
374      * @param id is the correlation id that the received message must have
375      * @return the received message
376      */

377     public Message JavaDoc receive(String JavaDoc id) throws WSIFException {
378         Trc.entry(this, id);
379         Message JavaDoc msg = receive(id, timeout);
380         Trc.exit(msg);
381         return msg;
382     }
383
384     /**
385      * Blocking receive waits for a message for the specified timeout
386      * @param id is the correlation id that the received message must have
387      * @param timeout how long in milliseconds to wait
388      * @return the received message
389      */

390     public Message JavaDoc receive(String JavaDoc id, long timeout) throws WSIFException {
391         Trc.entry(this, id);
392         areWeClosed();
393         QueueReceiver JavaDoc rec = null;
394         Message JavaDoc msg = null;
395
396         try {
397             if (id != null)
398                 rec =
399                     session.createReceiver(
400                         readQ,
401                         WSIFJMSConstants.JMS_CORRELATION_ID + "='" + id + "'");
402             else
403                 rec = session.createReceiver(readQ);
404
405             msg = rec.receive(timeout);
406             setLastMessage(msg);
407
408             if (msg == null)
409                 throw new WSIFException(
410                     "Receive timed out on JMS queue "
411                         + readQ.getQueueName()
412                         + ", timeout "
413                         + timeout);
414         } catch (JMSException JavaDoc e) {
415             Trc.exception(e);
416             throw WSIFJMSConstants.ToWsifException(e);
417         } finally {
418             try {
419                 if (rec != null)
420                     rec.close();
421             } catch (Exception JavaDoc ignored) {
422                 Trc.ignoredException(ignored);
423             }
424         }
425
426         Trc.exit(msg);
427         return msg;
428     }
429
430     /**
431      * Set the replyTo queue to a temporary queue.
432      */

433     public void setReplyToQueue() throws WSIFException {
434         Trc.entry(this);
435         areWeClosed();
436
437         Queue JavaDoc tmp;
438         try {
439             if (syncTempQueue == null)
440                 syncTempQueue = session.createTemporaryQueue();
441         } catch (JMSException JavaDoc je) {
442             Trc.exception(je);
443             throw WSIFJMSConstants.ToWsifException(je);
444         }
445
446         // So we don't overwrite readQ if there was an error.
447
readQ = syncTempQueue;
448         replyToName = null;
449         Trc.exit();
450     }
451
452     /**
453      * Set the replyTo queue.
454      * @param replyTo queue name.
455      */

456     public void setReplyToQueue(String JavaDoc replyTo) throws WSIFException {
457         Trc.entry(this, replyTo);
458         areWeClosed();
459
460         if (replyTo == null || replyTo.length() == 0) {
461             setReplyToQueue();
462             Trc.exit();
463             return;
464         }
465
466         // If we're already using this queue, then reuse it.
467
if (replyTo.equals(replyToName)) {
468             Trc.exit();
469             return;
470         }
471
472         readQ = finder.findQueue(replyTo);
473
474         replyToName = replyTo;
475         Trc.exit();
476     }
477
478     /**
479      * Sets if this destination is to be used for asynchronous requests.
480      * If this destination is to be used for asynchronous requests then a
481      * WSIFJMSAsyncListener will be created to listen for the async responses.
482      *
483      * @param b true if this destination is to be used for asynchronous requests,
484      * otherwise false.
485      */

486     public void setAsyncMode(boolean b) throws WSIFException {
487         Trc.entry(this, b);
488         areWeClosed();
489         if (asyncMode != b) {
490             asyncMode = b;
491         }
492         Trc.exit();
493     }
494
495     /**
496      * Sets a JMS property to a value. This property value will be only be used for
497      * the next message that is sent, then the property will be reset.
498      */

499     public void setProperty(String JavaDoc name, Object JavaDoc value) throws WSIFException {
500         Trc.entry(this, name, value);
501         if (name != null && value != null)
502             inProps.put(name, value);
503         Trc.exit();
504     }
505
506     /**
507      * Sets a HashMap of JMS property value pairs. The property values will be only
508      * be used for the next message that is sent, then all the properties will be reset.
509      */

510     public void setProperties(HashMap JavaDoc propMap) {
511         Trc.entry(this, propMap);
512         if (propMap != null && !propMap.isEmpty())
513             inProps.putAll(propMap);
514         Trc.exit();
515     }
516
517     /**
518      * Gets a JMS property from the previous message that was received.
519      */

520     public Object JavaDoc getProperty(String JavaDoc name) throws WSIFException {
521         Trc.entry(this, name);
522         if (lastMessage == null) {
523             Trc.exit(null);
524             return null;
525         }
526
527         if (outProps.isEmpty())
528             outProps.getPropertiesFromMessage(lastMessage);
529
530         Object JavaDoc prop = null;
531         if (name != null)
532             prop = outProps.get(name);
533
534         Trc.exit(prop);
535         return prop;
536     }
537
538     /**
539      * Gets all the JMS properties from the previous message that was received.
540      */

541     public HashMap JavaDoc getProperties() throws WSIFException {
542         Trc.entry(this);
543         if (lastMessage == null) {
544             Trc.exit(null);
545             return null;
546         }
547
548         if (outProps.isEmpty())
549             outProps.getPropertiesFromMessage(lastMessage);
550         if (!outProps.isEmpty()) {
551             Trc.exit(outProps);
552             return outProps;
553         }
554         Trc.exit(null);
555         return null;
556     }
557
558     protected void areWeClosed() throws WSIFException {
559         if (session == null)
560             throw new WSIFException("Cannot use a closed destination");
561     }
562
563     public static Message JavaDoc createMessage(Session JavaDoc session, int msgType)
564         throws WSIFException {
565         Trc.entry(null, session, new Integer JavaDoc(msgType));
566         Message JavaDoc jmsMsg = null;
567
568         try {
569             if (msgType
570                 == org
571                     .apache
572                     .wsif
573                     .wsdl
574                     .extensions
575                     .jms
576                     .JMSConstants
577                     .MESSAGE_TYPE_OBJECTMESSAGE)
578                 jmsMsg = session.createObjectMessage();
579             else if (
580                 msgType
581                     == org
582                         .apache
583                         .wsif
584                         .wsdl
585                         .extensions
586                         .jms
587                         .JMSConstants
588                         .MESSAGE_TYPE_TEXTMESSAGE)
589                 jmsMsg = session.createTextMessage();
590             else
591                 throw new WSIFException("Unable to support message type");
592         } catch (JMSException JavaDoc je) {
593             Trc.exception(je);
594             throw WSIFJMSConstants.ToWsifException(je);
595         }
596         Trc.exit(jmsMsg);
597         return jmsMsg;
598     }
599
600     public Message JavaDoc createMessage(int msgType) throws WSIFException {
601         Trc.entry(this, msgType);
602         Message JavaDoc m = createMessage(session, msgType);
603         Trc.exit(m);
604         return m;
605     }
606
607     /**
608      * The last message is the most recent message that was received by this
609      * WSIFJMSDestination. The getProperty(s) methods return the properties
610      * that are on the lastMessage. The works fine for sync, but for async
611      * user code will have received the message. So the provider must inform
612      * the WSIFJMSDestination about the lastMessage explicitly so it can
613      * inquire correctly about any jms properties on it.
614      */

615     public void setLastMessage(Message JavaDoc msg) {
616         Trc.entry(this, msg);
617         lastMessage = msg;
618         Trc.exit();
619     }
620     
621     public String JavaDoc deep() {
622         String JavaDoc buff = "";
623         try {
624             buff = new String JavaDoc(this.toString() + "\n");
625             buff += "finder: " + finder;
626             buff += " connection: " + connection;
627             buff += " session: " + session;
628             buff += " readQ: " + readQ;
629             buff += " writeQ: " + writeQ;
630             buff += " sender: " + sender;
631             buff += " asyncMode: " + asyncMode;
632             buff += " syncTempQueue: " + syncTempQueue;
633             buff += " inProps: " + inProps;
634             buff += " outProps: " + outProps;
635             buff += " lastMessage: " + lastMessage;
636             buff += " timeout: " + timeout;
637             buff += " replyToName: " + replyToName;
638         } catch (Exception JavaDoc e) {
639             Trc.exceptionInTrace(e);
640         }
641         return buff;
642     }
643 }
Popular Tags