|                                                                                                              1   package org.jbpm.bpel.service.def;
 2
 3   import java.io.Serializable
  ; 4   import java.util.Enumeration
  ; 5   import java.util.Iterator
  ; 6   import java.util.Map
  ; 7
 8   import javax.jms.*;
 9   import javax.naming.Context
  ; 10  import javax.naming.InitialContext
  ; 11  import javax.naming.NamingException
  ; 12  import javax.wsdl.Operation;
 13  import javax.wsdl.OperationType;
 14  import javax.xml.namespace.QName
  ; 15  import javax.xml.rpc.JAXRPCException
  ; 16  import javax.xml.rpc.handler.Handler
  ; 17  import javax.xml.rpc.handler.HandlerInfo
  ; 18  import javax.xml.rpc.handler.MessageContext
  ; 19  import javax.xml.rpc.handler.soap.SOAPMessageContext
  ; 20  import javax.xml.rpc.soap.SOAPFaultException
  ; 21  import javax.xml.soap.*;
 22
 23  import org.apache.commons.logging.Log;
 24  import org.apache.commons.logging.LogFactory;
 25  import org.w3c.dom.Element
  ; 26  import org.w3c.dom.Node
  ; 27
 28  import com.ibm.wsdl.util.xml.DOMUtils;
 29
 30  import org.jbpm.bpel.data.def.MessageTypeInfo;
 31  import org.jbpm.bpel.data.exe.MessageVariableInstance;
 32  import org.jbpm.bpel.def.BpelDefinition;
 33  import org.jbpm.bpel.par.JndiProcessDeployer;
 34  import org.jbpm.bpel.wsdl.def.Property;
 35  import org.jbpm.bpel.wsdl.def.PropertyAlias;
 36  import org.jbpm.bpel.xml.util.NodeUtil;
 37
 38
 42  public class BpelEndpointHandler implements Handler
  { 43
 44    private QName
  [] headers; 45    private BpelEndpointInfo endpointInfo;
 46    private long responseTimeout;
 47    private long oneWayTimeout;
 48
 49    private BpelDefinition process;
 50    private PartnerLinkDefinition partnerLink;
 51
 52    private Destination destination;
 53    private Connection connection;
 54
 55
 56    public static final String
  PROCESS_NAME_PARAM = "processName"; 57
 58    public static final String
  PARTNER_LINK_NAME_PARAM = "partnerLinkName"; 59
 60    public static final String
  RESPONSE_TIMEOUT_PARAM = "responseTimeout"; 61
 62    public static final String
  ONE_WAY_TIMEOUT_PARAM = "oneWayTimeout"; 63
 64
 65    public static final QName
  PROCESS_FAULTCODE = new QName  (SOAPConstants.URI_NS_SOAP_ENVELOPE, "Server.BusinessProcess", "soapenv"); 66
 67    public static final String
  PROCESS_FAULTSTRING = "Business Process Fault"; 68
 69
 70    public static final QName
  TIMEOUT_FAULTCODE = new QName  (SOAPConstants.URI_NS_SOAP_ENVELOPE, "Server.Timeout"); 71
 72    public static final String
  TIMEOUT_FAULTSTRING = "Response Timeout"; 73
 74
 75    public static final String
  PARTNER_LINK_ID_PROP = "_$partnerLinkId"; 76
 77    public static final String
  OPERATION_NAME_PROP = "_$operationName"; 78
 79    public static final String
  FAULT_NAME_PROP = "_$faultName"; 80
 81
 82    static final String
  MESSAGE_OBJECT_PROP = "_$messageObject"; 83
 84    private static final Log log = LogFactory.getLog(BpelEndpointHandler.class);
 85
 86
 87    public void init(HandlerInfo
  handlerInfo) throws JAXRPCException  { 88          headers = handlerInfo.getHeaders();
 90          Map
  handlerConfig = handlerInfo.getHandlerConfig(); 92          String
  processName = (String  ) handlerConfig.get(PROCESS_NAME_PARAM); 94      if (processName == null) {
 95        throw new JAXRPCException
  ("Parameter '" + PROCESS_NAME_PARAM + 96            "' is mandatory in the handler configuration");
 97      }
 98          String
  partnerLinkAlias = (String  ) handlerConfig.get(PARTNER_LINK_NAME_PARAM); 100     if (partnerLinkAlias == null) {
 101       throw new JAXRPCException
  ("Parameter '" + PARTNER_LINK_NAME_PARAM + 102           "' is mandatory in the handler configuration");
 103     }
 104         String
  receiveTimeoutText = (String  ) handlerConfig.get(RESPONSE_TIMEOUT_PARAM); 106     if (receiveTimeoutText != null) {
 107       try {
 108         responseTimeout = Long.parseLong(receiveTimeoutText);
 109       }
 110       catch (NumberFormatException
  e) { 111         throw new JAXRPCException
  ("Parameter '" + RESPONSE_TIMEOUT_PARAM + 112             "' does not contain a parsable long", e);
 113       }
 114     }
 115         String
  oneWayTimeoutText = (String  ) handlerConfig.get(ONE_WAY_TIMEOUT_PARAM); 117     if (oneWayTimeoutText != null) {
 118       try {
 119         oneWayTimeout = Long.parseLong(oneWayTimeoutText);
 120       }
 121       catch (NumberFormatException
  e) { 122         throw new JAXRPCException
  ("Parameter '" + ONE_WAY_TIMEOUT_PARAM + 123             "' does not contain a parsable long", e);
 124       }
 125     }
 126     Context
  initialContext = null; 127     try {
 128       initialContext = new InitialContext
  (); 129             JndiProcessDeployer deployer = new JndiProcessDeployer(initialContext);
 131       process = (BpelDefinition) deployer.findProcessDefinition(processName);
 132       log.debug("retrieved process definition: " + processName);
 133             endpointInfo = deployer.findEndpointInfo(processName, partnerLinkAlias);
 135       log.debug("retrieved endpoint info: " + partnerLinkAlias);
 136             PartnerLinkFinder finder = new PartnerLinkFinder(endpointInfo.getPartnerLinkId());
 138       finder.visit(process);
 139       partnerLink = finder.getPartnerLink();
 140             destination = (Destination) initialContext.lookup(endpointInfo.getDestinationName());
 142             ConnectionFactory factory = (ConnectionFactory) initialContext.lookup(endpointInfo.getConnectionFactoryName());
 144             connection = factory.createConnection();
 146       connection.start();
 147     }
 148     catch (NamingException
  e) { 149       log.error(e);
 150       throw new JAXRPCException
  ("could not lookup object", e); 151     }
 152     catch (JMSException e) {
 153       log.error(e);
 154       throw new JAXRPCException
  ("could not create jms connection", e); 155     }
 156     finally {
 157       if (initialContext != null) {
 158         try {
 159           initialContext.close();
 160         }
 161         catch (NamingException
  e) { 162           log.warn("could not close naming context", e);
 163         }
 164       }
 165     }
 166   }
 167
 168
 169   public void destroy() {
 170     if (connection != null) {
 171       try {
 172         connection.close();
 173       }
 174       catch (JMSException e) {
 175         log.warn("could not close jms connection", e);
 176       }
 177     }
 178   }
 179
 180
 181   public QName
  [] getHeaders() { 182     return headers;
 183   }
 184
 185
 186   public boolean handleRequest(MessageContext
  context) throws JAXRPCException  , SOAPFaultException  { 187     if (!(context instanceof SOAPMessageContext
  )) 188       return true;
 189
 190     SOAPMessageContext
  soapContext = (SOAPMessageContext  ) context; 191     Session session = null;
 192     try {
 193       session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 194       TemporaryQueue replyTo = sendRequest(soapContext, session);
 195       if (replyTo != null) {
 196         receiveResponse(soapContext, session, replyTo);
 197       }
 198       return true;
 199     }
 200     catch (JMSException e) {
 201       throw new JAXRPCException
  ("could not complete jms operation", e); 202     }
 203     catch (SOAPException e) {
 204       throw new JAXRPCException
  ("could not complete saaj operation", e); 205     }
 206     finally {
 207       if (session != null) {
 208         try {
 209           session.close();
 210         }
 211         catch (JMSException e) {
 212           log.warn("could not close jms session", e);
 213         }
 214       }
 215     }
 216   }
 217
 218
 219   public boolean handleResponse(MessageContext
  context) throws JAXRPCException  { 220     if (context instanceof SOAPMessageContext
  && context.containsProperty(MESSAGE_OBJECT_PROP)) { 221       SOAPMessageContext
  soapContext = (SOAPMessageContext  ) context; 222       if (soapContext.containsProperty(FAULT_NAME_PROP))
 223         processFault(soapContext);
 224       else
 225         processResponse(soapContext);
 226     }
 227     return true;
 228   }
 229
 230
 231   public boolean handleFault(MessageContext
  context) throws JAXRPCException  { 232     return true;
 233   }
 234
 235   public BpelEndpointInfo getEndpointInfo() {
 236     return endpointInfo;
 237   }
 238
 239   public long getResponseTimeout() {
 240     return responseTimeout;
 241   }
 242
 243   public long getOneWayTimeout() {
 244     return oneWayTimeout;
 245   }
 246
 247   protected Destination getDestination() {
 248     return destination;
 249   }
 250
 251   protected Connection getConnection() {
 252     return connection;
 253   }
 254
 255   protected TemporaryQueue sendRequest(SOAPMessageContext
  soapContext, Session session) 256   throws JMSException, SOAPException {
 257     ObjectMessage message = session.createObjectMessage();
 258         message.setLongProperty(PARTNER_LINK_ID_PROP, endpointInfo.getPartnerLinkId());
 260         Element
  operationElem = DOMUtils.getFirstChildElement(soapContext.getMessage().getSOAPBody()); 262     String
  operationName = operationElem.getLocalName(); 263     message.setStringProperty(OPERATION_NAME_PROP, operationName);
 264         Operation operation = partnerLink.getMyRole().getPortType().getOperation(operationName, null, null);
 266     MessageTypeInfo inputTypeInfo = process.getImports().getMessageTypeInfo(operation.getInput().getMessage().getQName());
 267     Map
  aliases = inputTypeInfo.getPropertyAliases(); 268     if (aliases != null) {
 269       Iterator
  aliasIt = aliases.values().iterator(); 270       while (aliasIt.hasNext()) {
 271         PropertyAlias alias = (PropertyAlias) aliasIt.next();
 272         Property property = alias.getProperty();
 273         Node
  value = (Node  ) alias.getQuery().getScript().evaluate(operationElem); 274                 message.setObjectProperty(property.getQName().getLocalPart(), NodeUtil.getValue(value));
 276       }
 277     }
 278         MessageProducer producer = session.createProducer(destination);
 280     producer.setDisableMessageTimestamp(true);
 281         TemporaryQueue replyTo = null;
 283     if (operation.getStyle().equals(OperationType.REQUEST_RESPONSE)) {
 284       replyTo = session.createTemporaryQueue();
 285       message.setJMSReplyTo(replyTo);
 286             producer.setTimeToLive(responseTimeout);
 288     }
 289     else {
 290       producer.setTimeToLive(oneWayTimeout);
 291     }
 292         Element
  messageElement = MessageVariableInstance.createMessageElement(); 294     NodeUtil.copy(messageElement, operationElem);
 295     message.setObject((Serializable
  ) messageElement); 296         producer.send(message);
 298
 299     log.debug("sent request: " + messageToString(message));
 300         producer.close();
 302     return replyTo;
 303   }
 304
 305   protected void receiveResponse(SOAPMessageContext
  soapContext, Session session, TemporaryQueue replyTo) 306   throws JMSException, SOAPFaultException
  { 307     MessageConsumer consumer = null;
 308     try {
 309       consumer = session.createConsumer(replyTo);
 310             log.debug("listening for response: " + replyTo.getQueueName());
 312       ObjectMessage message = (ObjectMessage) consumer.receive(responseTimeout);
 313             if (message != null) {
 315         log.debug("received response: " + messageToString(message));
 316                 Element
  messageElement = (Element  ) message.getObject(); 318         soapContext.setProperty(MESSAGE_OBJECT_PROP, messageElement);
 319         String
  faultName = message.getStringProperty(FAULT_NAME_PROP); 320         message.acknowledge();
 321                 if (faultName != null) {
 323           soapContext.setProperty(FAULT_NAME_PROP, QName.valueOf(faultName));
 324
 325         }
 326       }
 327       else {
 328         log.debug("response timeout expired: " + replyTo.getQueueName());
 329         soapContext.setProperty(FAULT_NAME_PROP, TIMEOUT_FAULTCODE);
 330
 331       }
 332     }
 333     finally {
 334       try {
 335         if (consumer != null) {
 336                     consumer.close();
 338         }
 339         replyTo.delete();
 340       }
 341       catch (JMSException e) {
 342         log.warn(e);
 343       }
 344     }
 345   }
 346
 347   protected void processResponse(SOAPMessageContext
  soapContext) throws JAXRPCException  { 348     try {
 349       SOAPEnvelope envelope = soapContext.getMessage().getSOAPPart().getEnvelope();
 350             SOAPBody body = envelope.getBody();
 352       Name
  responseName = ((SOAPElement) body.getChildElements().next()).getElementName(); 353       body.detachNode();
 354             body = envelope.addBody();
 356       SOAPBodyElement responseElement = body.addBodyElement(responseName);
 357             Element
  messageElement = (Element  ) soapContext.getProperty(MESSAGE_OBJECT_PROP); 359       NodeUtil.copy(responseElement, messageElement);
 360     }
 361     catch (SOAPException e) {
 362       throw new JAXRPCException
  ("could not complete saaj operation", e); 363     }
 364   }
 365
 366   protected void processFault(SOAPMessageContext
  soapContext) throws JAXRPCException  { 367     try {
 368       SOAPEnvelope envelope = soapContext.getMessage().getSOAPPart().getEnvelope();
 369       SOAPElement body = envelope.getBody();
 370             body.detachNode();
 372             String
  prefix = envelope.getPrefix(); 374       body = envelope.addChildElement("Body", prefix);
 375             SOAPElement fault = body.addChildElement("Fault", prefix);
 377             QName
  faultName = (QName  ) soapContext.getProperty(FAULT_NAME_PROP); 379       String
  faultString; 380       if (faultName.equals(TIMEOUT_FAULTCODE)) {
 381         faultString = TIMEOUT_FAULTSTRING;
 382       }
 383       else {
 384                 faultName = PROCESS_FAULTCODE;
 386         faultString = PROCESS_FAULTSTRING;
 387       }
 388       SOAPElement faultcode = fault.addChildElement("faultcode", prefix);
 389       faultcode.setValue(prefix + ':' + faultName.getLocalPart());
 390       fault.addChildElement("faultstring", prefix).setValue(faultString);
 391             SOAPElement detail = fault.addChildElement("detail", prefix);
 393             Element
  messageElement = (Element  ) soapContext.getProperty(MESSAGE_OBJECT_PROP); 395       SOAPElement detailEntry = detail.addChildElement(envelope.createName(messageElement.getLocalName(),
 396           messageElement.getPrefix(), messageElement.getNamespaceURI()));
 397       NodeUtil.copy(detailEntry, messageElement);
 398     }
 399     catch (SOAPException e) {
 400       throw new JAXRPCException
  ("could not access soap message", e); 401     }
 402   }
 403
 404   public static String
  messageToString(Message message) throws JMSException { 405     StringBuffer
  result = new StringBuffer  (); 406         result.append("id=").append(message.getJMSMessageID());
 408     result.append(", destination=").append(message.getJMSDestination());
 409         Destination replyTo = message.getJMSReplyTo();
 411     if (replyTo != null) {
 412       result.append(", replyTo=").append(replyTo);
 413       result.append(", correlationId=").append(message.getJMSCorrelationID());
 414     }
 415         Enumeration
  propertyNames = message.getPropertyNames(); 417     while (propertyNames.hasMoreElements()) {
 418       String
  propertyName = (String  ) propertyNames.nextElement(); 419       result.append(", ").append(propertyName).append('=').append(message.getObjectProperty(propertyName));
 420     }
 421     return result.toString();
 422   }
 423 }
 424
                                                                                                                                                                                                             |                                                                       
 
 
 
 
 
                                                                                   Popular Tags                                                                                                                                                                                              |