KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > jms > standard > StandardProviderProcessor


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.jms.standard;
18
19 import java.io.ByteArrayInputStream JavaDoc;
20 import java.io.InputStream JavaDoc;
21
22 import javax.jbi.messaging.DeliveryChannel;
23 import javax.jbi.messaging.ExchangeStatus;
24 import javax.jbi.messaging.InOnly;
25 import javax.jbi.messaging.InOut;
26 import javax.jbi.messaging.MessageExchange;
27 import javax.jbi.messaging.NormalizedMessage;
28 import javax.jbi.messaging.RobustInOnly;
29 import javax.jms.BytesMessage JavaDoc;
30 import javax.jms.Destination JavaDoc;
31 import javax.jms.Message JavaDoc;
32 import javax.jms.MessageConsumer JavaDoc;
33 import javax.jms.MessageProducer JavaDoc;
34 import javax.jms.ObjectMessage JavaDoc;
35 import javax.jms.Queue JavaDoc;
36 import javax.jms.Session JavaDoc;
37 import javax.jms.TextMessage JavaDoc;
38 import javax.naming.InitialContext JavaDoc;
39
40 import org.apache.servicemix.jms.AbstractJmsProcessor;
41 import org.apache.servicemix.jms.JmsEndpoint;
42 import org.apache.servicemix.soap.marshalers.SoapMessage;
43
44 public class StandardProviderProcessor extends AbstractJmsProcessor {
45
46     protected Destination JavaDoc destination;
47     protected DeliveryChannel channel;
48     
49     public StandardProviderProcessor(JmsEndpoint endpoint) {
50         super(endpoint);
51     }
52
53     protected void doStart(InitialContext JavaDoc ctx) throws Exception JavaDoc {
54         channel = endpoint.getServiceUnit().getComponent().getComponentContext().getDeliveryChannel();
55         destination = endpoint.getDestination();
56         if (destination == null) {
57             if (endpoint.getJndiDestinationName() != null) {
58                 destination = (Destination JavaDoc) ctx.lookup(endpoint.getJndiDestinationName());
59             } else if (endpoint.getJmsProviderDestinationName() == null) {
60                 throw new IllegalStateException JavaDoc("No destination provided");
61             }
62         }
63     }
64
65     protected void doStop() throws Exception JavaDoc {
66         destination = null;
67     }
68
69     public void process(MessageExchange exchange) throws Exception JavaDoc {
70         if (exchange.getStatus() == ExchangeStatus.DONE) {
71             return;
72         } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
73             return;
74         }
75         Session JavaDoc session = null;
76         try {
77             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
78             if (destination == null) {
79                 if (STYLE_QUEUE.equals(endpoint.getDestinationStyle())) {
80                     destination = session.createQueue(endpoint.getJmsProviderDestinationName());
81                 } else {
82                     destination = session.createTopic(endpoint.getJmsProviderDestinationName());
83                 }
84             }
85             MessageProducer JavaDoc producer = session.createProducer(destination);
86             
87             TextMessage JavaDoc msg = session.createTextMessage();
88             NormalizedMessage nm = exchange.getMessage("in");
89             fromNMS(nm, msg);
90     
91             if (exchange instanceof InOnly || exchange instanceof RobustInOnly) {
92                 producer.send(msg);
93             } else if (exchange instanceof InOut) {
94                 Destination JavaDoc replyToDestination;
95                 if (destination instanceof Queue JavaDoc) {
96                     replyToDestination = session.createTemporaryQueue();
97                 } else {
98                     replyToDestination = session.createTemporaryTopic();
99                 }
100                 MessageConsumer JavaDoc consumer = session.createConsumer(replyToDestination);
101                 msg.setJMSCorrelationID(exchange.getExchangeId());
102                 msg.setJMSReplyTo(replyToDestination);
103                 producer.send(msg);
104                 Message message = consumer.receive();
105                 if (message instanceof ObjectMessage JavaDoc) {
106                     Object JavaDoc o = ((ObjectMessage JavaDoc) message).getObject();
107                     if (o instanceof Exception JavaDoc) {
108                         exchange.setError((Exception JavaDoc) o);
109                     } else {
110                         throw new UnsupportedOperationException JavaDoc("Can not handle objects of type " + o.getClass().getName());
111                     }
112                 } else {
113                     InputStream JavaDoc is = null;
114                     if (message instanceof TextMessage JavaDoc) {
115                         is = new ByteArrayInputStream JavaDoc(((TextMessage JavaDoc) message).getText().getBytes());
116                     } else if (message instanceof BytesMessage JavaDoc) {
117                         int length = (int) ((BytesMessage JavaDoc) message).getBodyLength();
118                         byte[] bytes = new byte[length];
119                         ((BytesMessage JavaDoc) message).readBytes(bytes);
120                         is = new ByteArrayInputStream JavaDoc(bytes);
121                     } else {
122                         throw new IllegalArgumentException JavaDoc("JMS message should be a text or bytes message");
123                     }
124                     String JavaDoc contentType = message.getStringProperty(CONTENT_TYPE);
125                     SoapMessage soap = soapHelper.getSoapMarshaler().createReader().read(is, contentType);
126                     NormalizedMessage out = exchange.createMessage();
127                     soapHelper.getJBIMarshaler().toNMS(out, soap);
128                     ((InOut) exchange).setOutMessage(out);
129                 }
130                 channel.send(exchange);
131             } else {
132                 throw new IllegalStateException JavaDoc(exchange.getPattern() + " not implemented");
133             }
134         } finally {
135             if (session != null) {
136                 session.close();
137             }
138         }
139     }
140
141 }
142
Popular Tags