1 45 package org.exolab.jms.messagemgr; 46 47 import java.sql.Connection ; 48 import java.util.Collections ; 49 import java.util.LinkedList ; 50 import java.util.List ; 51 import java.util.Iterator ; 52 import javax.jms.JMSException ; 53 54 import org.apache.commons.logging.Log; 55 import org.apache.commons.logging.LogFactory; 56 57 import org.exolab.jms.client.JmsDestination; 58 import org.exolab.jms.client.JmsQueue; 59 import org.exolab.jms.client.JmsTemporaryDestination; 60 import org.exolab.jms.message.MessageImpl; 61 import org.exolab.jms.persistence.DatabaseService; 62 import org.exolab.jms.persistence.PersistenceException; 63 import org.exolab.jms.selector.Selector; 64 import org.exolab.jms.server.JmsServerConnectionManager; 65 66 67 74 public class QueueDestinationCache extends AbstractDestinationCache { 75 76 79 private MessageQueue _handles; 80 81 84 protected List _browsers = Collections.synchronizedList(new LinkedList ()); 85 86 92 private int _lastConsumerIndex = 0; 93 94 97 private static final Log _log = LogFactory.getLog( 98 QueueDestinationCache.class); 99 100 101 107 public QueueDestinationCache(JmsQueue queue) { 108 super(queue); 109 _handles = new MessageQueue(); 110 } 111 112 121 public QueueDestinationCache(JmsQueue queue, Connection connection) 122 throws JMSException , PersistenceException { 123 super(queue, connection); 124 } 125 126 132 public void addQueueListener(QueueBrowserEndpoint listener) { 133 if (!_browsers.contains(listener)) { 135 _browsers.add(listener); 136 } 137 } 138 139 144 public void removeQueueListener(QueueBrowserEndpoint listener) { 145 if (_browsers.contains(listener)) { 147 _browsers.remove(listener); 148 } 149 } 150 151 158 public void messageAdded(JmsDestination destination, MessageImpl message) 159 throws JMSException { 160 MessageRef reference = new CachedMessageRef(message, false, 161 getMessageCache()); 162 MessageHandle shared = new SharedMessageHandle(reference, message); 163 MessageHandle handle = new QueueConsumerMessageHandle(shared); 164 165 addMessage(reference, message, handle); 168 169 QueueConsumerEndpoint endpoint = getEndpointForMessage(message); 172 if (endpoint != null) { 173 endpoint.messageAdded(handle, message); 174 } 175 } 176 177 186 public void persistentMessageAdded(Connection connection, 187 JmsDestination destination, 188 MessageImpl message) 189 throws JMSException , PersistenceException { 190 MessageRef reference = new CachedMessageRef(message, true, 191 getMessageCache()); 192 MessageHandle shared = new SharedMessageHandle(reference, message); 193 MessageHandle handle = new QueueConsumerMessageHandle(shared); 194 handle.add(connection); 195 196 addMessage(reference, message, handle); 197 198 QueueConsumerEndpoint endpoint = getEndpointForMessage(message); 201 if (endpoint != null) { 202 endpoint.persistentMessageAdded(handle, message, connection); 203 } 204 } 205 206 215 public synchronized MessageHandle getMessage(Selector selector) 216 throws JMSException { 217 QueueConsumerMessageHandle handle = null; 218 if (selector == null) { 219 handle = (QueueConsumerMessageHandle) _handles.removeFirst(); 222 } else { 223 MessageHandle[] handles = _handles.toArray(); 225 for (int i = 0; i < handles.length; ++i) { 226 MessageHandle hdl = handles[i]; 227 if (selector.selects(hdl.getMessage())) { 228 handle = (QueueConsumerMessageHandle) hdl; 229 _handles.remove(handle); 230 break; 231 } 232 } 233 } 234 return handle; 235 } 236 237 244 public void playbackMessages(QueueBrowserEndpoint browser) 245 throws JMSException { 246 MessageHandle[] handles = _handles.toArray(); 247 for (int i = 0; i < handles.length; ++i) { 248 MessageHandle handle = handles[i]; 249 MessageImpl message = handle.getMessage(); 250 if (message != null) { 251 browser.messageAdded(handle, message); 252 } 253 } 254 } 255 256 262 public void returnMessageHandle(MessageHandle handle) { 263 _handles.add(handle); 265 266 ConsumerEndpoint[] consumers = getConsumerArray(); 269 final int size = consumers.length; 270 if (size > 0) { 271 if ((_lastConsumerIndex + 1) > size) { 274 _lastConsumerIndex = 0; 275 } 276 277 int index = (_lastConsumerIndex >= size) ? 0 : _lastConsumerIndex; 278 279 do { 280 QueueConsumerEndpoint endpoint 281 = (QueueConsumerEndpoint) consumers[index]; 282 283 if (endpoint.hasMessageListener()) { 284 endpoint.schedule(); 287 _lastConsumerIndex = ++index; 288 break; 289 } else if (endpoint.isWaitingForMessage()) { 290 endpoint.notifyMessageAvailable(); 291 _lastConsumerIndex = ++index; 292 break; 293 } 294 295 if (++index >= size) { 297 index = 0; 298 } 299 } while (index != _lastConsumerIndex); 300 } 301 } 302 303 308 public boolean hasActiveConsumers() { 309 boolean active = super.hasActiveConsumers(); 310 if (!active && !_browsers.isEmpty()) { 311 active = true; 312 } 313 if (_log.isDebugEnabled()) { 314 _log.debug("hasActiveConsumers()[queue=" + getDestination() + "]=" 315 + active); 316 } 317 return active; 318 } 319 320 333 public boolean canDestroy() { 334 boolean destroy = false; 335 if (!hasActiveConsumers()) { 336 JmsDestination queue = getDestination(); 337 if (queue.getPersistent() && getMessageCount() == 0) { 338 destroy = true; 339 } else if (queue.isTemporaryDestination()) { 340 long connectionId = 343 ((JmsTemporaryDestination) queue).getConnectionId(); 344 JmsServerConnectionManager manager = 345 JmsServerConnectionManager.instance(); 346 if (manager.getConnection(connectionId) == null) { 347 destroy = true; 348 } 349 } 350 } 351 return destroy; 352 } 353 354 357 public synchronized void destroy() { 358 super.destroy(); 359 _browsers.clear(); 360 } 361 362 371 protected void init(Connection connection) throws JMSException , PersistenceException { 372 _handles = new MessageQueue(); 373 374 JmsDestination queue = getDestination(); 375 DatabaseService.getAdapter().removeExpiredMessageHandles(connection, 376 queue.getName()); 377 DefaultMessageCache cache = getMessageCache(); 378 List handles = DatabaseService.getAdapter().getMessageHandles( 379 connection, queue, queue.getName()); 380 Iterator iterator = handles.iterator(); 381 while (iterator.hasNext()) { 382 PersistentMessageHandle handle = (PersistentMessageHandle) iterator.next(); 383 String messageId = handle.getMessageId(); 384 MessageRef reference = cache.getMessageRef(messageId); 385 if (reference == null) { 386 reference = new CachedMessageRef(messageId, true, cache); 387 } 388 cache.addMessageRef(reference); 389 handle.reference(reference); 390 _handles.add(new QueueConsumerMessageHandle(handle)); 391 392 checkMessageExpiry(reference, handle.getExpiryTime()); 393 } 394 } 395 396 404 protected void addMessage(MessageRef reference, MessageImpl message, 405 MessageHandle handle) throws JMSException { 406 addMessage(reference, message); 407 _handles.add(handle); 408 409 notifyQueueListeners(handle, message); 411 412 checkMessageExpiry(reference, message); 414 } 415 416 417 424 protected void notifyQueueListeners(MessageHandle handle, 425 MessageImpl message) 426 throws JMSException { 427 QueueBrowserEndpoint[] browsers = 428 (QueueBrowserEndpoint[]) _browsers.toArray( 429 new QueueBrowserEndpoint[0]); 430 431 for (int index = 0; index < browsers.length; ++index) { 432 QueueBrowserEndpoint browser = browsers[index]; 433 browser.messageAdded(handle, message); 434 } 435 } 436 437 443 protected void messageExpired(MessageRef reference) throws JMSException { 444 _handles.remove(reference.getMessageId()); 445 super.messageExpired(reference); 447 } 448 449 458 protected void persistentMessageExpired(MessageRef reference, 459 Connection connection) 460 throws JMSException , PersistenceException { 461 _handles.remove(reference.getMessageId()); 462 super.messageExpired(reference); 464 } 465 466 473 private synchronized QueueConsumerEndpoint getEndpointForMessage( 474 MessageImpl message) { 475 QueueConsumerEndpoint result = null; 476 477 ConsumerEndpoint[] consumers = getConsumerArray(); 478 final int size = consumers.length; 479 if (size > 0) { 480 if ((_lastConsumerIndex + 1) > size) { 483 _lastConsumerIndex = 0; 484 } 485 486 int index = _lastConsumerIndex; 489 do { 490 QueueConsumerEndpoint endpoint = 491 (QueueConsumerEndpoint) consumers[index]; 492 Selector selector = endpoint.getSelector(); 493 494 if (((endpoint.hasMessageListener()) || 499 (endpoint.isWaitingForMessage())) && 500 ((selector == null) || 501 (selector.selects(message)))) { 502 _lastConsumerIndex = ++index; 503 result = endpoint; 504 break; 505 } 506 507 if (++index >= size) { 509 index = 0; 510 } 511 } while (index != _lastConsumerIndex); 512 } 513 514 return result; 515 } 516 517 } 518 | Popular Tags |