KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > jbi > nmr > flow > seda > SedaQueue


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.jbi.nmr.flow.seda;
18
19 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.servicemix.jbi.framework.ComponentNameSpace;
24 import org.apache.servicemix.jbi.management.AttributeInfoHelper;
25 import org.apache.servicemix.jbi.management.BaseLifeCycle;
26 import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
27 import org.apache.servicemix.jbi.util.BoundedLinkedQueue;
28
29 import javax.jbi.JBIException;
30 import javax.jbi.messaging.MessageExchange;
31 import javax.jbi.messaging.MessagingException;
32 import javax.management.JMException JavaDoc;
33 import javax.management.MBeanAttributeInfo JavaDoc;
34 import javax.management.ObjectName JavaDoc;
35 import javax.resource.spi.work.Work JavaDoc;
36 import javax.resource.spi.work.WorkException JavaDoc;
37
38 /**
39  * A simple Straight through flow
40  *
41  * @version $Revision: 429628 $
42  */

43 public class SedaQueue extends BaseLifeCycle implements Work JavaDoc {
44     
45     private static final Log log = LogFactory.getLog(SedaQueue.class);
46     
47     protected SedaFlow flow;
48     protected ComponentNameSpace name;
49     protected BoundedLinkedQueue queue;
50     protected AtomicBoolean started = new AtomicBoolean(false);
51     protected AtomicBoolean running = new AtomicBoolean(false);
52     protected ObjectName JavaDoc objectName;
53     protected String JavaDoc subType;
54     protected Thread JavaDoc thread;
55
56     /**
57      * SedaQueue name
58      *
59      * @param name
60      */

61     public SedaQueue(ComponentNameSpace name) {
62         this.name = name;
63     }
64
65     /**
66      * Get the name
67      *
68      * @return name
69      */

70     public String JavaDoc getName() {
71         return name.getName();
72     }
73
74     public String JavaDoc getType() {
75         return "SedaQueue";
76     }
77
78     /**
79      * @return Return the name
80      */

81     public ComponentNameSpace getComponentNameSpace() {
82         return this.name;
83     }
84
85     /**
86      * Get the description
87      *
88      * @return description
89      */

90     public String JavaDoc getDescription() {
91         return "bounded worker Queue for the NMR";
92     }
93
94     /**
95      * Initialize the Region
96      *
97      * @param flow
98      * @param capacity
99      */

100     public void init(SedaFlow flow, int capacity) {
101         this.flow = flow;
102         this.queue = new BoundedLinkedQueue(capacity);
103     }
104
105     /**
106      * Set the capacity of the Queue
107      *
108      * @param value
109      */

110     public void setCapacity(int value) {
111         int oldValue = queue.capacity();
112         this.queue.setCapacity(value);
113         super.firePropertyChanged("capacity", new Integer JavaDoc(oldValue), new Integer JavaDoc(value));
114     }
115
116     /**
117      * @return the capacity of the Queue
118      */

119     public int getCapacity() {
120         return this.queue.capacity();
121     }
122
123     /**
124      * @return size of the Queue
125      */

126     public int getSize() {
127         return this.queue.size();
128     }
129
130     /**
131      * Enqueue a Packet for processing
132      *
133      * @param packet
134      * @throws InterruptedException
135      * @throws MessagingException
136      */

137     public void enqueue(MessageExchange me) throws InterruptedException JavaDoc, MessagingException {
138         queue.put(me);
139     }
140
141     /**
142      * start processing
143      *
144      * @throws JBIException
145      */

146     public void start() throws JBIException {
147         synchronized (running) {
148             try {
149                 started.set(true);
150                 flow.getBroker().getContainer().getWorkManager().startWork(this);
151                 running.wait();
152                 super.start();
153             } catch (Exception JavaDoc e) {
154                 throw new JBIException("Unable to start queue work", e);
155             }
156         }
157     }
158
159     /**
160      * stop processing
161      *
162      * @throws JBIException
163      */

164     public void stop() throws JBIException {
165         started.set(false);
166         synchronized (running) {
167             if (thread != null && running.get()) {
168                 try {
169                     thread.interrupt();
170                     running.wait();
171                 } catch (Exception JavaDoc e) {
172                     log.warn("Error stopping thread", e);
173                 } finally {
174                     thread = null;
175                 }
176             }
177         }
178         super.stop();
179     }
180
181     /**
182      * shutDown the Queue
183      *
184      * @throws JBIException
185      */

186     public void shutDown() throws JBIException {
187         stop();
188         super.shutDown();
189     }
190
191     /**
192      * Asked by the WorkManager to give up
193      */

194     public void release() {
195         log.info("SedaQueue " + name + " asked to be released");
196         try {
197             shutDown();
198         }
199         catch (JBIException e) {
200             log.warn("Caught an exception shutting down", e);
201         }
202         flow.release(this);
203     }
204     
205     /**
206      * do processing
207      */

208     public void run() {
209         thread = Thread.currentThread();
210         synchronized (running) {
211             running.set(true);
212             running.notify();
213         }
214         while (started.get()) {
215             final MessageExchangeImpl me;
216             try {
217                 me = (MessageExchangeImpl) queue.poll(1000);
218                 if (me != null) {
219                     flow.getBroker().getContainer().getWorkManager().scheduleWork(new Work JavaDoc() {
220                         public void release() {
221                         }
222                         public void run() {
223                             try {
224                                 if (log.isDebugEnabled()) {
225                                     log.debug(this + " dequeued exchange: " + me);
226                                 }
227                                 flow.doRouting(me);
228                             }
229                             catch (Throwable JavaDoc e) {
230                                 log.error(this + " got error processing " + me, e);
231                             }
232                         }
233                         
234                     });
235                 }
236             }
237             catch (InterruptedException JavaDoc e) {
238                 if (!started.get()) {
239                     break;
240                 }
241                 log.warn(this + " interrupted", e);
242             } catch (WorkException JavaDoc e) {
243                 log.error(this + " got error processing exchange", e);
244             }
245         }
246         synchronized (running) {
247             running.set(false);
248             running.notify();
249         }
250     }
251
252     /**
253      * @return pretty print
254      */

255     public String JavaDoc toString() {
256         return "SedaQueue{" + name + "}";
257     }
258
259     /**
260      * Get an array of MBeanAttributeInfo
261      *
262      * @return array of AttributeInfos
263      * @throws JMException
264      */

265     public MBeanAttributeInfo JavaDoc[] getAttributeInfos() throws JMException JavaDoc {
266         AttributeInfoHelper helper = new AttributeInfoHelper();
267         helper.addAttribute(getObjectToManage(), "capacity", "The capacity of the SedaQueue");
268         helper.addAttribute(getObjectToManage(), "size", "The size (depth) of the SedaQueue");
269         return AttributeInfoHelper.join(super.getAttributeInfos(), helper.getAttributeInfos());
270     }
271
272     /**
273      * @return Returns the objectName.
274      */

275     public ObjectName JavaDoc getObjectName() {
276         return objectName;
277     }
278
279     /**
280      * @param objectName The objectName to set.
281      */

282     public void setObjectName(ObjectName JavaDoc objectName) {
283         this.objectName = objectName;
284     }
285
286 }
Popular Tags