KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > mq > server > JMSQueue


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.mq.server;
23
24 import java.util.ArrayList JavaDoc;
25
26 import javax.jms.JMSException JavaDoc;
27
28 import org.jboss.mq.SpyDestination;
29 import org.jboss.mq.SpyJMSException;
30 import org.jboss.mq.SpyMessage;
31 import org.jboss.mq.Subscription;
32 import org.jboss.mq.pm.Tx;
33
34 /**
35  * This class is a message queue which is stored (hashed by Destination) on the
36  * JMS provider
37  *
38  * @author Norbert Lataille (Norbert.Lataille@m4x.org)
39  * @author Hiram Chirino (Cojonudo14@hotmail.com)
40  * @author David Maplesden (David.Maplesden@orion.co.nz)
41  * @author Adrian Brock (adrian@jboss.com)
42  * @created August 16, 2001
43  * @version $Revision: 45317 $
44  */

45 public class JMSQueue extends JMSDestination
46 {
47
48    public BasicQueue queue;
49
50    public JMSQueue(SpyDestination dest, ClientConsumer temporary, JMSDestinationManager server, BasicQueueParameters parameters) throws JMSException JavaDoc
51    {
52       super(dest, temporary, server, parameters);
53
54       // If this is a non-temp queue, then we should persist data
55
if (temporaryDestination == null)
56       {
57          Throwable JavaDoc error = null;
58          for (int i = 0; i <= parameters.recoveryRetries; ++i)
59          {
60             // create queue
61
queue = new PersistentQueue(server, dest, parameters);
62
63             try
64             {
65                // restore persistent queue data
66
server.getPersistenceManager().restoreQueue(this, dest);
67                
68                // done
69
break;
70             }
71             catch (Throwable JavaDoc t)
72             {
73                if (i < parameters.recoveryRetries)
74                   cat.warn("Error restoring queue " + queue + " retries=" + i + " of " + parameters.recoveryRetries, t);
75                else
76                   error = t;
77                try
78                {
79                   queue.stop();
80                }
81                catch (Throwable JavaDoc ignored)
82                {
83                   cat.trace("Ignored error stopping queue " + queue, ignored);
84                }
85                finally
86                {
87                   queue = null;
88                }
89             }
90          }
91          
92          if (error != null)
93             SpyJMSException.rethrowAsJMSException("Unable to recover queue " + dest + " retries=" + parameters.recoveryRetries, error);
94       }
95       else
96       {
97          // create queue
98
queue = new BasicQueue(server, destination.toString(), parameters);
99       }
100
101       // create queue message counter
102
queue.createMessageCounter(dest.getName(), null, false, false, parameters.messageCounterHistoryDayLimit);
103    }
104
105    public void addSubscriber(Subscription sub) throws JMSException JavaDoc
106    {
107       queue.addSubscriber(sub);
108    }
109
110    public void removeSubscriber(Subscription sub)
111    {
112       queue.removeSubscriber(sub);
113    }
114
115    public void nackMessages(Subscription sub)
116    {
117       queue.nackMessages(sub);
118    }
119
120    public void addReceiver(Subscription sub) throws JMSException JavaDoc
121    {
122       queue.addReceiver(sub);
123    }
124
125    public void removeReceiver(Subscription sub)
126    {
127       queue.removeReceiver(sub);
128    }
129
130    public void restoreMessage(MessageReference messageRef)
131    {
132       try
133       {
134          SpyMessage spyMessage = messageRef.getMessage();
135          updateNextMessageId(spyMessage);
136          messageRef.queue = queue;
137          queue.restoreMessage(messageRef);
138       }
139       catch (JMSException JavaDoc e)
140       {
141          cat.error("Could not restore message:", e);
142       }
143    }
144
145    public void restoreMessage(SpyMessage message, Tx txid, int type)
146    {
147       try
148       {
149          updateNextMessageId(message);
150          MessageReference messageRef = server.getMessageCache().add(message, queue, MessageReference.STORED);
151          queue.restoreMessage(messageRef, txid, type);
152       }
153       catch (JMSException JavaDoc e)
154       {
155          cat.error("Could not restore message:", e);
156       }
157    }
158
159    public SpyMessage[] browse(String JavaDoc selector) throws JMSException JavaDoc
160    {
161       return queue.browse(selector);
162    }
163
164    public String JavaDoc toString()
165    {
166       return "JMSDestination:" + destination;
167    }
168
169    public void acknowledge(org.jboss.mq.AcknowledgementRequest req, Subscription sub, org.jboss.mq.pm.Tx txId)
170       throws JMSException JavaDoc
171    {
172       queue.acknowledge(req, txId);
173    }
174
175    public void addMessage(SpyMessage mes, org.jboss.mq.pm.Tx txId) throws JMSException JavaDoc
176    {
177       //Number the message so that we can preserve order of delivery.
178
mes.header.messageId = nextMessageId();
179       MessageReference message = server.getMessageCache().add(mes, queue, MessageReference.NOT_STORED);
180       queue.addMessage(message, txId);
181    }
182
183    public org.jboss.mq.SpyMessage receive(org.jboss.mq.Subscription sub, boolean wait) throws javax.jms.JMSException JavaDoc
184    {
185       return queue.receive(sub, wait);
186    }
187
188    /*
189     * @see JMSDestination#isInUse()
190     */

191    public boolean isInUse()
192    {
193       return queue.isInUse();
194    }
195
196    /*
197     * @see JMSDestination#close()
198     */

199    public void close() throws JMSException JavaDoc
200    {
201       queue.stop();
202       server.getPersistenceManager().closeQueue(this, getSpyDestination());
203    }
204
205    /**
206     * @see JMSDestination#destroy()
207     */

208    public void removeAllMessages() throws JMSException JavaDoc
209    {
210       queue.removeAllMessages();
211    }
212
213    /**
214     * Get message counter of internal queue
215     *
216     * @return MessageCounter[] internal queue message counter
217     */

218    public MessageCounter[] getMessageCounter()
219    {
220       ArrayList JavaDoc array = new ArrayList JavaDoc();
221
222       MessageCounter counter = queue.getMessageCounter();
223
224       if (counter != null)
225          array.add(counter);
226
227       return (MessageCounter[]) array.toArray(new MessageCounter[0]);
228    }
229 }
230
Popular Tags