KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > api > jms > MantaMessageConsumer


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is Nimo.
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34  
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  */

46 package org.mr.api.jms;
47
48 import java.io.Serializable JavaDoc;
49 import java.util.Iterator JavaDoc;
50 import java.util.List JavaDoc;
51
52 import javax.jms.Destination JavaDoc;
53 import javax.jms.IllegalStateException JavaDoc;
54 import javax.jms.JMSException JavaDoc;
55 import javax.jms.Message JavaDoc;
56 import javax.jms.MessageConsumer JavaDoc;
57 import javax.jms.MessageListener JavaDoc;
58 import javax.jms.Queue JavaDoc;
59 import javax.jms.QueueReceiver JavaDoc;
60 import javax.jms.Session JavaDoc;
61 import javax.jms.Topic JavaDoc;
62 import javax.jms.TopicSubscriber JavaDoc;
63
64 import org.apache.commons.logging.Log;
65 import org.apache.commons.logging.LogFactory;
66 import org.mr.MantaException;
67 import org.mr.core.protocol.MantaBusMessage;
68 import org.mr.core.protocol.MantaBusMessageConsts;
69 import org.mr.core.util.SynchronizedPriorityQueue;
70 import org.mr.core.util.SynchronizedQueue;
71 import org.mr.core.util.SystemTime;
72 import org.mr.kernel.services.ServiceConsumer;
73
74 /**
75  * This class represents a Message consumer object.
76  * The domain-specific consumers in the Manta system extend this class.
77  * When a user asks for a domain specific consumer, it will not have the
78  * methods declared by the other domain.
79  *
80  * @version 1.0
81  * @since Jan 18, 2004
82  * @author Nimo
83  *
84  */

85 public class MantaMessageConsumer implements MessageConsumer JavaDoc, QueueReceiver JavaDoc,
86         TopicSubscriber JavaDoc, Serializable JavaDoc {
87
88     /**
89      * A constructor for a MessageConsumer object.
90      *
91      * @param clientID - the clientId that the session gives this consumer.
92      * @param sess - the creating session
93      * @param destination - the queue/topic that this consumer is hooked on.
94      * @param messageSelector - a message selector for messages on the system.
95      * @param noLocal - is this topic local?
96      * @throws JMSException - in case the consumer can not be created.
97      */

98     public MantaMessageConsumer(String JavaDoc clientID, MantaSession sess,
99             Destination JavaDoc destination, String JavaDoc messageSelector, boolean noLocal,
100             ServiceConsumer service) throws JMSException JavaDoc {
101
102         if (sess == null)
103             throw new JMSException JavaDoc("MNJMS00060 : CAN NOT CREATE A CONSUMER. SESSION IS NULL.");
104
105         if (messageSelector == null || messageSelector.length()==0)
106             messageSelector = null;
107
108         //A new Selector object is created allowing us to check if the message selector constructs a valid
109
//selector sentence an InvalidSelectorException is thrown otherwise..
110

111         theMessageSelector = messageSelector;
112
113         isClosed = false;
114         theDestination = destination;
115         this.clientID = clientID;
116         this.creatingSession = sess;
117         isNoLocal = noLocal;
118         theService = service;
119         //innerQueue = new SynchronizedQueue();
120
innerQueue = new SynchronizedPriorityQueue(10);
121         log = LogFactory.getLog("MantaMessageConsumer");
122     }//MantaMessageConsumer
123

124     /**
125      * Closes the message consumer.
126      *
127      * <P>This call blocks until a <CODE>receive</CODE> or message listener in
128      * progress has completed. A blocked message consumer <CODE>receive</CODE>
129      * call returns null when this message consumer is closed.
130      *
131      * @exception JMSException if the JMS provider fails to close the consumer
132      * due to some internal error.
133      */

134     public synchronized void close() throws JMSException JavaDoc {
135
136         if (isClosed)
137             return;
138         
139         //nimo - 25-1-2004
140
isClosed = true;
141
142         // wake up all listeners and blocked threads
143
// Don't see anyone licking this object.
144
// Keep this line in the meantime.
145
//notifyAll();
146

147         //tell the creating session to remove this consumer. along with its
148
//service.
149
if (creatingSession != null)
150             creatingSession.removeConsumer(this);
151         
152         creatingSession = null;
153         theMessageSelector = null;
154         theService = null;
155         clientID = null;
156         this.messageListener = null;
157     }
158
159     /**
160      * Gets the message consumer's <CODE>MessageListener</CODE>.
161      *
162      * @return the listener for the message consumer, or null if no listener
163      * is set
164      *
165      * @exception JMSException if the JMS provider fails to get the message
166      * listener due to some internal error.
167      * @see javax.jms.MessageConsumer#setMessageListener
168      */

169     public MessageListener JavaDoc getMessageListener() throws JMSException JavaDoc {
170         checkLegalOperation();
171         return this.messageListener;
172     }//getMessageListener
173

174     /**
175      * Gets this message consumer's message selector expression.
176      *
177      * @return this message consumer's message selector, or null if no
178      * message selector exists for the message consumer (that is, if
179      * the message selector was not set or was set to null or the
180      * empty string)
181      *
182      * @exception JMSException if the JMS provider fails to get the message
183      * selector due to some internal error.
184      */

185     public String JavaDoc getMessageSelector() throws JMSException JavaDoc {
186         checkLegalOperation();
187         return theMessageSelector;
188     }//getMessageSelector
189

190     /**
191      * Receives the next message produced for this message consumer.
192      *
193      * <P>This call blocks indefinitely until a message is produced
194      * or until this message consumer is closed.
195      *
196      * <P>If this <CODE>receive</CODE> is done within a transaction, the
197      * consumer retains the message until the transaction commits.
198      *
199      * @return the next message produced for this message consumer, or
200      * null if this message consumer is concurrently closed
201      *
202      * @exception JMSException if the JMS provider fails to receive the next
203      * message due to some internal error.
204      *
205      */

206     public Message JavaDoc receive() throws JMSException JavaDoc {
207
208         return receive(0);
209     }//receive
210

211     /**
212      * Receives the next message that arrives within the specified
213      * timeout interval.
214      *
215      * <P>This call blocks until a message arrives, the
216      * timeout expires, or this message consumer is closed.
217      * A <CODE>timeout</CODE> of zero never expires, and the call blocks
218      * indefinitely.
219      *
220      * @param timeout the timeout value (in milliseconds)
221      *
222      * @return the next message produced for this message consumer, or
223      * null if the timeout expires or this message consumer is concurrently
224      * closed
225      *
226      * @exception JMSException if the JMS provider fails to receive the next
227      * message due to some internal error.
228      */

229     public Message JavaDoc receive(long timeout) throws JMSException JavaDoc {
230         
231         checkLegalOperation();
232         if (timeout < 0)
233             return null;
234
235         if (timeout == 0)
236             timeout = Long.MAX_VALUE;
237
238         if (creatingSession.isStopped) {
239             long startTime = System.currentTimeMillis();
240             try {
241                 synchronized (creatingSession.lockMonitor) {
242                     creatingSession.lockMonitor.wait(timeout);
243                     //if stopped - wait your timeout (block, as per spec).
244
}
245
246             } catch (InterruptedException JavaDoc e) {
247                 if (log.isErrorEnabled())
248                     log.error("Error while waiting for the session to resume. ", e);
249             }
250             //reduce timeout by time passed, more or less.
251
//so - if session was started again- it will have some more
252
//time to receive.
253
timeout = timeout - (System.currentTimeMillis() - startTime);
254             if (timeout < 1000) //not enough for a receiveNoWait even
255
return null;
256         }
257
258         
259         
260         
261         boolean ackOrHold = true;
262         MantaBusMessage cbm = null;
263         MantaMessage jmsMessage = null;
264         //if we are talking about a topic here, we need to only talk
265
//with our inner queue, i believe.
266
if (this.theDestination instanceof Topic JavaDoc) {
267
268             boolean goOn = true;
269             
270             // remove all messages with expired TTL
271
// synchronized (innerQueue) {
272
// List list = innerQueue.getUnderlineList();
273
// Iterator it = list.iterator();
274
// while(it.hasNext()) {
275
// MantaBusMessage candidate = (MantaBusMessage) it.next();
276
// if (candidate.getValidUntil()<SystemTime.gmtCurrentTimeMillis()) {
277
// it.remove();
278
// }
279
// }
280
// }
281

282             // now find the next message to return.
283
// if we use "noLocal" feature that don't pass messages
284
// that were sent from the local machine - just ack them.
285
while (goOn && timeout >= 0) {
286                 goOn = false;
287                 long start = SystemTime.currentTimeMillis();
288                 cbm = (MantaBusMessage) innerQueue.dequeue(timeout);
289                 if (cbm != null &&
290                     cbm.getValidUntil() < SystemTime.gmtCurrentTimeMillis()) {
291                     goOn = true;
292                     continue;
293                 }
294                 if (cbm != null) {
295                     jmsMessage = convertToJMSMessage(cbm,creatingSession);
296                     if (isBreakingNoLocal(cbm, jmsMessage)) {
297                         goOn = true;
298                         long now = SystemTime.currentTimeMillis();
299                         // we need time reduce the time out by the time passed
300
timeout = timeout - (now - start);
301                         ackOrHold=false;
302                     }
303                     else
304                         ackOrHold=true;
305                         //creatingSession.ackOrHold(cbm);
306
}
307             }// while
308

309         } else { //not a topic receive at all
310
try {
311                 //see - if recover and stuff - we may have us a message here.
312

313                 synchronized (innerQueue) {
314                     if (innerQueue.size() > 0)
315                         cbm = (MantaBusMessage) innerQueue.dequeue(timeout);
316                 }
317                 // can replace previous code with the following? same result?
318
//cbm = (MantaBusMessage) innerQueue.dequeueNoBlock();
319

320                 if (cbm == null){
321                     //no innerQueue
322
cbm = creatingSession.receive(this.getService(), timeout);
323                 }
324                 else {
325                     creatingSession.startLocalTransactionIfNeeded();
326                 }
327                     
328                 //receive will ack.
329
if (cbm != null) {
330                     jmsMessage = convertToJMSMessage(cbm, creatingSession);
331                     if (jmsMessage == null) {
332                         //creatingSession.ackOrHold(cbm);
333
ackOrHold = false;
334                     }
335                 }
336             } catch (MantaException me) {
337                 throw new JMSException JavaDoc("MNJMS00062 : METHOD receive() FAILED INTERNALLY. ERROR TEXT : "+me.getMessage());
338             }
339         }
340
341         if (cbm == null) {
342             return null;
343         }
344     
345         jmsMessage.setWriteableState(false);
346         if (creatingSession.sessionAcknowledgementMode == Session.CLIENT_ACKNOWLEDGE ||
347                 creatingSession.getTransacted()) {
348             //creatingSession.sessionAcknowledgementMode == Session.SESSION_TRANSACTED) {
349
//|| creatingSession.sessionTransactedMode) {
350
if (ackOrHold) {
351                 MantaMessage msg = jmsMessage.makeCopy();
352                 cbm.setPayload(msg);
353             }
354         }
355         
356         //check expiration
357
creatingSession.ackOrHold(cbm);
358         return jmsMessage;
359     
360
361     }//receive
362

363     
364     
365     private boolean isBreakingNoLocal(MantaBusMessage cbm,
366             MantaMessage jmsMessage) throws JMSException JavaDoc {
367         
368         String JavaDoc connId = jmsMessage.getConnId();
369         if (isNoLocal && connId != null) {
370             //if the received message was sent from the same connection it was recieved in-
371
//just ack it and don't actually pass it the message
372
if (connId.equals(creatingSession.owningConnection.getClientID())) {
373                 creatingSession.ackOrHold(cbm);//cannot just ack here.
374
return true;
375             }//if
376
}//if
377
return false;
378     }
379
380     /**
381      * Receives the next message if one is immediately available.
382      *
383      * @return the next message produced for this message consumer, or
384      * null if one is not available
385      *
386      * @exception JMSException if the JMS provider fails to receive the next
387      * message due to some internal error.
388      *
389      */

390     public Message JavaDoc receiveNoWait() throws JMSException JavaDoc {
391
392         return receive(3000L);
393
394     }//receiveNoWait
395

396     /**
397      * Sets the message consumer's <CODE>MessageListener</CODE>.
398      *
399      * <P>Setting the message listener to null is the equivalent of
400      * unsetting the message listener for the message consumer.
401      *
402      * <P>The effect of calling <CODE>MessageConsumer.setMessageListener</CODE>
403      * while messages are being consumed by an existing listener
404      * or the consumer is being used to consume messages synchronously
405      * is undefined.
406      *
407      * @param listener the listener to which the messages are to be
408      * delivered
409      *
410      * @exception JMSException if the JMS provider fails to set the message
411      * listener due to some internal error.
412      * @see javax.jms.MessageConsumer#getMessageListener
413      */

414     public void setMessageListener(MessageListener JavaDoc listener) throws JMSException JavaDoc {
415         checkLegalOperation();
416         
417         //if we had an earlier listener on a queue - deregister.
418
if (this.messageListener != null && theDestination instanceof Queue JavaDoc)
419             this.creatingSession.deregisterFromQueue(this);
420         
421         //Amir - moved from the end of the method
422
this.messageListener = listener;
423         
424         if (listener != null) {
425             //new listener is not null - empty the inner queue to it.
426
synchronized (innerQueue) {
427                 MantaBusMessage mbm;
428                 synchronized(creatingSession.listenersCount) {
429                     if (creatingSession.isClosed||creatingSession.isClosing)
430                         return;
431                     creatingSession.listenersCount.add();
432                 }
433                 
434                 while (!innerQueue.isEmpty()) {
435                     mbm = (MantaBusMessage) innerQueue.dequeue();
436                     if (mbm.getValidUntil()>SystemTime.gmtCurrentTimeMillis()) {
437                         creatingSession.ackOrHold(mbm);
438                         listener.onMessage(convertToJMSMessage(mbm, creatingSession));
439                     }
440                 }
441                 
442                 synchronized(creatingSession.listenersCount) {
443                     creatingSession.listenersCount.remove();
444                     if (creatingSession.listenersCount.val()==0) {
445                         creatingSession.listenersCount.notifyAll();
446                     }
447                 }
448             
449                 if (theDestination instanceof Queue JavaDoc)
450                     creatingSession.listenToQueue(this);
451             }
452         }
453     }
454
455     /*
456      * used by the MantaSession to feed a message to the listener.
457      * this method makes sure to convert the message to a JMS one.
458      */

459     void feedMessageListener(MantaBusMessage mbm)
460             throws JMSException JavaDoc {
461         
462         checkLegalOperation();
463         MantaMessage message = convertToJMSMessage(mbm, creatingSession);
464         if (messageListener != null) {
465             String JavaDoc connId = message.getConnId();
466             // Topics have a feature that we can ignore messages sent from
467
// the local machine. In this case we just ack the message without
468
// passing it further. This is the place when we check this condition.
469
// isNoLocal has to be true, the destination has to be a topic,
470
// and the connection id has to be the same.
471
if (isNoLocal && connId != null && this.theDestination instanceof Topic JavaDoc) {
472                 if (connId.equals(creatingSession.owningConnection.getClientID())) {
473                     creatingSession.ackMessage(mbm); //only for noLocal got local
474
return;
475                 }//if
476
}//if
477

478             if (creatingSession.sessionAcknowledgementMode==Session.CLIENT_ACKNOWLEDGE||
479                     creatingSession.sessionAcknowledgementMode==Session.SESSION_TRANSACTED) {
480                 MantaMessage copy = message.makeCopy();
481                 mbm.setPayload(copy);
482             }
483             
484             // start local transaction if needed
485
this.creatingSession.startLocalTransactionIfNeeded();
486             
487             // lock resources before passing the message further
488
creatingSession.ackOrHold(mbm);
489
490             // passing the message to the message listener
491
if (this.messageListener != null) {
492                 try {
493                     this.messageListener.onMessage(message);
494                 } catch (Throwable JavaDoc t) {
495                     log.error("Exception in message listener: " +
496                               t.getMessage());
497                 }
498             }
499             else {
500                 log.error("Message arrived to a consumer with no registered listener. MessageID="+message.getJMSMessageID());
501             }
502         }
503         //else - we are talking about a consumer with no listener.
504
else
505             innerQueue.enqueue(mbm);
506     }
507
508     /*
509      * implementation of queue/topic stuff.
510      */

511
512     /**
513      * Get the queue for this consumer
514      */

515     public Queue JavaDoc getQueue() throws JMSException JavaDoc {
516         checkLegalOperation();
517         return (Queue JavaDoc) getDestination();
518     }
519
520     /**
521      * Gets whether this is a local receiver
522      */

523     public boolean getNoLocal() throws JMSException JavaDoc {
524         checkLegalOperation();
525         return isNoLocal;
526     }
527
528     /**
529      * gets the topic for this consumer.
530      */

531
532     public Topic JavaDoc getTopic() throws JMSException JavaDoc {
533         checkLegalOperation();
534         return (Topic JavaDoc) getDestination();
535     }
536
537     private void checkLegalOperation() throws JMSException JavaDoc {
538         if (isClosed)
539             throw new IllegalStateException JavaDoc("MNJMS00061 : OPERATION UNALLOWED. METHOD FAILED: checkLegalOperation(). REASON : CONSUMER IS CLOSED.");
540             
541     }
542
543     /*
544      * This method checks to see if the MantaBusMessage is a valid JMS message
545      * and if so it converts the MantaBusMessage to a JMS message.
546      *
547      * @param message the MantaBusMessage to be converted
548      * @returnthe newly converted JMS message
549      */

550     static MantaMessage convertToJMSMessage(MantaBusMessage message,
551             MantaSession session) throws JMSException JavaDoc {
552         
553         MantaMessage payload = null;
554         if (message != null) {
555             if (message.getHeader(
556                     MantaBusMessageConsts.HEADER_NAME_PAYLOAD_TYPE).equals(
557                     MantaBusMessageConsts.PAYLOAD_TYPE_JMS))
558                 payload = (MantaMessage) (message.getPayload());
559
560         }//if
561
if (payload == null)
562             return null;
563
564         MantaMessage result = null;
565         if (payload.connId != null) {
566             //this messages was genereted in this VM we need to make a copy
567
result = payload.makeCopy();
568         } else {
569             result = payload;
570         }
571
572         result.creatingSession=session;
573         //the message was just received, so its saved props and body are reset.
574
result.flags = result.flags & 0x0FF9FFFF;
575         if (result.getJMSType().equals(MantaMessage.BYT_M)) {
576             MantaBytesMessage r = (MantaBytesMessage)result;
577             r.reset();
578         }
579         else if (result.getJMSType().equals(MantaMessage.STR_M))
580         {
581             MantaStreamMessage r = (MantaStreamMessage)result;
582             r.reset();
583         }
584         
585         result.setWriteableState(true);
586
587         //handle recover. and Redelivered
588
if (message.isRedelivered())
589             result.flags=result.flags|MantaMessage.IS_REDELIVERED;
590         
591         result.setWriteableState(false);
592         return result;
593     }//convertToJMSMessage
594

595     Destination JavaDoc getDestination() throws JMSException JavaDoc {
596         checkLegalOperation();
597         return theDestination;
598     }
599
600     String JavaDoc getClientId() {
601         return clientID;
602     }
603
604     ServiceConsumer getService() {
605         return theService;
606     }
607
608     //The destination object associated with the consumer
609
protected Destination JavaDoc theDestination = null;
610
611     //The message selector object associated with the consumer
612
protected String JavaDoc theMessageSelector = null;
613
614     // if true, and the destination is a topic, inhibits the
615
// delivery of messages published by its own connection. The
616
// behavior for <CODE>NoLocal</CODE> is not specified if the
617
// destination is a queue.
618
protected boolean isNoLocal;
619
620     // The creating session of this Consumer.
621
protected MantaSession creatingSession = null;
622
623     //is this consumer closed ?
624
boolean isClosed;
625
626     //specific client id for this consumer.
627
protected String JavaDoc clientID;
628
629     //The service that this consumer is on - for closing time.
630
protected ServiceConsumer theService;
631
632     //the listener
633
private MessageListener JavaDoc messageListener = null;
634
635     private SynchronizedQueue innerQueue;
636     
637     private Log log;
638     
639 }//MantaMessageConsumer
Popular Tags