KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > mq > server > BasicQueue


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.mq.server;
23
24 import java.util.ArrayList JavaDoc;
25 import java.util.Date JavaDoc;
26 import java.util.HashMap JavaDoc;
27 import java.util.HashSet JavaDoc;
28 import java.util.Iterator JavaDoc;
29 import java.util.LinkedList JavaDoc;
30 import java.util.List JavaDoc;
31 import java.util.Map JavaDoc;
32 import java.util.Set JavaDoc;
33 import java.util.SortedSet JavaDoc;
34 import java.util.TreeSet JavaDoc;
35
36 import javax.jms.IllegalStateException JavaDoc;
37 import javax.jms.JMSException JavaDoc;
38
39 import org.jboss.logging.Logger;
40 import org.jboss.mq.AcknowledgementRequest;
41 import org.jboss.mq.DestinationFullException;
42 import org.jboss.mq.SpyDestination;
43 import org.jboss.mq.SpyJMSException;
44 import org.jboss.mq.SpyMessage;
45 import org.jboss.mq.Subscription;
46 import org.jboss.mq.pm.Tx;
47 import org.jboss.mq.pm.TxManager;
48 import org.jboss.mq.selectors.Selector;
49 import org.jboss.util.NestedRuntimeException;
50 import org.jboss.util.timeout.Timeout;
51 import org.jboss.util.timeout.TimeoutTarget;
52
53 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
54 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
55
56 /**
57  * This class represents a queue which provides it's messages exclusively to one
58  * consumer at a time.<p>
59  *
60  * Notes about synchronization: Much of the work is synchronized on
61  * the receivers or messages depending on the work performed.
62  * However, anything to do with unacknowledged messages and removed
63  * subscriptions must be done synchronized on both (receivers first).
64  * This is because there are multiple entry points with the possibility
65  * that a message acknowledgement (or NACK) is being processed at
66  * the same time as a network failure removes the subscription.
67  *
68  *
69  * @author Hiram Chirino (Cojonudo14@hotmail.com)
70  * @author Norbert Lataille (Norbert.Lataille@m4x.org)
71  * @author David Maplesden (David.Maplesden@orion.co.nz)
72  * @author Adrian Brock (Adrian@jboss.org)
73  * @created August 16, 2001
74  * @version $Revision: 57131 $
75  */

76 public class BasicQueue
77 {
78    static final Logger log = Logger.getLogger(BasicQueue.class);
79
80    /** List of messages waiting to be dispatched<p>
81        synchronized access on itself */

82    SortedSet JavaDoc messages = new TreeSet JavaDoc();
83
84    /** Events by message id */
85    ConcurrentHashMap events = new ConcurrentHashMap();
86    
87    /** The scheduled messages */
88    CopyOnWriteArraySet scheduledMessages = new CopyOnWriteArraySet();
89
90    /** The JMSServer object */
91    JMSDestinationManager server;
92
93    /** The subscribers waiting for messages - synchronized access on itself */
94    Receivers receivers;
95
96    /** The description used to seperate persistence for multiple subscriptions to a topic */
97    String JavaDoc description;
98
99    /** Simple Counter for gathering message add statistic data */
100    MessageCounter counter;
101
102    /** Unacknowledged messages AcknowledgementRequest -> UnackedMessageInfo<p>
103        synchronized access on receivers and messages */

104    HashMap JavaDoc unacknowledgedMessages = new HashMap JavaDoc();
105    /** Unacknowledged messages MessageRef -> UnackedMessageInfo <p>
106        synchronized access on receivers and messages */

107    HashMap JavaDoc unackedByMessageRef = new HashMap JavaDoc();
108    /** Unacknowledged messages Subscription -> UnackedMessageInfo <p>
109        synchronized access on receivers and messages */

110    HashMap JavaDoc unackedBySubscription = new HashMap JavaDoc();
111
112    /** Subscribers <p>
113     synchronized access on receivers */

114    HashSet JavaDoc subscribers = new HashSet JavaDoc();
115    
116    /** Removed subscribers <p>
117        synchronized access on receivers and messages */

118    HashSet JavaDoc removedSubscribers = new HashSet JavaDoc();
119    
120    /** The basic queue parameters */
121    BasicQueueParameters parameters;
122
123    /** Have we been stopped */
124    boolean stopped = false;
125
126    /**
127     * Construct a new basic queue
128     *
129     * @param server the destination manager
130     * @param description a description to uniquely identify the queue
131     * @param parameters the basic queue parameters
132     * @throws JMSException for any error
133     */

134    public BasicQueue(JMSDestinationManager server, String JavaDoc description, BasicQueueParameters parameters)
135       throws JMSException JavaDoc
136    {
137       this.server = server;
138       this.description = description;
139       this.parameters = parameters;
140       
141       Class JavaDoc receiversImpl = parameters.receiversImpl;
142       if (receiversImpl == null)
143          receiversImpl = ReceiversImpl.class;
144       
145       try
146       {
147          receivers = (Receivers) receiversImpl.newInstance();
148       }
149       catch (Throwable JavaDoc t)
150       {
151          throw new SpyJMSException("Error instantiating receivers implementation: " + receiversImpl, t);
152       }
153    }
154
155    /**
156     * Retrieve the unique description for this queue
157     *
158     * @return the description
159     */

160    public String JavaDoc getDescription()
161    {
162       return description;
163    }
164
165    /**
166     * Retrieve the number of receivers waiting for a message
167     *
168     * @return the number of receivers
169     */

170    public int getReceiversCount()
171    {
172       return receivers.size();
173    }
174
175    /**
176     * Retrieve the receivers waiting for a message
177     *
178     * @return an array of subscriptions
179     */

180    public ArrayList JavaDoc getReceivers()
181    {
182       synchronized (receivers)
183       {
184          return receivers.listReceivers();
185       }
186    }
187
188    /**
189     * Test whether the queue is in use
190     *
191     * @return true when there are subscribers
192     */

193    public boolean isInUse()
194    {
195       synchronized (receivers)
196       {
197          return subscribers.size() > 0;
198       }
199    }
200
201    /**
202     * Add a receiver to the queue
203     *
204     * @param sub the subscription to add
205     * @throws JMSException for any error
206     */

207    public void addReceiver(Subscription sub) throws JMSException JavaDoc
208    {
209       boolean trace = log.isTraceEnabled();
210       if (trace)
211          log.trace("addReceiver " + sub + " " + this);
212       
213       MessageReference found = null;
214       synchronized (messages)
215       {
216          if (messages.size() != 0)
217          {
218             for (Iterator JavaDoc it = messages.iterator(); it.hasNext();)
219             {
220                MessageReference message = (MessageReference) it.next();
221                try
222                {
223                   if (message.isExpired())
224                   {
225                      it.remove();
226                      expireMessageAsync(message);
227                   }
228                   else if (sub.accepts(message.getHeaders()))
229                   {
230                      //queue message for sending to this sub
231
it.remove();
232                      found = message;
233                      break;
234                   }
235                }
236                catch (JMSException JavaDoc ignore)
237                {
238                   log.info("Caught unusual exception in addToReceivers.", ignore);
239                }
240             }
241          }
242       }
243       if (found != null)
244          queueMessageForSending(sub, found);
245       else
246          addToReceivers(sub);
247    }
248
249    /**
250     * Get the subscribers
251     *
252     * @return the subscribers
253     */

254    public Set JavaDoc getSubscribers()
255    {
256       synchronized (receivers)
257       {
258          return (Set JavaDoc) subscribers.clone();
259       }
260    }
261    
262    /**
263     * Add a subscription from the queue
264     *
265     * @param sub the subscription to add
266     * @throws JMSException for any error
267     */

268    public void addSubscriber(Subscription sub) throws JMSException JavaDoc
269    {
270       boolean trace = log.isTraceEnabled();
271       if (trace)
272          log.trace("addSubscriber " + sub + " " + this);
273       synchronized (receivers)
274       {
275          if (stopped)
276             throw new IllegalStateException JavaDoc("The destination is stopped " + getDescription());
277          subscribers.add(sub);
278       }
279    }
280       
281    /**
282     * Removes a subscription from the queue
283     *
284     * @param sub the subscription to remove
285     */

286    public void removeSubscriber(Subscription sub)
287    {
288       boolean trace = log.isTraceEnabled();
289       if (trace)
290          log.trace("removeSubscriber " + sub + " " + this);
291       synchronized (receivers)
292       {
293          removeReceiver(sub);
294          synchronized (messages)
295          {
296             if (hasUnackedMessages(sub))
297             {
298                if (trace)
299                   log.trace("Delaying removal of subscriber is has unacked messages " + sub);
300                removedSubscribers.add(sub);
301             }
302             else
303             {
304                if (trace)
305                   log.trace("Removing subscriber " + sub);
306                subscribers.remove(sub);
307                ((ClientConsumer) sub.clientConsumer).removeRemovedSubscription(sub.subscriptionId);
308             }
309          }
310       }
311    }
312
313    /**
314     * Retrieve the queue depth
315     *
316     * @return the number of messages in the queue
317     */

318    public int getQueueDepth()
319    {
320       return messages.size();
321    }
322
323    /**
324     * Returns the number of scheduled messages in the queue
325     *
326     * @return the scheduled message count
327     */

328    public int getScheduledMessageCount()
329    {
330       return scheduledMessages.size();
331    }
332
333    /**
334     * Returns the number of in process messages for the queue
335     *
336     * @return the in process count
337     */

338    public int getInProcessMessageCount()
339    {
340       synchronized (messages)
341       {
342          return unacknowledgedMessages.size();
343       }
344    }
345
346    /**
347     * Add a message to the queue
348     *
349     * @param mes the message reference
350     * @param txId the transaction
351     * @throws JMSException for any error
352     */

353    public void addMessage(MessageReference mes, Tx txId) throws JMSException JavaDoc
354    {
355       boolean trace = log.isTraceEnabled();
356       if (trace)
357          log.trace("addMessage " + mes + " " + txId + " " + this);
358
359       try
360       {
361          synchronized (receivers)
362          {
363             if (stopped)
364                throw new IllegalStateException JavaDoc("The destination is stopped " + getDescription());
365          }
366          
367          if (parameters.maxDepth > 0)
368          {
369             synchronized (messages)
370             {
371                if (messages.size() >= parameters.maxDepth)
372                {
373                   dropMessage(mes);
374                   String JavaDoc message = "Maximum size " + parameters.maxDepth +
375                      " exceeded for " + description;
376                   log.warn(message);
377                   throw new DestinationFullException(message);
378                }
379             }
380          }
381
382          performOrPrepareAddMessage(mes, txId);
383       }
384       catch (Throwable JavaDoc t)
385       {
386          String JavaDoc error = "Error in addMessage " + mes;
387          log.trace(error, t);
388          dropMessage(mes, txId);
389          SpyJMSException.rethrowAsJMSException(error, t);
390       }
391    }
392
393    /**
394     * Either perform or prepare the add message
395     *
396     * @param mes the message reference
397     * @param txId the transaction id
398     * @throws Exception for any error
399     */

400    protected void performOrPrepareAddMessage(MessageReference mes, Tx txId) throws Exception JavaDoc
401    {
402       TxManager txManager = server.getPersistenceManager().getTxManager();
403       
404       // The message is removed from the cache on a rollback
405
Runnable JavaDoc task = new AddMessagePostRollBackTask(mes);
406       txManager.addPostRollbackTask(txId, task);
407
408       // The message gets added to the queue after the transaction commits
409
task = new AddMessagePostCommitTask(mes);
410       txManager.addPostCommitTask(txId, task);
411    }
412    
413    /**
414     * Restores a message.
415     *
416     * @param mes the message reference
417     */

418    public void restoreMessage(MessageReference mes)
419    {
420       restoreMessage(mes, null, Tx.UNKNOWN);
421    }
422
423    /**
424     * Restores a message.
425     *
426     * @param mes the message reference
427     * @param txid the transaction id
428     * @param type the type of restoration
429     */

430    public void restoreMessage(MessageReference mes, Tx txid, int type)
431    {
432       boolean trace = log.isTraceEnabled();
433       if (trace)
434          log.trace("restoreMessage " + mes + " " + this + " txid=" + txid + " type=" + type);
435
436       try
437       {
438          if (txid == null)
439          {
440             internalAddMessage(mes);
441          }
442          else if (type == Tx.ADD)
443          {
444             performOrPrepareAddMessage(mes, txid);
445          }
446          else if (type == Tx.REMOVE)
447          {
448             performOrPrepareAcknowledgeMessage(mes, txid);
449          }
450          else
451          {
452             throw new IllegalStateException JavaDoc("Unknown restore type " + type + " for message " + mes + " txid=" + txid);
453          }
454       }
455       catch (RuntimeException JavaDoc e)
456       {
457          throw e;
458       }
459       catch (Exception JavaDoc e)
460       {
461          throw new NestedRuntimeException("Unable to restore message " + mes, e);
462       }
463    }
464
465    /**
466     * Nacks a message.
467     */

468    protected void nackMessage(MessageReference message)
469    {
470       if (log.isTraceEnabled())
471          log.trace("Restoring message: " + message);
472
473       try
474       {
475          message.redelivered();
476          // Set redelivered, vendor-specific flags
477
message.invalidate();
478          // Update the persistent message outside the transaction
479
// We want to know the message might have been delivered regardless
480
if (message.isPersistent())
481             server.getPersistenceManager().update(message, null);
482       }
483       catch (JMSException JavaDoc e)
484       {
485          log.error("Caught unusual exception in nackMessage for " + message, e);
486       }
487
488       internalAddMessage(message);
489    }
490
491    /**
492     * Browse the queue
493     *
494     * @param selector the selector to apply, pass null for
495     * all messages
496     * @return the messages
497     * @throws JMSException for any error
498     */

499    public SpyMessage[] browse(String JavaDoc selector) throws JMSException JavaDoc
500    {
501       if (selector == null)
502       {
503          SpyMessage list[];
504          synchronized (messages)
505          {
506             list = new SpyMessage[messages.size()];
507             Iterator JavaDoc iter = messages.iterator();
508             for (int i = 0; iter.hasNext(); i++)
509                list[i] = ((MessageReference) iter.next()).getMessageForDelivery();
510          }
511          return list;
512       }
513       else
514       {
515          Selector s = new Selector(selector);
516          LinkedList JavaDoc selection = new LinkedList JavaDoc();
517
518          synchronized (messages)
519          {
520             Iterator JavaDoc i = messages.iterator();
521             while (i.hasNext())
522             {
523                MessageReference m = (MessageReference) i.next();
524                if (s.test(m.getHeaders()))
525                   selection.add(m.getMessageForDelivery());
526             }
527          }
528
529          SpyMessage list[];
530          list = new SpyMessage[selection.size()];
531          list = (SpyMessage[]) selection.toArray(list);
532          return list;
533       }
534    }
535
536    /**
537     * Browse the scheduled messages
538     *
539     * @param selector the selector to apply, pass null for
540     * all messages
541     * @return the messages
542     * @throws JMSException for any error
543     */

544    public List JavaDoc browseScheduled(String JavaDoc selector) throws JMSException JavaDoc
545    {
546       if (selector == null)
547       {
548          ArrayList JavaDoc list;
549          synchronized (messages)
550          {
551             list = new ArrayList JavaDoc(scheduledMessages.size());
552             Iterator JavaDoc iter = scheduledMessages.iterator();
553             while (iter.hasNext())
554             {
555                MessageReference ref = (MessageReference) iter.next();
556                list.add(ref.getMessageForDelivery());
557             }
558          }
559          return list;
560       }
561       else
562       {
563          Selector s = new Selector(selector);
564          LinkedList JavaDoc selection = new LinkedList JavaDoc();
565
566          synchronized (messages)
567          {
568             Iterator JavaDoc iter = scheduledMessages.iterator();
569             while (iter.hasNext())
570             {
571                MessageReference ref = (MessageReference) iter.next();
572                if (s.test(ref.getHeaders()))
573                   selection.add(ref.getMessageForDelivery());
574             }
575          }
576          
577          return selection;
578       }
579    }
580
581    /**
582     * Browse the in process messages
583     *
584     * @param selector the selector to apply, pass null for
585     * all messages
586     * @return the messages
587     * @throws JMSException for any error
588     */

589    public List JavaDoc browseInProcess(String JavaDoc selector) throws JMSException JavaDoc
590    {
591       if (selector == null)
592       {
593          ArrayList JavaDoc list;
594          synchronized (messages)
595          {
596             list = new ArrayList JavaDoc(unacknowledgedMessages.size());
597             Iterator JavaDoc iter = unacknowledgedMessages.values().iterator();
598             while (iter.hasNext())
599             {
600                UnackedMessageInfo unacked = (UnackedMessageInfo) iter.next();
601                MessageReference ref = unacked.messageRef;
602                list.add(ref.getMessageForDelivery());
603             }
604          }
605          return list;
606       }
607       else
608       {
609          Selector s = new Selector(selector);
610          LinkedList JavaDoc selection = new LinkedList JavaDoc();
611
612          synchronized (messages)
613          {
614             Iterator JavaDoc iter = unacknowledgedMessages.values().iterator();
615             while (iter.hasNext())
616             {
617                UnackedMessageInfo unacked = (UnackedMessageInfo) iter.next();
618                MessageReference ref = unacked.messageRef;
619                if (s.test(ref.getHeaders()))
620                   selection.add(ref.getMessageForDelivery());
621             }
622          }
623          
624          return selection;
625       }
626    }
627
628    /**
629     * Receive a message from the queue
630     *
631     * @param sub the subscription requiring a message
632     * @param wait whether to wait for a message
633     * @return the message
634     * @throws JMSException for any error
635     */

636    public SpyMessage receive(Subscription sub, boolean wait) throws JMSException JavaDoc
637    {
638       boolean trace = log.isTraceEnabled();
639       if (trace)
640          log.trace("receive " + sub + " wait=" + wait + " " + this);
641
642       MessageReference messageRef = null;
643       synchronized (receivers)
644       {
645          if (stopped)
646             throw new IllegalStateException JavaDoc("The destination is stopped " + getDescription());
647          // If the subscription is not picky, the first message will be it
648
if (sub.getSelector() == null && sub.noLocal == false)
649          {
650             synchronized (messages)
651             {
652                // find a non-expired message
653
while (messages.size() != 0)
654                {
655                   messageRef = (MessageReference) messages.first();
656                   messages.remove(messageRef);
657
658                   if (messageRef.isExpired())
659                   {
660                      expireMessageAsync(messageRef);
661                      messageRef = null;
662                   }
663                   else
664                      break;
665                }
666             }
667          }
668          else
669          {
670             // The subscription is picky, so we have to iterate.
671
synchronized (messages)
672             {
673                Iterator JavaDoc i = messages.iterator();
674                while (i.hasNext())
675                {
676                   MessageReference mr = (MessageReference) i.next();
677                   if (mr.isExpired())
678                   {
679                      i.remove();
680                      expireMessageAsync(mr);
681                   }
682                   else if (sub.accepts(mr.getHeaders()))
683                   {
684                      messageRef = mr;
685                      i.remove();
686                      break;
687                   }
688                }
689             }
690          }
691
692          if (messageRef == null)
693          {
694             if (wait)
695                addToReceivers(sub);
696          }
697          else
698          {
699             setupMessageAcknowledgement(sub, messageRef);
700          }
701       }
702
703       if (messageRef == null)
704          return null;
705       return messageRef.getMessageForDelivery();
706    }
707
708    /**
709     * Acknowledge a message
710     *
711     * @param item the acknowledgement request
712     * @param txId the transaction
713     * @throws JMSException for any error
714     */

715    public void acknowledge(AcknowledgementRequest item, Tx txId) throws JMSException JavaDoc
716    {
717       boolean trace = log.isTraceEnabled();
718       if (trace)
719          log.trace("acknowledge " + item + " " + txId + " " + this);
720
721       UnackedMessageInfo unacked = null;
722       synchronized (messages)
723       {
724          unacked = (UnackedMessageInfo) unacknowledgedMessages.remove(item);
725          if (unacked == null)
726             return;
727          unackedByMessageRef.remove(unacked.messageRef);
728          HashMap JavaDoc map = (HashMap JavaDoc) unackedBySubscription.get(unacked.sub);
729          if (map != null)
730             map.remove(unacked.messageRef);
731          if (map == null || map.isEmpty())
732            unackedBySubscription.remove(unacked.sub);
733       }
734
735       MessageReference m = unacked.messageRef;
736
737       // Was it a negative acknowledge??
738
if (!item.isAck)
739       {
740          Runnable JavaDoc task = new RestoreMessageTask(m);
741          server.getPersistenceManager().getTxManager().addPostCommitTask(txId, task);
742          server.getPersistenceManager().getTxManager().addPostRollbackTask(txId, task);
743       }
744       else
745       {
746          try
747          {
748             if (m.isPersistent())
749                server.getPersistenceManager().remove(m, txId);
750          }
751          catch (Throwable JavaDoc t)
752          {
753             // Something is wrong with the persistence manager,
754
// force a NACK with a rollback/error
755
Runnable JavaDoc task = new RestoreMessageTask(m);
756             server.getPersistenceManager().getTxManager().addPostCommitTask(txId, task);
757             server.getPersistenceManager().getTxManager().addPostRollbackTask(txId, task);
758             SpyJMSException.rethrowAsJMSException("Error during ACK ref=" + m, t);
759          }
760
761          performOrPrepareAcknowledgeMessage(m, txId);
762       }
763
764       synchronized (receivers)
765       {
766          synchronized (messages)
767          {
768             checkRemovedSubscribers(unacked.sub);
769          }
770       }
771    }
772
773    /**
774     * Either perform or prepare the acknowledge message
775     *
776     * @param mes the message reference
777     * @param txId the transaction id
778     * @throws Exception for any error
779     */

780    protected void performOrPrepareAcknowledgeMessage(MessageReference mes, Tx txId) throws JMSException JavaDoc
781    {
782       TxManager txManager = server.getPersistenceManager().getTxManager();
783       
784       // The message is restored to the queue on a rollback
785
Runnable JavaDoc task = new RestoreMessageTask(mes);
786       txManager.addPostRollbackTask(txId, task);
787
788       // The message is fully removed after the transaction commits
789
task = new RemoveMessageTask(mes);
790       txManager.addPostCommitTask(txId, task);
791    }
792
793    /**
794     * Nack all messages for a subscription
795     *
796     * @param sub the subscription
797     */

798    public void nackMessages(Subscription sub)
799    {
800       boolean trace = log.isTraceEnabled();
801       if (trace)
802          log.trace("nackMessages " + sub + " " + this);
803
804       // Send nacks for unacknowledged messages
805
synchronized (receivers)
806       {
807          synchronized (messages)
808          {
809             int count = 0;
810             HashMap JavaDoc map = (HashMap JavaDoc) unackedBySubscription.get(sub);
811             if (map != null)
812             {
813                Iterator JavaDoc i = ((HashMap JavaDoc) map.clone()).values().iterator();
814                while (i.hasNext())
815                {
816                   AcknowledgementRequest item = (AcknowledgementRequest) i.next();
817                   try
818                   {
819                      acknowledge(item, null);
820                      count++;
821                   }
822                   catch (JMSException JavaDoc ignore)
823                   {
824                      log.debug("Unable to nack message: " + item, ignore);
825                   }
826                }
827                if (log.isDebugEnabled())
828                   log.debug("Nacked " + count + " messages for removed subscription " + sub);
829             }
830          }
831       }
832    }
833
834    public void removeAllMessages() throws JMSException JavaDoc
835    {
836       boolean trace = log.isTraceEnabled();
837       if (trace)
838          log.trace("removeAllMessages " + this);
839
840       // Drop scheduled messages
841
for (Iterator JavaDoc i = events.entrySet().iterator(); i.hasNext();)
842       {
843          Map.Entry JavaDoc entry = (Map.Entry JavaDoc) i.next();
844          MessageReference message = (MessageReference) entry.getKey();
845          Timeout timeout = (Timeout) entry.getValue();
846          if (timeout != null)
847          {
848             timeout.cancel();
849             i.remove();
850             dropMessage(message);
851          }
852       }
853       scheduledMessages.clear();
854
855       synchronized (receivers)
856       {
857          synchronized (messages)
858          {
859             Iterator JavaDoc i = ((HashMap JavaDoc) unacknowledgedMessages.clone()).keySet().iterator();
860             while (i.hasNext())
861             {
862                AcknowledgementRequest item = (AcknowledgementRequest) i.next();
863                try
864                {
865                   acknowledge(item, null);
866                }
867                catch (JMSException JavaDoc ignore)
868                {
869                }
870             }
871
872             // Remove all remaining messages
873
i = messages.iterator();
874             while (i.hasNext())
875             {
876                MessageReference message = (MessageReference) i.next();
877                i.remove();
878                dropMessage(message);
879             }
880          }
881       }
882    }
883    
884    public void stop()
885    {
886       HashSet JavaDoc subs;
887       synchronized (receivers)
888       {
889          stopped = true;
890          subs = new HashSet JavaDoc(subscribers);
891          if (log.isTraceEnabled())
892             log.trace("Stopping " + this + " with subscribers " + subs);
893          clearEvents();
894       }
895       
896       for (Iterator JavaDoc i = subs.iterator(); i.hasNext();)
897       {
898          Subscription sub = (Subscription) i.next();
899          ClientConsumer consumer = (ClientConsumer) sub.clientConsumer;
900          try
901          {
902             consumer.removeSubscription(sub.subscriptionId);
903          }
904          catch (Throwable JavaDoc t)
905          {
906             log.warn("Error during stop - removing subscriber " + sub, t);
907          }
908          nackMessages(sub);
909       }
910       
911       MessageCache cache = server.getMessageCache();
912       synchronized (messages)
913       {
914          for (Iterator JavaDoc i = messages.iterator(); i.hasNext();)
915          {
916             MessageReference message = (MessageReference) i.next();
917             try
918             {
919                cache.remove(message);
920             }
921             catch (JMSException JavaDoc ignored)
922             {
923                log.trace("Ignored error removing message from cache", ignored);
924             }
925          }
926       }
927       
928       // Help the garbage collector
929
messages.clear();
930       unacknowledgedMessages.clear();
931       unackedByMessageRef.clear();
932       unackedBySubscription.clear();
933       subscribers.clear();
934       removedSubscribers.clear();
935    }
936    
937    /**
938     * Create message counter object
939     *
940     * @param name topic/queue name
941     * @param subscription topic subscription
942     * @param topic topic flag
943     * @param durable durable subscription flag
944     * @param daycountmax message history day count limit
945     * 0: disabled,
946     * >0: max day count,
947     * <0: unlimited
948     */

949    public void createMessageCounter(String JavaDoc name, String JavaDoc subscription, boolean topic, boolean durable, int daycountmax)
950    {
951       // create message counter object
952
counter = new MessageCounter(name, subscription, this, topic, durable, daycountmax);
953    }
954
955    /**
956     * Get message counter object
957     *
958     * @return MessageCounter message counter object or null
959     */

960    public MessageCounter getMessageCounter()
961    {
962       return counter;
963    }
964    
965    public String JavaDoc toString()
966    {
967       return super.toString() + "{id=" + description + '}';
968    }
969
970    /**
971     * Clear all the events
972     */

973    protected void clearEvents()
974    {
975       for (Iterator JavaDoc i = events.entrySet().iterator(); i.hasNext();)
976       {
977          Map.Entry JavaDoc entry = (Map.Entry JavaDoc) i.next();
978          Timeout timeout = (Timeout) entry.getValue();
979          if (timeout != null)
980          {
981             timeout.cancel();
982             i.remove();
983          }
984       }
985       scheduledMessages.clear();
986    }
987    
988    /**
989     * Clear the event for a message
990     *
991     * @param message the message reference
992     */

993    protected void clearEvent(MessageReference message)
994    {
995       Timeout timeout = (Timeout) events.remove(message);
996       if (timeout != null)
997          timeout.cancel();
998       scheduledMessages.remove(message);
999    }
1000   
1001   /**
1002    * Add a receiver
1003    *
1004    * @param sub the receiver to add
1005    */

1006   protected void addToReceivers(Subscription sub) throws JMSException JavaDoc
1007   {
1008      boolean trace = log.isTraceEnabled();
1009      if (trace)
1010         log.trace("addReceiver " + " " + sub + " " + this);
1011
1012      synchronized (receivers)
1013      {
1014         if (stopped)
1015            throw new IllegalStateException JavaDoc("The destination is stopped " + getDescription());
1016         receivers.add(sub);
1017      }
1018   }
1019
1020   /**
1021    * Remove a receiver
1022    *
1023    * @param sub the receiver to remove
1024    */

1025   protected void removeReceiver(Subscription sub)
1026   {
1027      boolean trace = log.isTraceEnabled();
1028      if (trace)
1029         log.trace("removeReceiver " + " " + sub + " " + this);
1030
1031      synchronized (receivers)
1032      {
1033         receivers.remove(sub);
1034      }
1035   }
1036
1037   private void addTimeout(MessageReference message, TimeoutTarget t, long ts)
1038   {
1039      Timeout timeout = server.getTimeoutFactory().schedule(ts, t);
1040      events.put(message, timeout);
1041   }
1042
1043   /**
1044    * Add a message
1045    *
1046    * @param message the message to add
1047    */

1048   private void internalAddMessage(MessageReference message)
1049   {
1050      boolean trace = log.isTraceEnabled();
1051      if (trace)
1052         log.trace("internalAddMessage " + " " + message + " " + this);
1053
1054      // If scheduled, put in timer queue
1055
long ts = message.messageScheduledDelivery;
1056      if (ts > 0 && ts > System.currentTimeMillis())
1057      {
1058         scheduledMessages.add(message);
1059         addTimeout(message, new EnqueueMessageTask(message), ts);
1060         if (trace)
1061            log.trace("scheduled message at " + new Date JavaDoc(ts) + ": " + message);
1062         // Can't deliver now
1063
return;
1064      }
1065
1066      // Don't bother with expired messages
1067
if (message.isExpired())
1068      {
1069         expireMessageAsync(message);
1070         return;
1071      }
1072
1073      try
1074      {
1075         Subscription found = null;
1076         synchronized (receivers)
1077         {
1078            if (receivers.size() != 0)
1079            {
1080               for (Iterator JavaDoc it = receivers.iterator(); it.hasNext();)
1081               {
1082                  Subscription sub = (Subscription) it.next();
1083                  if (sub.accepts(message.getHeaders()))
1084                  {
1085                     it.remove();
1086                     found = sub;
1087                     break;
1088                  }
1089               }
1090            }
1091
1092            if (found == null)
1093            {
1094               synchronized (messages)
1095               {
1096                  messages.add(message);
1097
1098                  // If a message is set to expire, and nobody wants it, put its reaper in
1099
// the timer queue
1100
if (message.messageExpiration > 0)
1101                  {
1102                     addTimeout(message, new ExpireMessageTask(message), message.messageExpiration);
1103                  }
1104               }
1105            }
1106         }
1107         
1108         // Queue to the receiver
1109
if (found != null)
1110            queueMessageForSending(found, message);
1111      }
1112      catch (JMSException JavaDoc e)
1113      {
1114         // Could happen at the accepts() calls
1115
log.error("Caught unusual exception in internalAddMessage.", e);
1116         // And drop the message, otherwise we have a leak in the cache
1117
dropMessage(message);
1118      }
1119   }
1120
1121   /**
1122    * Queue a message for sending through the client consumer
1123    *
1124    * @param sub the subscirption to receive the message
1125    * @param message the message reference to queue
1126    */

1127   protected void queueMessageForSending(Subscription sub, MessageReference message)
1128   {
1129      boolean trace = log.isTraceEnabled();
1130      if (trace)
1131         log.trace("queueMessageForSending " + " " + sub + " " + message + " " + this);
1132
1133      try
1134      {
1135         setupMessageAcknowledgement(sub, message);
1136         RoutedMessage r = new RoutedMessage();
1137         r.message = message;
1138         r.subscriptionId = new Integer JavaDoc(sub.subscriptionId);
1139         ((ClientConsumer) sub.clientConsumer).queueMessageForSending(r);
1140      }
1141      catch (Throwable JavaDoc t)
1142      {
1143         log.warn("Caught unusual exception sending message to receiver.", t);
1144      }
1145   }
1146
1147   /**
1148    * Setup a message acknowledgement
1149    *
1150    * @param sub the subscription receiving the message
1151    * @param messageRef the message to be acknowledged
1152    * @throws JMSException for any error
1153    */

1154   protected void setupMessageAcknowledgement(Subscription sub, MessageReference messageRef) throws JMSException JavaDoc
1155   {
1156      SpyMessage message = messageRef.getMessage();
1157      AcknowledgementRequest nack = new AcknowledgementRequest(false);
1158      nack.destination = message.getJMSDestination();
1159      nack.messageID = message.getJMSMessageID();
1160      nack.subscriberId = sub.subscriptionId;
1161
1162      synchronized (messages)
1163      {
1164         UnackedMessageInfo unacked = new UnackedMessageInfo(messageRef, sub);
1165         unacknowledgedMessages.put(nack, unacked);
1166         unackedByMessageRef.put(messageRef, nack);
1167         HashMap JavaDoc map = (HashMap JavaDoc) unackedBySubscription.get(sub);
1168         if (map == null)
1169         {
1170            map = new HashMap JavaDoc();
1171            unackedBySubscription.put(sub, map);
1172         }
1173         map.put(messageRef, nack);
1174      }
1175   }
1176
1177   /**
1178    * Remove a message
1179    *
1180    * @param message the message to remove
1181    */

1182   protected void dropMessage(MessageReference message)
1183   {
1184      dropMessage(message, null);
1185   }
1186   
1187   /**
1188    * Remove a message
1189    *
1190    * @param message the message to remove
1191    * @param txid the transaction context for the removal
1192    */

1193   protected void dropMessage(MessageReference message, Tx txid)
1194   {
1195      boolean trace = log.isTraceEnabled();
1196      if (trace)
1197         log.trace("dropMessage " + this + " txid=" + txid);
1198
1199      clearEvent(message);
1200      try
1201      {
1202         if (message.isPersistent())
1203         {
1204            try
1205            {
1206               server.getPersistenceManager().remove(message, txid);
1207            }
1208            catch (JMSException JavaDoc e)
1209            {
1210               try
1211               {
1212                  log.warn("Message removed from queue, but not from the persistent store: " + message.getMessage(), e);
1213               }
1214               catch (JMSException JavaDoc x)
1215               {
1216                  log.warn("Message removed from queue, but not from the persistent store: " + message, e);
1217               }
1218            }
1219         }
1220         server.getMessageCache().remove(message);
1221      }
1222      catch (JMSException JavaDoc e)
1223      {
1224         log.warn("Error dropping message " + message, e);
1225      }
1226   }
1227
1228   /**
1229    * Expire a message asynchronously.
1230    *
1231    * @param messageRef the message to remove
1232    */

1233   protected void expireMessageAsync(MessageReference messageRef)
1234   {
1235      server.getThreadPool().run(new ExpireMessageTask(messageRef));
1236   }
1237   
1238   /**
1239    * Expire a message
1240    *
1241    * @param messageRef the message to remove
1242    */

1243   protected void expireMessage(MessageReference messageRef)
1244   {
1245      boolean trace = log.isTraceEnabled();
1246      if (trace)
1247         log.trace("message expired: " + messageRef);
1248
1249      SpyDestination ed = parameters.expiryDestination;
1250      if (ed == null)
1251      {
1252         dropMessage(messageRef);
1253         return;
1254      }
1255
1256      if (trace)
1257         log.trace("sending to: " + ed);
1258
1259      try
1260      {
1261         SpyMessage orig = messageRef.getMessage();
1262         SpyMessage copy = orig.myClone();
1263         copy.header.jmsPropertiesReadWrite = true;
1264         copy.setJMSExpiration(0);
1265         copy.setJMSDestination(ed);
1266         copy.setLongProperty(SpyMessage.PROPERTY_ORIG_EXPIRATION, orig.getJMSExpiration());
1267         copy.setStringProperty(SpyMessage.PROPERTY_ORIG_DESTINATION, orig.getJMSDestination().toString());
1268         TxManager tm = server.getPersistenceManager().getTxManager();
1269         Tx tx = tm.createTx();
1270         try
1271         {
1272            server.addMessage(null, copy, tx);
1273            dropMessage(messageRef, tx);
1274            tm.commitTx(tx);
1275         }
1276         catch (JMSException JavaDoc e)
1277         {
1278            tm.rollbackTx(tx);
1279            throw e;
1280         }
1281      }
1282      catch (JMSException JavaDoc e)
1283      {
1284         log.error("Could not move expired message: " + messageRef, e);
1285      }
1286   }
1287
1288   /**
1289    * Check whether a removed subscription can be permenantly removed.
1290    * This method is private because it assumes external synchronization
1291    *
1292    * @param the subscription to check
1293    */

1294   private void checkRemovedSubscribers(Subscription sub)
1295   {
1296      boolean trace = log.isTraceEnabled();
1297      if (removedSubscribers.contains(sub) && hasUnackedMessages(sub) == false)
1298      {
1299         if (trace)
1300            log.trace("Removing subscriber " + sub);
1301         removedSubscribers.remove(sub);
1302         subscribers.remove(sub);
1303         ((ClientConsumer) sub.clientConsumer).removeRemovedSubscription(sub.subscriptionId);
1304      }
1305   }
1306
1307   /**
1308    * Check whether a subscription has unacknowledged messages.
1309    * This method is private because it assumes external synchronization
1310    *
1311    * @param sub the subscription to check
1312    * @return true when it has unacknowledged messages
1313    */

1314   private boolean hasUnackedMessages(Subscription sub)
1315   {
1316      return unackedBySubscription.containsKey(sub);
1317   }
1318
1319   /**
1320    * Rollback an add message
1321    */

1322   class AddMessagePostRollBackTask implements Runnable JavaDoc
1323   {
1324      MessageReference message;
1325
1326      AddMessagePostRollBackTask(MessageReference m)
1327      {
1328         message = m;
1329      }
1330
1331      public void run()
1332      {
1333         try
1334         {
1335            server.getMessageCache().remove(message);
1336         }
1337         catch (JMSException JavaDoc e)
1338         {
1339            log.error("Could not remove message from the message cache after an add rollback: ", e);
1340         }
1341      }
1342   }
1343
1344   /**
1345    * Add a message to the queue
1346    */

1347   class AddMessagePostCommitTask implements Runnable JavaDoc
1348   {
1349      MessageReference message;
1350
1351      AddMessagePostCommitTask(MessageReference m)
1352      {
1353         message = m;
1354      }
1355
1356      public void run()
1357      {
1358         internalAddMessage(message);
1359
1360         // update message counter
1361
if (counter != null)
1362         {
1363            counter.incrementCounter();
1364         }
1365      }
1366   }
1367
1368   /**
1369    * Restore a message to the queue
1370    */

1371   class RestoreMessageTask implements Runnable JavaDoc
1372   {
1373      MessageReference message;
1374
1375      RestoreMessageTask(MessageReference m)
1376      {
1377         message = m;
1378      }
1379
1380      public void run()
1381      {
1382         nackMessage(message);
1383      }
1384   }
1385
1386   /**
1387    * Remove a message
1388    */

1389   class RemoveMessageTask implements Runnable JavaDoc
1390   {
1391      MessageReference message;
1392
1393      RemoveMessageTask(MessageReference m)
1394      {
1395         message = m;
1396      }
1397
1398      public void run()
1399      {
1400         try
1401         {
1402            clearEvent(message);
1403            server.getMessageCache().remove(message);
1404         }
1405         catch (JMSException JavaDoc e)
1406         {
1407            log.error("Could not remove an acknowleged message from the message cache: ", e);
1408         }
1409      }
1410   }
1411
1412   /**
1413    * Schedele message delivery
1414    */

1415   private class EnqueueMessageTask implements TimeoutTarget
1416   {
1417      private MessageReference messageRef;
1418
1419      public EnqueueMessageTask(MessageReference messageRef)
1420      {
1421         this.messageRef = messageRef;
1422      }
1423
1424      public void timedOut(Timeout timeout)
1425      {
1426         if (log.isTraceEnabled())
1427            log.trace("scheduled message delivery: " + messageRef);
1428         events.remove(messageRef);
1429         scheduledMessages.remove(messageRef);
1430         internalAddMessage(messageRef);
1431      }
1432   }
1433
1434   /**
1435    * Drop a message when it expires
1436    */

1437   private class ExpireMessageTask implements TimeoutTarget, Runnable JavaDoc
1438   {
1439      private MessageReference messageRef;
1440
1441      public ExpireMessageTask(MessageReference messageRef)
1442      {
1443         this.messageRef = messageRef;
1444      }
1445
1446      public void timedOut(Timeout timout)
1447      {
1448         events.remove(messageRef);
1449         scheduledMessages.remove(messageRef);
1450         synchronized (messages)
1451         {
1452            // If the message was already sent, then do nothing
1453
// (This probably happens more than not)
1454
if (messages.remove(messageRef) == false)
1455               return;
1456         }
1457         expireMessage(messageRef);
1458      }
1459
1460      public void run()
1461      {
1462         expireMessage(messageRef);
1463      }
1464   }
1465
1466   /**
1467    * Information about unacknowledged messages
1468    */

1469   private static class UnackedMessageInfo
1470   {
1471      public MessageReference messageRef;
1472      public Subscription sub;
1473      public UnackedMessageInfo(MessageReference messageRef, Subscription sub)
1474      {
1475         this.messageRef = messageRef;
1476         this.sub = sub;
1477      }
1478   }
1479}
1480
Popular Tags