1 50 package org.mr.api.jms; 51 52 import java.util.ArrayList ; 53 import java.util.List ; 54 55 import javax.jms.*; 56 66 public class MantaRequestor { 67 private long IDcounter; 69 71 private Session sess; 72 private Destination dest; 73 private TemporaryQueue tempQueue; 74 private long defaultTimeOut; 76 private MessageProducer producer; 77 private MessageConsumer consumer; 78 private boolean noTimeout; 80 89 public MantaRequestor(Session session, Destination destination) throws JMSException { 90 sess=session; 91 dest=destination; 92 tempQueue = sess.createTemporaryQueue(); 93 defaultTimeOut=0; 94 producer=sess.createProducer(dest); 95 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 96 consumer=sess.createConsumer(tempQueue); 97 IDcounter=0; 98 } 99 100 101 111 public final Message request(Message message) throws JMSException { 112 return request(message,defaultTimeOut); 113 } 115 116 126 public final synchronized Message request(Message message, long timeout) throws JMSException{ 127 sendRequest(message,IDcounter); 128 Message receivedMsg = getResponse(timeout,IDcounter); 129 IDcounter++; 130 return receivedMsg; 131 } 133 134 146 public final synchronized List request(Message message, long timeout, int numberOfReplies) throws JMSException { 147 int counter =0; if (timeout<0){ 150 return null; 151 } 152 if (numberOfReplies<1){ 153 throw new JMSException("request(): number of relpies must be greater than 0"); 154 } 155 long startTime = System.currentTimeMillis(); 156 if (timeout==0){ 157 noTimeout=true; 158 }else{ 159 noTimeout=false; 160 } 161 sendRequest(message,IDcounter); 162 List messages = new ArrayList (); 163 while(((noTimeout)||timeout>1)&& counter<numberOfReplies){ 164 Message receivedMsg=getResponse(timeout,IDcounter); 165 if (receivedMsg!=null){ 166 messages.add(receivedMsg); 167 counter++; 168 } 169 timeout = timeout - (System.currentTimeMillis()-startTime); 173 } IDcounter++; 175 return messages; 176 } 178 185 public final void close() throws JMSException { 186 sess.close(); 187 tempQueue.delete(); 188 } 189 190 194 public final synchronized long getDefaultTimeout() { 195 return defaultTimeOut; 196 } 197 198 203 public final synchronized void setDefaultTimeout(long timeout) { 204 defaultTimeOut=timeout; 205 } 206 207 215 private final synchronized void sendRequest(Message message, long CorrelationIdcounter) throws JMSException{ 216 message.setJMSReplyTo(tempQueue); 217 message.setJMSCorrelationID(String.valueOf(CorrelationIdcounter)); 218 producer.send(message); 219 } 220 221 229 private final synchronized Message getResponse(long timeout, long CorrelationIdcounter) throws JMSException { 230 Message receivedMsg = null; 231 if (timeout <= 0) { 232 timeout = 0; 233 noTimeout=true; 234 } 235 else { 236 noTimeout=false; 237 } 238 long startTime = System.currentTimeMillis(); 239 while((noTimeout || timeout>1) && receivedMsg == null ){ 240 receivedMsg=consumer.receive(timeout); 241 if (receivedMsg != null && !receivedMsg.getJMSCorrelationID().equals(String.valueOf(CorrelationIdcounter))){ 243 receivedMsg=null; 244 } 245 if (!noTimeout) 249 timeout = timeout - (System.currentTimeMillis()-startTime); 250 } 251 return receivedMsg; 252 } 254 } | Popular Tags |