1 17 package org.apache.servicemix.jms.multiplexing; 18 19 import java.io.ByteArrayInputStream ; 20 import java.io.InputStream ; 21 import java.util.Map ; 22 23 import javax.jbi.messaging.DeliveryChannel; 24 import javax.jbi.messaging.ExchangeStatus; 25 import javax.jbi.messaging.InOnly; 26 import javax.jbi.messaging.InOut; 27 import javax.jbi.messaging.MessageExchange; 28 import javax.jbi.messaging.NormalizedMessage; 29 import javax.jbi.messaging.RobustInOnly; 30 import javax.jms.BytesMessage ; 31 import javax.jms.Destination ; 32 import javax.jms.Message ; 33 import javax.jms.MessageConsumer ; 34 import javax.jms.MessageListener ; 35 import javax.jms.MessageProducer ; 36 import javax.jms.ObjectMessage ; 37 import javax.jms.Queue ; 38 import javax.jms.Session ; 39 import javax.jms.TextMessage ; 40 import javax.naming.InitialContext ; 41 import javax.resource.spi.work.Work ; 42 import javax.resource.spi.work.WorkException ; 43 44 import org.apache.servicemix.jms.AbstractJmsProcessor; 45 import org.apache.servicemix.jms.JmsEndpoint; 46 import org.apache.servicemix.soap.marshalers.SoapMessage; 47 48 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; 49 50 public class MultiplexingProviderProcessor extends AbstractJmsProcessor implements MessageListener { 51 52 protected Session session; 53 protected Destination destination; 54 protected Destination replyToDestination; 55 protected MessageConsumer consumer; 56 protected MessageProducer producer; 57 protected Map pendingExchanges = new ConcurrentHashMap(); 58 protected DeliveryChannel channel; 59 60 public MultiplexingProviderProcessor(JmsEndpoint endpoint) { 61 super(endpoint); 62 } 63 64 protected void doStart(InitialContext ctx) throws Exception { 65 channel = endpoint.getServiceUnit().getComponent().getComponentContext().getDeliveryChannel(); 66 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 67 destination = endpoint.getDestination(); 68 if (destination == null) { 69 if (endpoint.getJndiDestinationName() != null) { 70 destination = (Destination ) ctx.lookup(endpoint.getJndiDestinationName()); 71 } else if (endpoint.getJmsProviderDestinationName() != null) { 72 if (STYLE_QUEUE.equals(endpoint.getDestinationStyle())) { 73 destination = session.createQueue(endpoint.getJmsProviderDestinationName()); 74 } else { 75 destination = session.createTopic(endpoint.getJmsProviderDestinationName()); 76 } 77 } else { 78 throw new IllegalStateException ("No destination provided"); 79 } 80 } 81 if (destination instanceof Queue ) { 82 replyToDestination = session.createTemporaryQueue(); 83 } else { 84 replyToDestination = session.createTemporaryTopic(); 85 } 86 producer = session.createProducer(destination); 87 consumer = session.createConsumer(replyToDestination); 88 consumer.setMessageListener(this); 89 } 90 91 protected void doStop() throws Exception { 92 session = null; 93 destination = null; 94 consumer = null; 95 producer = null; 96 replyToDestination = null; 97 } 98 99 public void onMessage(final Message message) { 100 try { 101 if (log.isDebugEnabled()) { 102 log.debug("Received jms message " + message); 103 } 104 endpoint.getServiceUnit().getComponent().getWorkManager().scheduleWork(new Work () { 105 public void release() { 106 } 107 public void run() { 108 try { 109 if (log.isDebugEnabled()) { 110 log.debug("Handling jms message " + message); 111 } 112 InOut exchange = (InOut) pendingExchanges.remove(message.getJMSCorrelationID()); 113 if (exchange == null) { 114 throw new IllegalStateException ("Could not find exchange " + message.getJMSCorrelationID()); 115 } 116 if (message instanceof ObjectMessage ) { 117 Object o = ((ObjectMessage ) message).getObject(); 118 if (o instanceof Exception ) { 119 exchange.setError((Exception ) o); 120 } else { 121 throw new UnsupportedOperationException ("Can not handle objects of type " + o.getClass().getName()); 122 } 123 } else { 124 InputStream is = null; 125 if (message instanceof TextMessage ) { 126 is = new ByteArrayInputStream (((TextMessage ) message).getText().getBytes()); 127 } else if (message instanceof BytesMessage ) { 128 int length = (int) ((BytesMessage ) message).getBodyLength(); 129 byte[] bytes = new byte[length]; 130 ((BytesMessage ) message).readBytes(bytes); 131 is = new ByteArrayInputStream (bytes); 132 } else { 133 throw new IllegalArgumentException ("JMS message should be a text or bytes message"); 134 } 135 String contentType = message.getStringProperty(CONTENT_TYPE); 136 SoapMessage soap = soapHelper.getSoapMarshaler().createReader().read(is, contentType); 137 NormalizedMessage out = exchange.createMessage(); 138 soapHelper.getJBIMarshaler().toNMS(out, soap); 139 ((InOut) exchange).setOutMessage(out); 140 } 141 channel.send(exchange); 142 } catch (Throwable e) { 143 log.error("Error while handling jms message", e); 144 } 145 } 146 }); 147 } catch (WorkException e) { 148 log.error("Error while handling jms message", e); 149 } 150 } 151 152 public void process(MessageExchange exchange) throws Exception { 153 if (exchange.getStatus() == ExchangeStatus.DONE) { 154 return; 155 } else if (exchange.getStatus() == ExchangeStatus.ERROR) { 156 return; 157 } 158 TextMessage msg = session.createTextMessage(); 159 NormalizedMessage nm = exchange.getMessage("in"); 160 fromNMS(nm, msg); 161 162 if (exchange instanceof InOnly || exchange instanceof RobustInOnly) { 163 synchronized (producer) { 164 producer.send(msg); 165 } 166 exchange.setStatus(ExchangeStatus.DONE); 167 channel.send(exchange); 168 } else if (exchange instanceof InOut) { 169 msg.setJMSCorrelationID(exchange.getExchangeId()); 170 msg.setJMSReplyTo(replyToDestination); 171 pendingExchanges.put(exchange.getExchangeId(), exchange); 172 try { 173 synchronized (producer) { 174 producer.send(msg); 175 } 176 } catch (Exception e) { 177 pendingExchanges.remove(exchange.getExchangeId()); 178 throw e; 179 } 180 } else { 181 throw new IllegalStateException (exchange.getPattern() + " not implemented"); 182 } 183 } 184 185 } 186 | Popular Tags |