KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > mq > server > BasicQueue


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.mq.server;
23
24 import java.util.ArrayList JavaDoc;
25 import java.util.Date JavaDoc;
26 import java.util.HashMap JavaDoc;
27 import java.util.HashSet JavaDoc;
28 import java.util.Iterator JavaDoc;
29 import java.util.LinkedList JavaDoc;
30 import java.util.List JavaDoc;
31 import java.util.Map JavaDoc;
32 import java.util.Set JavaDoc;
33 import java.util.SortedSet JavaDoc;
34 import java.util.TreeSet JavaDoc;
35
36 import javax.jms.IllegalStateException JavaDoc;
37 import javax.jms.JMSException JavaDoc;
38
39 import org.jboss.logging.Logger;
40 import org.jboss.mq.AcknowledgementRequest;
41 import org.jboss.mq.DestinationFullException;
42 import org.jboss.mq.SpyDestination;
43 import org.jboss.mq.SpyJMSException;
44 import org.jboss.mq.SpyMessage;
45 import org.jboss.mq.Subscription;
46 import org.jboss.mq.pm.Tx;
47 import org.jboss.mq.pm.TxManager;
48 import org.jboss.mq.selectors.Selector;
49 import org.jboss.util.NestedRuntimeException;
50 import org.jboss.util.timeout.Timeout;
51 import org.jboss.util.timeout.TimeoutTarget;
52
53 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
54 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
55
56 /**
57  * This class represents a queue which provides it's messages exclusively to one
58  * consumer at a time.<p>
59  *
60  * Notes about synchronization: Much of the work is synchronized on
61  * the receivers or messages depending on the work performed.
62  * However, anything to do with unacknowledged messages and removed
63  * subscriptions must be done synchronized on both (receivers first).
64  * This is because there are multiple entry points with the possibility
65  * that a message acknowledgement (or NACK) is being processed at
66  * the same time as a network failure removes the subscription.
67  *
68  *
69  * @author Hiram Chirino (Cojonudo14@hotmail.com)
70  * @author Norbert Lataille (Norbert.Lataille@m4x.org)
71  * @author David Maplesden (David.Maplesden@orion.co.nz)
72  * @author Adrian Brock (Adrian@jboss.org)
73  * @created August 16, 2001
74  * @version $Revision: 57131 $
75  */

76 public class BasicQueue
77 {
78    static final Logger log = Logger.getLogger(BasicQueue.class);
79
80    /** List of messages waiting to be dispatched<p>
81        synchronized access on itself */

82    SortedSet JavaDoc messages = new TreeSet JavaDoc();
83
84    /** Events by message id */
85    ConcurrentHashMap events = new ConcurrentHashMap();
86    
87    /** The scheduled messages */
88    CopyOnWriteArraySet scheduledMessages = new CopyOnWriteArraySet();
89
90    /** The JMSServer object */
91    JMSDestinationManager server;
92
93    /** The subscribers waiting for messages - synchronized access on itself */
94    Receivers receivers;
95
96    /** The description used to seperate persistence for multiple subscriptions to a topic */
97    String JavaDoc description;
98
99    /** Simple Counter for gathering message add statistic data */
100    MessageCounter counter;
101
102    /** Unacknowledged messages AcknowledgementRequest -> UnackedMessageInfo<p>
103        synchronized access on receivers and messages */

104    HashMap JavaDoc unacknowledgedMessages = new HashMap JavaDoc();
105    /** Unacknowledged messages MessageRef -> UnackedMessageInfo <p>
106        synchronized access on receivers and messages */

107    HashMap JavaDoc unackedByMessageRef = new HashMap JavaDoc();
108    /** Unacknowledged messages Subscription -> UnackedMessageInfo <p>
109        synchronized access on receivers and messages */

110    HashMap JavaDoc unackedBySubscription = new HashMap JavaDoc();
111
112    /** Subscribers <p>
113     synchronized access on receivers */

114    HashSet JavaDoc subscribers = new HashSet JavaDoc();
115    
116    /** Removed subscribers <p>
117        synchronized access on receivers and messages */

118    HashSet JavaDoc removedSubscribers = new HashSet JavaDoc();
119    
120    /** The basic queue parameters */
121    BasicQueueParameters parameters;
122
123    /** Have we been stopped */
124    boolean stopped = false;
125
126    /**
127     * Construct a new basic queue
128     *
129     * @param server the destination manager
130     * @param description a description to uniquely identify the queue
131     * @param parameters the basic queue parameters
132     * @throws JMSException for any error
133     */

134    public BasicQueue(JMSDestinationManager server, String JavaDoc description, BasicQueueParameters parameters)
135       throws JMSException JavaDoc
136    {
137       this.server = server;
138       this.description = description;
139       this.parameters = parameters;
140       
141       Class JavaDoc receiversImpl = parameters.receiversImpl;
142       if (receiversImpl == null)
143          receiversImpl = ReceiversImpl.class;
144       
145       try
146       {
147          receivers = (Receivers) receiversImpl.newInstance();
148       }
149       catch (Throwable JavaDoc t)
150       {
151          throw new SpyJMSException("Error instantiating receivers implementation: " + receiversImpl, t);
152       }
153    }
154
155    /**
156     * Retrieve the unique description for this queue
157     *
158     * @return the description
159     */

160    public String JavaDoc getDescription()
161    {
162       return description;
163    }
164
165    /**
166     * Retrieve the number of receivers waiting for a message
167     *
168     * @return the number of receivers
169     */

170    public int getReceiversCount()
171    {
172       return receivers.size();
173    }
174
175    /**
176     * Retrieve the receivers waiting for a message
177     *
178     * @return an array of subscriptions
179     */

180    public ArrayList JavaDoc getReceivers()
181    {
182       synchronized (receivers)
183       {
184          return receivers.listReceivers();
185       }
186    }
187
188    /**
189     * Test whether the queue is in use
190     *
191     * @return true when there are subscribers
192     */

193    public boolean isInUse()
194    {
195       synchronized (receivers)
196       {
197          return subscribers.size() > 0;
198       }
199    }
200
201    /**
202     * Add a receiver to the queue
203     *
204     * @param sub the subscription to add
205     * @throws JMSException for any error
206     */

207    public void addReceiver(Subscription sub) throws JMSException JavaDoc
208    {
209       boolean trace = log.isTraceEnabled();
210       if (trace)
211          log.trace("addReceiver " + sub + " " + this);
212       
213       MessageReference found = null;
214       synchronized (messages)
215       {
216          if (messages.size() != 0)
217          {
218             for (Iterator JavaDoc it = messages.iterator(); it.hasNext();)
219             {
220                MessageReference message = (MessageReference) it.next();
221                try
222                {
223                   if (message.isExpired())
224                   {
225                      it.remove();
226                      expireMessageAsync(message);
227                   }
228                   else if (sub.accepts(message.getHeaders()))
229                   {
230                      //queue message for sending to this sub
231
it.remove();
232                      found = message;
233                      break;
234                   }
235                }
236                catch (JMSException JavaDoc ignore)
237                {
238                   log.info("Caught unusual exception in addToReceivers.", ignore);
239                }
240             }
241          }
242       }
243       if (found != null)
244          queueMessageForSending(sub, found);
245       else
246          addToReceivers(sub);
247    }
248
249    /**
250     * Get the subscribers
251     *
252     * @return the subscribers
253     */

254    public Set JavaDoc getSubscribers()
255    {
256       synchronized (receivers)
257       {
258          return (Set JavaDoc) subscribers.clone();
259       }
260    }
261    
262    /**
263     * Add a subscription from the queue
264     *
265     * @param sub the subscription to add
266     * @throws JMSException for any error
267     */

268    public void addSubscriber(Subscription sub) throws JMSException JavaDoc
269    {
270       boolean trace = log.isTraceEnabled();
271       if (trace)
272          log.trace("addSubscriber " + sub + " " + this);
273       synchronized (receivers)
274       {
275          if (stopped)
276             throw new IllegalStateException JavaDoc("The destination is stopped " + getDescription());
277          subscribers.add(sub);
278       }
279    }
280       
281    /**
282     * Removes a subscription from the queue
283     *
284     * @param sub the subscription to remove
285     */

286    public void removeSubscriber(Subscription sub)
287    {
288       boolean trace = log.isTraceEnabled();
289       if (trace)
290          log.trace("removeSubscriber " + sub + " " + this);
291       synchronized (receivers)
292       {
293          removeReceiver(sub);
294          synchronized (messages)
295          {
296             if (hasUnackedMessages(sub))
297             {
298                if (trace)
299                   log.trace("Delaying removal of subscriber is has unacked messages " + sub);
300                removedSubscribers.add(sub);
301             }
302             else
303             {
304                if (trace)
305                   log.trace("Removing subscriber " + sub);
306                subscribers.remove(sub);
307                ((ClientConsumer) sub.clientConsumer).removeRemovedSubscription(sub.subscriptionId);
308             }
309          }
310       }
311    }
312
313    /**
314     * Retrieve the queue depth
315     *
316     * @return the number of messages in the queue
317     */

318    public int getQueueDepth()
319    {
320       return messages.size();
321    }
322
323    /**
324     * Returns the number of scheduled messages in the queue
325     *
326     * @return the scheduled message count
327     */

328    public int getScheduledMessageCount()
329    {
330       return scheduledMessages.size();
331    }
332
333    /**
334     * Returns the number of in process messages for the queue
335     *
336     * @return the in process count
337     */

338    public int getInProcessMessageCount()
339    {
340       synchronized (messages)
341       {
342          return unacknowledgedMessages.size();
343       }
344    }
345
346    /**
347     * Add a message to the queue
348     *
349     * @param mes the message reference
350     * @param txId the transaction
351     * @throws JMSException for any error
352     */

353    public void addMessage(MessageReference mes, Tx txId) throws JMSException JavaDoc
354    {
355       boolean trace = log.isTraceEnabled();
356       if (trace)
357          log.trace("addMessage " + mes + " " + txId + " " + this);
358
359       try
360       {
361          synchronized (receivers)
362          {
363             if (stopped)
364                throw new IllegalStateException JavaDoc("The destination is stopped " + getDescription());
365          }
366          
367          if (parameters.maxDepth > 0)
368          {
369             synchronized (messages)
370             {
371                if (messages.size() >= parameters.maxDepth)
372                {
373                   dropMessage(mes);
374                   String JavaDoc message = "Maximum size " + parameters.maxDepth +
375                      " exceeded for " + description;
376                   log.warn(message);
377                   throw new DestinationFullException(message);
378                }
379             }
380          }
381
382          performOrPrepareAddMessage(mes, txId);
383       }
384       catch (Throwable JavaDoc t)
385       {
386          String JavaDoc error = "Error in addMessage " + mes;
387          log.trace(error, t);
388          dropMessage(mes, txId);
389          SpyJMSException.rethrowAsJMSException(error, t);
390       }
391    }
392
393    /**
394     * Either perform or prepare the add message
395     *
396     * @param mes the message reference
397     * @param txId the transaction id
398     * @throws Exception for any error
399     */

400    protected void performOrPrepareAddMessage(MessageReference mes, Tx txId) throws Exception JavaDoc
401    {
402       TxManager txManager = server.getPersistenceManager().getTxManager();
403       
404       // The message is removed from the cache on a rollback
405
Runnable JavaDoc task = new AddMessagePostRollBackTask(mes);
406       txManager.addPostRollbackTask(txId, task);
407
408       // The message gets added to the queue after the transaction commits
409
task = new AddMessagePostCommitTask(mes);
410       txManager.addPostCommitTask(txId, task);
411    }
412    
413    /**
414     * Restores a message.
415     *
416     * @param mes the message reference
417     */

418    public void restoreMessage(MessageReference mes)
419    {
420       restoreMessage(mes, null, Tx.UNKNOWN);
421    }
422
423    /**
424     * Restores a message.
425     *
426     * @param mes the message reference
427     * @param txid the transaction id
428     * @param type the type of restoration
429     */

430    public void restoreMessage(MessageReference mes, Tx txid, int type)
431    {
432       boolean trace = log.isTraceEnabled();
433       if (trace)
434          log.trace("restoreMessage " + mes + " " + this + " txid=" + txid + " type=" + type);
435
436       try
437       {
438          if (txid == null)
439          {
440             internalAddMessage(mes);
441          }
442          else if (type == Tx.ADD)
443          {
444             performOrPrepareAddMessage(mes, txid);
445          }
446          else if (type == Tx.REMOVE)
447          {
448             performOrPrepareAcknowledgeMessage(mes, txid);
449          }
450          else
451          {
452             throw new IllegalStateException JavaDoc("Unknown restore type " + type + " for message " + mes + " txid=" + txid);
453          }
454       }
455       catch (RuntimeException JavaDoc e)
456       {
457          throw e;
458       }
459       catch (Exception JavaDoc e)
460       {
461          throw new NestedRuntimeException("Unable to restore message " + mes, e);
462       }
463    }
464
465    /**
466     * Nacks a message.
467     */

468    protected void nackMessage(MessageReference message)
469    {
470       if (log.isTraceEnabled())
471          log.trace("Restoring message: " + message);
472
473       try
474       {
475          message.redelivered();
476          // Set redelivered, vendor-specific flags
477
message.invalidate();
478          // Update the persistent message outside the transaction
479
// We want to know the message might have been delivered regardless
480
if (message.isPersistent())
481             server.getPersistenceManager().update(message, null);
482       }
483       catch (JMSException JavaDoc e)
484       {
485          log.error("Caught unusual exception in nackMessage for " + message, e);
486       }
487
488       internalAddMessage(message);
489    }
490
491    /**
492     * Browse the queue
493     *
494     * @param selector the selector to apply, pass null for
495     * all messages
496     * @return the messages
497     * @throws JMSException for any error
498     */

499    public SpyMessage[] browse(String JavaDoc selector) throws JMSException JavaDoc
500    {
501       if (selector == null)
502       {
503          SpyMessage list[];
504          synchronized (messages)
505          {
506             list = new SpyMessage[messages.size()];
507             Iterator JavaDoc iter = messages.iterator();
508             for (int i = 0; iter.hasNext(); i++)
509                list[i] = ((MessageReference) iter.next()).getMessageForDelivery();
510          }
511          return list;
512       }
513       else
514       {
515          Selector s = new Selector(selector);
516          LinkedList JavaDoc selection = new LinkedList JavaDoc();
517
518          synchronized (messages)
519          {
520             Iterator JavaDoc i = messages.iterator();
521             while (i.hasNext())
522             {
523                MessageReference m = (MessageReference) i.next();
524                if (s.test(m.getHeaders()))
525                   selection.add(m.getMessageForDelivery());
526             }
527          }
528
529          SpyMessage list[];
530          list = new SpyMessage[selection.size()];
531          list = (SpyMessage[]) selection.toArray(list);
532          return list;
533       }
534    }
535
536    /**
537     * Browse the scheduled messages
538     *
539     * @param selector the selector to apply, pass null for
540     * all messages
541     * @return the messages
542     * @throws JMSException for any error
543     */

544    public List JavaDoc browseScheduled(String JavaDoc selector) throws JMSException JavaDoc
545    {
546       if (selector == null)
547       {
548          ArrayList JavaDoc list;
549          synchronized (messages)
550          {
551             list = new ArrayList JavaDoc(scheduledMessages.size());
552             Iterator JavaDoc iter = scheduledMessages.iterator();
553             while (iter.hasNext())
554             {
555                MessageReference ref = (MessageReference) iter.next();
556                list.add(ref.getMessageForDelivery());
557             }
558          }
559          return list;
560       }
561       else
562       {
563          Selector s = new Selector(selector);
564          LinkedList JavaDoc selection = new LinkedList JavaDoc();
565
566          synchronized (messages)
567          {
568             Iterator JavaDoc iter = scheduledMessages.iterator();
569             while (iter.hasNext())
570             {
571                MessageReference ref = (MessageReference) iter.next();
572                if (s.test(ref.getHeaders()))
573                   selection.add(ref.getMessageForDelivery());
574             }
575          }
576          
577          return selection;
578       }
579    }
580
581    /**
582     * Browse the in process messages
583     *
584     * @param selector the selector to apply, pass null for
585     * all messages
586     * @return the messages
587     * @throws JMSException for any error
588     */

589    public List JavaDoc browseInProcess(String JavaDoc selector) throws JMSException JavaDoc
590    {
591       if (selector == null)
592       {
593          ArrayList JavaDoc list;
594          synchronized (messages)
595          {
596             list = new ArrayList JavaDoc(unacknowledgedMessages.size());
597             Iterator JavaDoc iter = unacknowledgedMessages.values().iterator();
598             while (iter.hasNext())
599             {
600                UnackedMessageInfo unacked = (UnackedMessageInfo) iter.next();
601                MessageReference ref = unacked.messageRef;
602                list.add(ref.getMessageForDelivery());
603             }
604          }
605          return list;
606       }
607       else
608       {
609          Selector s = new Selector(selector);
610          LinkedList JavaDoc selection = new LinkedList JavaDoc();
611
612          synchronized (messages)
613          {
614             Iterator JavaDoc iter = unacknowledgedMessages.values().iterator();
615             while (iter.hasNext())
616             {
617                UnackedMessageInfo unacked = (UnackedMessageInfo) iter.next();
618                MessageReference ref = unacked.messageRef;
619                if (s.test(ref.getHeaders()))
620                   selection.add(ref.getMessageForDelivery());
621             }
622          }
623          
624          return selection;
625       }
626    }
627
628    /**
629     * Receive a message from the queue
630     *
631     * @param sub the subscription requiring a message
632     * @param wait whether to wait for a message
633     * @return the message
634     * @throws JMSException for any error
635     */

636    public SpyMessage receive(Subscription sub, boolean wait) throws JMSException JavaDoc
637    {
638       boolean trace = log.isTraceEnabled();
639       if (trace)
640          log.trace("receive " + sub + " wait=" + wait + " " + this);
641
642       MessageReference messageRef = null;
643       synchronized (receivers)
644       {
645          if (stopped)
646             throw new IllegalStateException JavaDoc("The destination is stopped " + getDescription());
647          // If the subscription is not picky, the first message will be it
648
if (sub.getSelector() == null && sub.noLocal == false)
649          {
650             synchronized (messages)
651             {
652                // find a non-expired message
653
while (messages.size() != 0)
654                {
655                   messageRef = (MessageReference) messages.first();
656                   messages.remove(messageRef);
657
658                   if (messageRef.isExpired())
659                   {
660                      expireMessageAsync(messageRef);
661                      messageRef = null;
662                   }
663                   else
664                      break;
665                }
666             }
667          }
668          else
669          {
670             // The subscription is picky, so we have to iterate.
671
synchronized (messages)
672             {
673                Iterator JavaDoc i = messages.iterator();
674                while (i.hasNext())
675                {
676                   MessageReference mr = (MessageReference) i.next();
677                   if (mr.isExpired())
678                   {
679                      i.remove();
680                      expireMessageAsync(mr);
681                   }
682                   else if (sub.accepts(mr.getHeaders()))
683                   {
684                      messageRef = mr;
685                      i.remove();
686                      break;
687                   }
688                }
689             }
690          }
691
692          if (messageRef == null)
693          {
694             if (wait)
695                addToReceivers(sub);
696          }
697          else
698          {
699             setupMessageAcknowledgement(sub, messageRef);
700          }
701       }
702
703       if (messageRef == null)
704          return null;
705       return messageRef.getMessageForDelivery();
706    }
707
708    /**
709     * Acknowledge a message
710     *
711     * @param item the acknowledgement request
712     * @param txId the transaction
713     * @throws JMSException for any error
714     */

715    public void acknowledge(AcknowledgementRequest item, Tx txId) throws JMSException JavaDoc
716    {
717       boolean trace = log.isTraceEnabled();
718       if (trace)
719          log.trace("acknowledge " + item + " " + txId + " " + this);
720
721       UnackedMessageInfo unacked = null;
722       synchronized (messages)
723       {
724          unacked = (UnackedMessageInfo) unacknowledgedMessages.remove(item);
725          if (unacked == null)
726             return;
727          unackedByMessageRef.remove(unacked.messageRef);
728          HashMap JavaDoc map = (HashMap JavaDoc) unackedBySubscription.get(unacked.sub);
729          if (map != null)
730             map.remove(unacked.messageRef);
731          if (map == null || map.isEmpty())
732            unackedBySubscription.remove(unacked.sub);
733       }
734
735       MessageReference m = unacked.messageRef;
736
737       // Was it a negative acknowledge??
738
if (!item.isAck)
739       {
740          Runnable JavaDoc task = new RestoreMessageTask(m);
741          server.getPersistenceManager().getTxManager().addPostCommitTask(txId, task);
742          server.getPersistenceManager().getTxManager().addPostRollbackTask(txId, task);
743       }
744       else
745       {
746          try
747          {
748             if (m.isPersistent())
749                server.getPersistenceManager().remove(m, txId);
750          }
751          catch (Throwable JavaDoc t)
752          {
753             // Something is wrong with the persistence manager,
754
// force a NACK with a rollback/error
755
Runnable JavaDoc task = new RestoreMessageTask(m);
756             server.getPersistenceManager().getTxManager().addPostCommitTask(txId, task);
757             server.getPersistenceManager().getTxManager().addPostRollbackTask(txId, task);
758             SpyJMSException.rethrowAsJMSException("Error during ACK ref=" + m, t);
759          }
760
761          performOrPrepareAcknowledgeMessage(m, txId);
762       }
763
764       synchronized (receivers)
765       {
766          synchronized (messages)
767          {
768             checkRemovedSubscribers(unacked.sub);
769          }
770       }
771    }
772
773    /**
774     * Either perform or prepare the acknowledge message
775     *
776     * @param mes the message reference
777     * @param txId the transaction id
778     * @throws Exception for any error
779     */

780    protected void performOrPrepareAcknowledgeMessage(MessageReference mes, Tx txId) throws JMSException JavaDoc
781    {
782       TxManager txManager = server.getPersistenceManager().getTxManager();
783       
784       // The message is restored to the queue on a rollback
785
Runnable JavaDoc task = new RestoreMessageTask(mes);
786       txManager.addPostRollbackTask(txId, task);
787
788       // The message is fully removed after the transaction commits
789
task = new RemoveMessageTask(mes);
790       txManager.addPostCommitTask(txId, task);
791    }
792
793    /**
794     * Nack all messages for a subscription
795     *
796     * @param sub the subscription
797     */

798    public void nackMessages(Subscription sub)
799    {
800       boolean trace = log.isTraceEnabled();
801       if (trace)
802          log.trace("nackMessages " + sub + " " + this);
803
804       // Send nacks for unacknowledged messages
805
synchronized (receivers)
806       {
807          synchronized (messages)
808          {
809             int count = 0;
810             HashMap JavaDoc map = (HashMap JavaDoc) unackedBySubscription.get(sub);
811             if (map != null)
812             {
813                Iterator JavaDoc i = ((HashMap JavaDoc) map.clone()).values().iterator();
814                while (i.hasNext())
815                {
816                   AcknowledgementRequest item = (AcknowledgementRequest) i.next();
817                   try
818                   {
819                      acknowledge(item, null);
820                      count++;
821                   }
822                   catch (JMSException JavaDoc ignore)
823                   {
824                      log.debug("Unable to nack message: " + item, ignore);
825                   }
826                }
827                if (log.isDebugEnabled())
828                   log.debug("Nacked " + count + " messages for removed subscription " + sub);
829             }
830          }
831       }
832    }
833
834    public void removeAllMessages() throws JMSException JavaDoc
835    {
836       boolean trace = log.isTraceEnabled();
837       if (trace)
838          log.trace("removeAllMessages " + this);
839
840       // Drop scheduled messages
841
for (Iterator JavaDoc i = events.entrySet().iterator(); i.hasNext();)
842       {
843          Map.Entry JavaDoc entry = (Map.Entry JavaDoc) i.next();
844          MessageReference message = (MessageReference) entry.getKey();
845          Timeout timeout = (Timeout) entry.getValue();
846          if (timeout != null)
847          {
848             timeout.cancel();
849             i.remove();
850             dropMessage(message);
851          }
852       }
853       scheduledMessages.clear();
854
855       synchronized (receivers)
856       {
857          synchronized (messages)
858          {
859             Iterator JavaDoc i = ((HashMap JavaDoc) unacknowledgedMessages.clone()).keySet().iterator();
860             while (i.hasNext())
861             {
862                AcknowledgementRequest item = (AcknowledgementRequest) i.next();
863                try
864                {
865                   acknowledge(item, null);
866                }
867                catch (JMSException JavaDoc ignore)
868                {
869                }
870             }
871
872             // Remove all remaining messages
873
i = messages.iterator();
874             while (i.hasNext())
875