KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > mq > server > JMSTopic


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.mq.server;
23
24 import java.util.ArrayList JavaDoc;
25 import java.util.Iterator JavaDoc;
26 import java.util.TreeMap JavaDoc;
27
28 import javax.jms.IllegalStateException JavaDoc;
29 import javax.jms.JMSException JavaDoc;
30
31 import org.jboss.mq.DestinationFullException;
32 import org.jboss.mq.DurableSubscriptionID;
33 import org.jboss.mq.SpyDestination;
34 import org.jboss.mq.SpyJMSException;
35 import org.jboss.mq.SpyMessage;
36 import org.jboss.mq.SpyTopic;
37 import org.jboss.mq.Subscription;
38 import org.jboss.mq.pm.NewPersistenceManager;
39 import org.jboss.mq.pm.PersistenceManager;
40 import org.jboss.mq.pm.Tx;
41
42 import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
43
44 /**
45  * This class is a message queue which is stored (hashed by Destination) on the
46  * JMS provider
47  *
48  * @author Norbert Lataille (Norbert.Lataille@m4x.org)
49  * @author Hiram Chirino (Cojonudo14@hotmail.com)
50  * @author David Maplesden (David.Maplesden@orion.co.nz)
51  * @author Adrian Brock (adrian@jboss.com)
52  * @created August 16, 2001
53  * @version $Revision: 45317 $
54  */

55 public class JMSTopic extends JMSDestination
56 {
57
58    //Hashmap of ExclusiveQueues
59
ConcurrentReaderHashMap durQueues = new ConcurrentReaderHashMap();
60    ConcurrentReaderHashMap tempQueues = new ConcurrentReaderHashMap();
61
62    public JMSTopic(SpyDestination dest, ClientConsumer temporary, JMSDestinationManager server, BasicQueueParameters parameters) throws JMSException JavaDoc
63    {
64       super(dest, temporary, server, parameters);
65
66       // This is a bit of a hack, for backwards compatibility
67
PersistenceManager pm = server.getPersistenceManager();
68       parameters.lateClone = (pm instanceof NewPersistenceManager);
69    }
70
71    public void addSubscriber(Subscription sub) throws JMSException JavaDoc
72    {
73       SpyTopic topic = (SpyTopic) sub.destination;
74       DurableSubscriptionID id = topic.getDurableSubscriptionID();
75
76       if (id == null)
77       {
78          // create queue
79
ExclusiveQueue q = new ExclusiveQueue(server, destination, sub, parameters);
80
81          // create topic queue message counter
82
q.createMessageCounter(destination.getName(), q.getDescription(), true, false, parameters.messageCounterHistoryDayLimit);
83          
84          tempQueues.put(sub, q);
85          q.addSubscriber(sub);
86       }
87       else
88       {
89          PersistentQueue q = (PersistentQueue) durQueues.get(id);
90
91          // Check for already in use
92
if (q != null && q.isInUse())
93             throw new IllegalStateException JavaDoc("The durable subscription is already in use. " + id);
94          
95          // Check for a changed selector
96
boolean selectorChanged = false;
97          if (q != null)
98          {
99             String JavaDoc newSelector = sub.messageSelector;
100             String JavaDoc oldSelector = null;
101             if (q instanceof SelectorPersistentQueue)
102                oldSelector = ((SelectorPersistentQueue) q).selectorString;
103             if ((newSelector == null && oldSelector != null)
104                || (newSelector != null && newSelector.equals(oldSelector) == false))
105                selectorChanged = true;
106          }
107
108          if (q == null || //Brand new durable subscriber
109
!q.destination.equals(topic) || selectorChanged)
110          {
111             //subscription changed to new topic
112
server.getStateManager().setDurableSubscription(server, id, topic);
113
114             // Pickup the new queue
115
synchronized (durQueues)
116             {
117                q = (PersistentQueue) durQueues.get(id);
118             }
119          }
120          q.addSubscriber(sub);
121       }
122    }
123
124    public void removeSubscriber(Subscription sub) throws JMSException JavaDoc
125    {
126       BasicQueue queue = null;
127       SpyTopic topic = (SpyTopic) sub.destination;
128       DurableSubscriptionID id = topic.getDurableSubscriptionID();
129       if (id == null)
130          queue = (BasicQueue) tempQueues.get(sub);
131       else
132          queue = (BasicQueue) durQueues.get(id);
133       // The queue may be null if the durable subscription
134
// is destroyed before the consumer is unsubscribed!
135
if (queue == null)
136          ((ClientConsumer) sub.clientConsumer).removeRemovedSubscription(sub.subscriptionId);
137       else
138          queue.removeSubscriber(sub);
139    }
140
141    public void nackMessages(Subscription sub) throws JMSException JavaDoc
142    {
143       BasicQueue queue = null;
144       SpyTopic topic = (SpyTopic) sub.destination;
145       DurableSubscriptionID id = topic.getDurableSubscriptionID();
146       if (id == null)
147          queue = (BasicQueue) tempQueues.get(sub);
148       else
149          queue = (BasicQueue) durQueues.get(id);
150       if (queue != null)
151       {
152          queue.nackMessages(sub);
153       }
154    }
155
156    void cleanupSubscription(Subscription sub)
157    {
158       //just try to remove from tempQueues, don't worry if its not there
159
BasicQueue queue = (BasicQueue) tempQueues.remove(sub);
160       try
161       {
162          if (queue != null)
163             queue.removeAllMessages();
164       }
165       catch (JMSException JavaDoc e)
166       {
167          cat.debug("Error removing messages for subscription " + sub, e);
168       }
169    }
170
171    public void addReceiver(Subscription sub) throws JMSException JavaDoc
172    {
173       getQueue(sub).addReceiver(sub);
174    }
175
176    public void removeReceiver(Subscription sub)
177    {
178       try
179       {
180          getQueue(sub).removeReceiver(sub);
181       }
182       catch (JMSException JavaDoc e)
183       {
184          cat.trace("Subscription is not registered: " + sub, e);
185       }
186    }
187
188    public void restoreMessage(MessageReference messageRef)
189    {
190       try
191       {
192          SpyMessage spyMessage = messageRef.getMessage();
193          updateNextMessageId(spyMessage);
194          if (spyMessage.header.durableSubscriberID == null)
195          {
196             cat.debug("Trying to restore message with null durableSubscriberID");
197          }
198          else
199          {
200             BasicQueue queue = ((BasicQueue) durQueues.get(spyMessage.header.durableSubscriberID));
201             messageRef.queue = queue;
202             queue.restoreMessage(messageRef);
203          }
204       }
205       catch (JMSException JavaDoc e)
206       {
207          cat.error("Could not restore message:", e);
208       }
209    }
210
211    public void restoreMessage(SpyMessage message, Tx txid, int type)
212    {
213       try
214       {
215          updateNextMessageId(message);
216          if (message.header.durableSubscriberID == null)
217          {
218             cat.debug("Trying to restore message with null durableSubscriberID");
219          }
220          else
221          {
222             BasicQueue queue = (BasicQueue) durQueues.get(message.header.durableSubscriberID);
223             MessageReference messageRef = server.getMessageCache().add(message, queue, MessageReference.STORED);
224             queue.restoreMessage(messageRef, txid, type);
225          }
226       }
227       catch (JMSException JavaDoc e)
228       {
229          cat.error("Could not restore message:", e);
230       }
231    }
232
233    public void restoreMessage(SpyMessage message, DurableSubscriptionID id)
234    {
235       try
236       {
237          updateNextMessageId(message);
238          if (id == null)
239          {
240             cat.debug("Trying to restore message with null durableSubscriberID");
241          }
242          else
243          {
244             BasicQueue queue = (BasicQueue) durQueues.get(id);
245             MessageReference messageRef = server.getMessageCache().add(message, queue, MessageReference.STORED, id);
246             queue.restoreMessage(messageRef);
247          }
248       }
249       catch (JMSException JavaDoc e)
250       {
251          cat.error("Could not restore message:", e);
252       }
253    }
254
255    //called by state manager when a durable sub is created
256
public void createDurableSubscription(DurableSubscriptionID id) throws JMSException JavaDoc
257    {
258       if (temporaryDestination != null)
259          throw new JMSException JavaDoc("Not a valid operation on a temporary topic");
260
261       SpyTopic dstopic = new SpyTopic((SpyTopic) destination, id);
262
263       Throwable JavaDoc error = null;
264       for (int i = 0; i <= parameters.recoveryRetries; ++i)
265       {
266          // Create a subscription
267
BasicQueue queue;
268          if (id.getSelector() == null)
269             queue = new PersistentQueue(server, dstopic, parameters);
270          else
271             queue = new SelectorPersistentQueue(server, dstopic, id.getSelector(), parameters);
272
273          // create topic queue message counter
274
queue.createMessageCounter(destination.getName(), id.toString(), true, true, parameters.messageCounterHistoryDayLimit);
275
276          durQueues.put(id, queue);
277
278          try
279          {
280             // restore persistent queue data
281
server.getPersistenceManager().restoreQueue(this, dstopic);
282             
283             // done
284
break;
285          }
286          catch (Throwable JavaDoc t)
287          {
288             if (i < parameters.recoveryRetries)
289                cat.warn("Error restoring topic subscription " + queue + " retries=" + i + " of " + parameters.recoveryRetries, t);
290             else
291                error = t;
292             try
293             {
294                queue.stop();
295             }
296             catch (Throwable JavaDoc ignored)
297             {
298                cat.trace("Ignored error stopping topic subscription " + queue, ignored);
299             }
300             finally
301             {
302                durQueues.remove(id);
303                queue = null;
304             }
305          }
306       }
307       
308       if (error != null)
309          SpyJMSException.rethrowAsJMSException("Unable to recover topic subscription " + id + " retries=" + parameters.recoveryRetries, error);
310    }
311
312    //called by JMSServer when a destination is being closed.
313
public void close() throws JMSException JavaDoc
314    {
315       if (temporaryDestination != null)
316          throw new JMSException JavaDoc("Not a valid operation on a temporary topic");
317       
318       Iterator JavaDoc i = tempQueues.values().iterator();
319       while (i.hasNext())
320       {
321          ExclusiveQueue queue = (ExclusiveQueue) i.next();
322          queue.stop();
323       }
324
325       i = durQueues.values().iterator();
326       while (i.hasNext())
327       {
328          PersistentQueue queue = (PersistentQueue) i.next();
329          queue.stop();
330          server.getPersistenceManager().closeQueue(this, queue.getSpyDestination());
331       }
332    }
333
334    //called by state manager when a durable sub is deleted
335
public void destroyDurableSubscription(DurableSubscriptionID id) throws JMSException JavaDoc
336    {
337       BasicQueue queue = (BasicQueue) durQueues.remove(id);
338       queue.removeAllMessages();
339    }
340
341    public SpyMessage receive(Subscription sub, boolean wait) throws javax.jms.JMSException JavaDoc
342    {
343       return getQueue(sub).receive(sub, wait);
344    }
345
346    public void acknowledge(org.jboss.mq.AcknowledgementRequest req, Subscription sub, org.jboss.mq.pm.Tx txId)
347       throws JMSException JavaDoc
348    {
349       getQueue(sub).acknowledge(req, txId);
350    }
351
352    public void addMessage(SpyMessage message, org.jboss.mq.pm.Tx txId) throws JMSException JavaDoc
353    {
354       StringBuffer JavaDoc errorMessage = null;
355
356       // Whether the message was added to a persistence queue
357
boolean added = false;
358
359       //Number the message so that we can preserve order of delivery.
360
long messageId = nextMessageId();
361          
362       if (parameters.lateClone)
363          message.header.messageId = messageId;
364
365       Iterator JavaDoc iter = durQueues.keySet().iterator();
366       while (iter.hasNext())
367       {
368          DurableSubscriptionID id = (DurableSubscriptionID) iter.next();
369          PersistentQueue q = (PersistentQueue) durQueues.get(id);
370          MessageReference ref;
371          if (parameters.lateClone)
372          {
373             ref = server.getMessageCache().add(message, q, MessageReference.NOT_STORED, id);
374          }
375          else
376          {
377             SpyMessage clone = message.myClone();
378             clone.header.durableSubscriberID = id;
379             clone.header.messageId = messageId;
380             clone.setJMSDestination(q.getSpyDestination());
381             ref = server.getMessageCache().add(clone, q, MessageReference.NOT_STORED);
382          }
383          try
384          {
385             // For shared blob write it for the first durable subscription
386
if (added == false && parameters.lateClone && ref.isPersistent())
387             {
388                NewPersistenceManager pm = (NewPersistenceManager) server.getPersistenceManager();
389                pm.addMessage(message);
390                added = true;
391             }
392             q.addMessage(ref, txId);
393          }
394          catch (DestinationFullException e)
395          {
396             if (errorMessage == null)
397                errorMessage = new StringBuffer JavaDoc(e.getText());
398             else
399                errorMessage.append(", ").append(e.getText());
400          }
401       }
402          
403       iter = tempQueues.values().iterator();
404       while (iter.hasNext())
405       {
406          BasicQueue q = (BasicQueue) iter.next();
407          MessageReference ref;
408          if (parameters.lateClone)
409          {
410             ref = server.getMessageCache().add(message, q, MessageReference.NOT_STORED);
411          }
412          else
413          {
414             SpyMessage clone = message.myClone();
415             clone.header.messageId = messageId;
416             ref = server.getMessageCache().add(clone, q, MessageReference.NOT_STORED);
417          }
418          try
419          {
420             q.addMessage(ref, txId);
421          }
422          catch (DestinationFullException e)
423          {
424             if (errorMessage == null)
425                errorMessage = new StringBuffer JavaDoc(e.getText());
426             else
427                errorMessage.append(", ").append(e.getText());
428          }
429       }
430       
431       if (errorMessage != null)
432          throw new DestinationFullException(errorMessage.toString());
433    }
434
435    public int getAllMessageCount()
436    {
437       return calculateMessageCount(getAllQueues());
438    }
439
440    public int getDurableMessageCount()
441    {
442       return calculateMessageCount(getPersistentQueues());
443    }
444
445    public int getNonDurableMessageCount()
446    {
447       return calculateMessageCount(getTemporaryQueues());
448    }
449
450    public ArrayList JavaDoc getAllQueues()
451    {
452       ArrayList JavaDoc result = new ArrayList JavaDoc(getAllSubscriptionsCount());
453       result.addAll(getPersistentQueues());
454       result.addAll(getTemporaryQueues());
455       return result;
456    }
457
458    public ArrayList JavaDoc getTemporaryQueues()
459    {
460       return new ArrayList JavaDoc(tempQueues.values());
461    }
462
463    public ArrayList JavaDoc getPersistentQueues()
464    {
465       return new ArrayList JavaDoc(durQueues.values());
466    }
467
468    public int getAllSubscriptionsCount()
469    {
470       return durQueues.size() + tempQueues.size();
471    }
472
473    public int getDurableSubscriptionsCount()
474    {
475       return durQueues.size();
476    }
477
478    public int getNonDurableSubscriptionsCount()
479    {
480       return tempQueues.size();
481    }
482
483    public ArrayList JavaDoc getAllSubscriptions()
484    {
485       ArrayList JavaDoc result = new ArrayList JavaDoc(getAllSubscriptionsCount());
486       result.addAll(getDurableSubscriptions());
487       result.addAll(getNonDurableSubscriptions());
488       return result;
489    }
490
491    public ArrayList JavaDoc getDurableSubscriptions()
492    {
493       return new ArrayList JavaDoc(durQueues.keySet());
494    }
495
496    public ArrayList JavaDoc getNonDurableSubscriptions()
497    {
498       return new ArrayList JavaDoc(tempQueues.keySet());
499    }
500
501    public PersistentQueue getDurableSubscription(DurableSubscriptionID id)
502    {
503       return (PersistentQueue) durQueues.get(id);
504    }
505
506    public BasicQueue getQueue(Subscription sub) throws JMSException JavaDoc
507    {
508       SpyTopic topic = (SpyTopic) sub.destination;
509       DurableSubscriptionID id = topic.getDurableSubscriptionID();
510       BasicQueue queue = null;
511       if (id != null)
512          queue = getDurableSubscription(id);
513       else
514          queue = (BasicQueue) tempQueues.get(sub);
515       
516       if (queue != null)
517          return queue;
518       else
519          throw new JMSException JavaDoc("Subscription not found: " + sub);
520    }
521
522    // Package protected ---------------------------------------------
523

524    /*
525     * @see JMSDestination#isInUse()
526     */

527    public boolean isInUse()
528    {
529       if (tempQueues.size() > 0)
530          return true;
531       Iterator JavaDoc iter = durQueues.values().iterator();
532       while (iter.hasNext())
533       {
534          PersistentQueue q = (PersistentQueue) iter.next();
535          if (q.isInUse())
536             return true;
537       }
538       return false;
539    }
540    /**
541     * @see JMSDestination#destroy()
542     */

543    public void removeAllMessages() throws JMSException JavaDoc
544    {
545       Iterator JavaDoc i = durQueues.values().iterator();
546       while (i.hasNext())
547       {
548          PersistentQueue queue = (PersistentQueue) i.next();
549          queue.removeAllMessages();
550       }
551    }
552
553    private int calculateMessageCount(ArrayList JavaDoc queues)
554    {
555       int count = 0;
556       for (Iterator JavaDoc i = queues.listIterator(); i.hasNext();)
557       {
558          BasicQueue queue = (BasicQueue) i.next();
559          count += queue.getQueueDepth();
560       }
561       return count;
562    }
563
564    /**
565     * Get message counter of all topic internal queues
566     *
567     * @return MessageCounter[] topic queue message counter array
568     */

569    public MessageCounter[] getMessageCounter()
570    {
571       TreeMap JavaDoc map = new TreeMap JavaDoc();
572
573       Iterator JavaDoc i = durQueues.values().iterator();
574
575       while (i.hasNext())
576       {
577          BasicQueue queue = (BasicQueue) i.next();
578          MessageCounter counter = queue.getMessageCounter();
579
580          if (counter != null)
581          {
582             String JavaDoc key = counter.getDestinationName() + counter.getDestinationSubscription();
583             map.put(key, counter);
584          }
585       }
586
587       i = tempQueues.values().iterator();
588
589       while (i.hasNext())
590       {
591          BasicQueue queue = (BasicQueue) i.next();
592          MessageCounter counter = queue.getMessageCounter();
593
594          if (counter != null)
595          {
596             String JavaDoc key = counter.getDestinationName() + counter.getDestinationSubscription();
597             map.put(key, counter);
598          }
599       }
600
601       return (MessageCounter[]) map.values().toArray(new MessageCounter[0]);
602    }
603 }
604
Popular Tags