KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > walend > somnifugi > juc > MessageSelectingPriorityBlockingQueue


1 package net.walend.somnifugi.juc;
2
3 import java.util.Comparator JavaDoc;
4 import java.util.Map JavaDoc;
5 import java.util.HashMap JavaDoc;
6 import java.util.List JavaDoc;
7 import java.util.ArrayList JavaDoc;
8 import java.util.Collections JavaDoc;
9
10 import java.util.concurrent.TimeUnit JavaDoc;
11 import java.util.concurrent.PriorityBlockingQueue JavaDoc;
12
13 import java.util.concurrent.locks.ReentrantLock JavaDoc;
14 import java.util.concurrent.locks.Condition JavaDoc;
15
16 import java.util.concurrent.atomic.AtomicMarkableReference JavaDoc;
17
18 import javax.jms.Message JavaDoc;
19 import javax.jms.JMSException JavaDoc;
20
21 import net.walend.somnifugi.SomniMessageSelector;
22 import net.walend.somnifugi.SomniMessageSelectorException;
23 import net.walend.somnifugi.SomniRuntimeException;
24
25 /**
26 A BlockingQueue-like class that lets Message Selectors work.
27
28 @author <a HREF="http://walend.net">David Walend</a> <a HREF="mailto:david@walend.net">david@walend.net</a>
29  */

30
31 class MessageSelectingPriorityBlockingQueue
32 {
33
34     private static final int INITIALCAPACITY = 5;
35     
36     private final PriorityBlockingQueue JavaDoc<AtomicMarkableReference JavaDoc<Message JavaDoc>> masterQueue;
37     
38     //used to guard the map of selectors to queues.
39
private final ReentrantLock JavaDoc lock = new ReentrantLock JavaDoc(true);
40     
41     private final Comparator JavaDoc<AtomicMarkableReference JavaDoc<Message JavaDoc>> refComparator;
42     
43     private final Map JavaDoc<SomniMessageSelector,PriorityBlockingQueue JavaDoc<AtomicMarkableReference JavaDoc<Message JavaDoc>>> messageSelectorsToQueues;
44     
45     MessageSelectingPriorityBlockingQueue(int initialCapacity,Comparator JavaDoc<Message JavaDoc> comparator)
46     {
47         refComparator = new RefComparator(comparator);
48         
49         messageSelectorsToQueues = new HashMap JavaDoc<SomniMessageSelector,PriorityBlockingQueue JavaDoc<AtomicMarkableReference JavaDoc<Message JavaDoc>>>();
50         
51         masterQueue = getQueueForMessageSelector(SomniMessageSelector.ALLMESSAGESELECTOR);
52     }
53     
54     private PriorityBlockingQueue JavaDoc<AtomicMarkableReference JavaDoc<Message JavaDoc>> getQueueForMessageSelector(SomniMessageSelector messageSelector)
55     {
56         try
57         {
58             lock.lock();
59             
60             PriorityBlockingQueue JavaDoc<AtomicMarkableReference JavaDoc<Message JavaDoc>> queue = messageSelectorsToQueues.get(messageSelector);
61             if(queue == null)
62             {
63                 queue = new PriorityBlockingQueue JavaDoc<AtomicMarkableReference JavaDoc<Message JavaDoc>>(INITIALCAPACITY,refComparator);
64                 messageSelectorsToQueues.put(messageSelector,queue);
65                                 
66                 if(messageSelector != SomniMessageSelector.ALLMESSAGESELECTOR)
67                 {
68                     //put all the pending messages that match the selector into the new queue
69
List JavaDoc<AtomicMarkableReference JavaDoc<Message JavaDoc>> list = new ArrayList JavaDoc<AtomicMarkableReference JavaDoc<Message JavaDoc>>(masterQueue);
70     
71                     for(AtomicMarkableReference JavaDoc<Message JavaDoc> ref : list)
72                     {
73                         if(messageSelector.matches(ref.getReference()))
74                         {
75                             queue.add(ref);
76                         }
77                     }
78                 }
79             }
80             
81             return queue;
82         }
83         catch(SomniMessageSelectorException smse)
84         {
85             throw new SomniRuntimeException("While using "+messageSelector,smse);
86         }
87         finally
88         {
89             lock.unlock();
90         }
91     }
92     
93     public void removeMessageSelector(SomniMessageSelector messageSelector)
94     {
95         try
96         {
97             lock.lock();
98             messageSelectorsToQueues.remove(messageSelector);
99         }
100         finally
101         {
102             lock.unlock();
103         }
104     }
105     
106     public void put(Message JavaDoc message)
107     {
108         if (message == null)
109         {
110             throw new NullPointerException JavaDoc("message can not be null.");
111         }
112         AtomicMarkableReference JavaDoc<Message JavaDoc> ref = new AtomicMarkableReference JavaDoc<Message JavaDoc>(message,false);
113             
114         try
115         {
116             lock.lock();
117             
118             //loop through SomniMessageSelectors for matches. If a match, then signal that condition.
119
for(SomniMessageSelector messageSelector : messageSelectorsToQueues.keySet())
120             {
121                 if(messageSelector.matches(message))
122                 {
123                     PriorityBlockingQueue JavaDoc<AtomicMarkableReference JavaDoc<Message JavaDoc>> queue = getQueueForMessageSelector(messageSelector);
124                     queue.put(ref);
125                 }
126             }
127         }
128         catch(SomniMessageSelectorException smse)
129         {
130             throw new SomniRuntimeException("While offering "+message,smse);
131         }
132         finally
133         {
134             lock.unlock();
135         }
136     }
137
138     /**
139     An interesting compromize on the timeout. The method offers to the masterQueue with the timeout, then offers to every other queue with a matching message selector. So the method can take more than timeout milliseconds.
140     
141     @param timeout
142     */

143     public boolean offer(Message JavaDoc message,long timeout,TimeUnit JavaDoc timeUnit)
144     {
145         if (message == null)
146         {
147             throw new NullPointerException JavaDoc("message can not be null.");
148         }
149         AtomicMarkableReference JavaDoc<Message JavaDoc> ref = new AtomicMarkableReference JavaDoc<Message JavaDoc>(message,false);
150             
151         try
152         {
153             lock.lock();
154             boolean ok = masterQueue.offer(ref,timeout,timeUnit);
155             
156             if(ok)
157             {
158                 //loop through SomniMessageSelectors for matches. If a match, then signal that condition.
159
for(SomniMessageSelector messageSelector : messageSelectorsToQueues.keySet())
160                 {
161                     if(messageSelector.matches(message))
162                     {
163                         PriorityBlockingQueue JavaDoc<AtomicMarkableReference JavaDoc<Message JavaDoc>> queue = getQueueForMessageSelector(messageSelector);
164                         if(queue!=masterQueue)
165                         {
166                             queue.put(ref);
167                         }
168                     }
169                 }
170             }
171             return ok;
172         }
173         catch(SomniMessageSelectorException smse)
174         {
175             throw new SomniRuntimeException("While offering "+message,smse);
176         }
177         finally
178         {
179             lock.unlock();
180         }
181     }
182     
183     public Message JavaDoc take(SomniMessageSelector messageSelector)
184         throws InterruptedException JavaDoc
185     {
186         PriorityBlockingQueue JavaDoc<AtomicMarkableReference JavaDoc<Message JavaDoc>> queue = getQueueForMessageSelector(messageSelector);
187
188         //try taking messages until this method gets one that nothing else has gotten
189

190         AtomicMarkableReference JavaDoc<Message JavaDoc> ref = null;
191         
192         do
193         {
194             ref = queue.take();
195         }
196         while(!ref.compareAndSet(ref.getReference(),ref.getReference(),false,true));
197
198         if(SomniMessageSelector.ALLMESSAGESELECTOR!=messageSelector)
199         {
200             masterQueue.remove(ref);
201         }
202         
203         return ref.getReference();
204     }
205
206     public Message JavaDoc poll(SomniMessageSelector messageSelector)
207     {
208         PriorityBlockingQueue JavaDoc<AtomicMarkableReference JavaDoc<Message JavaDoc>> queue = getQueueForMessageSelector(messageSelector);
209
210         AtomicMarkableReference JavaDoc<Message JavaDoc> ref = null;
211         
212         //try polling messages until this gets one that nothing else has gotten
213
do
214         {
215             ref = queue.poll();
216         }
217         while((ref!=null)&&(!ref.compareAndSet(ref.getReference(),ref.getReference(),false,true)));
218
219         if(ref == null)
220         {
221             return null;
222         }
223
224         if(SomniMessageSelector.ALLMESSAGESELECTOR!=messageSelector)
225         {
226             masterQueue.remove(ref);
227         }
228         
229         return ref.getReference();
230     }
231
232     public Message JavaDoc poll(long timeout,TimeUnit JavaDoc unit,SomniMessageSelector messageSelector) throws InterruptedException JavaDoc
233     {
234         PriorityBlockingQueue JavaDoc<AtomicMarkableReference JavaDoc<Message JavaDoc>> queue = getQueueForMessageSelector(messageSelector);
235
236         long nanos = unit.toNanos(timeout);
237         //todo -- anything for wrap-around?
238
long now = System.nanoTime();
239         final long timeOutTime = now + nanos;
240         long timeLeft;
241         
242         AtomicMarkableReference JavaDoc<Message JavaDoc> ref = null;
243         
244         //try polling messages until this gets one that nothing else has gotten
245
do
246         {
247         //todo -- anything for wrap-around?
248
now = System.nanoTime();
249             timeLeft = timeOutTime - now;
250             ref = queue.poll(timeLeft,TimeUnit.NANOSECONDS);
251         }
252         while((timeLeft>0)&&(ref!=null)&&(!ref.compareAndSet(ref.getReference(),ref.getReference(),false,true)));
253
254         if(ref == null)
255         {
256             return null;
257         }
258         
259         if(timeLeft <= 0)
260         {
261             return null;
262         }
263
264         if(SomniMessageSelector.ALLMESSAGESELECTOR!=messageSelector)
265         {
266             masterQueue.remove(ref);
267         }
268         
269         return ref.getReference();
270     }
271
272     public Message JavaDoc peek()
273     {
274         AtomicMarkableReference JavaDoc<Message JavaDoc> ref = masterQueue.peek();
275         if(ref == null)
276         {
277             return null;
278         }
279         return masterQueue.peek().getReference();
280     }
281     
282     public int size()
283     {
284         return masterQueue.size();
285     }
286
287     List JavaDoc<Message JavaDoc> snapMasterQueueContents()
288     {
289         List JavaDoc<AtomicMarkableReference JavaDoc<Message JavaDoc>> refs = new ArrayList JavaDoc<AtomicMarkableReference JavaDoc<Message JavaDoc>>(masterQueue);
290         Collections.sort(refs,refComparator);
291         List JavaDoc<Message JavaDoc> messages = new ArrayList JavaDoc<Message JavaDoc>(refs.size());
292         for(AtomicMarkableReference JavaDoc<Message JavaDoc> ref : refs)
293         {
294             messages.add(ref.getReference());
295         }
296         return messages;
297     }
298
299     public String JavaDoc toString()
300     {
301         return masterQueue.toString();
302     }
303
304     private static final class RefComparator
305         implements Comparator JavaDoc<AtomicMarkableReference JavaDoc<Message JavaDoc>>
306     {
307         private Comparator JavaDoc<Message JavaDoc> comparator;
308         
309         private RefComparator(Comparator JavaDoc<Message JavaDoc> comparator)
310         {
311             this.comparator = comparator;
312         }
313         
314         public int compare(AtomicMarkableReference JavaDoc<Message JavaDoc> ref1,AtomicMarkableReference JavaDoc<Message JavaDoc> ref2)
315         {
316             return comparator.compare(ref1.getReference(),ref2.getReference());
317         }
318     }
319 }
320
321 /* Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006 David Walend
322 All rights reserved.
323
324 Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
325
326 Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
327
328 Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
329
330 Neither the name of the SomnifugiJMS Project, walend.net, nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission from David Walend.
331
332 Credits in redistributions in source or binary forms must include a link to http://somnifugi.sourceforge.net .
333
334 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
335 The net.walend.somnifugi.sql92 package is modified code from the openmq project, https://mq.dev.java.net/ , Copyright (c) of Sun, and carries the CDDL license, repeated here: You can obtain a copy of the license at https://glassfish.dev.java.net/public/CDDLv1.0.html. See the License for the specific language governing permissions and limitations under the License.
336
337 =================================================================================
338
339 For more information and the latest version of this software, please see http://somnifugi.sourceforge.net and http://walend.net or email <a HREF="mailto:david@walend.net">david@walend.net</a>.
340  */

341
Popular Tags