KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > jms > multiplexing > MultiplexingProviderProcessor


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.jms.multiplexing;
18
19 import java.io.ByteArrayInputStream JavaDoc;
20 import java.io.InputStream JavaDoc;
21 import java.util.Map JavaDoc;
22
23 import javax.jbi.messaging.DeliveryChannel;
24 import javax.jbi.messaging.ExchangeStatus;
25 import javax.jbi.messaging.InOnly;
26 import javax.jbi.messaging.InOut;
27 import javax.jbi.messaging.MessageExchange;
28 import javax.jbi.messaging.NormalizedMessage;
29 import javax.jbi.messaging.RobustInOnly;
30 import javax.jms.BytesMessage JavaDoc;
31 import javax.jms.Destination JavaDoc;
32 import javax.jms.Message JavaDoc;
33 import javax.jms.MessageConsumer JavaDoc;
34 import javax.jms.MessageListener JavaDoc;
35 import javax.jms.MessageProducer JavaDoc;
36 import javax.jms.ObjectMessage JavaDoc;
37 import javax.jms.Queue JavaDoc;
38 import javax.jms.Session JavaDoc;
39 import javax.jms.TextMessage JavaDoc;
40 import javax.naming.InitialContext JavaDoc;
41 import javax.resource.spi.work.Work JavaDoc;
42 import javax.resource.spi.work.WorkException JavaDoc;
43
44 import org.apache.servicemix.jms.AbstractJmsProcessor;
45 import org.apache.servicemix.jms.JmsEndpoint;
46 import org.apache.servicemix.soap.marshalers.SoapMessage;
47
48 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
49
50 public class MultiplexingProviderProcessor extends AbstractJmsProcessor implements MessageListener JavaDoc {
51
52     protected Session JavaDoc session;
53     protected Destination JavaDoc destination;
54     protected Destination JavaDoc replyToDestination;
55     protected MessageConsumer JavaDoc consumer;
56     protected MessageProducer JavaDoc producer;
57     protected Map JavaDoc pendingExchanges = new ConcurrentHashMap();
58     protected DeliveryChannel channel;
59
60     public MultiplexingProviderProcessor(JmsEndpoint endpoint) {
61         super(endpoint);
62     }
63
64     protected void doStart(InitialContext JavaDoc ctx) throws Exception JavaDoc {
65         channel = endpoint.getServiceUnit().getComponent().getComponentContext().getDeliveryChannel();
66         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
67         destination = endpoint.getDestination();
68         if (destination == null) {
69             if (endpoint.getJndiDestinationName() != null) {
70                 destination = (Destination JavaDoc) ctx.lookup(endpoint.getJndiDestinationName());
71             } else if (endpoint.getJmsProviderDestinationName() != null) {
72                 if (STYLE_QUEUE.equals(endpoint.getDestinationStyle())) {
73                     destination = session.createQueue(endpoint.getJmsProviderDestinationName());
74                 } else {
75                     destination = session.createTopic(endpoint.getJmsProviderDestinationName());
76                 }
77             } else {
78                 throw new IllegalStateException JavaDoc("No destination provided");
79             }
80         }
81         if (destination instanceof Queue JavaDoc) {
82             replyToDestination = session.createTemporaryQueue();
83         } else {
84             replyToDestination = session.createTemporaryTopic();
85         }
86         producer = session.createProducer(destination);
87         consumer = session.createConsumer(replyToDestination);
88         consumer.setMessageListener(this);
89     }
90
91     protected void doStop() throws Exception JavaDoc {
92         session = null;
93         destination = null;
94         consumer = null;
95         producer = null;
96         replyToDestination = null;
97     }
98
99     public void onMessage(final Message message) {
100         try {
101             if (log.isDebugEnabled()) {
102                 log.debug("Received jms message " + message);
103             }
104             endpoint.getServiceUnit().getComponent().getWorkManager().scheduleWork(new Work JavaDoc() {
105                 public void release() {
106                 }
107                 public void run() {
108                     try {
109                         if (log.isDebugEnabled()) {
110                             log.debug("Handling jms message " + message);
111                         }
112                         InOut exchange = (InOut) pendingExchanges.remove(message.getJMSCorrelationID());
113                         if (exchange == null) {
114                             throw new IllegalStateException JavaDoc("Could not find exchange " + message.getJMSCorrelationID());
115                         }
116                         if (message instanceof ObjectMessage JavaDoc) {
117                             Object JavaDoc o = ((ObjectMessage JavaDoc) message).getObject();
118                             if (o instanceof Exception JavaDoc) {
119                                 exchange.setError((Exception JavaDoc) o);
120                             } else {
121                                 throw new UnsupportedOperationException JavaDoc("Can not handle objects of type " + o.getClass().getName());
122                             }
123                         } else {
124                             InputStream JavaDoc is = null;
125                             if (message instanceof TextMessage JavaDoc) {
126                                 is = new ByteArrayInputStream JavaDoc(((TextMessage JavaDoc) message).getText().getBytes());
127                             } else if (message instanceof BytesMessage JavaDoc) {
128                                 int length = (int) ((BytesMessage JavaDoc) message).getBodyLength();
129                                 byte[] bytes = new byte[length];
130                                 ((BytesMessage JavaDoc) message).readBytes(bytes);
131                                 is = new ByteArrayInputStream JavaDoc(bytes);
132                             } else {
133                                 throw new IllegalArgumentException JavaDoc("JMS message should be a text or bytes message");
134                             }
135                             String JavaDoc contentType = message.getStringProperty(CONTENT_TYPE);
136                             SoapMessage soap = soapHelper.getSoapMarshaler().createReader().read(is, contentType);
137                             NormalizedMessage out = exchange.createMessage();
138                             soapHelper.getJBIMarshaler().toNMS(out, soap);
139                             ((InOut) exchange).setOutMessage(out);
140                         }
141                         channel.send(exchange);
142                     } catch (Throwable JavaDoc e) {
143                         log.error("Error while handling jms message", e);
144                     }
145                 }
146             });
147         } catch (WorkException JavaDoc e) {
148             log.error("Error while handling jms message", e);
149         }
150     }
151
152     public void process(MessageExchange exchange) throws Exception JavaDoc {
153         if (exchange.getStatus() == ExchangeStatus.DONE) {
154             return;
155         } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
156             return;
157         }
158         TextMessage JavaDoc msg = session.createTextMessage();
159         NormalizedMessage nm = exchange.getMessage("in");
160         fromNMS(nm, msg);
161
162         if (exchange instanceof InOnly || exchange instanceof RobustInOnly) {
163             synchronized (producer) {
164                 producer.send(msg);
165             }
166             exchange.setStatus(ExchangeStatus.DONE);
167             channel.send(exchange);
168         } else if (exchange instanceof InOut) {
169             msg.setJMSCorrelationID(exchange.getExchangeId());
170             msg.setJMSReplyTo(replyToDestination);
171             pendingExchanges.put(exchange.getExchangeId(), exchange);
172             try {
173                 synchronized (producer) {
174                     producer.send(msg);
175                 }
176             } catch (Exception JavaDoc e) {
177                 pendingExchanges.remove(exchange.getExchangeId());
178                 throw e;
179             }
180         } else {
181             throw new IllegalStateException JavaDoc(exchange.getPattern() + " not implemented");
182         }
183     }
184
185 }
186
Popular Tags