1 45 package org.exolab.jms.client; 46 47 import javax.jms.JMSException ; 48 import javax.jms.Message ; 49 import javax.jms.MessageConsumer ; 50 import javax.jms.MessageListener ; 51 import javax.jms.Destination ; 52 53 import org.apache.commons.logging.Log; 54 import org.apache.commons.logging.LogFactory; 55 56 import org.exolab.jms.message.MessageImpl; 57 58 59 67 class JmsMessageConsumer 68 implements MessageListener , MessageConsumer { 69 70 73 private JmsSession _session = null; 74 75 78 private final long _consumerId; 79 80 83 private final Destination _destination; 84 85 89 private MessageListener _messageListener = null; 90 91 94 private String _selector = null; 95 96 99 private volatile boolean _closed = false; 100 101 105 private long _listenerSetTimestamp = 0; 106 107 110 private String _lastMessageDelivered; 111 112 115 private static final Log _log = 116 LogFactory.getLog(JmsMessageConsumer.class); 117 118 119 127 public JmsMessageConsumer(JmsSession session, long consumerId, 128 Destination destination, String selector) { 129 if (session == null) { 130 throw new IllegalArgumentException ("Argument 'session' is null"); 131 } 132 if (destination == null) { 133 throw new IllegalArgumentException ( 134 "Argument 'destination' is null"); 135 } 136 _session = session; 137 _consumerId = consumerId; 138 _destination = destination; 139 _selector = selector; 140 } 141 142 147 public String getMessageSelector() { 148 return _selector; 149 } 150 151 157 public MessageListener getMessageListener() { 158 return _messageListener; 159 } 160 161 168 public void setMessageListener(MessageListener listener) 169 throws JMSException { 170 if (listener != null) { 173 if (_messageListener == null) { 174 _listenerSetTimestamp = System.currentTimeMillis(); 176 _messageListener = listener; 177 _session.setMessageListener(this); 178 } else { 179 _messageListener = listener; 182 } 183 } else { 184 if (_messageListener != null) { 185 _session.removeMessageListener(this); 186 _messageListener = listener; 187 } 188 } 189 190 _lastMessageDelivered = null; 193 } 194 195 204 public Message receive() throws JMSException { 205 return retrieveMessage(0); 206 } 207 208 219 public Message receive(long timeout) throws JMSException { 220 return retrieveMessage(timeout); 221 } 222 223 230 public Message receiveNoWait() throws JMSException { 231 return retrieveMessage(-1); 232 } 233 234 241 public synchronized void close() throws JMSException { 242 if (!_closed) { 243 try { 244 _closed = true; 245 _session.removeConsumer(this); 246 247 notifyAll(); 249 } finally { 250 _messageListener = null; 251 _session= null; 252 _selector = null; 253 } 254 } 255 } 256 257 263 public void onMessage(Message message) { 264 try { 265 if (_messageListener != null) { 266 long rcvd = message.getLongProperty("JMSXRcvTimestamp"); 269 if (rcvd < _listenerSetTimestamp) { 270 return; 271 } 272 273 _lastMessageDelivered = ((MessageImpl) message).getId(); 281 _messageListener.onMessage(message); 282 } 283 } catch (JMSException exception) { 284 _log.error("Error in onMessage", exception); 286 } 287 } 288 289 300 protected Message retrieveMessage(long wait) throws JMSException { 301 if (_messageListener != null) { 302 throw new JMSException ("Can't receive when listener defined"); 304 } 305 306 if (_closed) { 307 throw new JMSException ("Can't receive when session closed"); 309 } 310 311 MessageImpl message = 312 (MessageImpl) _session.retrieveMessage(_consumerId, wait); 313 if (message != null) { 314 _lastMessageDelivered = message.getId(); 315 } 316 317 return message; 318 } 319 320 325 protected String getLastMessageDelivered() { 326 return _lastMessageDelivered; 327 } 328 329 334 protected Destination getDestination() { 335 return _destination; 336 } 337 338 343 protected long getConsumerId() { 344 return _consumerId; 345 } 346 347 352 protected JmsSession getSession() { 353 return _session; 354 } 355 356 } 357 | Popular Tags |