1 7 package org.jboss.webservice.transport.jms; 8 9 11 import org.jboss.axis.AxisFault; 12 import org.jboss.axis.ConfigurationException; 13 import org.jboss.axis.EngineConfiguration; 14 import org.jboss.axis.MessageContext; 15 import org.jboss.axis.description.OperationDesc; 16 import org.jboss.axis.description.ServiceDesc; 17 import org.jboss.axis.server.AxisServer; 18 import org.jboss.logging.Logger; 19 import org.jboss.mx.util.MBeanServerLocator; 20 import org.jboss.util.NestedRuntimeException; 21 import org.jboss.webservice.AxisServiceMBean; 22 23 import javax.ejb.EJBException ; 24 import javax.ejb.MessageDrivenBean ; 25 import javax.ejb.MessageDrivenContext ; 26 import javax.jms.BytesMessage ; 27 import javax.jms.JMSException ; 28 import javax.jms.Message ; 29 import javax.jms.MessageListener ; 30 import javax.jms.Queue ; 31 import javax.jms.QueueConnection ; 32 import javax.jms.QueueConnectionFactory ; 33 import javax.jms.QueueSender ; 34 import javax.jms.QueueSession ; 35 import javax.jms.Session ; 36 import javax.management.MBeanServer ; 37 import javax.naming.InitialContext ; 38 import javax.xml.namespace.QName ; 39 import javax.xml.soap.SOAPElement ; 40 import javax.xml.soap.SOAPException ; 41 import java.io.ByteArrayInputStream ; 42 import java.io.ByteArrayOutputStream ; 43 import java.io.IOException ; 44 import java.io.InputStream ; 45 import java.util.HashMap ; 46 import java.util.Iterator ; 47 48 49 61 public class DelegatingMessageDrivenBean implements MessageDrivenBean , MessageListener 62 { 63 64 static final long serialVersionUID = -2841152046009542652L; 65 66 protected Logger log = Logger.getLogger(DelegatingMessageDrivenBean.class); 68 69 private MessageDrivenContext mdbCtx; 70 private QueueConnectionFactory queueFactory; 71 72 75 protected AxisServer getAxisServer() 76 { 77 AxisServer axisServer; 78 try 79 { 80 MBeanServer mbeanServer = MBeanServerLocator.locateJBoss(); 81 axisServer = (AxisServer)mbeanServer.getAttribute(AxisServiceMBean.OBJECT_NAME, "AxisServer"); 82 log.debug("got AxisServer: " + axisServer); 83 } 84 catch (Exception e) 85 { 86 throw new RuntimeException ("Cannot obtain axis server", e); 87 } 88 return axisServer; 89 } 90 91 94 public void onMessage(Message message) 95 { 96 try 97 { 98 if (message instanceof BytesMessage ) 99 { 100 processSOAPMessage((BytesMessage )message); 101 } 102 else 103 { 104 log.warn("Ingnore message, because it is not a javax.jms.BytesMessage: " + message); 105 } 106 } 107 catch (Exception e) 108 { 109 throw new EJBException (e); 110 } 111 } 112 113 117 protected void processSOAPMessage(BytesMessage message) throws Exception 118 { 119 InputStream in = null; 120 byte[] buffer = new byte[8 * 1024]; 121 ByteArrayOutputStream out = new ByteArrayOutputStream (buffer.length); 122 try 123 { 124 int read = message.readBytes(buffer); 126 while (read != -1) 127 { 128 out.write(buffer, 0, read); 129 read = message.readBytes(buffer); 130 } 131 in = new ByteArrayInputStream (out.toByteArray()); 132 } 133 catch (Exception e) 134 { 135 log.error("Cannot get bytes from message", e); 136 return; 137 } 138 139 log.debug("onMessage: " + new String (out.toByteArray())); 140 141 AxisServer axisServer = getAxisServer(); 142 143 org.jboss.axis.Message axisRequest = new org.jboss.axis.Message(in); 145 MessageContext msgContext = new MessageContext(axisServer); 146 msgContext.setRequestMessage(axisRequest); 147 148 HashMap serviceOperations = getServiceOperationsMap(axisServer); 151 152 String targetService = null; 154 Iterator it = axisRequest.getSOAPEnvelope().getBody().getChildElements(); 155 while (it.hasNext()) 156 { 157 SOAPElement soapElement = (SOAPElement )it.next(); 158 String namespace = soapElement.getElementName().getURI(); 159 String localName = soapElement.getElementName().getLocalName(); 160 QName qname = new QName (namespace, localName); 161 log.debug("maybe operation: " + qname); 162 targetService = (String )serviceOperations.get(qname); 163 } 164 165 if (targetService != null) 166 { 167 log.debug("setTargetService: " + targetService); 168 msgContext.setTargetService(targetService); 169 } 170 171 org.jboss.axis.Message axisResponse = null; 172 try 173 { 174 axisServer.invoke(msgContext); 175 axisResponse = msgContext.getResponseMessage(); 176 } 177 catch (AxisFault af) 178 { 179 axisResponse = new org.jboss.axis.Message(af); 180 axisResponse.setMessageContext(msgContext); 181 } 182 catch (Exception e) 183 { 184 axisResponse = new org.jboss.axis.Message(new AxisFault(e.toString())); 185 axisResponse.setMessageContext(msgContext); 186 } 187 188 Queue replyQueue = getReplyQueue(message); 189 if (replyQueue != null) 190 sendResponse(replyQueue, axisResponse); 191 } 192 193 196 private HashMap getServiceOperationsMap(AxisServer server) 197 { 198 HashMap serviceOperations = new HashMap (); 199 200 try 201 { 202 EngineConfiguration config = server.getConfig(); 203 Iterator it = config.getDeployedServices(); 204 while (it.hasNext()) 205 { 206 ServiceDesc service = (ServiceDesc)it.next(); 207 log.debug("service: [name=" + service.getName() + ",ns=" + service.getDefaultNamespace() + "]"); 208 Iterator opit = service.getOperations().iterator(); 209 while (opit.hasNext()) 210 { 211 OperationDesc operation = (OperationDesc)opit.next(); 212 QName qname = operation.getElementQName(); 213 log.debug(" operation: [qname=" + qname + "]"); 214 serviceOperations.put(qname, service.getName()); 215 } 216 } 217 } 218 catch (ConfigurationException e) 219 { 220 log.error("Cannot map service operations", e); 221 } 222 223 return serviceOperations; 224 } 225 226 229 protected Queue getReplyQueue(BytesMessage message) 230 throws JMSException 231 { 232 Queue replyQueue = (Queue )message.getJMSReplyTo(); 233 return replyQueue; 234 } 235 236 239 protected void sendResponse(Queue replyQueue, org.jboss.axis.Message axisResponse) 240 throws SOAPException , IOException , JMSException 241 { 242 ByteArrayOutputStream out = new ByteArrayOutputStream (8 * 1024); 243 axisResponse.writeTo(out); 244 245 QueueConnection qc = queueFactory.createQueueConnection(); 246 QueueSession session = qc.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 247 QueueSender sender = null; 248 try 249 { 250 sender = session.createSender(replyQueue); 251 BytesMessage responseMessage = session.createBytesMessage(); 252 responseMessage.writeBytes(out.toByteArray()); 253 sender.send(responseMessage); 254 log.info("Sent response"); 255 } 256 finally 257 { 258 try 259 { 260 sender.close(); 261 } 262 catch (JMSException ignored) 263 { 264 } 265 try 266 { 267 session.close(); 268 } 269 catch (JMSException ignored) 270 { 271 } 272 try 273 { 274 qc.close(); 275 } 276 catch (JMSException ignored) 277 { 278 } 279 } 280 } 281 282 284 public void ejbCreate() 285 { 286 try 287 { 288 InitialContext ctx = new InitialContext (); 289 queueFactory = (QueueConnectionFactory )ctx.lookup("java:/ConnectionFactory"); 290 } 291 catch (Exception e) 292 { 293 throw new NestedRuntimeException(e); 294 } 295 } 296 297 300 public void ejbRemove() throws EJBException 301 { 302 } 303 304 307 public void setMessageDrivenContext(MessageDrivenContext ctx) throws EJBException 308 { 309 this.mdbCtx = ctx; 310 } 311 } 312 | Popular Tags |