1 17 package org.apache.servicemix.components.util; 18 19 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; 20 21 import org.apache.commons.logging.Log; 22 import org.apache.commons.logging.LogFactory; 23 import org.apache.servicemix.MessageExchangeListener; 24 25 import javax.jbi.JBIException; 26 import javax.jbi.messaging.DeliveryChannel; 27 import javax.jbi.messaging.ExchangeStatus; 28 import javax.jbi.messaging.MessageExchange; 29 import javax.jbi.messaging.MessagingException; 30 import javax.jbi.messaging.NormalizedMessage; 31 32 37 public abstract class OutBinding extends ComponentSupport implements Runnable , MessageExchangeListener { 38 private static final Log log = LogFactory.getLog(OutBinding.class); 39 private AtomicBoolean stop = new AtomicBoolean(false); 40 private Thread runnable; 41 42 public OutBinding() { 43 } 44 45 public void onMessageExchange(MessageExchange exchange) throws MessagingException { 46 if (exchange.getStatus() == ExchangeStatus.ACTIVE) { 47 try { 48 NormalizedMessage message = getInMessage(exchange); 49 process(exchange, message); 50 } 51 catch (Exception e) { 52 if (log.isDebugEnabled()) { 53 log.debug("Exchange failed", e); 54 } 55 fail(exchange, e); 56 } 57 } 58 } 59 60 63 public void run() { 64 try { 65 DeliveryChannel deliveryChannel = getDeliveryChannel(); 66 while (!stop.get()) { 67 MessageExchange exchange = deliveryChannel.accept(); 68 if (exchange != null) { 69 try { 70 onMessageExchange(exchange); 71 } catch (MessagingException e) { 72 log.error("MessageExchange processing failed", e); 73 } 74 } 75 } 76 } 77 catch (MessagingException e) { 78 log.error("run failed", e); 79 } 80 } 81 82 87 public void shutDown() throws JBIException { 88 } 89 90 95 public void stop() throws JBIException { 96 stop.compareAndSet(true, false); 97 if (runnable != null) { 98 runnable.interrupt(); 99 try { 100 runnable.join(); 101 } catch (InterruptedException e) { 102 log.warn("Unable to stop component polling thread", e); 103 } 104 runnable = null; 105 } 106 } 107 108 111 public void start() throws JBIException { 112 if (stop.compareAndSet(false, true)) { 113 runnable = new Thread (this); 114 runnable.setDaemon(true); 115 runnable.start(); 116 } 117 } 118 119 129 protected abstract void process(MessageExchange messageExchange, NormalizedMessage message) throws Exception ; 130 } 131 | Popular Tags |