KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > walend > somnifugi > juc > MessageSelectingPriorityChannel


1 package net.walend.somnifugi.juc;
2
3 import java.util.Comparator JavaDoc;
4 import java.util.Enumeration JavaDoc;
5 import java.util.Collections JavaDoc;
6 import java.util.List JavaDoc;
7 import java.util.ArrayList JavaDoc;
8
9 import java.util.concurrent.TimeUnit JavaDoc;
10 import java.util.concurrent.BlockingQueue JavaDoc;
11 import java.util.concurrent.PriorityBlockingQueue JavaDoc;
12
13 import javax.jms.Message JavaDoc;
14 import javax.jms.JMSException JavaDoc;
15
16 import net.walend.somnifugi.SomniRuntimeException;
17 import net.walend.somnifugi.SomniMessage;
18 import net.walend.somnifugi.SomniMessageSelector;
19 import net.walend.somnifugi.SomniMessageSelectorException;
20
21 import net.walend.somnifugi.channel.Channel;
22 import net.walend.somnifugi.channel.Puttable;
23 import net.walend.somnifugi.channel.Takable;
24
25 /**
26 A Channel built on MessageSelectingPriorityBlockingQueue for JMS Queues with message selectors.
27
28 @author <a HREF="http://walend.net">David Walend</a> <a HREF="mailto:david@walend.net">david@walend.net</a>
29  */

30
31 public class MessageSelectingPriorityChannel
32     implements Channel<Message JavaDoc>,Puttable<Message JavaDoc>
33 {
34     private final MessageSelectingPriorityBlockingQueue queue = new MessageSelectingPriorityBlockingQueue(11,new MessageComparator());
35     private final Takable<Message JavaDoc> takable = new MessageSelectingTakable(SomniMessageSelector.ALLMESSAGESELECTOR,queue);
36
37     public MessageSelectingPriorityChannel()
38     {
39     }
40
41     //Puttable interface
42
/**
43 Inserts the specified element into this queue, waiting if necessary up to the specified wait time for space to become available.
44
45 @return true if the element was added.
46     */

47     public boolean offer(Message JavaDoc elem,long timeout)
48         throws InterruptedException JavaDoc
49     {
50         return queue.offer(elem,timeout,TimeUnit.MILLISECONDS);
51     }
52
53     /**
54 Adds the specified element to this queue, waiting if necessary for space to become available.
55     */

56     public void put(Message JavaDoc elem)
57         throws InterruptedException JavaDoc
58     {
59         queue.put(elem);
60     }
61
62     //Channel methods
63
public boolean hasRealPushback()
64     {
65         return true;
66     }
67
68     public boolean supportsPriorities()
69     {
70         return true;
71     }
72     
73     public boolean supportsMessageSelectors()
74     {
75         return true;
76     }
77     
78     public Puttable<Message JavaDoc> getPuttable()
79     {
80         return this;
81     }
82     
83     public Takable<Message JavaDoc> getTakable()
84     {
85         return takable;
86     }
87
88     public Takable<Message JavaDoc> getTakable(SomniMessageSelector messageSelector)
89     {
90         return new MessageSelectingTakable(messageSelector,queue);
91     }
92     
93     private static final class MessageSelectingTakable
94         implements Takable<Message JavaDoc>
95     {
96         private final SomniMessageSelector messageSelector;
97         private final MessageSelectingPriorityBlockingQueue queue;
98         
99         MessageSelectingTakable(SomniMessageSelector messageSelector,MessageSelectingPriorityBlockingQueue queue)
100         {
101             this.messageSelector = messageSelector;
102             this.queue = queue;
103         }
104         
105             //Takable interface
106
/**
107     Retrieves and removes the head of this queue.
108         */

109         public Message JavaDoc poll()
110         {
111             return queue.poll(messageSelector);
112         }
113     
114         /**
115     Retrieves and removes the head of this queue, waiting if necessary up to the specified wait time if no elements are present on this queue.
116         */

117         public Message JavaDoc poll(long timeout)
118             throws InterruptedException JavaDoc
119         {
120             return queue.poll(timeout,TimeUnit.MILLISECONDS,messageSelector);
121         }
122     
123         /**
124     Retrieves and removes the head of this queue, waiting if no elements are present on this queue.
125         */

126         public Message JavaDoc take()
127             throws InterruptedException JavaDoc
128         {
129             return queue.take(messageSelector);
130         }
131     
132         /**
133     Pushes an element back onto the head of a queue.
134         */

135         public void pushBack(Message JavaDoc elem)
136             throws InterruptedException JavaDoc
137         {
138             queue.put(elem);
139         }
140         
141         public Message JavaDoc peek()
142         {
143             return queue.peek();
144         }
145         
146         public Enumeration JavaDoc<Message JavaDoc> snapShot()
147         {
148             List JavaDoc<Message JavaDoc> snap = queue.snapMasterQueueContents();
149             
150             return new EnumerationBridge<Message JavaDoc>(snap.iterator());
151         }
152
153         public Enumeration JavaDoc snapShot(SomniMessageSelector messageSelector)
154             throws SomniMessageSelectorException
155         {
156             List JavaDoc<Message JavaDoc> snap = queue.snapMasterQueueContents();
157             List JavaDoc<Message JavaDoc> result = new ArrayList JavaDoc<Message JavaDoc>(snap.size());
158             
159             for(Message JavaDoc message : snap)
160             {
161                 if(messageSelector.matches(message))
162                 {
163                     result.add(message);
164                 }
165             }
166             return new EnumerationBridge<Message JavaDoc>(result.iterator());
167         }
168         
169         public int guessSize()
170         {
171             return queue.size();
172         }
173     }
174 }
175
176 /* Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006 David Walend
177 All rights reserved.
178
179 Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
180
181 Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
182
183 Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
184
185 Neither the name of the SomnifugiJMS Project, walend.net, nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission from David Walend.
186
187 Credits in redistributions in source or binary forms must include a link to http://somnifugi.sourceforge.net .
188
189 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
190 The net.walend.somnifugi.sql92 package is modified code from the openmq project, https://mq.dev.java.net/ , Copyright (c) of Sun, and carries the CDDL license, repeated here: You can obtain a copy of the license at https://glassfish.dev.java.net/public/CDDLv1.0.html. See the License for the specific language governing permissions and limitations under the License.
191
192 =================================================================================
193
194 For more information and the latest version of this software, please see http://somnifugi.sourceforge.net and http://walend.net or email <a HREF="mailto:david@walend.net">david@walend.net</a>.
195  */

196
Popular Tags