KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > api > jms > MantaRequestor


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is lital kasif.
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34  
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  *
46  * Created on Mar 22, 2005
47  *
48  * Coridan LTD
49  */

50 package org.mr.api.jms;
51
52 import java.util.ArrayList JavaDoc;
53 import java.util.List JavaDoc;
54
55 import javax.jms.*;
56 /**
57  * MantaRequestor is a manta extension to the two JMS classes QueueRequestor and TopicRequestor.
58  * The mantaRequestor constructor is given a non-transacted Session and a Destination. It creates
59  * a TemporaryQueue for the responses and provides a request method that sends the request message
60  * and waits for its reply message.
61  * MantaRequestor is using JMS 1.1, and adds request options such as a time out period of
62  * waiting for responses and the ability to receive a number of response messages to one request.
63  *
64  * @author lital kasif
65  */

66 public class MantaRequestor {
67     //the correlation ID of the last request made
68
private long IDcounter;
69     //counts the number of suitable messages recieved, if cse of multi replies option
70

71     private Session sess;
72     private Destination dest;
73     private TemporaryQueue tempQueue;
74     //the default time out for waiting for a result after a request was made
75
private long defaultTimeOut;
76     private MessageProducer producer;
77     private MessageConsumer consumer;
78     private boolean noTimeout;//will be set to true if the timeout<=0, then the request should be blocking
79

80     /**
81      * Constructor for the mantaRequestor class.
82      * It assumes the session parameter to be non-transacted, with a delivery mode of either
83      * AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE.
84      * @param session - the Session that will send request. The destination must be defined on
85      * this session.
86      * @param destination - the Destination on which
87      * @exception JMSException - in case the Requestor creation failed due to some internal error.
88      */

89      public MantaRequestor(Session session, Destination destination) throws JMSException {
90          sess=session;
91          dest=destination;
92          tempQueue = sess.createTemporaryQueue();
93          defaultTimeOut=0;
94          producer=sess.createProducer(dest);
95          producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
96          consumer=sess.createConsumer(tempQueue);
97          IDcounter=0;
98      }
99   
100
101      /**
102       * This method is used to send a request and to wait for a reply.
103       * The temporary queue is used for JMSReplyTo destination, and only one reply per request
104       * is expected. If the defauly time out passes and no suitable reply is made, a null will be
105       * returned. A suitable reply should hold the same JMSCorrelationID the request message
106       * holds.
107       * @param message the message to be sent
108       * @return the reply message received
109       * @throws JMSException - in case the request was failed to complete.
110       */

111      public final Message request(Message message) throws JMSException {
112          return request(message,defaultTimeOut);
113      } //request
114

115   
116      /**
117       * This method is used to send a request and to wait for a reply. The temporary queue is
118       * used for JMSReplyTo destination, and only one reply per request is expected.
119       * If the inserted time out passes and no suitable reply is made, a null will be
120       * returned. A suitable reply should hold the same JMSCorrelationID the request message
121       * holds.
122       * @param message the message to send
123       * @return the reply message
124       * @throws JMSException - in case the request was failed to complete.
125       */

126      public final synchronized Message request(Message message, long timeout) throws JMSException{
127         sendRequest(message,IDcounter);
128         Message receivedMsg = getResponse(timeout,IDcounter);
129         IDcounter++;
130         return receivedMsg;
131      }//request
132

133      
134      /**
135       * This method is used to send a request and to wait for multiple replies. The temporary
136       * queue is used for JMSReplyTo destination.
137       * The requestor will return when the time out has passed or numberOfReplies has
138       * been reached by suitable replies. A suitable reply holds the same JMSCorellationID as
139       * the request message holds.
140       * @param message the message to send
141       * @param timeout the timeout the requestor will wait for replay message. 0 value will
142       * wait until a replay is received. The timeout can not be negative.
143       * @return Set of the the reply messages
144       * @throws JMSException - in case the request was failed to complete.
145      **/

146      public final synchronized List JavaDoc request(Message message, long timeout, int numberOfReplies) throws JMSException {
147         int counter =0;//counts the number of suitable reply messages received for the request
148
//timeout that is samller than 0 is illegal and will return no messages
149
if (timeout<0){
150             return null;
151         }
152         if (numberOfReplies<1){
153              throw new JMSException("request(): number of relpies must be greater than 0");
154          }
155         long startTime = System.currentTimeMillis();
156         if (timeout==0){
157             noTimeout=true;
158         }else{
159             noTimeout=false;
160         }
161         sendRequest(message,IDcounter);
162         List JavaDoc messages = new ArrayList JavaDoc();
163         while(((noTimeout)||timeout>1)&& counter<numberOfReplies){
164              Message receivedMsg=getResponse(timeout,IDcounter);
165              if (receivedMsg!=null){
166                     messages.add(receivedMsg);
167                     counter++;
168              }
169              // reduce the timeout by the time that had passed. so that if a message
170
//with wrong correlation ID was received, we would continue trying to
171
//receive until time out passes
172
timeout = timeout - (System.currentTimeMillis()-startTime);
173         }//while
174
IDcounter++;
175         return messages;
176      }//request
177

178      /**
179       * Closes the mantaRequestor and the session passed in its constructor. Since there might be
180       * resources allocated on behalf of a mantaRequestor outside the Java virtual machine,
181       * clients should close them when they are not needed. Relying on garbage collection to
182       * eventually reclaim these resources may not be timely enough.
183       * @throws JMSException - in case closing the mantaRequestor failed.
184       */

185      public final void close() throws JMSException {
186          sess.close();
187          tempQueue.delete();
188      }
189      
190      /**
191       * @return the default time out period waiting for a response after a requesty was made.
192       * the time out is in milliseconds.
193       */

194      public final synchronized long getDefaultTimeout() {
195          return defaultTimeOut;
196      }
197
198      /**
199       * Sets the default timeout period used while calling the request method.
200      * If timeout <= 0 the timeout is ignored and the method is blocking.
201       * @param timeout the timeout to wait in milliseconds.
202       */

203      public final synchronized void setDefaultTimeout(long timeout) {
204          defaultTimeOut=timeout;
205      }
206      
207      /**
208       * Sends a request message to the Destianton that belons to this Requestor.
209       * The JMSReplyTo of the request message will be set to be the Temporary Queue
210       * of this Reuquestor and the JMSCoreelationID to the Id inputed to the method.
211       * @param message - the request message to be sent.
212       * @param CorrelationIdcounter - the CorrelationId to be set to the request message.
213       * @throws JMSException - if the request message could not get sent.
214       */

215      private final synchronized void sendRequest(Message message, long CorrelationIdcounter) throws JMSException{
216         message.setJMSReplyTo(tempQueue);
217         message.setJMSCorrelationID(String.valueOf(CorrelationIdcounter));
218         producer.send(message);
219      }
220      
221      /**
222       * Receives reply messages on the Temporary Queue of this Requestor. If the time out period
223       * passes and no suitable reply is received, will return a null.
224       * @param timeout
225       * @param CorrelationIdcounter
226       * @return
227       * @throws JMSException
228       */

229      private final synchronized Message getResponse(long timeout, long CorrelationIdcounter) throws JMSException {
230         Message receivedMsg = null;
231         if (timeout <= 0) {
232             timeout = 0;
233             noTimeout=true;
234         }
235         else {
236             noTimeout=false;
237         }
238         long startTime = System.currentTimeMillis();
239         while((noTimeout || timeout>1) && receivedMsg == null ){
240             receivedMsg=consumer.receive(timeout);
241             // check if this message is not a responce to the request
242
if (receivedMsg != null && !receivedMsg.getJMSCorrelationID().equals(String.valueOf(CorrelationIdcounter))){
243                 receivedMsg=null;
244             }
245             // reduce the timeout by the time that had passed. so that if a message
246
//with wrong correlation ID was received, we would continue trying to
247
//receive until time out passes
248
if (!noTimeout)
249                 timeout = timeout - (System.currentTimeMillis()-startTime);
250         }
251         return receivedMsg;
252     }//request
253

254 }//mantaRequestor
255
Popular Tags