1 7 package org.jboss.webservice.transport.jms; 8 9 11 import org.jboss.axis.AxisFault; 12 import org.jboss.axis.MessageContext; 13 import org.jboss.axis.configuration.SimpleProvider; 14 import org.jboss.axis.handlers.soap.SOAPService; 15 import org.jboss.axis.server.AxisServer; 16 import org.jboss.logging.Logger; 17 import org.jboss.util.NestedRuntimeException; 18 import org.jboss.webservice.server.InvokerProviderSimple; 19 20 import javax.ejb.EJBException ; 21 import javax.ejb.MessageDrivenBean ; 22 import javax.ejb.MessageDrivenContext ; 23 import javax.jms.BytesMessage ; 24 import javax.jms.JMSException ; 25 import javax.jms.Message ; 26 import javax.jms.MessageListener ; 27 import javax.jms.Queue ; 28 import javax.jms.QueueConnection ; 29 import javax.jms.QueueConnectionFactory ; 30 import javax.jms.QueueSender ; 31 import javax.jms.QueueSession ; 32 import javax.jms.Session ; 33 import javax.naming.InitialContext ; 34 import javax.xml.soap.SOAPException ; 35 import java.io.ByteArrayInputStream ; 36 import java.io.ByteArrayOutputStream ; 37 import java.io.IOException ; 38 import java.io.InputStream ; 39 40 46 public abstract class JMSTransportSupport implements MessageDrivenBean , MessageListener 47 { 48 49 static final long serialVersionUID = -6224491411234603413L; 50 protected Logger log = Logger.getLogger(JMSTransportSupport.class); 52 53 private QueueConnectionFactory queueFactory; 55 56 59 protected AxisServer getAxisServer() 60 { 61 AxisServer axisServer; 62 try 63 { 64 SimpleProvider config = new SimpleProvider(); 65 SOAPService service = new SOAPService(new InvokerProviderSimple(this)); 66 config.deployService(getServiceName(), service); 67 axisServer = new AxisServer(config); 68 log.debug("got AxisServer: " + axisServer); 69 } 70 catch (Exception e) 71 { 72 throw new RuntimeException ("Cannot obtain axis server", e); 73 } 74 return axisServer; 75 } 76 77 80 protected String getServiceName() 81 { 82 return "jms-service"; 83 } 84 85 88 public void onMessage(Message message) 89 { 90 try 91 { 92 if (message instanceof BytesMessage ) 93 { 94 processSOAPMessage((BytesMessage )message); 95 } 96 else 97 { 98 log.warn("Ingnore message, because it is not a javax.jms.BytesMessage: " + message); 99 } 100 } 101 catch (Exception e) 102 { 103 throw new EJBException (e); 104 } 105 } 106 107 111 protected void processSOAPMessage(BytesMessage message) throws Exception 112 { 113 InputStream in = null; 114 byte[] buffer = new byte[8 * 1024]; 115 ByteArrayOutputStream out = new ByteArrayOutputStream (buffer.length); 116 try 117 { 118 int read = message.readBytes(buffer); 120 while (read != -1) 121 { 122 out.write(buffer, 0, read); 123 read = message.readBytes(buffer); 124 } 125 in = new ByteArrayInputStream (out.toByteArray()); 126 } 127 catch (Exception e) 128 { 129 log.error("Cannot get bytes from message", e); 130 return; 131 } 132 133 log.debug("onMessage: " + new String (out.toByteArray())); 134 135 org.jboss.axis.Message axisRequest = new org.jboss.axis.Message(in); 137 MessageContext msgContext = new MessageContext(getAxisServer()); 138 msgContext.setRequestMessage(axisRequest); 139 msgContext.setTargetService(getServiceName()); 140 141 org.jboss.axis.Message axisResponse = null; 142 try 143 { 144 getAxisServer().invoke(msgContext); 145 axisResponse = msgContext.getResponseMessage(); 146 } 147 catch (AxisFault af) 148 { 149 axisResponse = new org.jboss.axis.Message(af); 150 axisResponse.setMessageContext(msgContext); 151 } 152 catch (Exception e) 153 { 154 axisResponse = new org.jboss.axis.Message(new AxisFault(e.toString())); 155 axisResponse.setMessageContext(msgContext); 156 } 157 158 Queue replyQueue = getReplyQueue(message); 159 if (replyQueue != null) 160 sendResponse(replyQueue, axisResponse); 161 } 162 163 166 protected Queue getReplyQueue(BytesMessage message) 167 throws JMSException 168 { 169 Queue replyQueue = (Queue )message.getJMSReplyTo(); 170 return replyQueue; 171 } 172 173 176 protected void sendResponse(Queue replyQueue, org.jboss.axis.Message axisResponse) 177 throws SOAPException , IOException , JMSException 178 { 179 ByteArrayOutputStream out = new ByteArrayOutputStream (8 * 1024); 180 axisResponse.writeTo(out); 181 182 QueueConnection qc = queueFactory.createQueueConnection(); 183 QueueSession session = qc.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 184 QueueSender sender = null; 185 try 186 { 187 sender = session.createSender(replyQueue); 188 BytesMessage responseMessage = session.createBytesMessage(); 189 responseMessage.writeBytes(out.toByteArray()); 190 sender.send(responseMessage); 191 log.info("Sent response"); 192 } 193 finally 194 { 195 try 196 { 197 sender.close(); 198 } 199 catch (JMSException ignored) 200 { 201 } 202 try 203 { 204 session.close(); 205 } 206 catch (JMSException ignored) 207 { 208 } 209 try 210 { 211 qc.close(); 212 } 213 catch (JMSException ignored) 214 { 215 } 216 } 217 } 218 219 221 public void ejbCreate() 222 { 223 try 224 { 225 InitialContext ctx = new InitialContext (); 226 queueFactory = (QueueConnectionFactory )ctx.lookup("java:/ConnectionFactory"); 227 } 228 catch (Exception e) 229 { 230 throw new NestedRuntimeException(e); 231 } 232 } 233 234 237 public void ejbRemove() throws EJBException 238 { 239 } 240 241 244 public void setMessageDrivenContext(MessageDrivenContext ctx) throws EJBException 245 { 246 } 248 } 249 | Popular Tags |