KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > jms > AbstractJmsProcessor


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.jms;
18
19 import java.io.ByteArrayInputStream JavaDoc;
20 import java.io.ByteArrayOutputStream JavaDoc;
21 import java.io.InputStream JavaDoc;
22 import java.util.Date JavaDoc;
23 import java.util.Hashtable JavaDoc;
24 import java.util.Iterator JavaDoc;
25 import java.util.Map JavaDoc;
26
27 import javax.jbi.messaging.ExchangeStatus;
28 import javax.jbi.messaging.Fault;
29 import javax.jbi.messaging.MessageExchange;
30 import javax.jbi.messaging.NormalizedMessage;
31 import javax.jms.BytesMessage JavaDoc;
32 import javax.jms.Connection JavaDoc;
33 import javax.jms.ConnectionFactory JavaDoc;
34 import javax.jms.Message JavaDoc;
35 import javax.jms.Session JavaDoc;
36 import javax.jms.TextMessage JavaDoc;
37 import javax.naming.InitialContext JavaDoc;
38 import javax.naming.NamingException JavaDoc;
39
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.servicemix.JbiConstants;
43 import org.apache.servicemix.common.BaseLifeCycle;
44 import org.apache.servicemix.common.ExchangeProcessor;
45 import org.apache.servicemix.soap.Context;
46 import org.apache.servicemix.soap.SoapFault;
47 import org.apache.servicemix.soap.SoapHelper;
48 import org.apache.servicemix.soap.marshalers.SoapMessage;
49 import org.apache.servicemix.soap.marshalers.SoapWriter;
50
51 public abstract class AbstractJmsProcessor implements ExchangeProcessor {
52
53     public static final String JavaDoc STYLE_QUEUE = "queue";
54     public static final String JavaDoc STYLE_TOPIC = "topic";
55     
56     public static final String JavaDoc CONTENT_TYPE = "MimeContentType";
57
58     protected final transient Log log = LogFactory.getLog(getClass());
59     
60     protected JmsEndpoint endpoint;
61     protected Connection JavaDoc connection;
62     protected SoapHelper soapHelper;
63
64     public AbstractJmsProcessor(JmsEndpoint endpoint) {
65         this.endpoint = endpoint;
66         this.soapHelper = new SoapHelper(endpoint);
67     }
68
69     public void start() throws Exception JavaDoc {
70         InitialContext JavaDoc ctx = null;
71         ConnectionFactory JavaDoc connectionFactory = null;
72         try {
73             // First check configured connectionFactory on the endpoint
74
connectionFactory = getConnectionFactory();
75             connection = connectionFactory.createConnection();
76             connection.start();
77             doStart(ctx);
78         } catch (Exception JavaDoc e) {
79             try {
80                 stop();
81             } catch (Exception JavaDoc inner) {
82                 // TODO: log
83
}
84             throw e;
85         } finally {
86             if (ctx != null) {
87                 ctx.close();
88             }
89         }
90     }
91     
92     protected ConnectionFactory JavaDoc getConnectionFactory() throws NamingException JavaDoc {
93         InitialContext JavaDoc ctx = null;
94         // First check configured connectionFactory on the endpoint
95
ConnectionFactory JavaDoc connectionFactory = endpoint.getConnectionFactory();
96         // Then, check for jndi connection factory name on the endpoint
97
if (connectionFactory == null && endpoint.getJndiConnectionFactoryName() != null) {
98             ctx = getInitialContext();
99             connectionFactory = (ConnectionFactory JavaDoc) ctx.lookup(endpoint.getJndiConnectionFactoryName());
100         }
101         // Check for a configured connectionFactory on the configuration
102
if (connectionFactory == null && endpoint.getConfiguration().getConnectionFactory() != null) {
103             connectionFactory = endpoint.getConfiguration().getConnectionFactory();
104         }
105         // Check for jndi connection factory name on the configuration
106
if (connectionFactory == null && endpoint.getConfiguration().getJndiConnectionFactoryName() != null) {
107             ctx = getInitialContext();
108             connectionFactory = (ConnectionFactory JavaDoc) ctx.lookup(endpoint.getConfiguration().getJndiConnectionFactoryName());
109         }
110         return connectionFactory;
111     }
112
113     protected InitialContext JavaDoc getInitialContext() throws NamingException JavaDoc {
114         Hashtable JavaDoc props = new Hashtable JavaDoc();
115         if (endpoint.getInitialContextFactory() != null && endpoint.getJndiProviderURL() != null) {
116             props.put(InitialContext.INITIAL_CONTEXT_FACTORY, endpoint.getInitialContextFactory());
117             props.put(InitialContext.PROVIDER_URL, endpoint.getJndiProviderURL());
118             return new InitialContext JavaDoc(props);
119         } else if (endpoint.getConfiguration().getJndiInitialContextFactory() != null &&
120                    endpoint.getConfiguration().getJndiProviderUrl() != null) {
121             props.put(InitialContext.INITIAL_CONTEXT_FACTORY, endpoint.getConfiguration().getJndiInitialContextFactory());
122             props.put(InitialContext.PROVIDER_URL, endpoint.getConfiguration().getJndiProviderUrl());
123             return new InitialContext JavaDoc(props);
124         } else {
125             BaseLifeCycle lf = (BaseLifeCycle) endpoint.getServiceUnit().getComponent().getLifeCycle();
126             return lf.getContext().getNamingContext();
127         }
128     }
129
130     protected void doStart(InitialContext JavaDoc ctx) throws Exception JavaDoc {
131     }
132
133     public void stop() throws Exception JavaDoc {
134         try {
135             doStop();
136             if (connection != null) {
137                 connection.close();
138             }
139         } finally {
140             connection = null;
141         }
142     }
143
144     protected void doStop() throws Exception JavaDoc {
145     }
146     
147     protected void fromNMS(NormalizedMessage nm, TextMessage JavaDoc msg) throws Exception JavaDoc {
148         Map JavaDoc headers = (Map JavaDoc) nm.getProperty(JbiConstants.PROTOCOL_HEADERS);
149         SoapMessage soap = new SoapMessage();
150         soapHelper.getJBIMarshaler().fromNMS(soap, nm);
151         fromNMS(soap, msg, headers);
152     }
153     
154     protected void fromNMS(SoapMessage soap, TextMessage JavaDoc msg, Map JavaDoc headers) throws Exception JavaDoc {
155         ByteArrayOutputStream JavaDoc baos = new ByteArrayOutputStream JavaDoc();
156         SoapWriter writer = soapHelper.getSoapMarshaler().createWriter(soap);
157         writer.write(baos);
158         msg.setText(baos.toString());
159         if (headers != null) {
160             for (Iterator JavaDoc it = headers.keySet().iterator(); it.hasNext();) {
161                 String JavaDoc name = (String JavaDoc) it.next();
162                 Object JavaDoc value = headers.get(name);
163                 if (shouldIncludeHeader(name, value)) {
164                     msg.setObjectProperty(name, value);
165                 }
166             }
167         }
168         // overwrite whatever content-type was passed on to us with the one
169
// the SoapWriter constructed
170
msg.setStringProperty(CONTENT_TYPE, writer.getContentType());
171     }
172     
173     protected Context JavaDoc createContext() {
174         return soapHelper.createContext();
175     }
176     
177     protected MessageExchange toNMS(Message message, Context JavaDoc context) throws Exception JavaDoc {
178         InputStream JavaDoc is = null;
179         if (message instanceof TextMessage JavaDoc) {
180             is = new ByteArrayInputStream JavaDoc(((TextMessage JavaDoc) message).getText().getBytes());
181         } else if (message instanceof BytesMessage JavaDoc) {
182             int length = (int) ((BytesMessage JavaDoc) message).getBodyLength();
183             byte[] bytes = new byte[length];
184             ((BytesMessage JavaDoc) message).readBytes(bytes);
185             is = new ByteArrayInputStream JavaDoc(bytes);
186         } else {
187             throw new IllegalArgumentException JavaDoc("JMS message should be a text or bytes message");
188         }
189         String JavaDoc contentType = message.getStringProperty(CONTENT_TYPE);
190         SoapMessage soap = soapHelper.getSoapMarshaler().createReader().read(is, contentType);
191         context.setInMessage(soap);
192         context.setProperty(Message.class.getName(), message);
193         MessageExchange exchange = soapHelper.onReceive(context);
194         // TODO: copy protocol messages
195
//inMessage.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(message));
196
return exchange;
197     }
198     
199     protected Message fromNMSResponse(MessageExchange exchange, Context JavaDoc context, Session JavaDoc session) throws Exception JavaDoc {
200         Message response = null;
201         if (exchange.getStatus() == ExchangeStatus.ERROR) {
202             Exception JavaDoc e = exchange.getError();
203             if (e == null) {
204                 e = new Exception JavaDoc("Unkown error");
205             }
206             response = session.createObjectMessage(e);
207         } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
208             Fault jbiFault = exchange.getFault();
209             if (jbiFault != null) {
210                 SoapFault fault = new SoapFault(SoapFault.RECEIVER, null, null, null, jbiFault.getContent());
211                 SoapMessage soapFault = soapHelper.onFault(context, fault);
212                 TextMessage JavaDoc txt = session.createTextMessage();
213                 fromNMS(soapFault, txt, (Map JavaDoc) jbiFault.getProperty(JbiConstants.PROTOCOL_HEADERS));
214                 response = txt;
215             } else {
216                 NormalizedMessage outMsg = exchange.getMessage("out");
217                 if (outMsg != null) {
218                     SoapMessage out = soapHelper.onReply(context, outMsg);
219                     TextMessage JavaDoc txt = session.createTextMessage();
220                     fromNMS(out, txt, (Map JavaDoc) outMsg.getProperty(JbiConstants.PROTOCOL_HEADERS));
221                     response = txt;
222                 }
223             }
224         }
225         return response;
226     }
227
228     private boolean shouldIncludeHeader(String JavaDoc name, Object JavaDoc value) {
229         return (value instanceof String JavaDoc || value instanceof Number JavaDoc || value instanceof Date JavaDoc)
230                         && (!endpoint.isNeedJavaIdentifiers() || isJavaIdentifier(name));
231     }
232
233     private static boolean isJavaIdentifier(String JavaDoc s) {
234         int n = s.length();
235         if (n == 0)
236             return false;
237         if (!Character.isJavaIdentifierStart(s.charAt(0)))
238             return false;
239         for (int i = 1; i < n; i++)
240             if (!Character.isJavaIdentifierPart(s.charAt(i)))
241                 return false;
242         return true;
243     }
244     
245 }
246
Popular Tags