1 17 package org.apache.servicemix.jms.jca; 18 19 import java.util.Map ; 20 21 import javax.jbi.messaging.DeliveryChannel; 22 import javax.jbi.messaging.ExchangeStatus; 23 import javax.jbi.messaging.InOnly; 24 import javax.jbi.messaging.MessageExchange; 25 import javax.jms.Connection ; 26 import javax.jms.ConnectionFactory ; 27 import javax.jms.Message ; 28 import javax.jms.MessageListener ; 29 import javax.jms.MessageProducer ; 30 import javax.jms.Session ; 31 import javax.resource.spi.ActivationSpec ; 32 import javax.resource.spi.BootstrapContext ; 33 import javax.resource.spi.ResourceAdapter ; 34 import javax.resource.spi.endpoint.MessageEndpointFactory ; 35 import javax.transaction.TransactionManager ; 36 37 import org.apache.commons.logging.Log; 38 import org.apache.commons.logging.LogFactory; 39 import org.apache.servicemix.common.AsyncBaseLifeCycle; 40 import org.apache.servicemix.common.BaseLifeCycle; 41 import org.apache.servicemix.jms.AbstractJmsProcessor; 42 import org.apache.servicemix.jms.JmsEndpoint; 43 import org.apache.servicemix.soap.Context; 44 import org.jencks.SingletonEndpointFactory; 45 46 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; 47 48 52 public class JcaConsumerProcessor extends AbstractJmsProcessor implements MessageListener { 53 54 private static final Log log = LogFactory.getLog(JcaConsumerProcessor.class); 55 56 protected Map pendingMessages = new ConcurrentHashMap(); 57 protected DeliveryChannel channel; 58 protected ResourceAdapter resourceAdapter; 59 protected MessageEndpointFactory endpointFactory; 60 protected ActivationSpec activationSpec; 61 protected BootstrapContext bootstrapContext; 62 protected TransactionManager transactionManager; 63 protected ConnectionFactory connectionFactory; 64 65 public JcaConsumerProcessor(JmsEndpoint endpoint) { 66 super(endpoint); 67 } 68 69 public void start() throws Exception { 70 AsyncBaseLifeCycle lf = (AsyncBaseLifeCycle) endpoint.getServiceUnit().getComponent().getLifeCycle(); 71 channel = lf.getContext().getDeliveryChannel(); 72 transactionManager = (TransactionManager ) lf.getContext().getTransactionManager(); 73 endpointFactory = new SingletonEndpointFactory(this, transactionManager); 74 bootstrapContext = endpoint.getBootstrapContext(); 75 if (bootstrapContext == null) { 76 throw new IllegalArgumentException ("bootstrapContext not set"); 77 } 78 connectionFactory = endpoint.getConnectionFactory(); 79 if (connectionFactory == null) { 80 throw new IllegalArgumentException ("connectionFactory not set"); 81 } 82 activationSpec = endpoint.getActivationSpec(); 83 if (activationSpec == null) { 84 throw new IllegalArgumentException ("activationSpec not set"); 85 } 86 resourceAdapter = endpoint.getResourceAdapter(); 87 if (resourceAdapter == null) { 88 resourceAdapter = activationSpec.getResourceAdapter(); 89 } else if (activationSpec.getResourceAdapter() == null) { 90 activationSpec.setResourceAdapter(resourceAdapter); 91 } else { 92 throw new IllegalArgumentException ("resourceAdapter not set"); 93 } 94 resourceAdapter.start(bootstrapContext); 95 resourceAdapter.endpointActivation(endpointFactory, activationSpec); 96 } 97 98 public void stop() throws Exception { 99 resourceAdapter.endpointDeactivation(endpointFactory, activationSpec); 100 pendingMessages.clear(); 101 } 102 103 public void onMessage(final Message message) { 104 try { 105 if (log.isDebugEnabled()) { 106 log.debug("Received jms message " + message); 107 } 108 Context context = createContext(); 109 MessageExchange exchange = toNMS(message, context); 110 if (exchange instanceof InOnly == false) { 111 throw new UnsupportedOperationException ("JCA consumer endpoints can only use InOnly MEP"); 112 } 113 exchange.setProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME, transactionManager.getTransaction()); 114 pendingMessages.put(exchange.getExchangeId(), context); 115 if (endpoint.isSynchronous()) { 116 channel.sendSync(exchange); 117 process(exchange); 118 } else { 119 BaseLifeCycle lf = (BaseLifeCycle) endpoint.getServiceUnit().getComponent().getLifeCycle(); 120 lf.sendConsumerExchange(exchange, JcaConsumerProcessor.this.endpoint); 121 } 122 } catch (Throwable e) { 123 log.error("Error while handling jms message", e); 124 } 125 } 126 127 public void process(MessageExchange exchange) throws Exception { 128 Context context = (Context ) pendingMessages.remove(exchange.getExchangeId()); 129 Message message = (Message ) context.getProperty(Message .class.getName()); 130 Message response = null; 131 Connection connection = null; 132 try { 133 if (exchange.getStatus() == ExchangeStatus.DONE) { 134 return; 135 } 136 connection = connectionFactory.createConnection(); 137 Session session = connection.createSession(true, Session.SESSION_TRANSACTED); 138 response = fromNMSResponse(exchange, context, session); 139 if (response != null) { 140 MessageProducer producer = session.createProducer(message.getJMSReplyTo()); 141 response.setJMSCorrelationID(message.getJMSCorrelationID()); 142 producer.send(response); 143 } 144 } finally { 145 if (connection != null) { 146 connection.close(); 147 } 148 if (exchange.getStatus() == ExchangeStatus.ACTIVE) { 149 exchange.setStatus(ExchangeStatus.DONE); 150 channel.send(exchange); 151 } 152 } 153 } 154 155 } 156 | Popular Tags |