KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > mq > SpyConnectionConsumer


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.mq;
23
24 import java.util.ArrayList JavaDoc;
25 import java.util.LinkedList JavaDoc;
26
27 import javax.jms.ConnectionConsumer JavaDoc;
28 import javax.jms.Destination JavaDoc;
29 import javax.jms.JMSException JavaDoc;
30 import javax.jms.ServerSession JavaDoc;
31 import javax.jms.ServerSessionPool JavaDoc;
32
33 import org.jboss.logging.Logger;
34
35 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
36
37 /**
38  * This class implements javax.jms.ConnectionConsumer
39  *
40  * @author Hiram Chirino (Cojonudo14@hotmail.com)
41  * @author <a HREF="mailto:adrian@jboss.org">Adrian Brock</a>
42  * @version $Revision: 37459 $
43  */

44 public class SpyConnectionConsumer implements ConnectionConsumer JavaDoc, SpyConsumer, Runnable JavaDoc
45 {
46    // Constants -----------------------------------------------------
47

48    /** The log */
49    static Logger log = Logger.getLogger(SpyConnectionConsumer.class);
50
51    /** Whether trace is enabled */
52    static boolean trace = log.isTraceEnabled();
53    
54    // Attributes ----------------------------------------------------
55

56    /** The connection is the consumer was created with */
57    Connection connection;
58    /** The destination this consumer will receive messages from */
59    Destination JavaDoc destination;
60    /** The ServerSessionPool that is implemented by the AS */
61    javax.jms.ServerSessionPool JavaDoc serverSessionPool;
62    /** The maximum number of messages that a single session will be loaded with. */
63    int maxMessages;
64    /** This queue will hold messages until they are dispatched to the
65    MessageListener */

66    LinkedList JavaDoc queue = new LinkedList JavaDoc();
67    /** Is the ConnectionConsumer closed? */
68    boolean closed = false;
69    /** Whether we are waiting for a message */
70    boolean waitingForMessage = false;
71    /** The subscription info the consumer */
72    Subscription subscription = new Subscription();
73    /** The "listening" thread that gets messages from destination and queues
74    them for delivery to sessions */

75    Thread JavaDoc internalThread;
76    /** The thread id */
77    int id;
78    /** The thread id generator */
79    static SynchronizedInt threadId = new SynchronizedInt(0);
80    
81    // Static --------------------------------------------------------
82

83    // Constructors --------------------------------------------------
84

85    /**
86     * SpyConnectionConsumer constructor
87     *
88     * @param connection the connection
89     * @param destination destination
90     * @param messageSelector the message selector
91     * @param serverSessionPool the server session pool
92     * @param maxMessages the maxmimum messages
93     * @exception JMSException for any error
94     */

95    public SpyConnectionConsumer(Connection connection, Destination JavaDoc destination, String JavaDoc messageSelector,
96                                 ServerSessionPool JavaDoc serverSessionPool, int maxMessages) throws JMSException JavaDoc
97    {
98       trace = log.isTraceEnabled();
99       
100       this.connection = connection;
101       this.destination = destination;
102       this.serverSessionPool = serverSessionPool;
103       this.maxMessages = maxMessages;
104       if (this.maxMessages < 1)
105          this.maxMessages = 1;
106
107       subscription.destination = (SpyDestination) destination;
108       subscription.messageSelector = messageSelector;
109       subscription.noLocal = false;
110
111       connection.addConsumer(this);
112       id = threadId.increment();
113       internalThread = new Thread JavaDoc(this, "Connection Consumer for dest " + subscription + " id=" + id);
114       internalThread.start();
115
116       if (trace)
117          log.trace("New " + this);
118    }
119    
120    // Public --------------------------------------------------------
121

122    /**
123     * Get the subscription
124     *
125     * @return the subscription
126     */

127    public Subscription getSubscription()
128    {
129       return subscription;
130    }
131
132    /**
133     * Add a message
134     *
135     * @mes the message
136     * @throws JMSException for any error
137     */

138    public void addMessage(SpyMessage mes) throws JMSException JavaDoc
139    {
140       synchronized (queue)
141       {
142          if (closed)
143          {
144             if (trace)
145                log.trace("Consumer close nacking message=" + mes.header.jmsMessageID + " " + this);
146             log.warn("NACK issued. The connection consumer was closed.");
147             connection.send(mes.getAcknowledgementRequest(false));
148             return;
149          }
150
151          if (trace)
152             log.trace("Add message=" + mes.header.jmsMessageID + " " + this);
153          
154          if (waitingForMessage)
155          {
156             queue.addLast(mes);
157             queue.notifyAll();
158          }
159          else
160          {
161             if (trace)
162                log.trace("Consumer not waiting nacking message=" + mes.header.jmsMessageID + " " + this);
163             connection.send(mes.getAcknowledgementRequest(false));
164          }
165       }
166    }
167    
168    // ConnectionConsumer implementation -----------------------------
169

170    public ServerSessionPool JavaDoc getServerSessionPool() throws JMSException JavaDoc
171    {
172       return serverSessionPool;
173    }
174
175    public void close() throws javax.jms.JMSException JavaDoc
176    {
177       synchronized (queue)
178       {
179          if (closed)
180             return;
181
182          closed = true;
183          queue.notifyAll();
184       }
185
186       if (trace)
187          log.trace("Close " + this);
188
189       if (internalThread != null && !internalThread.equals(Thread.currentThread()))
190       {
191          try
192          {
193
194             if (trace)
195                log.trace("Joining thread " + this);
196             internalThread.join();
197          }
198          catch (InterruptedException JavaDoc e)
199          {
200             if (trace)
201                log.trace("Ignoring interrupting while joining thread " + this);
202          }
203       }
204       synchronized (queue)
205       {
206          if (trace)
207             log.trace("Nacking messages on queue " + this);
208          try
209          {
210             while (queue.isEmpty() == false)
211             {
212                SpyMessage message = (SpyMessage) queue.removeFirst();
213                connection.send(message.getAcknowledgementRequest(false));
214             }
215          }
216          catch (Throwable JavaDoc ignore)
217          {
218             if (trace)
219                log.trace("Ignoring error nacking messages in queue " + this, ignore);
220          }
221          try
222          {
223             connection.removeConsumer(this);
224          }
225          catch (Throwable JavaDoc ignore)
226          {
227             if (trace)
228                log.trace("Ignoring error removing consumer from connection " + this, ignore);
229          }
230       }
231    }
232    
233    // Runnable implementation ---------------------------------------
234

235    public void run()
236    {
237       ArrayList JavaDoc mesList = new ArrayList JavaDoc();
238       try
239       {
240          outer : while (true)
241          {
242             synchronized (queue)
243             {
244                if (closed)
245                {
246                   if (trace)
247                      log.trace("run() closed " + this);
248                   break outer;
249                }
250             }
251
252             for (int i = 0; i < maxMessages; i++)
253             {
254                SpyMessage mes = connection.receive(subscription, -1);
255                if (mes == null)
256                {
257                   if (trace)
258                      log.trace("run() receivedNoWait got no message" + this);
259                   break;
260                }
261                else
262                {
263                   if (trace)
264                      log.trace("run() receivedNoWait message=" + mes.header.jmsMessageID + " " + this);
265                   mesList.add(mes);
266                }
267             }
268
269             if (mesList.isEmpty())
270             {
271                SpyMessage mes = null;
272                synchronized (queue)
273                {
274                   mes = connection.receive(subscription, 0);
275                   if (mes == null)
276                   {
277                      waitingForMessage = true;
278                      while (queue.isEmpty() && !closed)
279                      {
280                         if (trace)
281                            log.trace("run() waiting for message " + this);
282                         try
283                         {
284                            queue.wait();
285                         }
286                         catch (InterruptedException JavaDoc e)
287                         {
288                            if (trace)
289                               log.trace("Ignoring interruption waiting for message " + this, e);
290                         }
291                      }
292                      if (closed)
293                      {
294                         if (trace)
295                            log.trace("run() closed while waiting " + this);
296                         waitingForMessage = false;
297                         break outer;
298                      }
299                      mes = (SpyMessage) queue.removeFirst();
300                      waitingForMessage = false;
301                      if (trace)
302                         log.trace("run() got message message=" + mes.header.jmsMessageID + " " + this);
303                   }
304                }
305                mesList.add(mes);
306             }
307
308             if (trace)
309                log.trace("Waiting for serverSesionPool " + this);
310             ServerSession JavaDoc serverSession = serverSessionPool.getServerSession();
311             SpySession spySession = (SpySession) serverSession.getSession();
312             if (trace)
313                log.trace("Waited serverSesion=" + serverSession + " session=" + spySession + " " + this);
314             
315             if (spySession.sessionConsumer == null)
316             {
317                if (trace)
318                   log.trace("Session did not have a set MessageListner " + spySession + " " + this);
319             }
320             else
321             {
322                spySession.sessionConsumer.subscription = subscription;
323             }
324
325             for (int i = 0; i < mesList.size(); i++)
326                spySession.addMessage((SpyMessage) mesList.get(i));
327
328             if (trace)
329                log.trace(" Starting the ServerSession=" + serverSession + " " + this);
330             serverSession.start();
331             mesList.clear();
332          }
333       }
334       catch (Throwable JavaDoc t)
335       {
336          log.warn("Connection consumer closing due to error in listening thread " + this, t);
337          try
338          {
339             for (int i = 0; i < mesList.size(); i++)
340             {
341                SpyMessage msg = (SpyMessage) mesList.get(i);
342                connection.send(msg.getAcknowledgementRequest(false));
343             }
344          }
345          catch (Throwable JavaDoc ignore)
346          {
347             if (trace)
348                log.trace("Ignoring error nacking message " + this, ignore);
349          }
350          try
351          {
352             close();
353          }
354          catch (Throwable JavaDoc ignore)
355          {
356             if (trace)
357                log.trace("Ignoring error during close " + this, ignore);
358          }
359       }
360    }
361
362    // Object overrides ----------------------------------------------
363

364    public String JavaDoc toString()
365    {
366       StringBuffer JavaDoc buffer = new StringBuffer JavaDoc(100);
367       buffer.append("SpyConnectionConsumer[sub=").append(subscription);
368       if (closed)
369          buffer.append(" CLOSED");
370       buffer.append(" messages=").append(queue.size());
371       buffer.append(" waitingForMessage=").append(waitingForMessage);
372       if (internalThread != null)
373          buffer.append(" internalThread=").append(internalThread);
374       buffer.append(" sessionPool=").append(serverSessionPool);
375       buffer.append(" connection=").append(connection);
376       buffer.append(']');
377       return buffer.toString();
378    }
379    
380    // Package protected ---------------------------------------------
381

382    // Protected -----------------------------------------------------
383

384    // Private -------------------------------------------------------
385

386    // Inner classes -------------------------------------------------
387
}
Popular Tags