1 45 package org.exolab.jms.messagemgr; 46 47 import java.util.Vector ; 48 import java.sql.Connection ; 49 import javax.jms.InvalidSelectorException ; 50 import javax.jms.JMSException ; 51 52 import org.apache.commons.logging.Log; 53 import org.apache.commons.logging.LogFactory; 54 55 import org.exolab.jms.client.JmsQueue; 56 import org.exolab.jms.message.MessageImpl; 57 import org.exolab.jms.scheduler.Scheduler; 58 import org.exolab.jms.server.JmsServerSession; 59 60 61 70 public class QueueBrowserEndpoint extends AbstractConsumerEndpoint { 71 72 75 private MessageQueue _handles = new MessageQueue(); 76 77 80 private QueueDestinationCache _cache; 81 82 85 private static final Log _log = 86 LogFactory.getLog(QueueBrowserEndpoint.class); 87 88 89 101 public QueueBrowserEndpoint(long consumerId, JmsServerSession session, 102 JmsQueue queue, String selector, 103 Scheduler scheduler) 104 throws InvalidSelectorException , JMSException { 105 super(consumerId, session, queue, selector, false, scheduler); 106 107 _cache = (QueueDestinationCache) 108 DestinationManager.instance().getDestinationCache(queue); 109 110 _cache.addQueueListener(this); 113 _cache.playbackMessages(this); 114 } 115 116 125 public boolean messageAdded(MessageHandle handle, MessageImpl message) { 126 addMessage(handle); 127 return true; 128 } 129 130 139 public boolean persistentMessageAdded(MessageHandle handle, 140 MessageImpl message, 141 Connection connection) { 142 return messageAdded(handle, message); 143 } 144 145 151 public void messageRemoved(String messageId) { 152 _handles.remove(messageId); 153 } 154 155 162 public void persistentMessageRemoved(String messageId, 163 Connection connection) { 164 messageRemoved(messageId); 165 } 166 167 176 public MessageHandle receive(long wait) throws JMSException { 177 throw new JMSException ("Cannot call receive for QueueBrowser"); 178 } 179 180 186 public Vector receiveMessages(int count) { 187 Vector messages = new Vector (); 188 int index = 0; 189 while (index < count) { 190 191 if (isStopped() || getMessageCount() == 0) { 193 break; 194 } 195 196 try { 201 MessageHandle handle = removeFirstMessage(); 202 if (handle != null) { 203 MessageImpl m = handle.getMessage(); 204 if (m != null) { 205 if (selects(m)) { 209 messages.addElement(handle); 210 ++index; 211 } else { 212 } 214 } else { 215 } 217 } 218 } catch (Exception exception) { 219 _log.error(exception, exception); 220 } 221 } 222 223 return messages; 224 } 225 226 233 public void setMessageListener(ConsumerEndpointListener listener) { 234 _log.error("QueueBrowserEndpoint.setMessageListener " 235 + "should never be called"); 236 } 237 238 243 public int getMessageCount() { 244 return _handles.size(); 245 } 246 247 252 protected void addMessage(MessageHandle handle) { 253 _handles.add(handle); 254 255 notifyMessageAvailable(); 257 } 258 259 265 protected boolean deliverMessages() { 266 _log.error( 267 "QueueBrowserEndpoint.deliverMessages() should never be called", 268 new Exception ()); 269 return false; 270 } 271 272 278 protected MessageHandle removeMessage(MessageHandle handle) { 279 return _handles.remove(handle); 280 } 281 282 287 protected MessageHandle removeFirstMessage() { 288 return _handles.removeFirst(); 289 } 290 291 294 protected void doClose() { 295 _cache.removeQueueListener(this); 297 } 298 299 } 300 | Popular Tags |