1 17 package org.apache.servicemix.jms.jca; 18 19 import javax.jbi.messaging.DeliveryChannel; 20 import javax.jbi.messaging.ExchangeStatus; 21 import javax.jbi.messaging.InOnly; 22 import javax.jbi.messaging.MessageExchange; 23 import javax.jbi.messaging.NormalizedMessage; 24 import javax.jbi.messaging.RobustInOnly; 25 import javax.jms.Connection ; 26 import javax.jms.ConnectionFactory ; 27 import javax.jms.Destination ; 28 import javax.jms.MessageProducer ; 29 import javax.jms.Session ; 30 import javax.jms.TextMessage ; 31 import javax.naming.InitialContext ; 32 33 import org.apache.servicemix.jms.AbstractJmsProcessor; 34 import org.apache.servicemix.jms.JmsEndpoint; 35 36 40 public class JcaProviderProcessor extends AbstractJmsProcessor { 41 42 protected Destination destination; 43 protected Destination replyToDestination; 44 protected DeliveryChannel channel; 45 protected ConnectionFactory connectionFactory; 46 47 public JcaProviderProcessor(JmsEndpoint endpoint) { 48 super(endpoint); 49 } 50 51 public void start() throws Exception { 52 connectionFactory = getConnectionFactory(); 53 channel = endpoint.getServiceUnit().getComponent().getComponentContext().getDeliveryChannel(); 54 destination = endpoint.getDestination(); 55 if (destination == null) { 56 if (endpoint.getJndiDestinationName() != null) { 57 InitialContext ctx = getInitialContext(); 58 destination = (Destination ) ctx.lookup(endpoint.getJndiDestinationName()); 59 } else if (endpoint.getJmsProviderDestinationName() == null) { 60 throw new IllegalStateException ("No destination provided"); 61 } 62 } 63 } 64 65 66 public void stop() throws Exception { 67 destination = null; 68 replyToDestination = null; 69 } 70 71 public void process(MessageExchange exchange) throws Exception { 72 if (exchange.getStatus() == ExchangeStatus.DONE) { 73 return; 74 } else if (exchange.getStatus() == ExchangeStatus.ERROR) { 75 return; 76 } 77 if (exchange instanceof InOnly == false && 78 exchange instanceof RobustInOnly == false) { 79 exchange.setError(new UnsupportedOperationException ("Use an InOnly or RobustInOnly MEP")); 80 channel.send(exchange); 81 return; 82 } 83 Connection connection = null; 84 try { 85 connection = connectionFactory.createConnection(); 86 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 87 if (destination == null) { 88 if (STYLE_QUEUE.equals(endpoint.getDestinationStyle())) { 89 destination = session.createQueue(endpoint.getJmsProviderDestinationName()); 90 } else { 91 destination = session.createTopic(endpoint.getJmsProviderDestinationName()); 92 } 93 } 94 MessageProducer producer = session.createProducer(destination); 95 96 TextMessage msg = session.createTextMessage(); 97 NormalizedMessage nm = exchange.getMessage("in"); 98 fromNMS(nm, msg); 99 producer.send(msg); 100 exchange.setStatus(ExchangeStatus.DONE); 101 channel.send(exchange); 102 } finally { 103 if (connection != null) { 104 connection.close(); 105 } 106 } 107 } 108 109 } 110 | Popular Tags |