1 45 package org.exolab.jms.messagemgr; 46 47 import java.rmi.RemoteException ; 48 import java.sql.Connection ; 49 import javax.jms.InvalidSelectorException ; 50 import javax.jms.JMSException ; 51 52 import org.apache.commons.logging.Log; 53 import org.apache.commons.logging.LogFactory; 54 55 import org.exolab.jms.client.JmsQueue; 56 import org.exolab.jms.message.MessageImpl; 57 import org.exolab.jms.scheduler.Scheduler; 58 import org.exolab.jms.server.JmsServerSession; 59 60 61 69 public class QueueConsumerEndpoint 70 extends AbstractConsumerEndpoint { 71 72 75 private QueueDestinationCache _cache = null; 76 77 81 private final int MAX_MESSAGES = 200; 82 83 86 private static final Log _log = 87 LogFactory.getLog(QueueConsumerEndpoint.class); 88 89 90 101 public QueueConsumerEndpoint(long consumerId, JmsServerSession session, 102 JmsQueue queue, String selector, 103 Scheduler scheduler) 104 throws InvalidSelectorException , JMSException { 105 super(consumerId, session, queue, selector, false, scheduler); 106 107 _cache = (QueueDestinationCache) 110 DestinationManager.instance().getDestinationCache(queue); 111 _cache.addConsumer(this); 112 } 113 114 119 public int getMessageCount() { 120 return _cache.getMessageCount(); 121 } 122 123 public void setMessageListener(ConsumerEndpointListener listener) { 125 if (listener == null) { 130 _cache.removeConsumer(this); 131 } else { 132 _cache.addConsumer(this); 133 } 134 135 super.setMessageListener(listener); 136 } 137 138 150 public MessageHandle receive(long wait) throws JMSException { 151 MessageHandle handle = getMessageFromCache(); 152 if (handle == null && wait >= 0) { 153 setWaitingForMessage(); 156 157 handle = getMessageFromCache(); 160 if (handle != null) { 161 clearWaitingForMessage(); 162 } 163 } 164 165 return handle; 166 } 167 168 174 public boolean hasMessageListener() { 175 return _listener != null; 176 } 177 178 187 public boolean messageAdded(MessageHandle handle, MessageImpl message) { 188 if (_listener != null) { 189 schedule(); 190 } else { 191 notifyMessageAvailable(); 193 } 194 195 return true; 196 } 197 198 207 public boolean persistentMessageAdded(MessageHandle handle, 208 MessageImpl message, 209 Connection connection) { 210 return messageAdded(handle, message); 211 } 212 213 219 public void messageRemoved(String messageId) { 220 } 222 223 230 public void persistentMessageRemoved(String messageId, 231 Connection connection) { 232 } 234 235 240 protected boolean deliverMessages() { 241 boolean reschedule = true; 242 243 for (int index = 0; index < MAX_MESSAGES; index++) { 244 if (stopDelivery()) { 246 reschedule = false; 247 break; 248 } 249 250 MessageHandle handle = null; 251 try { 252 handle = getMessageFromCache(); 253 } catch (Exception exception) { 254 _log.error(exception, exception); 255 } 256 257 if (handle == null) { 260 reschedule = false; 261 break; 262 } 263 264 try { 265 _listener.onMessage(handle); 267 } catch (RemoteException exception) { 268 _log.error(exception, exception); 270 if (handle != null) { 271 _cache.returnMessageHandle(handle); 272 } 273 _listener = null; 274 } catch (Exception exception) { 275 _log.error(exception, exception); 276 if (handle != null) { 277 _cache.returnMessageHandle(handle); 278 } 279 } 280 } 281 return reschedule; 282 } 283 284 287 protected void doClose() { 288 _cache.removeConsumer(this); 290 } 291 292 299 private MessageHandle getMessageFromCache() throws JMSException { 300 MessageHandle handle = _cache.getMessage(getSelector()); 301 if (handle instanceof QueueConsumerMessageHandle) { 302 ((QueueConsumerMessageHandle) handle).setConsumerId(getId()); 304 } 305 306 return handle; 307 } 308 309 } 310 311 | Popular Tags |