1 package org.jbpm.bpel.service.def; 2 3 import java.io.Serializable ; 4 import java.util.Enumeration ; 5 import java.util.Iterator ; 6 import java.util.Map ; 7 8 import javax.jms.*; 9 import javax.naming.Context ; 10 import javax.naming.InitialContext ; 11 import javax.naming.NamingException ; 12 import javax.wsdl.Operation; 13 import javax.wsdl.OperationType; 14 import javax.xml.namespace.QName ; 15 import javax.xml.rpc.JAXRPCException ; 16 import javax.xml.rpc.handler.Handler ; 17 import javax.xml.rpc.handler.HandlerInfo ; 18 import javax.xml.rpc.handler.MessageContext ; 19 import javax.xml.rpc.handler.soap.SOAPMessageContext ; 20 import javax.xml.rpc.soap.SOAPFaultException ; 21 import javax.xml.soap.*; 22 23 import org.apache.commons.logging.Log; 24 import org.apache.commons.logging.LogFactory; 25 import org.w3c.dom.Element ; 26 import org.w3c.dom.Node ; 27 28 import com.ibm.wsdl.util.xml.DOMUtils; 29 30 import org.jbpm.bpel.data.def.MessageTypeInfo; 31 import org.jbpm.bpel.data.exe.MessageVariableInstance; 32 import org.jbpm.bpel.def.BpelDefinition; 33 import org.jbpm.bpel.par.JndiProcessDeployer; 34 import org.jbpm.bpel.wsdl.def.Property; 35 import org.jbpm.bpel.wsdl.def.PropertyAlias; 36 import org.jbpm.bpel.xml.util.NodeUtil; 37 38 42 public class BpelEndpointHandler implements Handler { 43 44 private QName [] headers; 45 private BpelEndpointInfo endpointInfo; 46 private long responseTimeout; 47 private long oneWayTimeout; 48 49 private BpelDefinition process; 50 private PartnerLinkDefinition partnerLink; 51 52 private Destination destination; 53 private Connection connection; 54 55 56 public static final String PROCESS_NAME_PARAM = "processName"; 57 58 public static final String PARTNER_LINK_NAME_PARAM = "partnerLinkName"; 59 60 public static final String RESPONSE_TIMEOUT_PARAM = "responseTimeout"; 61 62 public static final String ONE_WAY_TIMEOUT_PARAM = "oneWayTimeout"; 63 64 65 public static final QName PROCESS_FAULTCODE = new QName (SOAPConstants.URI_NS_SOAP_ENVELOPE, "Server.BusinessProcess", "soapenv"); 66 67 public static final String PROCESS_FAULTSTRING = "Business Process Fault"; 68 69 70 public static final QName TIMEOUT_FAULTCODE = new QName (SOAPConstants.URI_NS_SOAP_ENVELOPE, "Server.Timeout"); 71 72 public static final String TIMEOUT_FAULTSTRING = "Response Timeout"; 73 74 75 public static final String PARTNER_LINK_ID_PROP = "_$partnerLinkId"; 76 77 public static final String OPERATION_NAME_PROP = "_$operationName"; 78 79 public static final String FAULT_NAME_PROP = "_$faultName"; 80 81 82 static final String MESSAGE_OBJECT_PROP = "_$messageObject"; 83 84 private static final Log log = LogFactory.getLog(BpelEndpointHandler.class); 85 86 87 public void init(HandlerInfo handlerInfo) throws JAXRPCException { 88 headers = handlerInfo.getHeaders(); 90 Map handlerConfig = handlerInfo.getHandlerConfig(); 92 String processName = (String ) handlerConfig.get(PROCESS_NAME_PARAM); 94 if (processName == null) { 95 throw new JAXRPCException ("Parameter '" + PROCESS_NAME_PARAM + 96 "' is mandatory in the handler configuration"); 97 } 98 String partnerLinkAlias = (String ) handlerConfig.get(PARTNER_LINK_NAME_PARAM); 100 if (partnerLinkAlias == null) { 101 throw new JAXRPCException ("Parameter '" + PARTNER_LINK_NAME_PARAM + 102 "' is mandatory in the handler configuration"); 103 } 104 String receiveTimeoutText = (String ) handlerConfig.get(RESPONSE_TIMEOUT_PARAM); 106 if (receiveTimeoutText != null) { 107 try { 108 responseTimeout = Long.parseLong(receiveTimeoutText); 109 } 110 catch (NumberFormatException e) { 111 throw new JAXRPCException ("Parameter '" + RESPONSE_TIMEOUT_PARAM + 112 "' does not contain a parsable long", e); 113 } 114 } 115 String oneWayTimeoutText = (String ) handlerConfig.get(ONE_WAY_TIMEOUT_PARAM); 117 if (oneWayTimeoutText != null) { 118 try { 119 oneWayTimeout = Long.parseLong(oneWayTimeoutText); 120 } 121 catch (NumberFormatException e) { 122 throw new JAXRPCException ("Parameter '" + ONE_WAY_TIMEOUT_PARAM + 123 "' does not contain a parsable long", e); 124 } 125 } 126 Context initialContext = null; 127 try { 128 initialContext = new InitialContext (); 129 JndiProcessDeployer deployer = new JndiProcessDeployer(initialContext); 131 process = (BpelDefinition) deployer.findProcessDefinition(processName); 132 log.debug("retrieved process definition: " + processName); 133 endpointInfo = deployer.findEndpointInfo(processName, partnerLinkAlias); 135 log.debug("retrieved endpoint info: " + partnerLinkAlias); 136 PartnerLinkFinder finder = new PartnerLinkFinder(endpointInfo.getPartnerLinkId()); 138 finder.visit(process); 139 partnerLink = finder.getPartnerLink(); 140 destination = (Destination) initialContext.lookup(endpointInfo.getDestinationName()); 142 ConnectionFactory factory = (ConnectionFactory) initialContext.lookup(endpointInfo.getConnectionFactoryName()); 144 connection = factory.createConnection(); 146 connection.start(); 147 } 148 catch (NamingException e) { 149 log.error(e); 150 throw new JAXRPCException ("could not lookup object", e); 151 } 152 catch (JMSException e) { 153 log.error(e); 154 throw new JAXRPCException ("could not create jms connection", e); 155 } 156 finally { 157 if (initialContext != null) { 158 try { 159 initialContext.close(); 160 } 161 catch (NamingException e) { 162 log.warn("could not close naming context", e); 163 } 164 } 165 } 166 } 167 168 169 public void destroy() { 170 if (connection != null) { 171 try { 172 connection.close(); 173 } 174 catch (JMSException e) { 175 log.warn("could not close jms connection", e); 176 } 177 } 178 } 179 180 181 public QName [] getHeaders() { 182 return headers; 183 } 184 185 186 public boolean handleRequest(MessageContext context) throws JAXRPCException , SOAPFaultException { 187 if (!(context instanceof SOAPMessageContext )) 188 return true; 189 190 SOAPMessageContext soapContext = (SOAPMessageContext ) context; 191 Session session = null; 192 try { 193 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 194 TemporaryQueue replyTo = sendRequest(soapContext, session); 195 if (replyTo != null) { 196 receiveResponse(soapContext, session, replyTo); 197 } 198 return true; 199 } 200 catch (JMSException e) { 201 throw new JAXRPCException ("could not complete jms operation", e); 202 } 203 catch (SOAPException e) { 204 throw new JAXRPCException ("could not complete saaj operation", e); 205 } 206 finally { 207 if (session != null) { 208 try { 209 session.close(); 210 } 211 catch (JMSException e) { 212 log.warn("could not close jms session", e); 213 } 214 } 215 } 216 } 217 218 219 public boolean handleResponse(MessageContext context) throws JAXRPCException { 220 if (context instanceof SOAPMessageContext && context.containsProperty(MESSAGE_OBJECT_PROP)) { 221 SOAPMessageContext soapContext = (SOAPMessageContext ) context; 222 if (soapContext.containsProperty(FAULT_NAME_PROP)) 223 processFault(soapContext); 224 else 225 processResponse(soapContext); 226 } 227 return true; 228 } 229 230 231 public boolean handleFault(MessageContext context) throws JAXRPCException { 232 return true; 233 } 234 235 public BpelEndpointInfo getEndpointInfo() { 236 return endpointInfo; 237 } 238 239 public long getResponseTimeout() { 240 return responseTimeout; 241 } 242 243 public long getOneWayTimeout() { 244 return oneWayTimeout; 245 } 246 247 protected Destination getDestination() { 248 return destination; 249 } 250 251 protected Connection getConnection() { 252 return connection; 253 } 254 255 protected TemporaryQueue sendRequest(SOAPMessageContext soapContext, Session session) 256 throws JMSException, SOAPException { 257 ObjectMessage message = session.createObjectMessage(); 258 message.setLongProperty(PARTNER_LINK_ID_PROP, endpointInfo.getPartnerLinkId()); 260 Element operationElem = DOMUtils.getFirstChildElement(soapContext.getMessage().getSOAPBody()); 262 String operationName = operationElem.getLocalName(); 263 message.setStringProperty(OPERATION_NAME_PROP, operationName); 264 Operation operation = partnerLink.getMyRole().getPortType().getOperation(operationName, null, null); 266 MessageTypeInfo inputTypeInfo = process.getImports().getMessageTypeInfo(operation.getInput().getMessage().getQName()); 267 Map aliases = inputTypeInfo.getPropertyAliases(); 268 if (aliases != null) { 269 Iterator aliasIt = aliases.values().iterator(); 270 while (aliasIt.hasNext()) { 271 PropertyAlias alias = (PropertyAlias) aliasIt.next(); 272 Property property = alias.getProperty(); 273 Node value = (Node ) alias.getQuery().getScript().evaluate(operationElem); 274 message.setObjectProperty(property.getQName().getLocalPart(), NodeUtil.getValue(value)); 276 } 277 } 278 MessageProducer producer = session.createProducer(destination); 280 producer.setDisableMessageTimestamp(true); 281 TemporaryQueue replyTo = null; 283 if (operation.getStyle().equals(OperationType.REQUEST_RESPONSE)) { 284 replyTo = session.createTemporaryQueue(); 285 message.setJMSReplyTo(replyTo); 286 producer.setTimeToLive(responseTimeout); 288 } 289 else { 290 producer.setTimeToLive(oneWayTimeout); 291 } 292 Element messageElement = MessageVariableInstance.createMessageElement(); 294 NodeUtil.copy(messageElement, operationElem); 295 message.setObject((Serializable ) messageElement); 296 producer.send(message); 298 299 log.debug("sent request: " + messageToString(message)); 300 producer.close(); 302 return replyTo; 303 } 304 305 protected void receiveResponse(SOAPMessageContext soapContext, Session session, TemporaryQueue replyTo) 306 throws JMSException, SOAPFaultException { 307 MessageConsumer consumer = null; 308 try { 309 consumer = session.createConsumer(replyTo); 310 log.debug("listening for response: " + replyTo.getQueueName()); 312 ObjectMessage message = (ObjectMessage) consumer.receive(responseTimeout); 313 if (message != null) { 315 log.debug("received response: " + messageToString(message)); 316 Element messageElement = (Element ) message.getObject(); 318 soapContext.setProperty(MESSAGE_OBJECT_PROP, messageElement); 319 String faultName = message.getStringProperty(FAULT_NAME_PROP); 320 message.acknowledge(); 321 if (faultName != null) { 323 soapContext.setProperty(FAULT_NAME_PROP, QName.valueOf(faultName)); 324 325 } 326 } 327 else { 328 log.debug("response timeout expired: " + replyTo.getQueueName()); 329 soapContext.setProperty(FAULT_NAME_PROP, TIMEOUT_FAULTCODE); 330 331 } 332 } 333 finally { 334 try { 335 if (consumer != null) { 336 consumer.close(); 338 } 339 replyTo.delete(); 340 } 341 catch (JMSException e) { 342 log.warn(e); 343 } 344 } 345 } 346 347 protected void processResponse(SOAPMessageContext soapContext) throws JAXRPCException { 348 try { 349 SOAPEnvelope envelope = soapContext.getMessage().getSOAPPart().getEnvelope(); 350 SOAPBody body = envelope.getBody(); 352 Name responseName = ((SOAPElement) body.getChildElements().next()).getElementName(); 353 body.detachNode(); 354 body = envelope.addBody(); 356 SOAPBodyElement responseElement = body.addBodyElement(responseName); 357 Element messageElement = (Element ) soapContext.getProperty(MESSAGE_OBJECT_PROP); 359 NodeUtil.copy(responseElement, messageElement); 360 } 361 catch (SOAPException e) { 362 throw new JAXRPCException ("could not complete saaj operation", e); 363 } 364 } 365 366 protected void processFault(SOAPMessageContext soapContext) throws JAXRPCException { 367 try { 368 SOAPEnvelope envelope = soapContext.getMessage().getSOAPPart().getEnvelope(); 369 SOAPElement body = envelope.getBody(); 370 body.detachNode(); 372 String prefix = envelope.getPrefix(); 374 body = envelope.addChildElement("Body", prefix); 375 SOAPElement fault = body.addChildElement("Fault", prefix); 377 QName faultName = (QName ) soapContext.getProperty(FAULT_NAME_PROP); 379 String faultString; 380 if (faultName.equals(TIMEOUT_FAULTCODE)) { 381 faultString = TIMEOUT_FAULTSTRING; 382 } 383 else { 384 faultName = PROCESS_FAULTCODE; 386 faultString = PROCESS_FAULTSTRING; 387 } 388 SOAPElement faultcode = fault.addChildElement("faultcode", prefix); 389 faultcode.setValue(prefix + ':' + faultName.getLocalPart()); 390 fault.addChildElement("faultstring", prefix).setValue(faultString); 391 SOAPElement detail = fault.addChildElement("detail", prefix); 393 Element messageElement = (Element ) soapContext.getProperty(MESSAGE_OBJECT_PROP); 395 SOAPElement detailEntry = detail.addChildElement(envelope.createName(messageElement.getLocalName(), 396 messageElement.getPrefix(), messageElement.getNamespaceURI())); 397 NodeUtil.copy(detailEntry, messageElement); 398 } 399 catch (SOAPException e) { 400 throw new JAXRPCException ("could not access soap message", e); 401 } 402 } 403 404 public static String messageToString(Message message) throws JMSException { 405 StringBuffer result = new StringBuffer (); 406 result.append("id=").append(message.getJMSMessageID()); 408 result.append(", destination=").append(message.getJMSDestination()); 409 Destination replyTo = message.getJMSReplyTo(); 411 if (replyTo != null) { 412 result.append(", replyTo=").append(replyTo); 413 result.append(", correlationId=").append(message.getJMSCorrelationID()); 414 } 415 Enumeration propertyNames = message.getPropertyNames(); 417 while (propertyNames.hasMoreElements()) { 418 String propertyName = (String ) propertyNames.nextElement(); 419 result.append(", ").append(propertyName).append('=').append(message.getObjectProperty(propertyName)); 420 } 421 return result.toString(); 422 } 423 } 424 | Popular Tags |