1 17 package org.apache.servicemix.jms.standard; 18 19 import java.io.ByteArrayInputStream ; 20 import java.io.InputStream ; 21 22 import javax.jbi.messaging.DeliveryChannel; 23 import javax.jbi.messaging.ExchangeStatus; 24 import javax.jbi.messaging.InOnly; 25 import javax.jbi.messaging.InOut; 26 import javax.jbi.messaging.MessageExchange; 27 import javax.jbi.messaging.NormalizedMessage; 28 import javax.jbi.messaging.RobustInOnly; 29 import javax.jms.BytesMessage ; 30 import javax.jms.Destination ; 31 import javax.jms.Message ; 32 import javax.jms.MessageConsumer ; 33 import javax.jms.MessageProducer ; 34 import javax.jms.ObjectMessage ; 35 import javax.jms.Queue ; 36 import javax.jms.Session ; 37 import javax.jms.TextMessage ; 38 import javax.naming.InitialContext ; 39 40 import org.apache.servicemix.jms.AbstractJmsProcessor; 41 import org.apache.servicemix.jms.JmsEndpoint; 42 import org.apache.servicemix.soap.marshalers.SoapMessage; 43 44 public class StandardProviderProcessor extends AbstractJmsProcessor { 45 46 protected Destination destination; 47 protected DeliveryChannel channel; 48 49 public StandardProviderProcessor(JmsEndpoint endpoint) { 50 super(endpoint); 51 } 52 53 protected void doStart(InitialContext ctx) throws Exception { 54 channel = endpoint.getServiceUnit().getComponent().getComponentContext().getDeliveryChannel(); 55 destination = endpoint.getDestination(); 56 if (destination == null) { 57 if (endpoint.getJndiDestinationName() != null) { 58 destination = (Destination ) ctx.lookup(endpoint.getJndiDestinationName()); 59 } else if (endpoint.getJmsProviderDestinationName() == null) { 60 throw new IllegalStateException ("No destination provided"); 61 } 62 } 63 } 64 65 protected void doStop() throws Exception { 66 destination = null; 67 } 68 69 public void process(MessageExchange exchange) throws Exception { 70 if (exchange.getStatus() == ExchangeStatus.DONE) { 71 return; 72 } else if (exchange.getStatus() == ExchangeStatus.ERROR) { 73 return; 74 } 75 Session session = null; 76 try { 77 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 78 if (destination == null) { 79 if (STYLE_QUEUE.equals(endpoint.getDestinationStyle())) { 80 destination = session.createQueue(endpoint.getJmsProviderDestinationName()); 81 } else { 82 destination = session.createTopic(endpoint.getJmsProviderDestinationName()); 83 } 84 } 85 MessageProducer producer = session.createProducer(destination); 86 87 TextMessage msg = session.createTextMessage(); 88 NormalizedMessage nm = exchange.getMessage("in"); 89 fromNMS(nm, msg); 90 91 if (exchange instanceof InOnly || exchange instanceof RobustInOnly) { 92 producer.send(msg); 93 } else if (exchange instanceof InOut) { 94 Destination replyToDestination; 95 if (destination instanceof Queue ) { 96 replyToDestination = session.createTemporaryQueue(); 97 } else { 98 replyToDestination = session.createTemporaryTopic(); 99 } 100 MessageConsumer consumer = session.createConsumer(replyToDestination); 101 msg.setJMSCorrelationID(exchange.getExchangeId()); 102 msg.setJMSReplyTo(replyToDestination); 103 producer.send(msg); 104 Message message = consumer.receive(); 105 if (message instanceof ObjectMessage ) { 106 Object o = ((ObjectMessage ) message).getObject(); 107 if (o instanceof Exception ) { 108 exchange.setError((Exception ) o); 109 } else { 110 throw new UnsupportedOperationException ("Can not handle objects of type " + o.getClass().getName()); 111 } 112 } else { 113 InputStream is = null; 114 if (message instanceof TextMessage ) { 115 is = new ByteArrayInputStream (((TextMessage ) message).getText().getBytes()); 116 } else if (message instanceof BytesMessage ) { 117 int length = (int) ((BytesMessage ) message).getBodyLength(); 118 byte[] bytes = new byte[length]; 119 ((BytesMessage ) message).readBytes(bytes); 120 is = new ByteArrayInputStream (bytes); 121 } else { 122 throw new IllegalArgumentException ("JMS message should be a text or bytes message"); 123 } 124 String contentType = message.getStringProperty(CONTENT_TYPE); 125 SoapMessage soap = soapHelper.getSoapMarshaler().createReader().read(is, contentType); 126 NormalizedMessage out = exchange.createMessage(); 127 soapHelper.getJBIMarshaler().toNMS(out, soap); 128 ((InOut) exchange).setOutMessage(out); 129 } 130 channel.send(exchange); 131 } else { 132 throw new IllegalStateException (exchange.getPattern() + " not implemented"); 133 } 134 } finally { 135 if (session != null) { 136 session.close(); 137 } 138 } 139 } 140 141 } 142 | Popular Tags |