1 package org.objectweb.celtix.bus.transports.jms; 2 3 import java.io.ByteArrayInputStream ; 4 import java.io.ByteArrayOutputStream ; 5 import java.io.IOException ; 6 7 import java.util.Calendar ; 8 import java.util.GregorianCalendar ; 9 import java.util.SimpleTimeZone ; 10 import java.util.TimeZone ; 11 import java.util.concurrent.Executor ; 12 import java.util.concurrent.RejectedExecutionException ; 13 import java.util.logging.Level ; 14 import java.util.logging.Logger ; 15 16 import javax.jms.JMSException ; 17 import javax.jms.Message ; 18 import javax.jms.Queue ; 19 import javax.jms.QueueSender ; 20 import javax.jms.TextMessage ; 21 import javax.naming.NamingException ; 22 import javax.wsdl.WSDLException; 23 import javax.xml.ws.handler.MessageContext; 24 25 import org.objectweb.celtix.Bus; 26 import org.objectweb.celtix.BusEvent; 27 import org.objectweb.celtix.BusEventListener; 28 import org.objectweb.celtix.BusException; 29 import org.objectweb.celtix.bus.busimpl.ComponentCreatedEvent; 30 import org.objectweb.celtix.bus.busimpl.ComponentRemovedEvent; 31 import org.objectweb.celtix.bus.configuration.ConfigurationEvent; 32 import org.objectweb.celtix.bus.management.counters.TransportServerCounters; 33 import org.objectweb.celtix.common.logging.LogUtils; 34 import org.objectweb.celtix.configuration.Configuration; 35 import org.objectweb.celtix.context.OutputStreamMessageContext; 36 import org.objectweb.celtix.transports.ServerTransport; 37 import org.objectweb.celtix.transports.ServerTransportCallback; 38 import org.objectweb.celtix.transports.jms.JMSServerBehaviorPolicyType; 39 import org.objectweb.celtix.transports.jms.context.JMSMessageHeadersType; 40 import org.objectweb.celtix.ws.addressing.EndpointReferenceType; 41 42 43 public class JMSServerTransport extends JMSTransportBase 44 implements ServerTransport, BusEventListener { 45 static final Logger LOG = LogUtils.getL7dLogger(JMSServerTransport.class); 46 private static final String JMS_SERVER_TRANSPORT_MESSAGE = 47 JMSServerTransport.class.getName() + ".IncomingMessage"; 48 49 ServerTransportCallback callback; 50 TransportServerCounters counters; 51 private PooledSession listenerSession; 52 private Thread listenerThread; 53 private JMSServerBehaviorPolicyType serverBehaviourPolicy; 54 55 56 57 public JMSServerTransport(Bus b, EndpointReferenceType address) 58 throws WSDLException { 59 super(b, address, true); 60 serverBehaviourPolicy = getServerPolicy(configuration); 61 counters = new TransportServerCounters("JMSServerTranpsort"); 62 entry("JMSServerTransport Constructor"); 63 bus.sendEvent(new ComponentCreatedEvent(this)); 64 } 65 66 private JMSServerBehaviorPolicyType getServerPolicy(Configuration conf) { 67 JMSServerBehaviorPolicyType pol = conf.getObject(JMSServerBehaviorPolicyType.class, "jmsServer"); 68 if (pol == null) { 69 pol = new JMSServerBehaviorPolicyType(); 70 } 71 return pol; 72 } 73 74 public JMSServerBehaviorPolicyType getJMSServerBehaviourPolicy() { 75 return serverBehaviourPolicy; 76 } 77 78 public void activate(ServerTransportCallback transportCB) throws IOException { 79 entry("JMSServerTransport activate().... "); 80 callback = transportCB; 81 82 try { 83 LOG.log(Level.FINE, "establishing JMS connection"); 84 JMSProviderHub.connect(this); 85 86 listenerSession = sessionFactory.get(targetDestination); 88 listenerThread = new JMSListenerThread(listenerSession, this); 89 listenerThread.start(); 90 } catch (JMSException ex) { 91 LOG.log(Level.FINE, "JMS connect failed with JMSException : ", ex); 92 throw new IOException (ex.getMessage()); 93 } catch (NamingException nex) { 94 LOG.log(Level.FINE, "JMS connect failed with NamingException : ", nex); 95 throw new IOException (nex.getMessage()); 96 } 97 } 98 99 public OutputStreamMessageContext rebase(MessageContext context, 100 EndpointReferenceType decoupledResponseEndpoint) 101 throws IOException { 102 OutputStreamMessageContext octx = new JMSOutputStreamContext(context); 103 104 String replyTo = decoupledResponseEndpoint.getAddress().getValue(); 105 replyTo = replyTo.substring(replyTo.indexOf('#') + 1); 106 octx.put(JMSConstants.JMS_REBASED_REPLY_TO, replyTo); 107 return octx; 108 } 109 110 public OutputStreamMessageContext createOutputStreamContext(MessageContext context) throws IOException { 111 return new JMSOutputStreamContext(context); 112 } 113 114 public void finalPrepareOutputStreamContext(OutputStreamMessageContext context) throws IOException { 115 } 116 117 public void deactivate() throws IOException { 118 try { 119 listenerSession.consumer().close(); 120 if (listenerThread != null) { 121 listenerThread.join(); 122 } 123 sessionFactory.shutdown(); 124 } catch (InterruptedException e) { 125 } catch (JMSException ex) { 127 } 129 } 130 131 public void shutdown() { 132 entry("JMSServerTransport shutdown()"); 133 try { 134 this.deactivate(); 135 } catch (IOException ex) { 136 } 138 bus.sendEvent(new ComponentRemovedEvent(this)); 139 } 140 141 public void postDispatch(MessageContext bindingContext, OutputStreamMessageContext context) 142 throws IOException { 143 144 Message message = (Message )bindingContext.get(JMS_SERVER_TRANSPORT_MESSAGE); 145 PooledSession replySession = null; 146 counters.getRequestTotal().increase(); 148 149 if (!context.isOneWay()) { 150 if (queueDestinationStyle) { 151 try { 152 Queue replyTo = getReplyToDestination(context, message); 154 replySession = sessionFactory.get(false); 155 156 Message reply = marshalResponse(message, context, replySession); 157 setReplyCorrelationID(message, reply); 158 159 QueueSender sender = (QueueSender )replySession.producer(); 160 161 sendResponse(context, message, reply, sender, replyTo); 162 163 } catch (JMSException ex) { 164 LOG.log(Level.WARNING, "Failed in post dispatch ...", ex); 165 counters.getTotalError().increase(); 166 throw new IOException (ex.getMessage()); 167 } catch (NamingException nex) { 168 LOG.log(Level.WARNING, "Failed in post dispatch ...", nex); 169 counters.getTotalError().increase(); 170 throw new IOException (nex.getMessage()); 171 } finally { 172 if (replySession != null) { 174 sessionFactory.recycle(replySession); 175 } 176 } 177 } else { 178 LOG.log(Level.WARNING, 183 "discarding reply for non-oneway invocation ", 184 "with 'topic' destinationStyle"); 185 counters.getTotalError().increase(); 186 } 187 } else { 188 counters.getRequestOneWay().increase(); 190 } 191 } 192 193 public Queue getReplyToDestination(OutputStreamMessageContext context, Message message) 194 throws JMSException , NamingException { 195 Queue replyTo; 196 if (context.get(JMSConstants.JMS_REBASED_REPLY_TO) != null) { 198 replyTo = sessionFactory.getQueueFromInitialContext( 199 (String ) context.get(JMSConstants.JMS_REBASED_REPLY_TO)); 200 } else { 201 replyTo = (null != message.getJMSReplyTo()) 202 ? (Queue )message.getJMSReplyTo() : (Queue )replyDestination; 203 } 204 205 return replyTo; 206 } 207 208 public Message marshalResponse(Message message, 209 OutputStreamMessageContext context, 210 PooledSession replySession) throws JMSException { 211 212 Message reply; 213 boolean textPayload = message instanceof TextMessage 214 ? true : false; 215 if (textPayload) { 216 reply = marshal(context.getOutputStream().toString(), 217 replySession.session(), 218 null, 219 JMSConstants.TEXT_MESSAGE_TYPE); 220 } else { 221 reply = marshal(((ByteArrayOutputStream ) context.getOutputStream()).toByteArray(), 222 replySession.session(), 223 null, 224 JMSConstants.BINARY_MESSAGE_TYPE); 225 } 226 227 return reply; 228 } 229 230 public void setReplyCorrelationID(Message message, Message reply) 231 throws JMSException { 232 String correlationID = message.getJMSCorrelationID(); 233 234 if (correlationID == null 235 || "".equals(correlationID) 236 && serverBehaviourPolicy.isUseMessageIDAsCorrelationID()) { 237 correlationID = message.getJMSMessageID(); 238 } 239 240 if (correlationID != null && !"".equals(correlationID)) { 241 reply.setJMSCorrelationID(correlationID); 242 } 243 } 244 245 246 public void sendResponse(OutputStreamMessageContext context, 247 Message request, 248 Message reply, 249 QueueSender sender, 250 Queue replyTo) 251 throws JMSException { 252 JMSMessageHeadersType headers = 253 (JMSMessageHeadersType) context.get(JMSConstants.JMS_SERVER_HEADERS); 254 255 int deliveryMode = getJMSDeliveryMode(headers); 256 int priority = getJMSPriority(headers); 257 long ttl = getTimeToLive(headers); 258 259 setMessageProperties(headers, reply); 260 261 LOG.log(Level.FINE, "server sending reply: ", reply); 262 263 long timeToLive = 0; 264 if (request.getJMSExpiration() > 0) { 265 TimeZone tz = new SimpleTimeZone (0, "GMT"); 266 Calendar cal = new GregorianCalendar (tz); 267 timeToLive = request.getJMSExpiration() - cal.getTimeInMillis(); 268 } 269 270 if (timeToLive >= 0) { 271 ttl = ttl > 0 ? ttl : timeToLive; 272 sender.send(replyTo, reply, deliveryMode, priority, ttl); 273 } else { 274 LOG.log(Level.INFO, "Message time to live is already expired skipping response."); 275 } 276 } 277 278 279 284 protected void incoming(Message message) throws IOException { 285 try { 286 LOG.log(Level.FINE, "server received request: ", message); 287 288 289 String msgType = message instanceof TextMessage 290 ? JMSConstants.TEXT_MESSAGE_TYPE : JMSConstants.BINARY_MESSAGE_TYPE; 291 Object request = unmarshal(message, msgType); 292 293 byte[] bytes = null; 294 295 if (JMSConstants.TEXT_MESSAGE_TYPE.equals(msgType)) { 296 String requestString = (String )request; 297 LOG.log(Level.FINE, "server received request: ", requestString); 298 bytes = requestString.getBytes(); 299 } else { 300 bytes = (byte[])request; 301 } 302 303 JMSInputStreamContext context = new JMSInputStreamContext(new ByteArrayInputStream (bytes)); 304 populateIncomingContext(message, context, JMSConstants.JMS_SERVER_HEADERS); 305 306 307 context.put(JMS_SERVER_TRANSPORT_MESSAGE, message); 308 callback.dispatch(context, this); 309 310 } catch (JMSException jmsex) { 311 throw new IOException (jmsex.getMessage()); 313 } 314 } 315 316 class JMSListenerThread extends Thread { 317 final JMSServerTransport theTransport; 318 private final PooledSession listenSession; 319 320 public JMSListenerThread(PooledSession session, 321 JMSServerTransport transport) { 322 listenSession = session; 323 theTransport = transport; 324 } 325 326 public void run() { 327 try { 328 while (true) { 329 Message message = listenSession.consumer().receive(); 330 if (message == null) { 331 LOG.log(Level.WARNING, 332 "Null message received from message consumer.", 333 " Exiting ListenerThread::run()."); 334 return; 335 } 336 while (message != null) { 337 Executor executor = theTransport.callback.getExecutor(); 338 if (executor == null) { 339 executor = theTransport.bus 340 .getWorkQueueManager().getAutomaticWorkQueue(); 341 } 342 if (executor != null) { 343 try { 344 executor.execute(new JMSExecutor(theTransport, message)); 345 message = null; 346 } catch (RejectedExecutionException ree) { 347 } 351 } else { 352 try { 354 theTransport.incoming(message); 355 } catch (IOException ex) { 356 LOG.log(Level.WARNING, "Failed to process incoming message : ", ex); 357 } 358 message = null; 359 } 360 } 361 } 362 } catch (JMSException jmsex) { 363 jmsex.printStackTrace(); 364 LOG.log(Level.SEVERE, "Exiting ListenerThread::run(): ", jmsex.getMessage()); 365 } catch (Throwable jmsex) { 366 jmsex.printStackTrace(); 367 LOG.log(Level.SEVERE, "Exiting ListenerThread::run(): ", jmsex.getMessage()); 368 } 369 } 370 } 371 372 static class JMSExecutor implements Runnable { 373 Message message; 374 JMSServerTransport transport; 375 376 JMSExecutor(JMSServerTransport t, Message m) { 377 message = m; 378 transport = t; 379 } 380 381 public void run() { 382 try { 383 transport.incoming(message); 384 } catch (IOException ex) { 385 LOG.log(Level.WARNING, 387 "Failed to process incoming message : ", ex); 388 } 389 } 390 391 } 392 393 public void processEvent(BusEvent e) throws BusException { 394 if (e.getID().equals(ConfigurationEvent.RECONFIGURED)) { 395 String configName = (String )e.getSource(); 396 reConfigure(configName); 397 } 398 } 399 400 private void reConfigure(String configName) { 401 if ("servicesMonitoring".equals(configName)) { 402 if (bus.getConfiguration().getBoolean("servicesMonitoring")) { 403 counters.resetCounters(); 404 } else { 405 counters.stopCounters(); 406 } 407 } 408 } 409 } 410 | Popular Tags |