KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > exolab > jms > messagemgr > MessageMgr


1 /**
2  * Redistribution and use of this software and associated documentation
3  * ("Software"), with or without modification, are permitted provided
4  * that the following conditions are met:
5  *
6  * 1. Redistributions of source code must retain copyright
7  * statements and notices. Redistributions must also contain a
8  * copy of this document.
9  *
10  * 2. Redistributions in binary form must reproduce the
11  * above copyright notice, this list of conditions and the
12  * following disclaimer in the documentation and/or other
13  * materials provided with the distribution.
14  *
15  * 3. The name "Exolab" must not be used to endorse or promote
16  * products derived from this Software without prior written
17  * permission of Exoffice Technologies. For written permission,
18  * please contact info@exolab.org.
19  *
20  * 4. Products derived from this Software may not be called "Exolab"
21  * nor may "Exolab" appear in their names without prior written
22  * permission of Exoffice Technologies. Exolab is a registered
23  * trademark of Exoffice Technologies.
24  *
25  * 5. Due credit should be given to the Exolab Project
26  * (http://www.exolab.org/).
27  *
28  * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29  * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30  * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32  * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39  * OF THE POSSIBILITY OF SUCH DAMAGE.
40  *
41  * Copyright 2000-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42  */

43 package org.exolab.jms.messagemgr;
44
45 import java.sql.Connection JavaDoc;
46 import java.util.Date JavaDoc;
47 import java.util.HashMap JavaDoc;
48 import javax.jms.InvalidDestinationException JavaDoc;
49 import javax.jms.JMSException JavaDoc;
50 import javax.jms.DeliveryMode JavaDoc;
51 import javax.jms.Destination JavaDoc;
52
53 import org.apache.commons.logging.Log;
54 import org.apache.commons.logging.LogFactory;
55
56 import org.exolab.jms.client.JmsDestination;
57 import org.exolab.jms.message.MessageImpl;
58 import org.exolab.jms.persistence.DatabaseService;
59 import org.exolab.jms.persistence.PersistenceException;
60 import org.exolab.jms.persistence.SQLHelper;
61 import org.exolab.jms.service.BasicService;
62 import org.exolab.jms.service.ServiceException;
63
64
65 /**
66  * This is the active message handling component within the JMS server. Messages
67  * are passed in and added to the appropriate dispatchers for delivery to the
68  * clients.
69  *
70  * @author <a HREF="mailto:mourikis@intalio.com">Jim Mourikis</a>
71  * @author <a HREF="mailto:tma@netspace.net.au">Tim Anderson</a>
72  * @version $Revision: 1.2 $ $Date: 2005/03/18 03:58:39 $
73  */

74 public class MessageMgr extends BasicService {
75
76     /**
77      * The service name of the message manager
78      */

79     private final static String JavaDoc MM_SERVICE_NAME = "MessageManager";
80
81     /**
82      * Caches the singleton instance of the message manager.
83      */

84     private static MessageMgr _instance;
85
86     /**
87      * used to synchronise the singleton construction
88      */

89     private static final Object JavaDoc _block = new Object JavaDoc();
90
91     /**
92      * Maintain a list of registered MessageManagerEventListener objects, that
93      * get notified when certain events occur in the MessageManager
94      */

95     private transient HashMap JavaDoc _listeners = new HashMap JavaDoc(1023);
96
97     /**
98      * The sequence number generator is used to differentiate messages arriving
99      * on the same millisecond
100      */

101     private long _sequenceNumberGenerator = 0;
102
103     /**
104      * The logger
105      */

106     private static final Log _log = LogFactory.getLog(MessageMgr.class);
107
108
109     /**
110      * Create and return an instance of the singleton. If the singleton already
111      * exists then simply return it. If there is a problem creating the
112      * singleton then throw an exception
113      *
114      * @return MessageMgr - the singleton instance
115      */

116     public static MessageMgr createInstance() {
117         if (_instance == null) {
118             synchronized (_block) {
119                 if (_instance == null) {
120                     _instance = new MessageMgr();
121                 }
122             }
123         }
124         return _instance;
125     }
126
127     /**
128      * Return an instance to the MessageMgr singleton. This method assumes that
129      * the singleton has already been created with a call to {@link
130      * #createInstance}
131      *
132      * @return MessageMgr
133      */

134     public static MessageMgr instance() {
135         return _instance;
136     }
137
138     /**
139      * Construct a new <code>MessageMgr</code>
140      */

141     private MessageMgr() {
142         super(MM_SERVICE_NAME);
143     }
144
145     // ovverride BasicService.start
146
public void start() throws ServiceException {
147         try {
148             DestinationManager.createInstance();
149             ConsumerManager.createInstance();
150         } catch (ServiceException exception) {
151             throw exception;
152         } catch (Exception JavaDoc exception) {
153             String JavaDoc msg = "Failed to start MessageMgr";
154             _log.error(msg, exception);
155             throw new ServiceException(msg + ":" + exception);
156         }
157     }
158
159     // implement BasicService.run
160
public void run() {
161         // do nothing
162
}
163
164     // override BasicService.stop
165
public synchronized void stop() throws ServiceException {
166         try {
167             // destroy the consumer manager.
168
ConsumerManager.instance().destroy();
169
170             // destroy the destination manager.
171
DestinationManager.instance().destroy();
172
173             // clear state
174
_listeners.clear();
175         } catch (Exception JavaDoc error) {
176             error.printStackTrace();
177             throw new ServiceException("Failed to stop MessageMgr : " +
178                                        error.toString());
179         }
180
181         // clear the static reference
182
synchronized (_block) {
183             _instance = null;
184
185         }
186     }
187
188     /**
189      * Create the specified destination. The destination is a container for
190      * messages and consumers. Consumers listen for messages posted on a
191      * particular destination.
192      * <p/>
193      * This can be called multiple times without any side effects. If the
194      * destination is null then it throws a JMSException
195      *
196      * @param destination - create this destination
197      * @throws JMSException - if the params is null
198      */

199     public void addDestination(JmsDestination destination)
200             throws JMSException JavaDoc {
201
202         // check the methods preconditions
203
if (destination == null) {
204             throw new JMSException JavaDoc("Call to addDestination with null object");
205         }
206
207         DestinationManager.instance().getDestinationCache(destination);
208     }
209
210     /**
211      * Add a message
212      *
213      * @param message the message to add
214      * @throws JMSException if the message cannot be added
215      */

216     public void add(MessageImpl message) throws JMSException JavaDoc {
217         prepare(message);
218
219         JmsDestination destination =
220                 (JmsDestination) message.getJMSDestination();
221
222         // if the message's delivery mode is PERSISTENT, and the destination
223
// is also persistent, then then process it accordingly, otherwise use
224
// the non-persistent quality of service
225
if (message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT
226                 && DestinationManager.instance().isPersistent(destination)) {
227             addPersistentMessage(message);
228         } else {
229             addNonPersistentMessage(message);
230         }
231     }
232
233     /**
234      * Add a message
235      * <p/>
236      * Note that this method is called exclusively by the
237      * {@link ResourceManager} and should not be used for any other purpose.
238      *
239      * @param connection this is the database connection that is used
240      * @param message the message to add
241      * @throws JMSException thrown if there is a problem processing msg
242      */

243     public void add(Connection JavaDoc connection, MessageImpl message)
244             throws JMSException JavaDoc {
245
246         JmsDestination destination =
247                 (JmsDestination) message.getJMSDestination();
248
249         // if the message's delivery mode is PERSISTENT, and the destination
250
// is also persistent, then then process it accordingly, otherwise use
251
// the non-persistent quality of service
252
if (message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT
253                 && DestinationManager.instance().isPersistent(destination)) {
254             addPersistentMessage(connection, message);
255         } else {
256             addNonPersistentMessage(message);
257         }
258     }
259
260     /**
261      * Processes a non-persistent message
262      *
263      * @param message - the message to add
264      * @throws JMSException if the message cannot be processed
265      */

266     private void addNonPersistentMessage(MessageImpl message)
267             throws JMSException JavaDoc {
268
269         // notify all registered listeners that a new message has arrived
270
// for the specified destination.
271
JmsDestination destination = (JmsDestination) message.getJMSDestination();
272
273         MessageManagerEventListener listener =
274                 (MessageManagerEventListener) _listeners.get(destination);
275
276         if (listener != null) {
277             // if there is a registered destination cache then let the cache
278
// process it.
279
listener.messageAdded(destination, message);
280         } else {
281             // let the DestinationManager handle the message
282
DestinationManager.instance().messageAdded(destination, message);
283         }
284     }
285
286     /**
287      * Add a persistent message
288      *
289      * @param message the message to add
290      * @throws JMSException if the message cannot be processed
291      */

292     private void addPersistentMessage(MessageImpl message) throws JMSException JavaDoc {
293         JmsDestination destination =
294                 (JmsDestination) message.getJMSDestination();
295
296         Connection JavaDoc connection = null;
297         // do all persistent work in this block
298
try {
299             connection = DatabaseService.getConnection();
300
301             // add the message to the database
302
DatabaseService.getAdapter().addMessage(connection, message);
303
304             // notify all listeners that a persistent message has arrived
305
notifyOnAddPersistentMessage(connection, destination, message);
306
307             // commit the work
308
connection.commit();
309         } catch (Exception JavaDoc exception) {
310             SQLHelper.rollback(connection);
311             _log.error("Failed to make message persistent", exception);
312             throw new JMSException JavaDoc("Failed to make message persistent: " +
313                                    exception.toString());
314         } finally {
315             SQLHelper.close(connection);
316         }
317     }
318
319     /**
320      * This method is used to process persistent messages published through the
321      * resource manager.
322      *
323      * @param connection - the database connection to use.
324      * @param message - the message to process
325      * @throws JMSException - if the message cannot be processed
326      */

327     private void addPersistentMessage(Connection JavaDoc connection,
328                                       MessageImpl message) throws JMSException JavaDoc {
329         JmsDestination destination = (JmsDestination) message.getJMSDestination();
330         try {
331             // notify all listeners that a persistent message has arrived
332
notifyOnAddPersistentMessage(connection, destination, message);
333         } catch (PersistenceException exception) {
334             throw new JMSException JavaDoc("Failed in addPersistentMessage : "
335                                    + exception.toString());
336         } catch (Exception JavaDoc exception) {
337             throw new JMSException JavaDoc("Failed in addPersistentMessage : "
338                                     + exception.toString());
339         }
340     }
341
342     /**
343      * Prepares a message prior to it being passed through the system.
344      * This
345      *
346      * @param message the message
347      * @throws JMSException - if the message is invalid or cannot be prep'ed
348      */

349     public void prepare(MessageImpl message)
350             throws JMSException JavaDoc {
351         if (message == null) {
352             throw new JMSException JavaDoc("Null message");
353         }
354         Destination JavaDoc destination = message.getJMSDestination();
355         if (destination == null) {
356             throw new InvalidDestinationException JavaDoc("Message has no destination");
357         }
358         if (!(destination instanceof JmsDestination)) {
359             throw new InvalidDestinationException JavaDoc(
360                     "Destination not a JmsDestination");
361         }
362
363         // mark the message as accepted and attach a sequence number
364
message.setAcceptedTime((new Date JavaDoc()).getTime());
365         message.setSequenceNumber(++_sequenceNumberGenerator);
366         message.setReadOnly(true);
367     }
368
369     /**
370      * Resolves a destination given its name
371      *
372      * @param name the name of the destination
373      * @return JmsDestination if an active destination exists for the given
374      * name, else it returns <tt>null</tt>
375      */

376     public JmsDestination resolve(String JavaDoc name) {
377         return DestinationManager.instance().getDestination(name);
378     }
379
380     /**
381      * Resolves a consumer given its destination and an identity. Should look
382      * removing t from here.
383      *
384      * @param destination the destination
385      * @param consumerId the consumer identifier
386      * @return ConsumerIfc if an active consumer exists for the given
387      * name, else it returns <tt>null</tt>
388      */

389     public ConsumerEndpoint resolveConsumer(JmsDestination destination,
390                                             String JavaDoc consumerId) {
391         return ConsumerManager.instance().getConsumerEndpoint(consumerId);
392     }
393
394     /**
395      * Stop/start a consumer. When stopped, the consumer will not receive
396      * messages until the consumer is re-started. This is invoked when the
397      * underlying connection is stopped or started
398      *
399      * @param consumer the consumer to stop/start
400      * @param stop when <tt>true</tt> stop the consumer else start it.
401      */

402     public void setStopped(ConsumerEndpoint consumer, boolean stop)
403             throws JMSException JavaDoc {
404         // need to implement this for the consumer
405
}
406
407     /**
408      * Add a message listener for a specific destination to be informed when
409      * messages, for the destination are added or removed from the queue. More
410      * than one listener can be registered per desitnation and the same listener
411      * can be registered for multiple destinations.
412      * <p/>
413      * If a listener is already registered for a particuler destination then it
414      * fails silently.
415      *
416      * @param destination - what messgaes to listen for
417      * @param listener - a JmsMessageListener instance
418      */

419     public void addEventListener(JmsDestination destination,
420                                  MessageManagerEventListener listener) {
421
422         if ((destination != null) &&
423                 (listener != null)) {
424             synchronized (_listeners) {
425                 if (!_listeners.containsKey(destination)) {
426                     _listeners.put(destination, listener);
427                 }
428             }
429         }
430     }
431
432     /**
433      * Remove the listener for the specified destination. If one is not
434      * registered then ignore it.
435      *
436      * @param destination - destination that it listens for
437      * @param listener - listener for that destination.
438      */

439     public void removeEventListener(JmsDestination destination,
440                                     MessageManagerEventListener listener) {
441         if ((destination != null) &&
442                 (listener != null)) {
443             synchronized (_listeners) {
444                 if (_listeners.containsKey(destination)) {
445                     _listeners.remove(destination);
446                 }
447             }
448         }
449     }
450
451     /**
452      * Notify the listeners registered for the destination that a persistent
453      * message has been added to the message manager.
454      *
455      * @param connection the database connection to use.
456      * @param destination the message destination
457      * @param message the message that was added
458      * @throws JMSException is a processing error occured
459      * @throws PersistenceException if a persistence error occured
460      */

461     private void notifyOnAddPersistentMessage(Connection JavaDoc connection,
462                                               JmsDestination destination,
463                                               MessageImpl message)
464             throws JMSException JavaDoc, PersistenceException {
465
466         MessageManagerEventListener listener =
467                 (MessageManagerEventListener) _listeners.get(destination);
468
469         if (listener != null) {
470             // if there is a registered destination cache then let the cache
471
// process it.
472
listener.persistentMessageAdded(connection, destination, message);
473         } else {
474             // let the DestinationManager handle the message
475
DestinationManager.instance().persistentMessageAdded(connection,
476                                                                  destination,
477                                                                  message);
478         }
479     }
480
481 }
482
Popular Tags