1 45 46 package org.exolab.jms.messagemgr; 47 48 import java.rmi.RemoteException ; 49 import java.sql.Connection ; 50 import java.util.Collections ; 51 import java.util.HashMap ; 52 import java.util.Iterator ; 53 import java.util.Map ; 54 import javax.jms.InvalidSelectorException ; 55 import javax.jms.JMSException ; 56 57 import org.apache.commons.logging.Log; 58 import org.apache.commons.logging.LogFactory; 59 60 import org.exolab.jms.client.JmsDestination; 61 import org.exolab.jms.client.JmsTopic; 62 import org.exolab.jms.message.MessageImpl; 63 import org.exolab.jms.persistence.PersistenceException; 64 import org.exolab.jms.scheduler.Scheduler; 65 import org.exolab.jms.selector.Selector; 66 import org.exolab.jms.server.JmsServerSession; 67 68 69 75 abstract class AbstractTopicConsumerEndpoint extends AbstractConsumerEndpoint 76 implements DestinationEventListener { 77 78 81 private MessageQueue _handles = new MessageQueue(); 82 83 88 protected Map _caches = Collections.synchronizedMap(new HashMap ()); 89 90 94 private final int MAX_MESSAGES = 200; 95 96 99 private static final Log _log = 100 LogFactory.getLog(AbstractTopicConsumerEndpoint.class); 101 102 120 public AbstractTopicConsumerEndpoint(long consumerId, 121 JmsServerSession session, 122 JmsTopic topic, 123 String selector, boolean noLocal, 124 Scheduler scheduler) 125 throws JMSException { 126 super(consumerId, session, topic, selector, noLocal, scheduler); 127 } 128 129 141 public MessageHandle receive(long wait) throws JMSException { 142 MessageHandle handle = receiveNoWait(); 143 if ((handle == null) && (wait >= 0)) { 144 setWaitingForMessage(); 148 } 149 150 return handle; 151 } 152 153 156 public MessageHandle receiveNoWait() throws JMSException { 157 MessageHandle handle = null; 158 while ((handle = _handles.removeFirst()) != null) { 159 MessageImpl message = handle.getMessage(); 161 if (message != null) { 162 if (selects(message)) { 163 break; 165 } else { 166 handle.destroy(); 169 } 170 } 171 handle = null; 172 } 173 return handle; 174 } 175 176 181 public void returnMessage(MessageHandle handle) { 182 addMessage(handle); 183 184 schedule(); 186 } 187 188 193 public int getMessageCount() { 194 return _handles.size(); 195 } 196 197 207 public boolean messageAdded(MessageHandle handle, MessageImpl message) 208 throws JMSException { 209 boolean accepted = true; 210 211 if (getNoLocal() && message.getConnectionId() == getConnectionId()) { 214 accepted = false; 215 } else { 216 handle = new TopicConsumerMessageHandle(handle, this); 218 219 if (!_handles.contains(handle)) { 220 addMessage(handle); 222 schedule(); 223 } else { 224 accepted = false; 225 _log.warn("Endpoint=" + this + " already has message cached: " + 226 handle); 227 } 228 } 229 return accepted; 230 } 231 232 239 public void messageRemoved(String messageId) throws JMSException { 240 MessageHandle handle = _handles.remove(messageId); 241 if (handle != null) { 242 handle.destroy(); 243 } 244 } 245 246 257 public boolean persistentMessageAdded(MessageHandle handle, 258 MessageImpl message, 259 Connection connection) 260 throws JMSException , PersistenceException { 261 boolean accepted = true; 262 263 if (getNoLocal() && message.getConnectionId() == getConnectionId()) { 266 accepted = false; 267 } else { 268 handle = new TopicConsumerMessageHandle(handle, this); 270 if (isPersistent()) { 271 handle.add(connection); 273 } 274 275 accepted = _handles.add(handle); 276 if (accepted) { 277 addMessage(handle); 278 schedule(); 279 } else { 280 accepted = false; 281 _log.warn("Endpoint=" + this + " already has message cached: " + 282 handle); 283 } 284 } 285 return accepted; 286 } 287 288 297 public void persistentMessageRemoved(String messageId, 298 Connection connection) 299 throws JMSException , PersistenceException { 300 MessageHandle handle = _handles.remove(messageId); 301 if (handle != null) { 302 handle.destroy(connection); 303 } 304 } 305 306 313 public void destinationAdded(JmsDestination destination, 314 DestinationCache cache) { 315 if (destination instanceof JmsTopic) { 316 JmsTopic myTopic = (JmsTopic) getDestination(); 317 JmsTopic topic = (JmsTopic) destination; 318 if (myTopic.match(topic) && !_caches.containsKey(topic)) { 319 _caches.put(topic, cache); 320 cache.addConsumer(this); 321 } 322 } 323 } 324 325 332 public void destinationRemoved(JmsDestination destination, 333 DestinationCache cache) { 334 if (destination instanceof JmsTopic) { 335 _caches.remove(destination); 336 } 337 } 338 339 344 protected boolean deliverMessages() { 345 boolean reschedule = true; 346 347 for (int index = 0; index < MAX_MESSAGES;) { 348 349 if (stopDelivery()) { 351 reschedule = false; 352 break; 353 } 354 355 MessageHandle handle = _handles.removeFirst(); 357 try { 358 Selector selector = getSelector(); 359 if (selector != null) { 360 MessageImpl m = handle.getMessage(); 361 if ((m != null) && selector.selects(m)) { 362 _listener.onMessage(handle); 364 index++; 365 } else { 366 handle.destroy(); 368 } 369 } else { 370 _listener.onMessage(handle); 372 index++; 373 } 374 } catch (RemoteException exception) { 375 _listener = null; 376 returnMessage(handle); 377 } catch (JMSException exception) { 378 _log.error(exception, exception); 379 returnMessage(handle); 380 } catch (Exception exception) { 381 _log.error(exception, exception); 382 returnMessage(handle); 383 } 384 } 385 return reschedule; 386 } 387 388 394 protected void init() throws JMSException { 395 JmsTopic topic = (JmsTopic) getDestination(); 396 397 DestinationManager destmgr = DestinationManager.instance(); 399 if (topic.isWildCard()) { 400 _caches = destmgr.getTopicDestinationCaches(topic); 403 destmgr.addDestinationEventListener(this); 407 Iterator iterator = _caches.values().iterator(); 408 while (iterator.hasNext()) { 409 DestinationCache cache = (DestinationCache) iterator.next(); 410 cache.addConsumer(this); 411 } 412 } else { 413 DestinationCache cache = destmgr.getDestinationCache(topic); 417 _caches.put(topic, cache); 418 cache.addConsumer(this); 419 } 420 } 421 422 427 protected void addMessage(MessageHandle handle) { 428 _handles.add(handle); 429 notifyMessageAvailable(); 430 } 431 432 435 protected void doClose() { 436 DestinationManager.instance().removeDestinationEventListener(this); 438 DestinationCache[] caches = (DestinationCache[]) 440 _caches.values().toArray(new DestinationCache[0]); 441 for (int i = 0; i < caches.length; ++i) { 442 caches[i].removeConsumer(this); 443 } 444 _caches.clear(); 445 446 if (!isPersistent()) { 447 MessageHandle[] handles = _handles.toArray(); 450 for (int i = 0; i < handles.length; ++i) { 451 MessageHandle handle = handles[i]; 452 try { 453 handle.destroy(); 454 } catch (JMSException exception) { 455 _log.error(exception, exception); 456 } 457 } 458 } 459 } 460 } 461 | Popular Tags |