KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > caucho > jms > memory > MemoryQueue


1 /*
2  * Copyright (c) 1998-2006 Caucho Technology -- all rights reserved
3  *
4  * This file is part of Resin(R) Open Source
5  *
6  * Each copy or derived work must preserve the copyright notice and this
7  * notice unmodified.
8  *
9  * Resin Open Source is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * Resin Open Source is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
17  * of NON-INFRINGEMENT. See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with Resin Open Source; if not, write to the
22  *
23  * Free Software Foundation, Inc.
24  * 59 Temple Place, Suite 330
25  * Boston, MA 02111-1307 USA
26  *
27  * @author Scott Ferguson
28  */

29
30 package com.caucho.jms.memory;
31
32 import com.caucho.jms.AbstractDestination;
33 import com.caucho.jms.message.MessageImpl;
34 import com.caucho.jms.selector.Selector;
35 import com.caucho.jms.session.SessionImpl;
36 import com.caucho.log.Log;
37 import com.caucho.util.L10N;
38
39 import javax.jms.JMSException JavaDoc;
40 import javax.jms.Message JavaDoc;
41 import javax.jms.MessageConsumer JavaDoc;
42 import javax.jms.Queue JavaDoc;
43 import java.util.ArrayList JavaDoc;
44 import java.util.Enumeration JavaDoc;
45 import java.util.logging.Level JavaDoc;
46 import java.util.logging.Logger JavaDoc;
47
48 /**
49  * A basic queue.
50  */

51 public class MemoryQueue extends AbstractDestination
52   implements Queue JavaDoc {
53   static final Logger JavaDoc log = Log.open(MemoryQueue.class);
54   static final L10N L = new L10N(MemoryQueue.class);
55
56   ArrayList JavaDoc<Item> _queue = new ArrayList JavaDoc<Item>();
57
58   private String JavaDoc _queueName;
59   private Selector _selector;
60
61   private long _queueId;
62
63   private int _consumerId;
64
65   public MemoryQueue()
66   {
67   }
68
69   /**
70    * Returns the queue's name.
71    */

72   public String JavaDoc getQueueName()
73   {
74     return _queueName;
75   }
76
77   /**
78    * Sets the queue's name.
79    */

80   public void setQueueName(String JavaDoc name)
81   {
82     _queueName = name;
83   }
84
85   public void setSelector(Selector selector)
86   {
87     _selector = selector;
88   }
89
90   public Selector getSelector()
91   {
92     return _selector;
93   }
94
95   /**
96    * Generates the next queue id.
97    */

98   public long generateQueueId()
99   {
100     return ++_queueId;
101   }
102
103   /**
104    * Generates the next consumer id.
105    */

106   public int generateConsumerId()
107   {
108     return ++_consumerId;
109   }
110
111   public void send(Message JavaDoc message)
112     throws JMSException JavaDoc
113   {
114     if (_selector != null && ! _selector.isMatch(message))
115       return;
116
117     long sequenceId = nextConsumerSequenceId();
118
119     if (log.isLoggable(Level.FINE))
120       log.fine("MemoryQueue[" + _queueName + "] send " + sequenceId);
121
122     synchronized (_queue) {
123       _queue.add(new Item(generateQueueId(), (MessageImpl) message));
124       _queue.notify();
125     }
126
127     messageAvailable();
128   }
129   
130   /**
131    * Creates a consumer.
132    *
133    * @param session the owning session
134    * @param selector the consumer's selector
135    * @param noLocal true if pub/sub should not send local requests
136    */

137   public MessageConsumer JavaDoc createConsumer(SessionImpl session,
138                     String JavaDoc selector,
139                     boolean noLocal)
140     throws JMSException JavaDoc
141   {
142     return new MemoryQueueConsumer(session, selector, this);
143   }
144
145   /**
146    * Removes the first message matching the selector.
147    */

148   MessageImpl receive(Selector selector, long consumerId, boolean autoAck)
149     throws JMSException JavaDoc
150   {
151     synchronized (_queue) {
152       int i;
153       int size = _queue.size();
154
155       for (i = 0; i < size; i++) {
156     Item item = _queue.get(i);
157
158     if (item.getConsumerId() >= 0)
159       continue;
160     
161     MessageImpl message = item.getMessage();
162
163     if (selector == null || selector.isMatch(message)) {
164       message.setJMSRedelivered(item.getDelivered());
165
166       if (autoAck)
167         _queue.remove(i);
168       else
169         item.setConsumerId(consumerId);
170
171       return message;
172     }
173       }
174     }
175
176     return null;
177   }
178
179   /**
180    * Rolls back messages for the consumer.
181    */

182   void rollback(long consumerId)
183     throws JMSException JavaDoc
184   {
185     synchronized (_queue) {
186       for (int i = _queue.size() -1; i >= 0; i--) {
187     Item item = _queue.get(i);
188
189     if (item.getConsumerId() == consumerId)
190       item.setConsumerId(-1);
191       }
192     }
193   }
194
195   /**
196    * Acknowledge messages for the consumer.
197    */

198   void acknowledge(long consumerId, long messageId)
199     throws JMSException JavaDoc
200   {
201     synchronized (_queue) {
202       for (int i = _queue.size() -1; i >= 0; i--) {
203     Item item = _queue.get(i);
204
205     if (item.getConsumerId() == consumerId)
206       _queue.remove(i);
207       }
208     }
209   }
210
211   /**
212    * Returns a browser.
213    */

214   public MemoryQueueBrowser createBrowser(SessionImpl session,
215                       String JavaDoc selector)
216     throws JMSException JavaDoc
217   {
218     return new MemoryQueueBrowser(session, this, selector);
219   }
220
221   /**
222    * Returns an enumeration of the matching messages.
223    */

224   public Enumeration JavaDoc getEnumeration(Selector selector)
225   {
226     return new BrowserEnumeration(this, selector);
227   }
228
229   /**
230    * Removes the first message matching the selector.
231    */

232   private boolean hasMessage(Selector selector)
233     throws JMSException JavaDoc
234   {
235     synchronized (_queue) {
236       int i;
237       int size = _queue.size();
238
239       for (i = 0; i < size; i++) {
240     Item item = _queue.get(i);
241
242     if (item.getConsumerId() >= 0)
243       continue;
244
245     Message JavaDoc message = item.getMessage();
246
247     if (selector == null || selector.isMatch(message))
248       return true;
249       }
250     }
251
252     return false;
253   }
254
255   /**
256    * Returns the id of the first message matching the selector.
257    */

258   private long nextId(Selector selector, long id)
259     throws JMSException JavaDoc
260   {
261     synchronized (_queue) {
262       int i;
263       int size = _queue.size();
264
265       for (i = 0; i < size; i++) {
266     Item item = _queue.get(i);
267     
268     if (item.getConsumerId() >= 0)
269       continue;
270
271     else if (item.getId() < id)
272       continue;
273
274     Message JavaDoc message = item.getMessage();
275
276     if (selector == null || selector.isMatch(message))
277       return item.getId();
278       }
279     }
280
281     return Long.MAX_VALUE;
282   }
283
284   /**
285    * Returns the id of the first message matching the selector.
286    */

287   private Message JavaDoc nextValue(Selector selector, long id)
288     throws JMSException JavaDoc
289   {
290     synchronized (_queue) {
291       int i;
292       int size = _queue.size();
293
294       for (i = 0; i < size; i++) {
295     Item item = _queue.get(i);
296
297     if (item.getConsumerId() >= 0)
298       continue;
299
300     else if (item.getId() < id)
301       continue;
302
303     Message JavaDoc message = item.getMessage();
304
305     if (selector == null || selector.isMatch(message))
306       return message;
307       }
308     }
309
310     return null;
311   }
312
313   /**
314    * Returns a printable view of the queue.
315    */

316   public String JavaDoc toString()
317   {
318     return "MemoryQueue[" + _queueName + "]";
319   }
320
321   static class BrowserEnumeration implements Enumeration JavaDoc {
322     private MemoryQueue _queue;
323     private Selector _selector;
324     private long _id = -1;
325
326     BrowserEnumeration(MemoryQueue queue, Selector selector)
327     {
328       _queue = queue;
329       _selector = selector;
330     }
331
332     public boolean hasMoreElements()
333     {
334       try {
335     if (_id < 0)
336       _id = _queue.nextId(_selector, _id);
337     
338     return (_id < Long.MAX_VALUE);
339       } catch (Exception JavaDoc e) {
340     throw new RuntimeException JavaDoc(e);
341       }
342     }
343
344     public Object JavaDoc nextElement()
345     {
346       try {
347     // ejb/6110
348
if (_id < 0)
349       _id = _queue.nextId(_selector, _id);
350
351     Object JavaDoc value = _queue.nextValue(_selector, _id);
352
353     _id = _queue.nextId(_selector, _id + 1);
354     
355     return value;
356       } catch (Exception JavaDoc e) {
357     throw new RuntimeException JavaDoc(e);
358       }
359     }
360   }
361
362   static class Item {
363     private MessageImpl _msg;
364     private long _id;
365     private long _consumerId = -1;
366     private boolean _delivered;
367
368     Item(long id, MessageImpl msg)
369     {
370       _id = id;
371       
372       _msg = msg;
373     }
374
375     MessageImpl getMessage()
376     {
377       return _msg;
378     }
379
380     long getId()
381     {
382       return _id;
383     }
384
385     long getConsumerId()
386     {
387       return _consumerId;
388     }
389
390     void setConsumerId(long consumerId)
391     {
392       _consumerId = consumerId;
393       _delivered = true;
394     }
395
396     boolean getDelivered()
397     {
398       return _delivered;
399     }
400   }
401 }
402
403
Popular Tags