KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > walend > somnifugi > SomniMessageConsumer


1 package net.walend.somnifugi;
2
3 import java.util.Set JavaDoc;
4 import java.util.HashSet JavaDoc;
5 import java.util.Iterator JavaDoc;
6
7 import javax.jms.MessageConsumer JavaDoc;
8 import javax.jms.JMSException JavaDoc;
9 import javax.jms.MessageListener JavaDoc;
10 import javax.jms.Message JavaDoc;
11 import javax.jms.Session JavaDoc;
12
13 import net.walend.somnifugi.channel.Takable;
14
15 /**
16 SomniMessageConsumer is an abstraction for MessageConsumers. It holds a Takable feed, an SomniExceptionListener and some control booleans. If you set a MessageListener, it also holds and controls an SomniMessageListenerRunner.
17
18 @author <a HREF="http://walend.net">David Walend</a> <a HREF="mailto:david@walend.net">david@walend.net</a>
19 @author @pwang@ added support for client acknowledgement.
20  */

21
22 public abstract class SomniMessageConsumer
23     implements MessageConsumer JavaDoc
24 {
25     private Takable<Message JavaDoc> feed;
26     private SomniMessageListenerRunner messageListenerRunner = null;
27     private SomniExceptionListener exceptionListener;
28     private Set JavaDoc<Thread JavaDoc> waitingThreads = new HashSet JavaDoc<Thread JavaDoc>();
29     private SomniMessageSelector messageSelector;
30     
31     private boolean started = false;
32     private boolean closed = false;
33
34     /*
35 Synchronization in SomniMessageConsumer is a little touchy. threadsGuard is currently independant of the other three.
36      */

37     private final Object JavaDoc threadsGuard = new Object JavaDoc(); //guards the waitingThreads Set
38

39     private final Object JavaDoc partsGuard = new Object JavaDoc(); //guards the messageListenerRunner and exceptionListener
40
private final Object JavaDoc feedGuard = new Object JavaDoc(); //guards the feed
41

42     private final Object JavaDoc stateGuard = new Object JavaDoc(); //guards the started and closed variables
43

44     /**
45 To support client acknowledge.
46     */

47     private SomniSession session;
48
49
50     protected SomniMessageConsumer(Takable<Message JavaDoc> feed,SomniExceptionListener somniExceptionListener,SomniSession session)
51     {
52         if(feed==null)
53         {
54             throw new NullPointerException JavaDoc("feed can not be null.");
55         }
56         
57         this.feed = feed;
58         this.exceptionListener = somniExceptionListener;
59         this.session = session;
60     }
61
62     protected SomniMessageConsumer(Takable<Message JavaDoc> feed,SomniExceptionListener somniExceptionListener,SomniMessageSelector messageSelector,SomniSession session)
63     {
64         this(feed,somniExceptionListener,session);
65         this.messageSelector = messageSelector;
66     }
67
68     /**
69 Gets this message consumer's message selector expression.
70  
71 @return this message consumer's message selector, or null if no
72         message selector exists for the message consumer (that is, if
73         the message selector was not set or was set to null or the
74         empty string)
75  
76 @exception JMSException if the JMS provider fails to get the message
77                         selector due to some internal error.
78 @exception UnsupportedOperationException because it isn't implemented.
79       */

80       //todo test
81
public String JavaDoc getMessageSelector()
82         throws JMSException JavaDoc
83     {
84         if(messageSelector == null)
85         {
86             return null;
87         }
88         else
89         {
90             return messageSelector.toString();
91         }
92     }
93
94     /**
95 @return the SomniMessageSelector for this subscriber.
96     */

97     public SomniMessageSelector getSomniMessageSelector()
98     {
99         return messageSelector;
100     }
101     
102     /** Gets the message consumer's <CODE>MessageListener</CODE>.
103  
104 @return the listener for the message consumer, or null if no listener
105 is set
106  
107 @exception JMSException if the JMS provider fails to get the message
108                         listener due to some internal error.
109 @see javax.jms.MessageConsumer#setMessageListener
110       */

111     public MessageListener JavaDoc getMessageListener()
112         throws JMSException JavaDoc
113     {
114         synchronized(partsGuard)
115             {
116                 if(messageListenerRunner==null)
117                     {
118                         return null;
119                     }
120                 else
121                     {
122                         return messageListenerRunner.getMessageListener();
123                     }
124             }
125     }
126
127     /**
128 Sets the message consumer's <CODE>MessageListener</CODE>.
129 <P>
130 Setting the message listener to null is the equivalent of
131 unsetting the message listener for the message consumer.
132 <P>
133 The effect of calling <CODE>MessageConsumer.setMessageListener</CODE>
134 while messages are being consumed by an existing listener
135 or the consumer is being used to consume messages synchronously
136 is undefined.
137 <P>
138 I've decided that using setMessageListener and calling receive forces the two
139 threads to compete for the message.
140 <p>
141 <p>
142 If the MessageListener's onMessage() or the MessageConsumer produces a RuntimeException, this SomniMessageListenerRunner will log the error via the ExceptionListener, redeliver the message, and let the message listener's run() method end.
143 <p>
144 @param listener the listener to which the messages are to be
145                 delivered
146  
147 @exception JMSException if the JMS provider fails to set the message
148                         listener due to some internal error.
149 @see javax.jms.MessageConsumer#getMessageListener
150       */

151     public void setMessageListener(MessageListener JavaDoc listener)
152         throws JMSException JavaDoc
153     {
154         synchronized(partsGuard)
155             {
156                 checkClosed();
157                 //stop the old MessageListenerRunner
158
if(messageListenerRunner!=null)
159                     {
160                         messageListenerRunner.close();
161                     }
162                 //Start a new MessageListenerRunner
163
if(listener!=null)
164                     {
165                         SomniLogger.IT.fine(getName()+" MessageListener set to "+listener.toString());
166
167                         messageListenerRunner = new SomniMessageListenerRunner(this,listener,exceptionListener);
168                         Thread JavaDoc thread = new Thread JavaDoc(messageListenerRunner);
169                         thread.start();
170                     }
171             }
172     }
173
174     private static boolean expired(Message JavaDoc message)
175         throws JMSException JavaDoc
176     {
177         if(message==null)
178             {
179                 return false;
180             }
181         if(message.getJMSExpiration()!=0)
182             {
183                 if(message.getJMSExpiration()<System.currentTimeMillis())
184                     {
185                         //todo is logging an Object possible?
186
StringBuffer JavaDoc buffy = new StringBuffer JavaDoc();
187                         buffy.append("Message ");
188                         buffy.append(message.toString());
189                         buffy.append(" received "+(System.currentTimeMillis()-message.getJMSExpiration()));
190                         buffy.append(" ms late. Dropped due to timeout.");
191
192                         SomniLogger.IT.warning(buffy.toString());
193
194                         return true;
195                     }
196                 else
197                     {
198                         return false;
199                     }
200             }
201         else
202             {
203                 return false;
204             }
205     }
206
207     private void logReceived(Message JavaDoc message)
208     {
209         if(message!=null)
210             {
211                 //todo is logging an Object possible?
212
StringBuffer JavaDoc buffy = new StringBuffer JavaDoc();
213                 
214                 buffy.append(getName());
215                 buffy.append(" received ");
216                 buffy.append(message.toString());
217                 
218                 SomniLogger.IT.finest(buffy.toString());
219             }
220     }
221
222     private SomniMessage processReceivedMessage(SomniMessage message)
223         throws JMSException JavaDoc
224     {
225         SomniMessage result = message;
226
227         //only if we're in client ack mode, put the copy in the session's message list and set the consumer
228
if(session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
229         {
230             session.addMessageToAcknowledge(result);
231             result.setConsumer(this);
232         }
233
234         //only if we're in autoack mode, go ahead and acknowledge the message
235
if (session.getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE)
236         {
237             result.acknowledge();
238         }
239
240         return result;
241     }
242
243     /**
244 Receives the next message produced for this message consumer.
245  
246 <P>This call blocks indefinitely until a message is produced
247 or until this message consumer is closed.
248
249 <P>If this <CODE>receive</CODE> is done within a transaction, the
250 consumer retains the message until the transaction commits.
251  
252 @return the next message produced for this message consumer, or
253 null if this message consumer is concurrently closed
254  
255 @exception JMSException if the JMS provider fails to receive the next
256                         message due to some internal error.
257
258       */

259     public Message JavaDoc receive()
260         throws JMSException JavaDoc
261     {
262         synchronized(threadsGuard)
263             {
264                 waitingThreads.add(Thread.currentThread());
265             }
266         try
267             {
268                 synchronized(feedGuard)
269                     {
270                         synchronized(stateGuard)
271                             {
272                                 checkClosed();
273                                 if(!isStarted())
274                                     {
275                                         stateGuard.wait();
276                                         checkClosed();
277                                     }
278                             }
279                         //todo check if this type is too strict
280
SomniMessage result = null;
281                         while(result==null)
282                             {
283                                 result = (SomniMessage)feed.take();
284                                 
285                                 if(expired(result))
286                                 {
287                                     result = null;
288                                 }
289                                 else
290                                 {
291                                     result = processReceivedMessage(result);
292                                 }
293                             }
294                         logReceived(result);
295                         return result;
296                     }
297             }
298         catch(InterruptedException JavaDoc ie)
299             {
300                 throw new SomniInterruptedException(ie);
301             }
302         finally
303             {
304                 synchronized(threadsGuard)
305                     {
306                         waitingThreads.remove(Thread.currentThread());
307                     }
308             }
309     }
310
311     /**
312 Receives the next message that arrives within the specified
313 timeout interval.
314  
315 <P>This call blocks until a message arrives, the
316 timeout expires, or this message consumer is closed.
317 A <CODE>timeout</CODE> of zero never expires, and the call blocks
318 indefinitely.
319       *
320 @param timeout the timeout value (in milliseconds)
321       *
322 @return the next message produced for this message consumer, or
323 null if the timeout expires or this message consumer is concurrently
324 closed
325       *
326 @exception JMSException if the JMS provider fails to receive the next
327                         message due to some internal error.
328       */

329     public Message JavaDoc receive(long timeout)
330         throws JMSException JavaDoc
331     {
332         long timesUp = System.currentTimeMillis()+timeout;
333         synchronized(threadsGuard)
334             {
335                 waitingThreads.add(Thread.currentThread());
336             }
337         try
338             {
339                 synchronized(feedGuard)
340                     {
341                         synchronized(stateGuard)
342                             {
343                                 checkClosed();
344                                 if(!isStarted())
345                                     {
346                                         stateGuard.wait(timesUp-System.currentTimeMillis());
347                                         checkClosed();
348                                     }
349                             }
350
351                         SomniLogger.IT.finest("polling for "+(timesUp-System.currentTimeMillis()));
352                         //todo check if this type is too strict
353
SomniMessage result = null;
354                         while(result==null&&timesUp>System.currentTimeMillis())
355                             {
356                                 result = (SomniMessage)feed.poll(timesUp-System.currentTimeMillis());
357                                 if(expired(result))
358                                 {
359                                     result = null;
360                                 }
361                             }
362                         if(result==null)
363                             {
364                                 SomniLogger.IT.finer("returned null after "+timeout+" ms.");
365                             }
366                         else
367                             {
368                                 result = processReceivedMessage(result);
369                                 logReceived(result);
370                             }
371                         return result;
372                     }
373             }
374         catch(InterruptedException JavaDoc ie)
375             {
376                 throw new SomniInterruptedException(ie);
377             }
378         finally
379             {
380                 synchronized(threadsGuard)
381                     {
382                         waitingThreads.remove(Thread.currentThread());
383                     }
384             }
385     }
386
387     /**
388 Receives the next message if one is immediately available.
389       *
390 @return the next message produced for this message consumer, or
391 null if one is not available
392  
393 @exception JMSException if the JMS provider fails to receive the next
394                         message due to some internal error.
395       */

396     public Message JavaDoc receiveNoWait()
397         throws JMSException JavaDoc
398     {
399         synchronized(feedGuard)
400             {
401                 if(!isStarted())
402                     {
403                         SomniLogger.IT.finer("returned null");
404                         return null;
405                     }
406                 checkClosed();
407                 boolean succeeded = false;
408                 //todo check if this type is too strict
409
SomniMessage result = null;
410
411                 //keep going if the result is not null and we did not succeed
412
do
413                     {
414                         result = (SomniMessage)feed.poll();
415                         succeeded = !expired(result);
416                     }
417                 while((!succeeded)&&(result!=null));
418
419                 if(result==null)
420                     {
421                         SomniLogger.IT.finer("returned null");
422                     }
423                 else
424                     {
425                         result = processReceivedMessage(result);
426                         logReceived(result);
427                     }
428                 return result;
429             }
430     }
431
432     /**
433 Closes the message consumer.
434
435 <P>Since a provider may allocate some resources on behalf of a
436 <CODE>MessageConsumer</CODE> outside the Java virtual machine, clients
437 should close them when they
438 are not needed. Relying on garbage collection to eventually reclaim
439 these resources may not be timely enough.
440
441 <P>This call blocks until a <CODE>receive</CODE> or message listener in
442 progress has completed. A blocked message consumer <CODE>receive</CODE>
443 call
444 returns null when this message consumer is closed.
445  
446 @exception JMSException if the JMS provider fails to close the consumer
447                         due to some internal error.
448       */

449     public void close()
450         throws JMSException JavaDoc
451     {
452         if(messageListenerRunner!=null)
453             {
454                 messageListenerRunner.close();
455             }
456
457         synchronized(stateGuard)
458             {
459                 closed = true;
460                 stop();
461             }
462
463         synchronized(threadsGuard)
464             {
465                 Iterator JavaDoc it = waitingThreads.iterator();
466                 while(it.hasNext())
467                     {
468                         Thread JavaDoc thread = (Thread JavaDoc)it.next();
469                         thread.interrupt();
470                         it.remove();
471                     }
472             }
473
474         if(messageListenerRunner!=null)
475             {
476                 messageListenerRunner.joinThread();
477             }
478
479         synchronized(stateGuard)
480             {
481                 stateGuard.notifyAll();
482             }
483         SomniLogger.IT.finer(getName()+" closed");
484     }
485
486     /**
487 Guess the number of Messages waiting to be consumed.
488     */

489     public int guessSize()
490     {
491         return feed.guessSize();
492     }
493
494     /**
495     @return the unix time stamp of the next message if a message is pending, or 0.
496     */

497     public long pendingMessageTimestamp()
498     {
499         Message JavaDoc pendingMessage = feed.peek();
500         
501         if(pendingMessage==null)
502         {
503             return 0;
504         }
505         try
506         {
507             return pendingMessage.getJMSTimestamp();
508         }
509         catch(JMSException JavaDoc jmse)
510         {
511             throw new SomniRuntimeException("Trouble getting the time stamp.",jmse);
512         }
513     }
514   
515     /**
516     Create a small report of how this consumer is doing.
517     */

518     public SomniConsumerReport createSomniConsumerReport()
519     {
520         return new SomniConsumerReport(guessSize(),pendingMessageTimestamp());
521     }
522     
523     protected void checkClosed()
524     {
525         synchronized(stateGuard)
526             {
527                 if(closed)
528                     {
529                         throw new IllegalStateException JavaDoc("This MessageConsumer is closed.");
530                     }
531             }
532     }
533
534     protected void start()
535     {
536         synchronized(stateGuard)
537             {
538                 started = true;
539                 stateGuard.notifyAll();
540             }
541         SomniLogger.IT.finer(getName()+" started");
542     }
543
544     protected void stop()
545     {
546         synchronized(stateGuard)
547             {
548                 started = false;
549             }
550         SomniLogger.IT.finer(getName()+" stopped");
551     }
552
553     protected boolean isStarted()
554     {
555         synchronized(stateGuard)
556             {
557                 return started;
558             }
559     }
560
561     protected abstract String JavaDoc getName();
562
563     protected abstract SomniDestination getDestination();
564
565     protected void redeliver(Message JavaDoc message)
566     {
567         try
568             {
569                 if(message!=null)
570                     {
571                         if(message.getJMSRedelivered())
572                             {
573                                 //todo is there a better way to log messages
574
StringBuffer JavaDoc buffy = new StringBuffer JavaDoc();
575                                 buffy.append(getName());
576                                 buffy.append(" can not deliver ");
577                                 buffy.append(message.toString());
578
579                                 SomniLogger.IT.warning(buffy.toString());
580                             }
581                         else
582                             {
583                                 //todo is there a better way to log messages
584
StringBuffer JavaDoc buffy = new StringBuffer JavaDoc();
585                                 buffy.append(getName());
586                                 buffy.append(" attempting to redeliver ");
587                                 buffy.append(message.toString());
588                                 SomniLogger.IT.warning(buffy.toString());
589
590                                 message.setJMSRedelivered(true);
591                                 try
592                                     {
593                                         feed.pushBack(message);
594                                     }
595                                 catch(InterruptedException JavaDoc ie)
596                                     {
597                                         throw new SomniInterruptedException(ie);
598                                     }
599                             }
600                     }
601             }
602         catch(JMSException JavaDoc jmse)
603             {
604                 synchronized(partsGuard)
605                     {
606                         exceptionListener.onException(jmse);
607                     }
608             }
609     }
610
611     void acknowledge()
612         throws JMSException JavaDoc
613     {
614         session.acknowledge();
615     }
616 }
617
618 /* Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006 David Walend
619 All rights reserved.
620
621 Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
622
623 Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
624
625 Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
626
627 Neither the name of the SomnifugiJMS Project, walend.net, nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission from David Walend.
628
629 Credits in redistributions in source or binary forms must include a link to http://somnifugi.sourceforge.net .
630
631 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
632 The net.walend.somnifugi.sql92 package is modified code from the openmq project, https://mq.dev.java.net/ , Copyright (c) of Sun, and carries the CDDL license, repeated here: You can obtain a copy of the license at https://glassfish.dev.java.net/public/CDDLv1.0.html. See the License for the specific language governing permissions and limitations under the License.
633
634 =================================================================================
635
636 For more information and the latest version of this software, please see http://somnifugi.sourceforge.net and http://walend.net or email <a HREF="mailto:david@walend.net">david@walend.net</a>.
637  */

638
Popular Tags