KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > exolab > jms > persistence > Messages


1 /**
2  * Redistribution and use of this software and associated documentation
3  * ("Software"), with or without modification, are permitted provided
4  * that the following conditions are met:
5  *
6  * 1. Redistributions of source code must retain copyright
7  * statements and notices. Redistributions must also contain a
8  * copy of this document.
9  *
10  * 2. Redistributions in binary form must reproduce the
11  * above copyright notice, this list of conditions and the
12  * following disclaimer in the documentation and/or other
13  * materials provided with the distribution.
14  *
15  * 3. The name "Exolab" must not be used to endorse or promote
16  * products derived from this Software without prior written
17  * permission of Exoffice Technologies. For written permission,
18  * please contact info@exolab.org.
19  *
20  * 4. Products derived from this Software may not be called "Exolab"
21  * nor may "Exolab" appear in their names without prior written
22  * permission of Exoffice Technologies. Exolab is a registered
23  * trademark of Exoffice Technologies.
24  *
25  * 5. Due credit should be given to the Exolab Project
26  * (http://www.exolab.org/).
27  *
28  * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29  * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30  * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32  * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39  * OF THE POSSIBILITY OF SUCH DAMAGE.
40  *
41  * Copyright 2000-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42  */

43 package org.exolab.jms.persistence;
44
45 import java.io.ByteArrayInputStream JavaDoc;
46 import java.io.ByteArrayOutputStream JavaDoc;
47 import java.io.ObjectInputStream JavaDoc;
48 import java.io.ObjectOutputStream JavaDoc;
49 import java.sql.Connection JavaDoc;
50 import java.sql.PreparedStatement JavaDoc;
51 import java.sql.ResultSet JavaDoc;
52 import java.sql.SQLException JavaDoc;
53 import java.util.HashMap JavaDoc;
54 import java.util.Vector JavaDoc;
55
56 import javax.jms.JMSException JavaDoc;
57
58 import org.apache.commons.logging.Log;
59 import org.apache.commons.logging.LogFactory;
60
61 import org.exolab.jms.client.JmsDestination;
62 import org.exolab.jms.client.JmsTopic;
63 import org.exolab.jms.message.MessageImpl;
64 import org.exolab.jms.messagemgr.PersistentMessageHandle;
65
66
67 /**
68  * This class manages the persistence of message objects.
69  *
70  * @version $Revision: 1.3 $ $Date: 2005/06/09 14:39:52 $
71  * @author <a HREF="mailto:tma@netspace.net.au">Tim Anderson</a>
72  */

73 public class Messages {
74
75     /**
76      * Caches the singleton instance of this class
77      */

78     private static Messages _instance;
79
80     /**
81      * Monitor used to synchronize the initialization of this
82      * class
83      */

84     private static final Object JavaDoc _block = new Object JavaDoc();
85
86     /**
87      * The logger
88      */

89     private static final Log _log = LogFactory.getLog(Messages.class);
90
91
92     /**
93      * Returns a reference to the singleton instance.
94      * <p>
95      * Note that initialise() must have been invoked first for this
96      * to return a valid instance.
97      *
98      * @return Messages the singleton instance
99      */

100     public static Messages instance() {
101         return _instance;
102     }
103
104     /**
105      * Create an initialise the singleton istance of this class.
106      *
107      * @return Messages the singleton instance
108      */

109     public static Messages initialise() {
110         if (_instance == null) {
111             synchronized (_block) {
112                 if (_instance == null) {
113                     _instance = new Messages();
114                 }
115             }
116         }
117         return _instance;
118     }
119
120     /**
121      * Add a message to the database, in the context of the specified
122      * transaction and connection.
123      *
124      * @param connection - execute on this connection
125      * @param message - the message to add
126      * @throws PersistenceException - an sql related error
127      */

128     public void add(Connection JavaDoc connection, MessageImpl message)
129         throws PersistenceException {
130
131         PreparedStatement JavaDoc insert = null;
132
133         // extract the identity of the message
134
String JavaDoc messageId = message.getMessageId().getId();
135
136         // check that the destination is actually registered
137
// and map the name to the corresponding id
138
String JavaDoc name;
139         try {
140             name = ((JmsDestination) message.getJMSDestination()).getName();
141         } catch (JMSException JavaDoc exception) {
142             throw new PersistenceException(
143                 "Failed to get destination for message=" +
144                 message.getMessageId(), exception);
145         }
146
147         long destinationId = Destinations.instance().getId(name);
148         if (destinationId == 0) {
149             throw new PersistenceException(
150                 "Cannot add message=" + message.getMessageId() +
151                 ", destination=" + name + " (" + destinationId +
152                 "): destination does not exist");
153         }
154
155         try {
156             // create, populate and execute the insert
157
insert = connection.prepareStatement(
158                 "insert into messages (messageid, destinationid, priority, "
159                 + "createtime, expirytime, processed, messageblob) values "
160                 + "(?,?,?,?,?,?,?)");
161             insert.setString(1, messageId);
162             insert.setLong(2, destinationId);
163             insert.setInt(3, message.getJMSPriority());
164             insert.setLong(4, message.getAcceptedTime());
165             insert.setLong(5, message.getJMSExpiration());
166             insert.setInt(6, (message.getProcessed()) ? 1 : 0);
167
168             // serialize the message
169
byte[] bytes = serialize(message);
170             insert.setBinaryStream(7, new ByteArrayInputStream JavaDoc(bytes),
171                 bytes.length);
172             //insert.setBytes(8, bytes);
173

174             // execute the insert
175
if (insert.executeUpdate() != 1) {
176                 throw new PersistenceException(
177                     "Failed to add message=" + message.getMessageId() +
178                     ", destination=" + name + " (" + destinationId + ")");
179             }
180         } catch (PersistenceException exception) {
181             throw exception;
182         } catch (Exception JavaDoc exception) {
183             throw new PersistenceException(
184                 "Failed to add message=" + message.getMessageId() +
185                 ", destination=" + name + " (" + destinationId + ")",
186                 exception);
187         } finally {
188             SQLHelper.close(insert);
189         }
190     }
191
192     /**
193      * Update the message state in the database. This will be called to set
194      * the message state to processed by the provider
195      *
196      * @param connection - execute on this connection
197      * @param message - the message to update
198      * @throws PersistenceException - an sql related error
199      */

200     public void update(Connection JavaDoc connection, MessageImpl message)
201         throws PersistenceException {
202
203         PreparedStatement JavaDoc update = null;
204
205         // extract the identity of the message
206
String JavaDoc messageId = message.getMessageId().getId();
207
208         try {
209             update = connection.prepareStatement(
210                 "update messages set processed=? where messageId=?");
211             update.setInt(1, message.getProcessed() ? 1 : 0);
212             update.setString(2, messageId);
213
214             // execute the update
215
if (update.executeUpdate() != 1) {
216                 _log.error("Cannot update message=" + messageId);
217             }
218         } catch (SQLException JavaDoc exception) {
219             throw new PersistenceException(
220                 "Failed to update message, id=" + messageId, exception);
221         } finally {
222             SQLHelper.close(update);
223         }
224     }
225
226     /**
227      * Remove a message with the specified identity from the database
228      *
229      * @param connection - execute on this connection
230      * @param messageId - the message id of the message to remove
231      * @throws PersistenceException - an sql related error
232      */

233     public void remove(Connection JavaDoc connection, String JavaDoc messageId)
234         throws PersistenceException {
235
236         PreparedStatement JavaDoc delete = null;
237         try {
238             delete = connection.prepareStatement(
239                 "delete from messages where messageId=?");
240             delete.setString(1, messageId);
241
242             // execute the delete
243
if (delete.executeUpdate() != 1) {
244                 _log.error("Cannot remove message=" + messageId);
245             }
246         } catch (SQLException JavaDoc exception) {
247             throw new PersistenceException(
248                 "Failed to remove message, id=" + messageId, exception);
249         } finally {
250             SQLHelper.close(delete);
251         }
252     }
253
254     /**
255      * Return the message identified by the message Id
256      *
257      * @param connection - execute on this connection
258      * @param messageId - id of message to retrieve
259      * @return MessageImpl - the associated message
260      * @throws PersistenceException - an sql related error
261      */

262     public MessageImpl get(Connection JavaDoc connection, String JavaDoc messageId)
263         throws PersistenceException {
264
265         MessageImpl result = null;
266         PreparedStatement JavaDoc select = null;
267         ResultSet JavaDoc set = null;
268         try {
269             select = connection.prepareStatement(
270                 "select messageBlob, processed from messages where messageId=?");
271
272             select.setString(1, messageId);
273             set = select.executeQuery();
274             if (set.next()) {
275                 result = deserialize(set.getBytes(1));
276                 result.setProcessed((set.getInt(2) == 1 ? true : false));
277             }
278         } catch (SQLException JavaDoc exception) {
279             throw new PersistenceException(
280                 "Failed to retrieve message, id=" + messageId, exception);
281         } finally {
282             SQLHelper.close(set);
283             SQLHelper.close(select);
284         }
285
286         return result;
287     }
288
289     /**
290      * Delete all messages for the given destination
291      *
292      * @param connection - execute on this connection
293      * @param destination the destination to remove messages for
294      * @return int - the number of messages purged
295      * @throws PersistenceException - an sql related error
296      */

297     public int removeMessages(Connection JavaDoc connection, String JavaDoc destination)
298         throws PersistenceException {
299
300         int result = 0;
301         PreparedStatement JavaDoc delete = null;
302
303         // map the destination name to an id
304
long destinationId = Destinations.instance().getId(destination);
305         if (destinationId == 0) {
306             throw new PersistenceException("Cannot delete messages for " +
307                 "destination=" + destination +
308                 ": destination does not exist");
309         }
310
311         try {
312             delete = connection.prepareStatement(
313                 "delete from messages where destinationId = ?");
314             delete.setLong(1, destinationId);
315             result = delete.executeUpdate();
316         } catch (SQLException JavaDoc exception) {
317             throw new PersistenceException(
318                 "Failed to remove messages for destination=" + destination,
319                 exception);
320         } finally {
321             SQLHelper.close(delete);
322         }
323
324         return result;
325     }
326
327     /**
328      * Retrieve the next set of messages for the specified destination with
329      * an acceptance time greater or equal to that specified. It will retrieve
330      * around 200 or so messages depending on what is available.
331      *
332      * @param connection - execute on this connection
333      * @param destination - the destination
334      * @param priority - the priority of the messages
335      * @param time - with timestamp greater or equal to this
336      * @return Vector - one or more MessageImpl objects
337      * @throws PersistenceException - if an SQL error occurs
338      */

339     public Vector JavaDoc getMessages(Connection JavaDoc connection, String JavaDoc destination,
340                               int priority, long time)
341         throws PersistenceException {
342
343         PreparedStatement JavaDoc select = null;
344         ResultSet JavaDoc set = null;
345         Vector JavaDoc messages = new Vector JavaDoc();
346
347         try {
348             JmsDestination dest = Destinations.instance().get(destination);
349             if (dest == null) {
350                 throw new PersistenceException(
351                     "Cannot getMessages for destination=" + destination
352                     + ": destination does not exist");
353             }
354
355             long destinationId = Destinations.instance().getId(destination);
356             if (destinationId == 0) {
357                 throw new PersistenceException(
358                     "Cannot getMessages for destination=" + destination
359                     + ": destination does not exist");
360             }
361
362             if ((dest instanceof JmsTopic) &&
363                 (((JmsTopic) dest).isWildCard())) {
364                 // if the destination is a wildcard then we can't only select
365
// on timestamp. This will fault in any message greater than
366
// or equal to the specified timestamp.
367
select = connection.prepareStatement(
368                     "select createtime,processed,messageblob from messages "
369                     + "where priority=? and createTime>=? "
370                     + "order by createTime asc");
371                 select.setInt(1, priority);
372                 select.setLong(2, time);
373             } else {
374                 // if the destination is more specific then we can execute a
375
// more specialized query and fault in other messages for
376
// the same destination.
377
select = connection.prepareStatement(
378                     "select createtime,processed,messageblob from messages "
379                     + "where destinationId=? and priority=? and createTime>=? "
380                     + "order by createTime asc");
381                 select.setLong(1, destinationId);
382                 select.setInt(2, priority);
383                 select.setLong(3, time);
384             }
385             set = select.executeQuery();
386
387             // now iterate through the result set
388
int count = 0;
389             long lastTimeStamp = time;
390             while (set.next()) {
391                 MessageImpl m = deserialize(set.getBytes(3));
392                 m.setProcessed((set.getInt(2) == 1 ? true : false));
393                 messages.add(m);
394                 if (++count > 200) {
395                     // if there are more than two hundred rows then exist
396
// the loop after 200 messages have been retrieved
397
// and the timestamp has changed.
398
if (set.getLong(1) > lastTimeStamp) {
399                         break;
400                     }
401                 } else {
402                     lastTimeStamp = set.getLong(1);
403                 }
404             }
405         } catch (SQLException JavaDoc exception) {
406             throw new PersistenceException(
407                 "Failed to retrieve messages", exception);
408         } finally {
409             SQLHelper.close(set);
410             SQLHelper.close(select);
411         }
412
413         return messages;
414     }
415
416     /**
417      * Retrieve the specified number of message ids from the database with a
418      * time greater than that specified. The number of items to retrieve
419      * is only a hint and does not reflect the number of messages actually
420      * returned.
421      *
422      * @param connection - execute on this connection
423      * @param time - with timestamp greater than
424      * @param hint - an indication of the number of messages to return.
425      * @return a map of messageId Strings to their creation time
426      * @throws PersistenceException - if an SQL error occurs
427      */

428     public HashMap JavaDoc getMessageIds(Connection JavaDoc connection, long time, int hint)
429         throws PersistenceException {
430
431         PreparedStatement JavaDoc select = null;
432         ResultSet JavaDoc set = null;
433         HashMap JavaDoc messages = new HashMap JavaDoc();
434
435         try {
436             select = connection.prepareStatement(
437                 "select messageId,createTime from messages where createTime>? "
438                 + "order by createTime asc");
439             select.setLong(1, time);
440             set = select.executeQuery();
441
442             // now iterate through the result set
443
int count = 0;
444             long lastTimeStamp = time;
445             while (set.next()) {
446                 messages.put(set.getString(1), new Long JavaDoc(set.getLong(2)));
447                 if (++count > hint) {
448                     if (set.getLong(2) > lastTimeStamp) {
449                         break;
450                     }
451                 } else {
452                     lastTimeStamp = set.getLong("createTime");
453                 }
454
455             }
456         } catch (SQLException JavaDoc exception) {
457             throw new PersistenceException(
458                 "Failed to retrieve message identifiers", exception);
459         } finally {
460             SQLHelper.close(set);
461             SQLHelper.close(select);
462         }
463
464         return messages;
465     }
466
467     /**
468      * Retrieve a list of unprocessed messages and return them to the client.
469      * An unprocessed message has been accepted by the system but not
470      * processed.
471      *
472      * @param connection - execute on this connection
473      * @return Vector - one or more MessageImpl objects
474      * @throws PersistenceException - if an SQL error occurs
475      */

476     public Vector JavaDoc getUnprocessedMessages(Connection JavaDoc connection)
477         throws PersistenceException {
478
479         PreparedStatement JavaDoc select = null;
480         ResultSet JavaDoc set = null;
481         Vector JavaDoc messages = new Vector JavaDoc();
482
483         try {
484             select = connection.prepareStatement(
485                 "select messageblob from messages where processed=0");
486             set = select.executeQuery();
487             // now iterate through the result set
488
while (set.next()) {
489                 MessageImpl m = deserialize(set.getBytes(1));
490                 m.setProcessed(false);
491                 messages.add(m);
492             }
493         } catch (SQLException JavaDoc exception) {
494             throw new PersistenceException(
495                 "Failed to retrieve unprocessed messages", exception);
496         } finally {
497             SQLHelper.close(set);
498             SQLHelper.close(select);
499         }
500
501         return messages;
502     }
503
504     /**
505      * Retrieve the message handle for all unexpired messages
506      *
507      * @param connection - execute on this connection
508      * @param destination - the destination in question
509      * @return Vector - collection of PersistentMessageHandle objects
510      * @throws PersistenceException - sql releated exception
511      */

512     public Vector JavaDoc getNonExpiredMessages(Connection JavaDoc connection,
513                                         JmsDestination destination)
514         throws PersistenceException {
515
516         Vector JavaDoc result = new Vector JavaDoc();
517         PreparedStatement JavaDoc select = null;
518         ResultSet JavaDoc set = null;
519
520         try {
521             long destinationId = Destinations.instance().getId(
522                 destination.getName());
523
524             if (destinationId == 0) {
525                 throw new PersistenceException(
526                     "Cannot getMessages for destination=" + destination
527                     + ": destination does not exist");
528             }
529
530             select = connection.prepareStatement(
531                 "select messageId,destinationId,priority,createTime,"
532                 + "sequenceNumber,expiryTime "
533                 + "from messages "
534                 + "where expiryTime>0 and destinationId=? "
535                 + "order by expiryTime asc");
536             select.setLong(1, destinationId);
537             set = select.executeQuery();
538
539             while (set.next()) {
540                 String JavaDoc messageId = set.getString(1);
541                 int priority = set.getInt(3);
542                 long acceptedTime = set.getLong(4);
543                 long sequenceNumber = set.getLong(5);
544                 long expiryTime = set.getLong(6);
545                 PersistentMessageHandle handle = new PersistentMessageHandle(
546                         messageId, priority, acceptedTime, sequenceNumber,
547                         expiryTime, destination);
548                 result.add(handle);
549             }
550         } catch (SQLException JavaDoc exception) {
551             throw new PersistenceException(
552                 "Failed to retrieve non-expired messages", exception);
553         } finally {
554             SQLHelper.close(set);
555             SQLHelper.close(select);
556         }
557
558         return result;
559     }
560
561     /**
562      * Delete all expired messages and associated message handles.
563      *
564      * @param connection - execute on this connection
565      * @throws PersistenceException - if an SQL error occurs
566      */

567     public void removeExpiredMessages(Connection JavaDoc connection)
568         throws PersistenceException {
569
570         PreparedStatement JavaDoc delete = null;
571         try {
572             long time = System.currentTimeMillis();
573
574             // delete from the messages
575
delete = connection.prepareStatement(
576                 "delete from messages where expiryTime > 0 and expiryTime < ?");
577             delete.setLong(1, time);
578             delete.executeUpdate();
579             delete.close();
580
581             // delete the message handles
582
delete = connection.prepareStatement(
583                 "delete from message_handles where expiryTime > 0 and expiryTime < ?");
584             delete.setLong(1, time);
585             delete.executeUpdate();
586         } catch (SQLException JavaDoc exception) {
587             throw new PersistenceException(
588                 "Failed to remove expired messages", exception);
589         } finally {
590             SQLHelper.close(delete);
591         }
592     }
593
594     /**
595      * Reset the instance. We need to deprecate this method since this
596      * class does not contain state information
597      */

598     public void close() {
599         _instance = null;
600     }
601
602     /**
603      * Default constructor does nothing at the moment.
604      */

605     protected Messages() {
606     }
607
608     /**
609      * Get the message as a serialized blob
610      *
611      * @param message the message to serialize
612      * @return byte[] the serialized message
613      */

614     public byte[] serialize(MessageImpl message)
615         throws PersistenceException {
616
617         byte[] result = null;
618         ObjectOutputStream JavaDoc ostream = null;
619         try {
620             ByteArrayOutputStream JavaDoc bstream = new ByteArrayOutputStream JavaDoc();
621             ostream = new ObjectOutputStream JavaDoc(bstream);
622             ostream.writeObject(message);
623             result = bstream.toByteArray();
624         } catch (Exception JavaDoc exception) {
625             throw new PersistenceException("Failed to serialize message",
626                 exception);
627         } finally {
628             SQLHelper.close(ostream);
629         }
630
631         return result;
632     }
633
634     /**
635      * Set the message from a serialized blob
636      *
637      * @param blob the serialized message
638      * @return the re-constructed message
639      */

640     public MessageImpl deserialize(byte[] blob) throws PersistenceException {
641         MessageImpl message = null;
642
643         if (blob != null) {
644             ObjectInputStream JavaDoc istream = null;
645             try {
646                 ByteArrayInputStream JavaDoc bstream = new ByteArrayInputStream JavaDoc(blob);
647                 istream = new ObjectInputStream JavaDoc(bstream);
648                 message = (MessageImpl) istream.readObject();
649             } catch (Exception JavaDoc exception) {
650                 throw new PersistenceException(
651                     "Failed to de-serialize message", exception);
652             } finally {
653                 SQLHelper.close(istream);
654             }
655         } else {
656             throw new PersistenceException(
657                 "Cannot de-serialize null message blob");
658         }
659
660         return message;
661     }
662
663 }
664
Popular Tags