KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > mq > server > ClientConsumer


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.mq.server;
23
24 import java.util.HashMap JavaDoc;
25 import java.util.Iterator JavaDoc;
26 import java.util.LinkedList JavaDoc;
27
28 import javax.jms.InvalidDestinationException JavaDoc;
29 import javax.jms.JMSException JavaDoc;
30
31 import org.jboss.logging.Logger;
32 import org.jboss.util.threadpool.ThreadPool;
33 import org.jboss.mq.AcknowledgementRequest;
34 import org.jboss.mq.ConnectionToken;
35 import org.jboss.mq.ReceiveRequest;
36 import org.jboss.mq.SpyMessage;
37 import org.jboss.mq.Subscription;
38
39 /**
40  * This represent the clients queue which consumes messages from the
41  * destinations on the provider.
42  *
43  * @author Hiram Chirino (Cojonudo14@hotmail.com)
44  * @author <a HREF="mailto:pra@tim.se">Peter Antman</a>
45  * @created August 16, 2001
46  * @version $Revision: 37459 $
47  */

48 public class ClientConsumer implements Runnable JavaDoc
49 {
50    private static Logger log = Logger.getLogger(ClientConsumer.class);
51    //The JMSServer object
52
JMSDestinationManager server;
53    //The connection this queue will send messages over
54
ConnectionToken connectionToken;
55    //Is this connection enabled (Can we transmit to the receiver)
56
boolean enabled;
57    //Has this connection been closed?
58
boolean closed = false;
59    //Maps a subscription id to a Subscription
60
HashMap JavaDoc subscriptions = new HashMap JavaDoc();
61    //Maps a subscription id to a Subscription for subscriptions that have finished receiving
62
HashMap JavaDoc removedSubscriptions = new HashMap JavaDoc();
63
64    LinkedList JavaDoc blockedSubscriptions = new LinkedList JavaDoc();
65
66    //List of messages waiting to be transmitted to the client
67
private LinkedList JavaDoc messages = new LinkedList JavaDoc();
68
69    /**
70     * Flags that I am enqueued as work on my thread pool.
71     */

72    private boolean enqueued = false;
73
74    // Static ---------------------------------------------------
75

76    /**
77     * The {@link org.jboss.util.threadpool.ThreadPool} that
78     * does the actual message pushing for us.
79     */

80    private ThreadPool threadPool = null;
81
82    // Constructor ---------------------------------------------------
83

84    public ClientConsumer(JMSDestinationManager server, ConnectionToken connectionToken) throws JMSException JavaDoc
85    {
86       this.server = server;
87       this.connectionToken = connectionToken;
88       this.threadPool = server.getThreadPool();
89    }
90
91    public void setEnabled(boolean enabled) throws JMSException JavaDoc
92    {
93       if (log.isTraceEnabled())
94          log.trace("" + this +"->setEnabled(enabled=" + enabled + ")");
95
96       // queues might be waiting for messages.
97
synchronized (blockedSubscriptions)
98       {
99          this.enabled = enabled;
100          if (enabled)
101          {
102             for (Iterator JavaDoc it = blockedSubscriptions.iterator(); it.hasNext();)
103             {
104                Subscription sub = (Subscription) it.next();
105                JMSDestination dest = server.getJMSDestination(sub.destination);
106                if (dest != null)
107                   dest.addReceiver(sub);
108             }
109             blockedSubscriptions.clear();
110          }
111       }
112    }
113
114    public void queueMessageForSending(RoutedMessage r)
115    {
116
117       synchronized (messages)
118       {
119          if (closed)
120             return; // Wouldn't be delivered anyway
121

122          messages.add(r);
123          if (!enqueued)
124          {
125             threadPool.run(this);
126             enqueued = true;
127          }
128       }
129    }
130
131    public void addSubscription(Subscription req) throws JMSException JavaDoc
132    {
133       if (log.isTraceEnabled())
134          log.trace("Adding subscription for: " + req);
135       req.connectionToken = connectionToken;
136       req.clientConsumer = this;
137
138       JMSDestination jmsdest = server.getJMSDestination(req.destination);
139       if (jmsdest == null)
140          throw new InvalidDestinationException JavaDoc("The destination " + req.destination + " does not exist !");
141
142       jmsdest.addSubscriber(req);
143
144       synchronized (subscriptions)
145       {
146          subscriptions.put(new Integer JavaDoc(req.subscriptionId), req);
147       }
148    }
149
150    public void close()
151    {
152       boolean trace = log.isTraceEnabled();
153       if (trace)
154          log.trace("" + this +"->close()");
155
156       synchronized (messages)
157       {
158          closed = true;
159          if (enqueued)
160          {
161             enqueued = false;
162          }
163          messages.clear();
164       }
165
166       // Remove all the subscriptions for this client
167
HashMap JavaDoc subscriptionsClone = null;
168       synchronized (subscriptions)
169       {
170          subscriptionsClone = (HashMap JavaDoc) subscriptions.clone();
171       }
172       Iterator JavaDoc i = subscriptionsClone.keySet().iterator();
173       while (i.hasNext())
174       {
175          Integer JavaDoc subscriptionId = (Integer JavaDoc) i.next();
176          try
177          {
178             removeSubscription(subscriptionId.intValue());
179          }
180          catch (JMSException JavaDoc ignore)
181          {
182          }
183       }
184
185       // Nack the removed subscriptions, the connection is gone
186
HashMap JavaDoc removedSubsClone = null;
187       synchronized (subscriptions)
188       {
189          removedSubsClone = (HashMap JavaDoc) removedSubscriptions.clone();
190       }
191       i = removedSubsClone.values().iterator();
192       while (i.hasNext())
193       {
194          Subscription removed = (Subscription) i.next();
195          JMSDestination queue = server.getJMSDestination(removed.destination);
196          if (queue == null)
197             log.warn("The subscription was registered with a destination that does not exist: " + removed);
198          else
199          {
200             try
201             {
202                queue.nackMessages(removed);
203             }
204             catch (JMSException JavaDoc e)
205             {
206                log.warn("Unable to nack removed subscription: " + removed, e);
207             }
208          }
209          // Make sure the subscription is gone
210
removeRemovedSubscription(removed.subscriptionId);
211       }
212    }
213
214    public SpyMessage receive(int subscriberId, long wait) throws JMSException JavaDoc
215    {
216       Subscription req = getSubscription(subscriberId);
217       if (req == null)
218       {
219          throw new JMSException JavaDoc("The provided subscription does not exist");
220       }
221
222       JMSDestination queue = server.getJMSDestination(req.destination);
223       if (queue == null)
224          throw new InvalidDestinationException JavaDoc("The subscription's destination " + req.destination + " does not exist");
225
226       // Block the receiver if we are not enabled and it is not noWait, otherwise receive a message
227
if (addBlockedSubscription(req, wait))
228          return queue.receive(req, (wait != -1));
229
230       return null;
231    }
232
233    public void removeSubscription(int subscriptionId) throws JMSException JavaDoc
234    {
235       if (log.isTraceEnabled())
236          log.trace("" + this +"->removeSubscription(subscriberId=" + subscriptionId + ")");
237
238       Integer JavaDoc subId = new Integer JavaDoc(subscriptionId);
239       Subscription req;
240       synchronized (subscriptions)
241       {
242          req = (Subscription) subscriptions.remove(subId);
243          if (req != null)
244             removedSubscriptions.put(subId, req);
245       }
246
247       if (req == null)
248          throw new JMSException JavaDoc("The subscription had not been previously registered");
249
250       JMSDestination queue = server.getPossiblyClosingJMSDestination(req.destination);
251       if (queue == null)
252          throw new InvalidDestinationException JavaDoc("The subscription was registered with a destination that does not exist !");
253
254       queue.removeSubscriber(req);
255
256    }
257
258    /**
259     * Push some messages.
260     */

261    public void run()
262    {
263       try
264       {
265
266          ReceiveRequest[] job;
267
268          synchronized (messages)
269          {
270             if (closed)
271                return;
272
273             job = new ReceiveRequest[messages.size()];
274             Iterator JavaDoc iter = messages.iterator();
275             for (int i = 0; iter.hasNext(); i++)
276             {
277                RoutedMessage rm = (RoutedMessage) iter.next();
278                job[i] = rm.toReceiveRequest();
279                iter.remove();
280             }
281             enqueued = false;
282          }
283
284          connectionToken.clientIL.receive(job);
285
286       }
287       catch (Throwable JavaDoc t)
288       {
289          synchronized (messages)
290          {
291             if (closed)
292                log.warn("Could not send messages to a receiver.", t);
293             else
294                log.trace("Could not send messages to a receiver. It is closed.", t);
295          }
296          try
297          {
298             server.connectionFailure(connectionToken);
299          }
300          catch (Throwable JavaDoc ignore)
301          {
302             log.warn("Could not close the client connection..", ignore);
303          }
304       }
305    }
306
307    public String JavaDoc toString()
308    {
309       return "ClientConsumer:" + connectionToken.getClientID();
310    }
311
312    public void acknowledge(AcknowledgementRequest request, org.jboss.mq.pm.Tx txId) throws JMSException JavaDoc
313    {
314       Subscription sub = retrieveSubscription(request.subscriberId);
315
316       if (sub == null)
317       {
318          //might be in removed subscriptions
319
synchronized (subscriptions)
320          {
321             sub = (Subscription) removedSubscriptions.get(new Integer JavaDoc(request.subscriberId));
322          }
323       }
324
325       if (sub == null)
326       {
327          throw new JMSException JavaDoc("The provided subscription does not exist");
328       }
329
330       JMSDestination queue = server.getJMSDestination(sub.destination);
331       if (queue == null)
332          throw new InvalidDestinationException JavaDoc("The subscription's destination " + sub.destination + " does not exist");
333
334       queue.acknowledge(request, sub, txId);
335    }
336
337    boolean addBlockedSubscription(Subscription sub, long wait)
338    {
339       synchronized (blockedSubscriptions)
340       {
341          if (enabled == false && wait != -1)
342             blockedSubscriptions.add(sub);
343          return enabled;
344       }
345    }
346
347    void removeRemovedSubscription(int subId)
348    {
349       Subscription sub = null;
350       synchronized (subscriptions)
351       {
352          sub = (Subscription) removedSubscriptions.remove(new Integer JavaDoc(subId));
353       }
354       if (sub != null)
355       {
356          JMSDestination topic = server.getPossiblyClosingJMSDestination(sub.destination);
357          if (topic != null && topic instanceof JMSTopic)
358              ((JMSTopic) topic).cleanupSubscription(sub);
359       }
360    }
361
362    /**
363     * Get a subscription for the subscriberid
364     *
365     * @exception JMSException if it can not find the subscription.
366     */

367    public Subscription getSubscription(int subscriberId) throws JMSException JavaDoc
368    {
369       Subscription req = retrieveSubscription(subscriberId);
370       if (req == null)
371          throw new JMSException JavaDoc("The provided subscription does not exist");
372
373       return req;
374    }
375
376    /**
377     * Retrieve a subscription for the subscriberid
378     */

379    private Subscription retrieveSubscription(int subscriberId) throws JMSException JavaDoc
380    {
381       Integer JavaDoc id = new Integer JavaDoc(subscriberId);
382       synchronized (subscriptions)
383       {
384          return (Subscription) subscriptions.get(id);
385       }
386    }
387 }
388
Popular Tags