1 17 package org.apache.servicemix.jms.standard; 18 19 import javax.jbi.messaging.DeliveryChannel; 20 import javax.jbi.messaging.ExchangeStatus; 21 import javax.jbi.messaging.MessageExchange; 22 import javax.jms.Destination ; 23 import javax.jms.Message ; 24 import javax.jms.MessageConsumer ; 25 import javax.jms.MessageProducer ; 26 import javax.jms.Session ; 27 import javax.naming.InitialContext ; 28 import javax.resource.spi.work.Work ; 29 30 import org.apache.servicemix.jms.AbstractJmsProcessor; 31 import org.apache.servicemix.jms.JmsEndpoint; 32 import org.apache.servicemix.soap.Context; 33 34 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; 35 36 public class StandardConsumerProcessor extends AbstractJmsProcessor { 37 38 protected Session session; 39 protected Destination destination; 40 protected DeliveryChannel channel; 41 protected AtomicBoolean running = new AtomicBoolean(false); 42 43 public StandardConsumerProcessor(JmsEndpoint endpoint) { 44 super(endpoint); 45 } 46 47 protected void doStart(InitialContext ctx) throws Exception { 48 destination = endpoint.getDestination(); 49 if (destination == null) { 50 if (endpoint.getJndiDestinationName() != null) { 51 destination = (Destination ) ctx.lookup(endpoint.getJndiDestinationName()); 52 } else if (endpoint.getJmsProviderDestinationName() == null) { 53 throw new IllegalStateException ("No destination provided"); 54 } 55 } 56 channel = endpoint.getServiceUnit().getComponent().getComponentContext().getDeliveryChannel(); 57 synchronized (running) { 58 endpoint.getServiceUnit().getComponent().getWorkManager().startWork(new Work () { 59 public void release() { 60 } 61 public void run() { 62 StandardConsumerProcessor.this.poll(); 63 } 64 }); 65 running.wait(); 66 } 67 } 68 69 protected void doStop() throws Exception { 70 if (running.get()) { 71 synchronized (running) { 72 if (session != null) { 73 session.close(); 74 } 75 running.wait(); 76 } 77 } 78 session = null; 79 destination = null; 80 } 81 82 protected void poll() { 83 synchronized (running) { 84 running.set(true); 85 running.notify(); 86 } 87 try { 88 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 89 if (destination == null) { 90 if (STYLE_QUEUE.equals(endpoint.getDestinationStyle())) { 91 destination = session.createQueue(endpoint.getJmsProviderDestinationName()); 92 } else { 93 destination = session.createTopic(endpoint.getJmsProviderDestinationName()); 94 } 95 } 96 MessageConsumer consumer = session.createConsumer(destination); 97 while (running.get()) { 98 Message message = consumer.receive(); 99 if (message != null) { 100 onMessage(message); 101 } 102 } 103 } catch (Exception e) { 104 log.error("", e); 105 } finally { 106 synchronized (running) { 107 running.set(false); 108 running.notify(); 109 } 110 } 111 } 112 113 public void onMessage(final Message message) { 114 try { 115 if (log.isDebugEnabled()) { 116 log.debug("Received jms message " + message); 117 } 118 Context context = createContext(); 119 MessageExchange exchange = toNMS(message, context); 120 if (!channel.sendSync(exchange)) { 121 throw new IllegalStateException ("Exchange has been aborted"); 122 } 123 MessageProducer producer = null; 124 Message response = null; 125 try { 126 response = fromNMSResponse(exchange, context, session); 127 if (response != null) { 128 producer = session.createProducer(message.getJMSReplyTo()); 129 response.setJMSCorrelationID(message.getJMSCorrelationID()); 130 producer.send(response); 131 } 132 } finally { 133 if (producer != null) { 134 producer.close(); 135 } 136 if (exchange.getStatus() == ExchangeStatus.ACTIVE) { 137 exchange.setStatus(ExchangeStatus.DONE); 138 channel.send(exchange); 139 } 140 } 141 } catch (Throwable e) { 142 log.error("Error while handling jms message", e); 143 } 144 } 145 146 public void process(MessageExchange exchange) throws Exception { 147 throw new IllegalStateException (); 148 } 149 150 } 151 | Popular Tags |