1 17 package org.apache.servicemix.jbi.nmr.flow.seda; 18 19 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; 20 21 import org.apache.commons.logging.Log; 22 import org.apache.commons.logging.LogFactory; 23 import org.apache.servicemix.jbi.framework.ComponentNameSpace; 24 import org.apache.servicemix.jbi.management.AttributeInfoHelper; 25 import org.apache.servicemix.jbi.management.BaseLifeCycle; 26 import org.apache.servicemix.jbi.messaging.MessageExchangeImpl; 27 import org.apache.servicemix.jbi.util.BoundedLinkedQueue; 28 29 import javax.jbi.JBIException; 30 import javax.jbi.messaging.MessageExchange; 31 import javax.jbi.messaging.MessagingException; 32 import javax.management.JMException ; 33 import javax.management.MBeanAttributeInfo ; 34 import javax.management.ObjectName ; 35 import javax.resource.spi.work.Work ; 36 import javax.resource.spi.work.WorkException ; 37 38 43 public class SedaQueue extends BaseLifeCycle implements Work { 44 45 private static final Log log = LogFactory.getLog(SedaQueue.class); 46 47 protected SedaFlow flow; 48 protected ComponentNameSpace name; 49 protected BoundedLinkedQueue queue; 50 protected AtomicBoolean started = new AtomicBoolean(false); 51 protected AtomicBoolean running = new AtomicBoolean(false); 52 protected ObjectName objectName; 53 protected String subType; 54 protected Thread thread; 55 56 61 public SedaQueue(ComponentNameSpace name) { 62 this.name = name; 63 } 64 65 70 public String getName() { 71 return name.getName(); 72 } 73 74 public String getType() { 75 return "SedaQueue"; 76 } 77 78 81 public ComponentNameSpace getComponentNameSpace() { 82 return this.name; 83 } 84 85 90 public String getDescription() { 91 return "bounded worker Queue for the NMR"; 92 } 93 94 100 public void init(SedaFlow flow, int capacity) { 101 this.flow = flow; 102 this.queue = new BoundedLinkedQueue(capacity); 103 } 104 105 110 public void setCapacity(int value) { 111 int oldValue = queue.capacity(); 112 this.queue.setCapacity(value); 113 super.firePropertyChanged("capacity", new Integer (oldValue), new Integer (value)); 114 } 115 116 119 public int getCapacity() { 120 return this.queue.capacity(); 121 } 122 123 126 public int getSize() { 127 return this.queue.size(); 128 } 129 130 137 public void enqueue(MessageExchange me) throws InterruptedException , MessagingException { 138 queue.put(me); 139 } 140 141 146 public void start() throws JBIException { 147 synchronized (running) { 148 try { 149 started.set(true); 150 flow.getBroker().getContainer().getWorkManager().startWork(this); 151 running.wait(); 152 super.start(); 153 } catch (Exception e) { 154 throw new JBIException("Unable to start queue work", e); 155 } 156 } 157 } 158 159 164 public void stop() throws JBIException { 165 started.set(false); 166 synchronized (running) { 167 if (thread != null && running.get()) { 168 try { 169 thread.interrupt(); 170 running.wait(); 171 } catch (Exception e) { 172 log.warn("Error stopping thread", e); 173 } finally { 174 thread = null; 175 } 176 } 177 } 178 super.stop(); 179 } 180 181 186 public void shutDown() throws JBIException { 187 stop(); 188 super.shutDown(); 189 } 190 191 194 public void release() { 195 log.info("SedaQueue " + name + " asked to be released"); 196 try { 197 shutDown(); 198 } 199 catch (JBIException e) { 200 log.warn("Caught an exception shutting down", e); 201 } 202 flow.release(this); 203 } 204 205 208 public void run() { 209 thread = Thread.currentThread(); 210 synchronized (running) { 211 running.set(true); 212 running.notify(); 213 } 214 while (started.get()) { 215 final MessageExchangeImpl me; 216 try { 217 me = (MessageExchangeImpl) queue.poll(1000); 218 if (me != null) { 219 flow.getBroker().getContainer().getWorkManager().scheduleWork(new Work () { 220 public void release() { 221 } 222 public void run() { 223 try { 224 if (log.isDebugEnabled()) { 225 log.debug(this + " dequeued exchange: " + me); 226 } 227 flow.doRouting(me); 228 } 229 catch (Throwable e) { 230 log.error(this + " got error processing " + me, e); 231 } 232 } 233 234 }); 235 } 236 } 237 catch (InterruptedException e) { 238 if (!started.get()) { 239 break; 240 } 241 log.warn(this + " interrupted", e); 242 } catch (WorkException e) { 243 log.error(this + " got error processing exchange", e); 244 } 245 } 246 synchronized (running) { 247 running.set(false); 248 running.notify(); 249 } 250 } 251 252 255 public String toString() { 256 return "SedaQueue{" + name + "}"; 257 } 258 259 265 public MBeanAttributeInfo [] getAttributeInfos() throws JMException { 266 AttributeInfoHelper helper = new AttributeInfoHelper(); 267 helper.addAttribute(getObjectToManage(), "capacity", "The capacity of the SedaQueue"); 268 helper.addAttribute(getObjectToManage(), "size", "The size (depth) of the SedaQueue"); 269 return AttributeInfoHelper.join(super.getAttributeInfos(), helper.getAttributeInfos()); 270 } 271 272 275 public ObjectName getObjectName() { 276 return objectName; 277 } 278 279 282 public void setObjectName(ObjectName objectName) { 283 this.objectName = objectName; 284 } 285 286 } | Popular Tags |