1 43 package org.exolab.jms.messagemgr; 44 45 import java.sql.Connection ; 46 import java.util.Date ; 47 import java.util.HashMap ; 48 import javax.jms.InvalidDestinationException ; 49 import javax.jms.JMSException ; 50 import javax.jms.DeliveryMode ; 51 import javax.jms.Destination ; 52 53 import org.apache.commons.logging.Log; 54 import org.apache.commons.logging.LogFactory; 55 56 import org.exolab.jms.client.JmsDestination; 57 import org.exolab.jms.message.MessageImpl; 58 import org.exolab.jms.persistence.DatabaseService; 59 import org.exolab.jms.persistence.PersistenceException; 60 import org.exolab.jms.persistence.SQLHelper; 61 import org.exolab.jms.service.BasicService; 62 import org.exolab.jms.service.ServiceException; 63 64 65 74 public class MessageMgr extends BasicService { 75 76 79 private final static String MM_SERVICE_NAME = "MessageManager"; 80 81 84 private static MessageMgr _instance; 85 86 89 private static final Object _block = new Object (); 90 91 95 private transient HashMap _listeners = new HashMap (1023); 96 97 101 private long _sequenceNumberGenerator = 0; 102 103 106 private static final Log _log = LogFactory.getLog(MessageMgr.class); 107 108 109 116 public static MessageMgr createInstance() { 117 if (_instance == null) { 118 synchronized (_block) { 119 if (_instance == null) { 120 _instance = new MessageMgr(); 121 } 122 } 123 } 124 return _instance; 125 } 126 127 134 public static MessageMgr instance() { 135 return _instance; 136 } 137 138 141 private MessageMgr() { 142 super(MM_SERVICE_NAME); 143 } 144 145 public void start() throws ServiceException { 147 try { 148 DestinationManager.createInstance(); 149 ConsumerManager.createInstance(); 150 } catch (ServiceException exception) { 151 throw exception; 152 } catch (Exception exception) { 153 String msg = "Failed to start MessageMgr"; 154 _log.error(msg, exception); 155 throw new ServiceException(msg + ":" + exception); 156 } 157 } 158 159 public void run() { 161 } 163 164 public synchronized void stop() throws ServiceException { 166 try { 167 ConsumerManager.instance().destroy(); 169 170 DestinationManager.instance().destroy(); 172 173 _listeners.clear(); 175 } catch (Exception error) { 176 error.printStackTrace(); 177 throw new ServiceException("Failed to stop MessageMgr : " + 178 error.toString()); 179 } 180 181 synchronized (_block) { 183 _instance = null; 184 185 } 186 } 187 188 199 public void addDestination(JmsDestination destination) 200 throws JMSException { 201 202 if (destination == null) { 204 throw new JMSException ("Call to addDestination with null object"); 205 } 206 207 DestinationManager.instance().getDestinationCache(destination); 208 } 209 210 216 public void add(MessageImpl message) throws JMSException { 217 prepare(message); 218 219 JmsDestination destination = 220 (JmsDestination) message.getJMSDestination(); 221 222 if (message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT 226 && DestinationManager.instance().isPersistent(destination)) { 227 addPersistentMessage(message); 228 } else { 229 addNonPersistentMessage(message); 230 } 231 } 232 233 243 public void add(Connection connection, MessageImpl message) 244 throws JMSException { 245 246 JmsDestination destination = 247 (JmsDestination) message.getJMSDestination(); 248 249 if (message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT 253 && DestinationManager.instance().isPersistent(destination)) { 254 addPersistentMessage(connection, message); 255 } else { 256 addNonPersistentMessage(message); 257 } 258 } 259 260 266 private void addNonPersistentMessage(MessageImpl message) 267 throws JMSException { 268 269 JmsDestination destination = (JmsDestination) message.getJMSDestination(); 272 273 MessageManagerEventListener listener = 274 (MessageManagerEventListener) _listeners.get(destination); 275 276 if (listener != null) { 277 listener.messageAdded(destination, message); 280 } else { 281 DestinationManager.instance().messageAdded(destination, message); 283 } 284 } 285 286 292 private void addPersistentMessage(MessageImpl message) throws JMSException { 293 JmsDestination destination = 294 (JmsDestination) message.getJMSDestination(); 295 296 Connection connection = null; 297 try { 299 connection = DatabaseService.getConnection(); 300 301 DatabaseService.getAdapter().addMessage(connection, message); 303 304 notifyOnAddPersistentMessage(connection, destination, message); 306 307 connection.commit(); 309 } catch (Exception exception) { 310 SQLHelper.rollback(connection); 311 _log.error("Failed to make message persistent", exception); 312 throw new JMSException ("Failed to make message persistent: " + 313 exception.toString()); 314 } finally { 315 SQLHelper.close(connection); 316 } 317 } 318 319 327 private void addPersistentMessage(Connection connection, 328 MessageImpl message) throws JMSException { 329 JmsDestination destination = (JmsDestination) message.getJMSDestination(); 330 try { 331 notifyOnAddPersistentMessage(connection, destination, message); 333 } catch (PersistenceException exception) { 334 throw new JMSException ("Failed in addPersistentMessage : " 335 + exception.toString()); 336 } catch (Exception exception) { 337 throw new JMSException ("Failed in addPersistentMessage : " 338 + exception.toString()); 339 } 340 } 341 342 349 public void prepare(MessageImpl message) 350 throws JMSException { 351 if (message == null) { 352 throw new JMSException ("Null message"); 353 } 354 Destination destination = message.getJMSDestination(); 355 if (destination == null) { 356 throw new InvalidDestinationException ("Message has no destination"); 357 } 358 if (!(destination instanceof JmsDestination)) { 359 throw new InvalidDestinationException ( 360 "Destination not a JmsDestination"); 361 } 362 363 message.setAcceptedTime((new Date ()).getTime()); 365 message.setSequenceNumber(++_sequenceNumberGenerator); 366 message.setReadOnly(true); 367 } 368 369 376 public JmsDestination resolve(String name) { 377 return DestinationManager.instance().getDestination(name); 378 } 379 380 389 public ConsumerEndpoint resolveConsumer(JmsDestination destination, 390 String consumerId) { 391 return ConsumerManager.instance().getConsumerEndpoint(consumerId); 392 } 393 394 402 public void setStopped(ConsumerEndpoint consumer, boolean stop) 403 throws JMSException { 404 } 406 407 419 public void addEventListener(JmsDestination destination, 420 MessageManagerEventListener listener) { 421 422 if ((destination != null) && 423 (listener != null)) { 424 synchronized (_listeners) { 425 if (!_listeners.containsKey(destination)) { 426 _listeners.put(destination, listener); 427 } 428 } 429 } 430 } 431 432 439 public void removeEventListener(JmsDestination destination, 440 MessageManagerEventListener listener) { 441 if ((destination != null) && 442 (listener != null)) { 443 synchronized (_listeners) { 444 if (_listeners.containsKey(destination)) { 445 _listeners.remove(destination); 446 } 447 } 448 } 449 } 450 451 461 private void notifyOnAddPersistentMessage(Connection connection, 462 JmsDestination destination, 463 MessageImpl message) 464 throws JMSException , PersistenceException { 465 466 MessageManagerEventListener listener = 467 (MessageManagerEventListener) _listeners.get(destination); 468 469 if (listener != null) { 470 listener.persistentMessageAdded(connection, destination, message); 473 } else { 474 DestinationManager.instance().persistentMessageAdded(connection, 476 destination, 477 message); 478 } 479 } 480 481 } 482 | Popular Tags |