1 45 package org.exolab.jms.messagemgr; 46 47 import java.sql.Connection ; 48 import java.util.Collections ; 49 import java.util.HashMap ; 50 import java.util.Map ; 51 import javax.jms.JMSException ; 52 53 import org.apache.commons.logging.Log; 54 import org.apache.commons.logging.LogFactory; 55 56 import org.exolab.jms.client.JmsDestination; 57 import org.exolab.jms.lease.LeaseEventListenerIfc; 58 import org.exolab.jms.lease.LeaseManager; 59 import org.exolab.jms.message.MessageImpl; 60 import org.exolab.jms.persistence.PersistenceException; 61 import org.exolab.jms.persistence.DatabaseService; 62 import org.exolab.jms.persistence.SQLHelper; 63 64 65 71 public abstract class AbstractDestinationCache implements DestinationCache, 72 LeaseEventListenerIfc { 73 74 77 private final JmsDestination _destination; 78 79 82 private DefaultMessageCache _cache = new DefaultMessageCache(); 83 84 87 private Map _consumers = Collections.synchronizedMap(new HashMap ()); 88 89 93 private HashMap _leases = new HashMap (); 94 95 98 private static final Log _log = LogFactory.getLog( 99 AbstractDestinationCache.class); 100 101 102 108 public AbstractDestinationCache(JmsDestination destination) { 109 if (destination == null) { 110 throw new IllegalArgumentException ( 111 "Argument 'destination' is null"); 112 } 113 if (destination.getPersistent()) { 114 throw new IllegalArgumentException ( 115 "Argument 'destination' refers to a persistent destination"); 116 } 117 _destination = destination; 118 119 MessageMgr.instance().addEventListener(getDestination(), this); 121 } 122 123 133 public AbstractDestinationCache(JmsDestination destination, 134 Connection connection) 135 throws JMSException , PersistenceException { 136 if (destination == null) { 137 throw new IllegalArgumentException ( 138 "Argument 'destination' is null"); 139 } 140 if (!destination.getPersistent()) { 141 throw new IllegalArgumentException ( 142 "Argument 'destination' refers to a non-persistent destination"); 143 } 144 _destination = destination; 145 146 init(connection); 147 148 MessageMgr.instance().addEventListener(getDestination(), this); 149 } 150 151 156 public JmsDestination getDestination() { 157 return _destination; 158 } 159 160 166 public boolean addConsumer(ConsumerEndpoint consumer) { 167 boolean result = false; 168 169 if (consumer.getDestination().equals(getDestination())) { 172 Long key = new Long (consumer.getId()); 173 if (!_consumers.containsKey(key)) { 174 _consumers.put(key, consumer); 175 result = true; 176 } 177 } 178 179 return result; 180 } 181 182 187 public void removeConsumer(ConsumerEndpoint consumer) { 188 Long key = new Long (consumer.getId()); 189 _consumers.remove(key); 190 } 191 192 197 public int getMessageCount() { 198 return _cache.getMessageCount(); 199 } 200 201 208 public boolean canDestroy() { 209 return !hasActiveConsumers(); 210 } 211 212 215 public synchronized void destroy() { 216 _cache.clear(); 218 219 _consumers.clear(); 221 222 MessageMgr.instance().removeEventListener(getDestination(), this); 224 225 MessageLease[] leases = null; 227 synchronized (_leases) { 228 leases = (MessageLease[]) _leases.values().toArray( 229 new MessageLease[0]); 230 _leases.clear(); 231 } 232 233 for (int i = 0; i < leases.length; ++i) { 234 MessageLease lease = leases[i]; 235 LeaseManager.instance().removeLease(lease); 236 } 237 } 238 239 244 public void onLeaseExpired(Object object) { 245 MessageRef reference = (MessageRef) object; 246 String messageId = ((MessageRef) reference).getMessageId(); 247 synchronized (_leases) { 248 _leases.remove(messageId); 249 } 250 251 try { 254 if (reference.isPersistent()) { 255 Connection connection = null; 256 try { 257 connection = DatabaseService.getConnection(); 258 persistentMessageExpired(reference, connection); 259 reference.destroy(connection); 260 connection.commit(); 261 } catch (JMSException exception) { 262 SQLHelper.rollback(connection); 263 throw exception; 264 } catch (Exception exception) { 265 SQLHelper.rollback(connection); 266 _log.error("Failed to expire message", exception); 267 throw new JMSException (exception.getMessage()); 268 } finally { 269 SQLHelper.close(connection); 270 } 271 272 } else { 273 messageExpired(reference); 274 reference.destroy(); 275 } 276 } catch (JMSException exception) { 277 _log.error("Failed to expire message", exception); 278 } 279 } 280 281 public void collectGarbage(boolean aggressive) { 282 if (aggressive) { 283 _cache.clearPersistentMessages(); 285 if (_log.isDebugEnabled()) { 286 _log.debug("Evicted all persistent messages from cache " 287 + getDestination().getName()); 288 } 289 } 290 291 if (_log.isDebugEnabled()) { 292 _log.debug("DESTCACHE -" + getDestination().getName() 293 + " Messages: P[" + _cache.getPersistentCount() 294 + "] T[" + _cache.getTransientCount() + "] Total: [" 295 + _cache.getMessageCount() + "]"); 296 } 297 } 298 299 306 protected abstract void init(Connection connection) throws JMSException , 307 PersistenceException; 308 309 315 protected void addMessage(MessageRef reference, MessageImpl message) { 316 _cache.addMessage(reference, message); 317 } 318 319 324 protected DefaultMessageCache getMessageCache() { 325 return _cache; 326 } 327 328 333 protected boolean hasActiveConsumers() { 334 return !_consumers.isEmpty(); 335 } 336 337 344 protected ConsumerEndpoint getConsumerEndpoint(long consumerId) { 345 return (ConsumerEndpoint) _consumers.get(new Long (consumerId)); 346 } 347 348 353 protected ConsumerEndpoint[] getConsumerArray() { 354 ConsumerEndpoint[] result = 355 (ConsumerEndpoint[]) _consumers.values().toArray( 356 new ConsumerEndpoint[0]); 357 return result; 358 } 359 360 366 protected void messageExpired(MessageRef reference) 367 throws JMSException { 368 String messageId = reference.getMessageId(); 370 ConsumerEndpoint[] consumers = getConsumerArray(); 371 for (int i = 0; i < consumers.length; ++i) { 372 consumers[i].messageRemoved(messageId); 373 } 374 } 375 376 384 protected void persistentMessageExpired(MessageRef reference, 385 Connection connection) 386 throws JMSException , PersistenceException { 387 String messageId = reference.getMessageId(); 389 ConsumerEndpoint[] consumers = getConsumerArray(); 390 391 for (int i = 0; i < consumers.length; ++i) { 392 consumers[i].persistentMessageRemoved(messageId, connection); 393 } 394 } 395 396 404 protected void checkMessageExpiry(MessageRef reference, 405 MessageImpl message) throws JMSException { 406 checkMessageExpiry(reference, message.getJMSExpiration()); 407 } 408 409 416 protected void checkMessageExpiry(MessageRef reference, 417 long expiryTime) { 418 if (expiryTime != 0) { 419 synchronized (_leases) { 420 if (!_leases.containsKey(reference.getMessageId())) { 422 long duration = expiryTime - System.currentTimeMillis(); 423 if (duration <= 0) { 424 duration = 1; 425 } 426 MessageLease lease = new MessageLease(reference, duration, 427 this); 428 LeaseManager.instance().addLease(lease); 429 _leases.put(reference.getMessageId(), lease); 430 } 431 } 432 } 433 } 434 435 } 436 | Popular Tags |