1 17 package org.apache.servicemix.jms.multiplexing; 18 19 import java.util.Map ; 20 21 import javax.jbi.messaging.DeliveryChannel; 22 import javax.jbi.messaging.ExchangeStatus; 23 import javax.jbi.messaging.MessageExchange; 24 import javax.jms.Destination ; 25 import javax.jms.Message ; 26 import javax.jms.MessageConsumer ; 27 import javax.jms.MessageListener ; 28 import javax.jms.MessageProducer ; 29 import javax.jms.Session ; 30 import javax.naming.InitialContext ; 31 import javax.resource.spi.work.Work ; 32 import javax.resource.spi.work.WorkException ; 33 34 import org.apache.servicemix.common.BaseLifeCycle; 35 import org.apache.servicemix.jms.AbstractJmsProcessor; 36 import org.apache.servicemix.jms.JmsEndpoint; 37 import org.apache.servicemix.soap.Context; 38 39 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; 40 41 public class MultiplexingConsumerProcessor extends AbstractJmsProcessor implements MessageListener { 42 43 protected Session session; 44 protected Destination destination; 45 protected MessageConsumer consumer; 46 protected Map pendingMessages = new ConcurrentHashMap(); 47 protected DeliveryChannel channel; 48 49 public MultiplexingConsumerProcessor(JmsEndpoint endpoint) { 50 super(endpoint); 51 } 52 53 protected void doStart(InitialContext ctx) throws Exception { 54 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 55 destination = endpoint.getDestination(); 56 if (destination == null) { 57 if (endpoint.getJndiDestinationName() != null) { 58 destination = (Destination ) ctx.lookup(endpoint.getJndiDestinationName()); 59 } else if (endpoint.getJmsProviderDestinationName() != null) { 60 if (STYLE_QUEUE.equals(endpoint.getDestinationStyle())) { 61 destination = session.createQueue(endpoint.getJmsProviderDestinationName()); 62 } else { 63 destination = session.createTopic(endpoint.getJmsProviderDestinationName()); 64 } 65 } else { 66 throw new IllegalStateException ("No destination provided"); 67 } 68 } 69 consumer = session.createConsumer(destination); 70 consumer.setMessageListener(this); 71 channel = endpoint.getServiceUnit().getComponent().getComponentContext().getDeliveryChannel(); 72 } 73 74 protected void doStop() throws Exception { 75 session = null; 76 destination = null; 77 consumer = null; 78 pendingMessages.clear(); 79 } 80 81 public void onMessage(final Message message) { 82 try { 83 if (log.isDebugEnabled()) { 84 log.debug("Received jms message " + message); 85 } 86 endpoint.getServiceUnit().getComponent().getWorkManager().scheduleWork(new Work () { 87 public void release() { 88 } 89 public void run() { 90 try { 91 if (log.isDebugEnabled()) { 92 log.debug("Handling jms message " + message); 93 } 94 Context context = createContext(); 95 MessageExchange exchange = toNMS(message, context); 96 pendingMessages.put(exchange.getExchangeId(), context); 99 BaseLifeCycle lf = (BaseLifeCycle) endpoint.getServiceUnit().getComponent().getLifeCycle(); 100 lf.sendConsumerExchange(exchange, MultiplexingConsumerProcessor.this.endpoint); 101 } catch (Throwable e) { 102 log.error("Error while handling jms message", e); 103 } 104 } 105 }); 106 } catch (WorkException e) { 107 log.error("Error while handling jms message", e); 108 } 109 } 110 111 public void process(MessageExchange exchange) throws Exception { 112 Context context = (Context ) pendingMessages.remove(exchange.getExchangeId()); 113 Message message = (Message ) context.getProperty(Message .class.getName()); 114 MessageProducer producer = null; 115 Message response = null; 116 try { 117 response = fromNMSResponse(exchange, context, session); 118 if (response != null) { 119 producer = session.createProducer(message.getJMSReplyTo()); 120 response.setJMSCorrelationID(message.getJMSCorrelationID()); 121 producer.send(response); 122 } 123 } finally { 124 if (producer != null) { 125 producer.close(); 126 } 127 if (exchange.getStatus() == ExchangeStatus.ACTIVE) { 128 exchange.setStatus(ExchangeStatus.DONE); 129 channel.send(exchange); 130 } 131 } 132 } 133 134 } 135 | Popular Tags |