1 45 package org.exolab.jms.messagemgr; 46 47 import java.sql.Connection ; 48 import java.util.Iterator ; 49 import java.util.List ; 50 import java.util.Vector ; 51 import java.util.ArrayList ; 52 import javax.jms.JMSException ; 53 54 import org.exolab.jms.client.JmsDestination; 55 import org.exolab.jms.client.JmsTopic; 56 import org.exolab.jms.message.MessageImpl; 57 import org.exolab.jms.persistence.PersistenceException; 58 import org.exolab.jms.persistence.DatabaseService; 59 60 61 68 class TopicDestinationCache extends AbstractDestinationCache { 69 70 76 public TopicDestinationCache(JmsTopic topic) { 77 super(topic); 78 } 79 80 89 public TopicDestinationCache(JmsTopic topic, Connection connection) 90 throws JMSException , PersistenceException { 91 super(topic, connection); 92 } 93 94 100 public boolean addConsumer(ConsumerEndpoint consumer) { 101 102 boolean result = false; 103 104 JmsTopic cdest = (JmsTopic) consumer.getDestination(); 107 JmsTopic ddest = (JmsTopic) getDestination(); 108 109 if (cdest.match(ddest)) { 110 result = super.addConsumer(consumer); 111 } 112 113 return result; 114 } 115 116 123 public void messageAdded(JmsDestination destination, MessageImpl message) 124 throws JMSException { 125 boolean processed = false; 126 MessageRef reference = 127 new CachedMessageRef(message, false, getMessageCache()); 128 addMessage(reference, message); 129 MessageHandle handle = new SharedMessageHandle(reference, message); 130 131 ConsumerEndpoint[] consumers = getConsumerArray(); 132 for (int index = 0; index < consumers.length; index++) { 133 ConsumerEndpoint consumer = consumers[index]; 134 processed |= consumer.messageAdded(handle, message); 135 } 136 137 if (processed) { 140 checkMessageExpiry(reference, message); 141 } else { 142 reference.destroy(); 144 } 147 } 148 149 158 public void persistentMessageAdded(Connection connection, 159 JmsDestination destination, 160 MessageImpl message) 161 throws JMSException , PersistenceException { 162 boolean processed = false; 163 MessageRef reference = new CachedMessageRef(message, true, getMessageCache()); 164 addMessage(reference, message); 165 SharedMessageHandle handle = new SharedMessageHandle(reference, message); 166 167 ConsumerEndpoint[] consumers = getConsumerArray(); 169 for (int index = 0; index < consumers.length; index++) { 170 ConsumerEndpoint consumer = consumers[index]; 171 processed |= consumer.persistentMessageAdded(handle, message, connection); 172 } 173 174 JmsTopic topic = (JmsTopic) getDestination(); 178 List inactive = ConsumerManager.instance().getInactiveSubscriptions( 179 topic); 180 if (!inactive.isEmpty()) { 181 Iterator iterator = inactive.iterator(); 182 while (iterator.hasNext()) { 183 String name = (String ) iterator.next(); 184 TopicConsumerMessageHandle durable 185 = new TopicConsumerMessageHandle(handle, name); 186 durable.add(connection); 187 } 188 processed = true; 189 } 190 191 if (processed) { 194 checkMessageExpiry(reference, message); 195 } else { 196 handle.destroy(connection); 198 } 202 203 } 204 205 211 public void returnMessageHandle(MessageHandle handle) { 212 long consumerId = handle.getConsumerId(); 213 AbstractTopicConsumerEndpoint endpoint = 214 (AbstractTopicConsumerEndpoint) getConsumerEndpoint(consumerId); 215 if (endpoint != null) { 218 endpoint.returnMessage(handle); 219 } else { 220 } 224 225 } 226 227 236 public List getDurableMessageHandles(String name, Connection connection) 237 throws JMSException , PersistenceException { 238 Vector handles = DatabaseService.getAdapter().getMessageHandles( 239 connection, getDestination(), name); 240 List result = new ArrayList (handles.size()); 241 242 MessageCache cache = getMessageCache(); 243 244 Iterator iterator = handles.iterator(); 245 while (iterator.hasNext()) { 246 PersistentMessageHandle handle = 247 (PersistentMessageHandle) iterator.next(); 248 String messageId = handle.getMessageId(); 249 MessageRef reference = cache.getMessageRef(messageId); 250 if (reference == null) { 251 reference = new CachedMessageRef(messageId, true, cache); 252 } 253 cache.addMessageRef(reference); 254 handle.reference(reference); 255 result.add(handle); 256 257 checkMessageExpiry(reference, handle.getExpiryTime()); 258 } 259 return result; 260 } 261 262 269 protected void init(Connection connection) throws JMSException , 270 PersistenceException { 271 } 273 274 283 protected void persistentMessageExpired(MessageRef reference, Connection connection) 284 throws JMSException , PersistenceException { 285 String messageId = reference.getMessageId(); 286 ConsumerEndpoint[] consumers = getConsumerArray(); 287 288 for (int i = 0; i < consumers.length; ++i) { 289 consumers[i].persistentMessageRemoved(messageId, connection); 290 } 291 292 List inactive = ConsumerManager.instance().getInactiveSubscriptions( 295 (JmsTopic) getDestination()); 296 Iterator iterator = inactive.iterator(); 297 while (iterator.hasNext()) { 298 String name = (String ) iterator.next(); 299 304 } 305 } 306 307 } 308 309 | Popular Tags |