KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > caucho > jms > session > MessageConsumerImpl


1 /*
2  * Copyright (c) 1998-2006 Caucho Technology -- all rights reserved
3  *
4  * This file is part of Resin(R) Open Source
5  *
6  * Each copy or derived work must preserve the copyright notice and this
7  * notice unmodified.
8  *
9  * Resin Open Source is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * Resin Open Source is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
17  * of NON-INFRINGEMENT. See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with Resin Open Source; if not, write to the
22  * Free SoftwareFoundation, Inc.
23  * 59 Temple Place, Suite 330
24  * Boston, MA 02111-1307 USA
25  *
26  * @author Scott Ferguson
27  */

28
29 package com.caucho.jms.session;
30
31 import com.caucho.jms.AbstractDestination;
32 import com.caucho.jms.message.MessageImpl;
33 import com.caucho.jms.selector.Selector;
34 import com.caucho.jms.selector.SelectorParser;
35 import com.caucho.log.Log;
36 import com.caucho.util.Alarm;
37 import com.caucho.util.L10N;
38
39 import javax.jms.JMSException JavaDoc;
40 import javax.jms.Message JavaDoc;
41 import javax.jms.MessageConsumer JavaDoc;
42 import javax.jms.MessageListener JavaDoc;
43 import javax.jms.Session JavaDoc;
44 import java.util.logging.Logger JavaDoc;
45
46 /**
47  * A basic message consumer.
48  */

49 public class MessageConsumerImpl
50   implements MessageConsumer JavaDoc, MessageAvailableListener {
51   static final Logger JavaDoc log = Log.open(MessageConsumerImpl.class);
52   static final L10N L = new L10N(MessageConsumerImpl.class);
53
54   private final Object JavaDoc _consumerLock = new Object JavaDoc();
55   
56   protected SessionImpl _session;
57   private AbstractDestination _queue;
58   private MessageListener JavaDoc _messageListener;
59   private String JavaDoc _messageSelector;
60   protected Selector _selector;
61   private boolean _noLocal;
62   
63   private volatile boolean _isClosed;
64
65   protected MessageConsumerImpl(SessionImpl session,
66                                 String JavaDoc messageSelector,
67                 AbstractDestination queue,
68                                 boolean noLocal)
69     throws JMSException JavaDoc
70   {
71     _session = session;
72     _queue = queue;
73     _messageSelector = messageSelector;
74     if (_messageSelector != null) {
75       SelectorParser parser = new SelectorParser();
76       _selector = parser.parse(messageSelector);
77     }
78     _noLocal = noLocal;
79
80     _queue.addListener(this);
81   }
82
83   /**
84    * Returns true if local messages are not sent.
85    */

86   public boolean getNoLocal()
87   {
88     return _noLocal;
89   }
90   
91   /**
92    * Returns the message listener
93    */

94   public MessageListener JavaDoc getMessageListener()
95   {
96     return _messageListener;
97   }
98
99   /**
100    * Sets the message listener
101    */

102   public void setMessageListener(MessageListener JavaDoc listener)
103   {
104     _messageListener = listener;
105     _session.setAsynchronous();
106   }
107
108   /**
109    * Returns the message consumer's selector.
110    */

111   public String JavaDoc getMessageSelector()
112   {
113     return _messageSelector;
114   }
115
116   /**
117    * Returns the parsed selector.
118    */

119   public Selector getSelector()
120   {
121     return _selector;
122   }
123
124   /**
125    * Returns true if active
126    */

127   public boolean isActive()
128   {
129     return _session.isActive() && ! _isClosed;
130   }
131
132   /**
133    * Returns true if closed
134    */

135   public boolean isClosed()
136   {
137     return _isClosed;
138   }
139
140   /**
141    * Receives the next message, blocking until a message is available.
142    */

143   public Message JavaDoc receive()
144     throws JMSException JavaDoc
145   {
146     return receive(Long.MAX_VALUE);
147   }
148
149   /**
150    * Called to synchronously receive a message.
151    */

152   public Message JavaDoc receive(long timeout)
153     throws JMSException JavaDoc
154   {
155     _session.checkOpen();
156     
157     if (Long.MAX_VALUE / 2 < timeout || timeout < 0)
158       timeout = Long.MAX_VALUE / 2;
159     
160     long now = Alarm.getCurrentTime();
161     long expireTime = Alarm.getCurrentTime() + timeout;
162     
163     // 4.4.1 user's reponsibility
164
// checkThread();
165

166     while (! isClosed()) {
167       Message JavaDoc msg = receiveNoWait();
168       if (msg != null)
169     return msg;
170       
171       long delta = expireTime - Alarm.getCurrentTime();
172
173       if (delta <= 0 || _isClosed || Alarm.isTest())
174     return null;
175
176       synchronized (_consumerLock) {
177     try {
178       _consumerLock.wait(delta);
179     } catch (Throwable JavaDoc e) {
180     }
181       }
182     }
183
184     return null;
185   }
186
187   /**
188    * Receives a message from the queue.
189    */

190   public Message JavaDoc receiveNoWait()
191     throws JMSException JavaDoc
192   {
193     if (_isClosed)
194       throw new javax.jms.IllegalStateException JavaDoc(L.l("can't receive when consumer is closed"));
195     
196     if (! _session.isActive())
197       return null;
198
199     MessageImpl msg = receiveImpl();
200
201     if (msg == null)
202       return null;
203
204     switch (_session.getAcknowledgeMode()) {
205     case Session.CLIENT_ACKNOWLEDGE:
206       msg.setSession(_session);
207       break;
208     
209     case Session.AUTO_ACKNOWLEDGE:
210     case Session.DUPS_OK_ACKNOWLEDGE:
211       acknowledge();
212       break;
213
214     default:
215       // transacted
216
break;
217     }
218
219     return msg;
220   }
221
222   /**
223    * Receives the next message, if one is available
224    */

225   protected MessageImpl receiveImpl()
226     throws JMSException JavaDoc
227   {
228     throw new UnsupportedOperationException JavaDoc();
229   }
230
231   /**
232    * acknowledge any received messages.
233    */

234   public void acknowledge()
235     throws JMSException JavaDoc
236   {
237   }
238
239   /**
240    * rollback any received messages.
241    */

242   public void rollback()
243     throws JMSException JavaDoc
244   {
245   }
246
247   /**
248    * Closes the consumer.
249    */

250   public void close()
251     throws JMSException JavaDoc
252   {
253     _isClosed = true;
254     _queue.removeListener(this);
255     // XXX: remove session?
256
// _session.removeListener(this);
257
}
258
259   /**
260    * Called when a new message is available.
261    */

262   public void messageAvailable()
263   {
264     _session.notifyListener();
265     synchronized (_consumerLock) {
266       try {
267     _consumerLock.notify();
268       } catch (Throwable JavaDoc e) {
269       }
270     }
271   }
272 }
273
274
Popular Tags