1 17 package org.apache.servicemix.jms; 18 19 import java.io.ByteArrayInputStream ; 20 import java.io.ByteArrayOutputStream ; 21 import java.io.InputStream ; 22 import java.util.Date ; 23 import java.util.Hashtable ; 24 import java.util.Iterator ; 25 import java.util.Map ; 26 27 import javax.jbi.messaging.ExchangeStatus; 28 import javax.jbi.messaging.Fault; 29 import javax.jbi.messaging.MessageExchange; 30 import javax.jbi.messaging.NormalizedMessage; 31 import javax.jms.BytesMessage ; 32 import javax.jms.Connection ; 33 import javax.jms.ConnectionFactory ; 34 import javax.jms.Message ; 35 import javax.jms.Session ; 36 import javax.jms.TextMessage ; 37 import javax.naming.InitialContext ; 38 import javax.naming.NamingException ; 39 40 import org.apache.commons.logging.Log; 41 import org.apache.commons.logging.LogFactory; 42 import org.apache.servicemix.JbiConstants; 43 import org.apache.servicemix.common.BaseLifeCycle; 44 import org.apache.servicemix.common.ExchangeProcessor; 45 import org.apache.servicemix.soap.Context; 46 import org.apache.servicemix.soap.SoapFault; 47 import org.apache.servicemix.soap.SoapHelper; 48 import org.apache.servicemix.soap.marshalers.SoapMessage; 49 import org.apache.servicemix.soap.marshalers.SoapWriter; 50 51 public abstract class AbstractJmsProcessor implements ExchangeProcessor { 52 53 public static final String STYLE_QUEUE = "queue"; 54 public static final String STYLE_TOPIC = "topic"; 55 56 public static final String CONTENT_TYPE = "MimeContentType"; 57 58 protected final transient Log log = LogFactory.getLog(getClass()); 59 60 protected JmsEndpoint endpoint; 61 protected Connection connection; 62 protected SoapHelper soapHelper; 63 64 public AbstractJmsProcessor(JmsEndpoint endpoint) { 65 this.endpoint = endpoint; 66 this.soapHelper = new SoapHelper(endpoint); 67 } 68 69 public void start() throws Exception { 70 InitialContext ctx = null; 71 ConnectionFactory connectionFactory = null; 72 try { 73 connectionFactory = getConnectionFactory(); 75 connection = connectionFactory.createConnection(); 76 connection.start(); 77 doStart(ctx); 78 } catch (Exception e) { 79 try { 80 stop(); 81 } catch (Exception inner) { 82 } 84 throw e; 85 } finally { 86 if (ctx != null) { 87 ctx.close(); 88 } 89 } 90 } 91 92 protected ConnectionFactory getConnectionFactory() throws NamingException { 93 InitialContext ctx = null; 94 ConnectionFactory connectionFactory = endpoint.getConnectionFactory(); 96 if (connectionFactory == null && endpoint.getJndiConnectionFactoryName() != null) { 98 ctx = getInitialContext(); 99 connectionFactory = (ConnectionFactory ) ctx.lookup(endpoint.getJndiConnectionFactoryName()); 100 } 101 if (connectionFactory == null && endpoint.getConfiguration().getConnectionFactory() != null) { 103 connectionFactory = endpoint.getConfiguration().getConnectionFactory(); 104 } 105 if (connectionFactory == null && endpoint.getConfiguration().getJndiConnectionFactoryName() != null) { 107 ctx = getInitialContext(); 108 connectionFactory = (ConnectionFactory ) ctx.lookup(endpoint.getConfiguration().getJndiConnectionFactoryName()); 109 } 110 return connectionFactory; 111 } 112 113 protected InitialContext getInitialContext() throws NamingException { 114 Hashtable props = new Hashtable (); 115 if (endpoint.getInitialContextFactory() != null && endpoint.getJndiProviderURL() != null) { 116 props.put(InitialContext.INITIAL_CONTEXT_FACTORY, endpoint.getInitialContextFactory()); 117 props.put(InitialContext.PROVIDER_URL, endpoint.getJndiProviderURL()); 118 return new InitialContext (props); 119 } else if (endpoint.getConfiguration().getJndiInitialContextFactory() != null && 120 endpoint.getConfiguration().getJndiProviderUrl() != null) { 121 props.put(InitialContext.INITIAL_CONTEXT_FACTORY, endpoint.getConfiguration().getJndiInitialContextFactory()); 122 props.put(InitialContext.PROVIDER_URL, endpoint.getConfiguration().getJndiProviderUrl()); 123 return new InitialContext (props); 124 } else { 125 BaseLifeCycle lf = (BaseLifeCycle) endpoint.getServiceUnit().getComponent().getLifeCycle(); 126 return lf.getContext().getNamingContext(); 127 } 128 } 129 130 protected void doStart(InitialContext ctx) throws Exception { 131 } 132 133 public void stop() throws Exception { 134 try { 135 doStop(); 136 if (connection != null) { 137 connection.close(); 138 } 139 } finally { 140 connection = null; 141 } 142 } 143 144 protected void doStop() throws Exception { 145 } 146 147 protected void fromNMS(NormalizedMessage nm, TextMessage msg) throws Exception { 148 Map headers = (Map ) nm.getProperty(JbiConstants.PROTOCOL_HEADERS); 149 SoapMessage soap = new SoapMessage(); 150 soapHelper.getJBIMarshaler().fromNMS(soap, nm); 151 fromNMS(soap, msg, headers); 152 } 153 154 protected void fromNMS(SoapMessage soap, TextMessage msg, Map headers) throws Exception { 155 ByteArrayOutputStream baos = new ByteArrayOutputStream (); 156 SoapWriter writer = soapHelper.getSoapMarshaler().createWriter(soap); 157 writer.write(baos); 158 msg.setText(baos.toString()); 159 if (headers != null) { 160 for (Iterator it = headers.keySet().iterator(); it.hasNext();) { 161 String name = (String ) it.next(); 162 Object value = headers.get(name); 163 if (shouldIncludeHeader(name, value)) { 164 msg.setObjectProperty(name, value); 165 } 166 } 167 } 168 msg.setStringProperty(CONTENT_TYPE, writer.getContentType()); 171 } 172 173 protected Context createContext() { 174 return soapHelper.createContext(); 175 } 176 177 protected MessageExchange toNMS(Message message, Context context) throws Exception { 178 InputStream is = null; 179 if (message instanceof TextMessage ) { 180 is = new ByteArrayInputStream (((TextMessage ) message).getText().getBytes()); 181 } else if (message instanceof BytesMessage ) { 182 int length = (int) ((BytesMessage ) message).getBodyLength(); 183 byte[] bytes = new byte[length]; 184 ((BytesMessage ) message).readBytes(bytes); 185 is = new ByteArrayInputStream (bytes); 186 } else { 187 throw new IllegalArgumentException ("JMS message should be a text or bytes message"); 188 } 189 String contentType = message.getStringProperty(CONTENT_TYPE); 190 SoapMessage soap = soapHelper.getSoapMarshaler().createReader().read(is, contentType); 191 context.setInMessage(soap); 192 context.setProperty(Message.class.getName(), message); 193 MessageExchange exchange = soapHelper.onReceive(context); 194 return exchange; 197 } 198 199 protected Message fromNMSResponse(MessageExchange exchange, Context context, Session session) throws Exception { 200 Message response = null; 201 if (exchange.getStatus() == ExchangeStatus.ERROR) { 202 Exception e = exchange.getError(); 203 if (e == null) { 204 e = new Exception ("Unkown error"); 205 } 206 response = session.createObjectMessage(e); 207 } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) { 208 Fault jbiFault = exchange.getFault(); 209 if (jbiFault != null) { 210 SoapFault fault = new SoapFault(SoapFault.RECEIVER, null, null, null, jbiFault.getContent()); 211 SoapMessage soapFault = soapHelper.onFault(context, fault); 212 TextMessage txt = session.createTextMessage(); 213 fromNMS(soapFault, txt, (Map ) jbiFault.getProperty(JbiConstants.PROTOCOL_HEADERS)); 214 response = txt; 215 } else { 216 NormalizedMessage outMsg = exchange.getMessage("out"); 217 if (outMsg != null) { 218 SoapMessage out = soapHelper.onReply(context, outMsg); 219 TextMessage txt = session.createTextMessage(); 220 fromNMS(out, txt, (Map ) outMsg.getProperty(JbiConstants.PROTOCOL_HEADERS)); 221 response = txt; 222 } 223 } 224 } 225 return response; 226 } 227 228 private boolean shouldIncludeHeader(String name, Object value) { 229 return (value instanceof String || value instanceof Number || value instanceof Date ) 230 && (!endpoint.isNeedJavaIdentifiers() || isJavaIdentifier(name)); 231 } 232 233 private static boolean isJavaIdentifier(String s) { 234 int n = s.length(); 235 if (n == 0) 236 return false; 237 if (!Character.isJavaIdentifierStart(s.charAt(0))) 238 return false; 239 for (int i = 1; i < n; i++) 240 if (!Character.isJavaIdentifierPart(s.charAt(i))) 241 return false; 242 return true; 243 } 244 245 } 246 | Popular Tags |