KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > mq > server > JMSDestinationManager


1 /*
2  * JBoss, Home of Professional Open Source
3  * Copyright 2005, JBoss Inc., and individual contributors as indicated
4  * by the @authors tag. See the copyright.txt in the distribution for a
5  * full listing of individual contributors.
6  *
7  * This is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as
9  * published by the Free Software Foundation; either version 2.1 of
10  * the License, or (at your option) any later version.
11  *
12  * This software is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this software; if not, write to the Free
19  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21  */

22 package org.jboss.mq.server;
23
24 import java.util.Collection JavaDoc;
25 import java.util.HashMap JavaDoc;
26 import java.util.Iterator JavaDoc;
27 import java.util.Map JavaDoc;
28 import java.util.TreeMap JavaDoc;
29
30 import javax.jms.Destination JavaDoc;
31 import javax.jms.InvalidDestinationException JavaDoc;
32 import javax.jms.JMSException JavaDoc;
33 import javax.jms.Queue JavaDoc;
34 import javax.jms.TemporaryQueue JavaDoc;
35 import javax.jms.TemporaryTopic JavaDoc;
36 import javax.jms.Topic JavaDoc;
37 import javax.transaction.xa.Xid JavaDoc;
38
39 import org.jboss.mq.AcknowledgementRequest;
40 import org.jboss.mq.ConnectionToken;
41 import org.jboss.mq.DurableSubscriptionID;
42 import org.jboss.mq.SpyDestination;
43 import org.jboss.mq.SpyJMSException;
44 import org.jboss.mq.SpyMessage;
45 import org.jboss.mq.SpyQueue;
46 import org.jboss.mq.SpyTemporaryQueue;
47 import org.jboss.mq.SpyTemporaryTopic;
48 import org.jboss.mq.SpyTopic;
49 import org.jboss.mq.SpyTransactionRolledBackException;
50 import org.jboss.mq.Subscription;
51 import org.jboss.mq.TransactionRequest;
52 import org.jboss.mq.pm.PersistenceManager;
53 import org.jboss.mq.pm.Tx;
54 import org.jboss.mq.pm.TxManager;
55 import org.jboss.mq.sm.StateManager;
56 import org.jboss.util.threadpool.ThreadPool;
57 import org.jboss.util.timeout.TimeoutFactory;
58
59 import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
60
61 /**
62  * This class implements the JMS provider
63  *
64  * @author Norbert Lataille (Norbert.Lataille@m4x.org)
65  * @author Hiram Chirino (Cojonudo14@hotmail.com)
66  * @author David Maplesden (David.Maplesden@orion.co.nz)
67  * @author <a HREF="mailto:pra@tim.se">Peter Antman</a>
68  * @version $Revision: 55999 $
69  */

70 public class JMSDestinationManager extends JMSServerInterceptorSupport
71 {
72    /** The version */
73    public final static String JavaDoc JBOSS_VERSION = "JBossMQ Version 4.0";
74
75    /** Destinations SpyDestination -> JMSDestination */
76    public Map JavaDoc destinations = new ConcurrentReaderHashMap();
77
78    /** Destinations being closed SpyDestination -> JMSDestination */
79    public Map JavaDoc closingDestinations = new ConcurrentReaderHashMap();
80
81    /** Thread pool */
82    public ThreadPool threadPool;
83
84    /** Thread group */
85    public ThreadGroup JavaDoc threadGroup;
86
87    /** Timeout factory */
88    public TimeoutFactory timeoutFactory;
89
90    /** The list of ClientConsumers hased by ConnectionTokens */
91    Map JavaDoc clientConsumers = new ConcurrentReaderHashMap();
92
93    /** last id given to a client */
94    private int lastID = 1;
95
96    /** last id given to a temporary topic */
97    private int lastTemporaryTopic = 1;
98
99    private Object JavaDoc lastTemporaryTopicLock = new Object JavaDoc();
100
101    /** last id given to a temporary queue */
102    private int lastTemporaryQueue = 1;
103
104    private Object JavaDoc lastTemporaryQueueLock = new Object JavaDoc();
105
106    /** The security manager */
107    private StateManager stateManager;
108
109    /** The persistence manager */
110    private PersistenceManager persistenceManager;
111
112    /** The Cache Used to hold messages */
113    private MessageCache messageCache;
114
115    private Object JavaDoc stateLock = new Object JavaDoc();
116
117    private Object JavaDoc idLock = new Object JavaDoc();
118
119    /**
120     * Because there can be a delay between killing the JMS service and the
121     * service actually dying, this field is used to tell external classes that
122     * that server has actually stopped.
123     */

124    private boolean stopped = true;
125
126    /** Temporary queue/topic parameters */
127    BasicQueueParameters parameters;
128
129    /**
130     * Constructor for the JMSServer object
131     */

132    public JMSDestinationManager(BasicQueueParameters parameters)
133    {
134       this.parameters = parameters;
135    }
136
137    /**
138     * Sets the Enabled attribute of the JMSServer object
139     *
140     * @param dc The new Enabled value
141     * @param enabled The new Enabled value
142     * @exception JMSException Description of Exception
143     */

144    public void setEnabled(ConnectionToken dc, boolean enabled) throws JMSException JavaDoc
145    {
146       ClientConsumer ClientConsumer = getClientConsumer(dc);
147       ClientConsumer.setEnabled(enabled);
148    }
149
150    /**
151     * Sets the StateManager attribute of the JMSServer object
152     *
153     * @param newStateManager The new StateManager value
154     */

155    public void setStateManager(StateManager newStateManager)
156    {
157       stateManager = newStateManager;
158    }
159
160    /**
161     * Sets the PersistenceManager attribute of the JMSServer object
162     *
163     * @param newPersistenceManager The new PersistenceManager value
164     */

165    public void setPersistenceManager(org.jboss.mq.pm.PersistenceManager newPersistenceManager)
166    {
167       persistenceManager = newPersistenceManager;
168    }
169
170    /**
171     * Returns <code>false</code> if the JMS server is currently running and
172     * handling requests, <code>true</code> otherwise.
173     *
174     * @return <code>false</code> if the JMS server is currently running and
175     * handling requests, <code>true</code> otherwise.
176     */

177    public boolean isStopped()
178    {
179       synchronized (stateLock)
180       {
181          return this.stopped;
182       }
183    }
184
185    protected void checkStopped() throws IllegalStateException JavaDoc
186    {
187       if (isStopped())
188          throw new IllegalStateException JavaDoc("Server is stopped.");
189    }
190    
191    /**
192     *
193     * @return the current client count
194     */

195    public int getClientCount()
196    {
197       return clientConsumers.size();
198    }
199
200    /**
201     * Obtain a copy of the current clients
202     *
203     * @return a HashMap<ConnectionToken, ClientConsumer> of current clients
204     */

205    public HashMap JavaDoc getClients()
206    {
207       return new HashMap JavaDoc(clientConsumers);
208    }
209
210    public void setThreadPool(ThreadPool threadPool)
211    {
212       this.threadPool = threadPool;
213    }
214
215    public ThreadPool getThreadPool()
216    {
217       return threadPool;
218    }
219
220    public void setThreadGroup(ThreadGroup JavaDoc threadGroup)
221    {
222       this.threadGroup = threadGroup;
223    }
224
225    public ThreadGroup JavaDoc getThreadGroup()
226    {
227       return threadGroup;
228    }
229
230    public TimeoutFactory getTimeoutFactory()
231    {
232       return timeoutFactory;
233    }
234
235    /**
236     * Gets the ID attribute of the JMSServer object
237     *
238     * @return The ID value
239     */

240    public String JavaDoc getID()
241    {
242       String JavaDoc ID = null;
243
244       while (isStopped() == false)
245       {
246          if (stateManager == null)
247             throw new IllegalStateException JavaDoc("No statemanager");
248          try
249          {
250             synchronized (idLock)
251             {
252                ID = "ID:" + (new Integer JavaDoc(lastID++).toString());
253             }
254             stateManager.addLoggedOnClientId(ID);
255             break;
256          }
257          catch (Exception JavaDoc e)
258          {
259          }
260       }
261
262       checkStopped();
263       
264       return ID;
265    }
266
267    public TemporaryTopic JavaDoc getTemporaryTopic(ConnectionToken dc) throws JMSException JavaDoc
268    {
269       checkStopped();
270       
271       String JavaDoc topicName;
272       synchronized (lastTemporaryTopicLock)
273       {
274          topicName = "JMS_TT" + (new Integer JavaDoc(lastTemporaryTopic++).toString());
275       }
276       SpyTemporaryTopic topic = new SpyTemporaryTopic(topicName, dc);
277
278       ClientConsumer ClientConsumer = getClientConsumer(dc);
279       JMSDestination queue = new JMSTopic(topic, ClientConsumer, this, parameters);
280       destinations.put(topic, queue);
281
282       return topic;
283    }
284
285    public TemporaryQueue JavaDoc getTemporaryQueue(ConnectionToken dc) throws JMSException JavaDoc
286    {
287       checkStopped();
288       
289       String JavaDoc queueName;
290       synchronized (lastTemporaryQueueLock)
291       {
292          queueName = "JMS_TQ" + (new Integer JavaDoc(lastTemporaryQueue++).toString());
293       }
294       SpyTemporaryQueue newQueue = new SpyTemporaryQueue(queueName, dc);
295
296       ClientConsumer ClientConsumer = getClientConsumer(dc);
297       JMSDestination queue = new JMSQueue(newQueue, ClientConsumer, this, parameters);
298       destinations.put(newQueue, queue);
299
300       return newQueue;
301    }
302
303    public ClientConsumer getClientConsumer(ConnectionToken dc) throws JMSException JavaDoc
304    {
305       ClientConsumer cq = (ClientConsumer) clientConsumers.get(dc);
306       if (cq == null)
307       {
308          cq = new ClientConsumer(this, dc);
309          clientConsumers.put(dc, cq);
310       }
311       return cq;
312    }
313
314    public JMSDestination getJMSDestination(SpyDestination dest)
315    {
316       return (JMSDestination) destinations.get(dest);
317    }
318
319    /**
320     * Gets the JMSDestination attribute of the JMSServer object
321     * which might be being closed
322     *
323     * @param dest Description of Parameter
324     * @return The JMSDestination value
325     */

326    protected JMSDestination getPossiblyClosingJMSDestination(SpyDestination dest)
327    {
328       JMSDestination result = (JMSDestination) destinations.get(dest);
329       if (result == null)
330          result = (JMSDestination) closingDestinations.get(dest);
331       return result;
332    }
333
334    /**
335     * Gets the StateManager attribute of the JMSServer object
336     *
337     * @return The StateManager value
338     */

339    public StateManager getStateManager()
340    {
341       return stateManager;
342    }
343
344    /**
345     * Gets the PersistenceManager attribute of the JMSServer object
346     *
347     * @return The PersistenceManager value
348     */

349    public PersistenceManager getPersistenceManager()
350    {
351       return persistenceManager;
352    }
353
354    /**
355     * Start the server
356     */

357    public void startServer()
358    {
359       synchronized (stateLock)
360       {
361          this.stopped = false;
362          this.timeoutFactory = new TimeoutFactory(this.threadPool);
363       }
364    }
365
366    /**
367     * Stop the server
368     */

369    public void stopServer()
370    {
371       synchronized (stateLock)
372       {
373          this.stopped = true;
374          this.timeoutFactory.cancel();
375          
376          for (Iterator JavaDoc i = clientConsumers.keySet().iterator(); i.hasNext();)
377          {
378             ConnectionToken token = (ConnectionToken) i.next();
379             try
380             {
381                connectionClosing(token);
382             }
383             catch (Throwable JavaDoc t)
384             {
385                log.trace("Ignored error closing client connection " + token, t);
386             }
387          }
388       }
389    }
390
391    public void checkID(String JavaDoc ID) throws JMSException JavaDoc
392    {
393       checkStopped();
394       stateManager.addLoggedOnClientId(ID);
395    }
396
397    public void addMessage(ConnectionToken dc, SpyMessage val) throws JMSException JavaDoc
398    {
399       addMessage(dc, val, null);
400    }
401
402    public void addMessage(ConnectionToken dc, SpyMessage val, Tx txId) throws JMSException JavaDoc
403    {
404       checkStopped();
405       JMSDestination queue = (JMSDestination) destinations.get(val.getJMSDestination());
406       if (queue == null)
407          throw new InvalidDestinationException JavaDoc("This destination does not exist! " + val.getJMSDestination());
408
409       // Reset any redelivered information
410
val.setJMSRedelivered(false);
411       val.header.jmsProperties.remove(SpyMessage.PROPERTY_REDELIVERY_COUNT);
412
413       //Add the message to the queue
414
val.setReadOnlyMode();
415       queue.addMessage(val, txId);
416    }
417
418    public void transact(ConnectionToken dc, TransactionRequest t) throws JMSException JavaDoc
419    {
420       checkStopped();
421       boolean trace = log.isTraceEnabled();
422       TxManager txManager = persistenceManager.getTxManager();
423       if (t.requestType == TransactionRequest.ONE_PHASE_COMMIT_REQUEST)
424       {
425          Tx txId = txManager.createTx();
426          if (trace)
427             log.trace(dc + " 1PC " + t.xid + " txId=" + txId.longValue());
428          try
429          {
430             if (t.messages != null)
431             {
432                for (int i = 0; i < t.messages.length; i++)
433                {
434                   addMessage(dc, t.messages[i], txId);
435                }
436             }
437             if (t.acks != null)
438             {
439                for (int i = 0; i < t.acks.length; i++)
440                {
441                   acknowledge(dc, t.acks[i], txId);
442                }
443             }
444             txManager.commitTx(txId);
445          }
446          catch (JMSException JavaDoc e)
447          {
448             log.debug("Exception occured, rolling back transaction: ", e);
449             txManager.rollbackTx(txId);
450             throw new SpyTransactionRolledBackException("Transaction was rolled back.", e);
451          }
452       }
453       else if (t.requestType == TransactionRequest.TWO_PHASE_COMMIT_PREPARE_REQUEST)
454       {
455          Tx txId = txManager.createTx(dc, t.xid);
456          if (trace)
457             log.trace(dc + " 2PC PREPARE " + t.xid + " txId=" + txId.longValue());
458          try
459          {
460             if (t.messages != null)
461             {
462                for (int i = 0; i < t.messages.length; i++)
463                {
464                   addMessage(dc, t.messages[i], txId);
465                }
466             }
467             if (t.acks != null)
468             {
469                for (int i = 0; i < t.acks.length; i++)
470                {
471                   acknowledge(dc, t.acks[i], txId);
472                }
473             }
474             
475             txManager.markPrepared(dc, t.xid, txId);
476          }
477          catch (JMSException JavaDoc e)
478          {
479             log.debug("Exception occured, rolling back transaction: ", e);
480             txManager.rollbackTx(txId);
481             throw new SpyTransactionRolledBackException("Transaction was rolled back.", e);
482          }
483       }
484       else if (t.requestType == TransactionRequest.TWO_PHASE_COMMIT_ROLLBACK_REQUEST)
485       {
486          if (trace)
487             log.trace(dc + " 2PC ROLLBACK " + t.xid);
488          txManager.rollbackTx(dc, t.xid);
489       }
490       else if (t.requestType == TransactionRequest.TWO_PHASE_COMMIT_COMMIT_REQUEST)
491       {
492          if (trace)
493             log.trace(dc + " 2PC COMMIT " + t.xid);
494          txManager.commitTx(dc, t.xid);
495       }
496    }
497
498    public Xid JavaDoc[] recover(ConnectionToken dc, int flags) throws Exception JavaDoc
499    {
500       checkStopped();
501       TxManager txManager = persistenceManager.getTxManager();
502       return txManager.recover(dc, flags);
503    }
504
505    public void acknowledge(ConnectionToken dc, AcknowledgementRequest item) throws JMSException JavaDoc
506    {
507       acknowledge(dc, item, null);
508    }
509
510    public void acknowledge(ConnectionToken dc, AcknowledgementRequest item, Tx txId) throws JMSException JavaDoc
511    {
512       checkStopped();
513       ClientConsumer cc = getClientConsumer(dc);
514       cc.acknowledge(item, txId);
515    }
516
517    public void connectionClosing(ConnectionToken dc) throws JMSException JavaDoc
518    {
519       if (dc == null)
520          return;
521
522       // Close it's ClientConsumer
523
ClientConsumer cq = (ClientConsumer) clientConsumers.remove(dc);
524       if (cq != null)
525          cq.close();
526
527       //unregister its clientID
528
if (dc.getClientID() != null)
529          stateManager.removeLoggedOnClientId(dc.getClientID());
530
531       //Remove any temporary destinations the consumer may have created.
532
Iterator JavaDoc i = destinations.entrySet().iterator();
533       while (i.hasNext())
534       {
535          Map.Entry JavaDoc entry = (Map.Entry JavaDoc) i.next();
536          JMSDestination sq = (JMSDestination) entry.getValue();
537          if (sq != null)
538          {
539             ClientConsumer cc = sq.temporaryDestination;
540             if (cc != null && dc.equals(cc.connectionToken))
541             {
542                i.remove();
543                deleteTemporaryDestination(dc, sq);
544             }
545          }
546       }
547       // Close the clientIL
548
try
549       {
550          if (dc.clientIL != null)
551             dc.clientIL.close();
552       }
553       catch (Exception JavaDoc ex)
554       {
555          // We skip warning, to often the client will always
556
// have gone when we get here
557
//log.warn("Could not close clientIL: " +ex,ex);
558
}
559    }
560
561    public void connectionFailure(ConnectionToken dc) throws JMSException JavaDoc
562    {
563       //We should try again :) This behavior should under control of a Failure-Plugin
564
log.error("The connection to client " + dc.getClientID() + " failed.");
565       connectionClosing(dc);
566    }
567
568    public void subscribe(ConnectionToken dc, Subscription sub) throws JMSException JavaDoc
569    {
570       checkStopped();
571       ClientConsumer clientConsumer = getClientConsumer(dc);
572       clientConsumer.addSubscription(sub);
573    }
574
575    public void unsubscribe(ConnectionToken dc, int subscriptionId) throws JMSException JavaDoc
576    {
577       checkStopped();
578       ClientConsumer clientConsumer = getClientConsumer(dc);
579       clientConsumer.removeSubscription(subscriptionId);
580    }
581
582    public void destroySubscription(ConnectionToken dc, DurableSubscriptionID id) throws JMSException JavaDoc
583    {
584       checkStopped();
585       getStateManager().setDurableSubscription(this, id, null);
586    }
587
588    public SpyMessage[] browse(ConnectionToken dc, Destination JavaDoc dest, String JavaDoc selector) throws JMSException JavaDoc
589    {
590       checkStopped();
591       JMSDestination queue = (JMSDestination) destinations.get(dest);
592       if (queue == null)
593          throw new InvalidDestinationException JavaDoc("That destination does not exist! " + dest);
594       if (!(queue instanceof JMSQueue))
595          throw new JMSException JavaDoc("That destination is not a queue");
596
597       return ((JMSQueue) queue).browse(selector);
598    }
599
600    public SpyMessage receive(ConnectionToken dc, int subscriberId, long wait) throws JMSException JavaDoc
601    {
602       checkStopped();
603       ClientConsumer clientConsumer = getClientConsumer(dc);
604       SpyMessage msg = clientConsumer.receive(subscriberId, wait);
605       return msg;
606    }
607
608    public Queue JavaDoc createQueue(ConnectionToken dc, String JavaDoc name) throws JMSException JavaDoc
609    {
610       checkStopped();
611       SpyQueue newQueue = new SpyQueue(name);
612       if (!destinations.containsKey(newQueue))
613          throw new JMSException JavaDoc("This destination does not exist !" + newQueue);
614       return newQueue;
615    }
616
617    public Topic JavaDoc createTopic(ConnectionToken dc, String JavaDoc name) throws JMSException JavaDoc
618    {
619       checkStopped();
620       SpyTopic newTopic = new SpyTopic(name);
621       if (!destinations.containsKey(newTopic))
622          throw new JMSException JavaDoc("This destination does not exist !" + newTopic);
623       return newTopic;
624    }
625    
626    public Queue JavaDoc createQueue(String JavaDoc queueName) throws JMSException JavaDoc
627    {
628       checkStopped();
629       
630       SpyTemporaryQueue newQueue = new SpyTemporaryQueue(queueName, null);
631
632       JMSDestination queue = new JMSQueue(newQueue, null, this, parameters);
633       destinations.put(newQueue, queue);
634
635       return newQueue;
636    }
637    
638    public Topic JavaDoc createTopic(String JavaDoc topicName) throws JMSException JavaDoc
639    {
640       checkStopped();
641       
642       SpyTemporaryTopic topic = new SpyTemporaryTopic(topicName, null);
643
644       JMSDestination queue = new JMSTopic(topic, null, this, parameters);
645       destinations.put(topic, queue);
646
647       return topic;
648    }
649
650    public void deleteTemporaryDestination(ConnectionToken dc, SpyDestination dest) throws JMSException JavaDoc
651    {
652       checkStopped();
653       JMSDestination destination = (JMSDestination) destinations.get(dest);
654       if (destination == null)
655          throw new InvalidDestinationException JavaDoc("That destination does not exist! " + destination);
656
657       if (destination.isInUse())
658          throw new JMSException JavaDoc("Cannot delete temporary queue, it is in use.");
659
660       destinations.remove(dest);
661       deleteTemporaryDestination(dc, destination);
662    }
663
664    protected void deleteTemporaryDestination(ConnectionToken dc, JMSDestination destination) throws JMSException JavaDoc
665    {
666       try
667       {
668          destination.removeAllMessages();
669       }
670       catch (Exception JavaDoc e)
671       {
672          log.error("An exception happened while removing all messages from temporary destination "
673                + destination.getSpyDestination().getName(), e);
674       }
675
676    }
677
678    public String JavaDoc checkUser(String JavaDoc userName, String JavaDoc password) throws JMSException JavaDoc
679    {
680       checkStopped();
681       return stateManager.checkUser(userName, password);
682    }
683
684    public String JavaDoc authenticate(String JavaDoc id, String JavaDoc password) throws JMSException JavaDoc
685    {
686       checkStopped();
687       // do nothing
688
return null;
689    }
690
691    public void addDestination(JMSDestination destination) throws JMSException JavaDoc
692    {
693       if (destinations.containsKey(destination.getSpyDestination()))
694          throw new JMSException JavaDoc("This destination has already been added to the server!");
695
696       //Add this new destination to the list
697
destinations.put(destination.getSpyDestination(), destination);
698
699       // Restore the messages
700
if (destination instanceof JMSTopic)
701       {
702          Collection JavaDoc durableSubs = getStateManager().getDurableSubscriptionIdsForTopic((SpyTopic) destination.getSpyDestination());
703          for (Iterator JavaDoc i = durableSubs.iterator(); i.hasNext();)
704          {
705             DurableSubscriptionID sub = (DurableSubscriptionID) i.next();
706             log.debug("creating the durable subscription for :" + sub);
707             ((JMSTopic) destination).createDurableSubscription(sub);
708          }
709       }
710    }
711
712    /**
713     * Closed a destination that was opened previously
714     *
715     * @param dest the destionation to close
716     * @exception JMSException Description of Exception
717     */

718    public void closeDestination(SpyDestination dest) throws JMSException JavaDoc
719    {
720       JMSDestination destination = (JMSDestination) destinations.remove(dest);
721       if (destination == null)
722          throw new InvalidDestinationException JavaDoc("This destination is not open! " + dest);
723
724       log.debug("Closing destination " + dest);
725
726       // Add it to the closing list
727
closingDestinations.put(dest, destination);
728       try
729       {
730          destination.close();
731       }
732       finally
733       {
734          closingDestinations.remove(dest);
735       }
736    }
737
738    public String JavaDoc toString()
739    {
740       return JBOSS_VERSION;
741    }
742
743    public void ping(ConnectionToken dc, long clientTime) throws JMSException JavaDoc
744    {
745       checkStopped();
746       try
747       {
748          dc.clientIL.pong(System.currentTimeMillis());
749       }
750       catch (Exception JavaDoc e)
751       {
752          throw new SpyJMSException("Could not pong", e);
753       }
754    }
755
756    /**
757     * Gets the messageCache
758     * @return Returns a MessageCache
759     */

760    public MessageCache getMessageCache()
761    {
762       return messageCache;
763    }
764
765    /**
766     * Sets the messageCache
767     * @param messageCache The messageCache to set
768     */

769    public void setMessageCache(MessageCache messageCache)
770    {
771       this.messageCache = messageCache;
772    }
773
774    public SpyTopic getDurableTopic(DurableSubscriptionID sub) throws JMSException JavaDoc
775    {
776       checkStopped();
777       return getStateManager().getDurableTopic(sub);
778    }
779
780    public Subscription getSubscription(ConnectionToken dc, int subscriberId) throws JMSException JavaDoc
781    {
782       checkStopped();
783       ClientConsumer clientConsumer = getClientConsumer(dc);
784       return clientConsumer.getSubscription(subscriberId);
785    }
786
787    /**
788     * Gets message counters of all configured destinations
789     *
790     * @return MessageCounter[] message counter array sorted by name
791     */

792    public MessageCounter[] getMessageCounter()
793    {
794       TreeMap JavaDoc map = new TreeMap JavaDoc(); // for sorting
795

796       Iterator JavaDoc i = destinations.values().iterator();
797
798       while (i.hasNext())
799       {
800          JMSDestination dest = (JMSDestination) i.next();
801
802          MessageCounter[] counter = dest.getMessageCounter();
803
804          for (int j = 0; j < counter.length; j++)
805          {
806             // sorting order name + subscription + type
807
String JavaDoc key = counter[j].getDestinationName() + "-" + counter[j].getDestinationSubscription() + "-"
808                   + (counter[j].getDestinationTopic() ? "Topic" : "Queue");
809
810             map.put(key, counter[j]);
811          }
812       }
813
814       return (MessageCounter[]) map.values().toArray(new MessageCounter[0]);
815    }
816
817    /**
818     * Resets message counters of all configured destinations
819     */

820    public void resetMessageCounter()
821    {
822       Iterator JavaDoc i = destinations.values().iterator();
823
824       while (i.hasNext())
825       {
826          JMSDestination dest = (JMSDestination) i.next();
827
828          MessageCounter[] counter = dest.getMessageCounter();
829
830          for (int j = 0; j < counter.length; j++)
831          {
832             counter[j].resetCounter();
833          }
834       }
835    }
836
837    public BasicQueueParameters getParameters()
838    {
839       return parameters;
840    }
841 }
842
Popular Tags