KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jbpm > bpel > service > def > BpelEndpointHandler


1 package org.jbpm.bpel.service.def;
2
3 import java.io.Serializable JavaDoc;
4 import java.util.Enumeration JavaDoc;
5 import java.util.Iterator JavaDoc;
6 import java.util.Map JavaDoc;
7
8 import javax.jms.*;
9 import javax.naming.Context JavaDoc;
10 import javax.naming.InitialContext JavaDoc;
11 import javax.naming.NamingException JavaDoc;
12 import javax.wsdl.Operation;
13 import javax.wsdl.OperationType;
14 import javax.xml.namespace.QName JavaDoc;
15 import javax.xml.rpc.JAXRPCException JavaDoc;
16 import javax.xml.rpc.handler.Handler JavaDoc;
17 import javax.xml.rpc.handler.HandlerInfo JavaDoc;
18 import javax.xml.rpc.handler.MessageContext JavaDoc;
19 import javax.xml.rpc.handler.soap.SOAPMessageContext JavaDoc;
20 import javax.xml.rpc.soap.SOAPFaultException JavaDoc;
21 import javax.xml.soap.*;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.w3c.dom.Element JavaDoc;
26 import org.w3c.dom.Node JavaDoc;
27
28 import com.ibm.wsdl.util.xml.DOMUtils;
29
30 import org.jbpm.bpel.data.def.MessageTypeInfo;
31 import org.jbpm.bpel.data.exe.MessageVariableInstance;
32 import org.jbpm.bpel.def.BpelDefinition;
33 import org.jbpm.bpel.par.JndiProcessDeployer;
34 import org.jbpm.bpel.wsdl.def.Property;
35 import org.jbpm.bpel.wsdl.def.PropertyAlias;
36 import org.jbpm.bpel.xml.util.NodeUtil;
37
38 /**
39  * @author Alejandro Guízar
40  * @version $Revision: 1.10 $ $Date: 2005/06/23 02:22:57 $
41  */

42 public class BpelEndpointHandler implements Handler JavaDoc {
43   
44   private QName JavaDoc[] headers;
45   private BpelEndpointInfo endpointInfo;
46   private long responseTimeout;
47   private long oneWayTimeout;
48   
49   private BpelDefinition process;
50   private PartnerLinkDefinition partnerLink;
51   
52   private Destination destination;
53   private Connection connection;
54   
55   /** Name of the process element. */
56   public static final String JavaDoc PROCESS_NAME_PARAM = "processName";
57   /** Name of the partner link element. */
58   public static final String JavaDoc PARTNER_LINK_NAME_PARAM = "partnerLinkName";
59   /** Time to wait for response messages, in milliseconds. */
60   public static final String JavaDoc RESPONSE_TIMEOUT_PARAM = "responseTimeout";
61   /** Time to expire one-way messages, in milliseconds. */
62   public static final String JavaDoc ONE_WAY_TIMEOUT_PARAM = "oneWayTimeout";
63   
64   /** The fault code for faults returned by the business process. */
65   public static final QName JavaDoc PROCESS_FAULTCODE = new QName JavaDoc(SOAPConstants.URI_NS_SOAP_ENVELOPE, "Server.BusinessProcess", "soapenv");
66   /** The fault string for faults returned by the business process. */
67   public static final String JavaDoc PROCESS_FAULTSTRING = "Business Process Fault";
68   
69   /** The fault code for faults caused by response timeouts. */
70   public static final QName JavaDoc TIMEOUT_FAULTCODE = new QName JavaDoc(SOAPConstants.URI_NS_SOAP_ENVELOPE, "Server.Timeout");
71   /** The fault string for faults caused by response timeouts. */
72   public static final String JavaDoc TIMEOUT_FAULTSTRING = "Response Timeout";
73   
74   /** JMS property for the partner link identifier, in the request message. */
75   public static final String JavaDoc PARTNER_LINK_ID_PROP = "_$partnerLinkId";
76   /** JMS property for the operation name, in the request message. */
77   public static final String JavaDoc OPERATION_NAME_PROP = "_$operationName";
78   /** JMS property for the fault qualified name, in the response message. */
79   public static final String JavaDoc FAULT_NAME_PROP = "_$faultName";
80   
81   /** Message context property for the message element. */
82   static final String JavaDoc MESSAGE_OBJECT_PROP = "_$messageObject";
83   
84   private static final Log log = LogFactory.getLog(BpelEndpointHandler.class);
85   
86   /** {@inheritDoc} */
87   public void init(HandlerInfo JavaDoc handlerInfo) throws JAXRPCException JavaDoc {
88     // save headers
89
headers = handlerInfo.getHeaders();
90     // get handler configuration
91
Map JavaDoc handlerConfig = handlerInfo.getHandlerConfig();
92     // process name parameter
93
String JavaDoc processName = (String JavaDoc) handlerConfig.get(PROCESS_NAME_PARAM);
94     if (processName == null) {
95       throw new JAXRPCException JavaDoc("Parameter '" + PROCESS_NAME_PARAM +
96           "' is mandatory in the handler configuration");
97     }
98     // partner link alias parameter
99
String JavaDoc partnerLinkAlias = (String JavaDoc) handlerConfig.get(PARTNER_LINK_NAME_PARAM);
100     if (partnerLinkAlias == null) {
101       throw new JAXRPCException JavaDoc("Parameter '" + PARTNER_LINK_NAME_PARAM +
102           "' is mandatory in the handler configuration");
103     }
104     // response timeout parameter
105
String JavaDoc receiveTimeoutText = (String JavaDoc) handlerConfig.get(RESPONSE_TIMEOUT_PARAM);
106     if (receiveTimeoutText != null) {
107       try {
108         responseTimeout = Long.parseLong(receiveTimeoutText);
109       }
110       catch (NumberFormatException JavaDoc e) {
111         throw new JAXRPCException JavaDoc("Parameter '" + RESPONSE_TIMEOUT_PARAM +
112             "' does not contain a parsable long", e);
113       }
114     }
115     // one-way timeout parameter
116
String JavaDoc oneWayTimeoutText = (String JavaDoc) handlerConfig.get(ONE_WAY_TIMEOUT_PARAM);
117     if (oneWayTimeoutText != null) {
118       try {
119         oneWayTimeout = Long.parseLong(oneWayTimeoutText);
120       }
121       catch (NumberFormatException JavaDoc e) {
122         throw new JAXRPCException JavaDoc("Parameter '" + ONE_WAY_TIMEOUT_PARAM +
123             "' does not contain a parsable long", e);
124       }
125     }
126     Context JavaDoc initialContext = null;
127     try {
128       initialContext = new InitialContext JavaDoc();
129       // get the process definition
130
JndiProcessDeployer deployer = new JndiProcessDeployer(initialContext);
131       process = (BpelDefinition) deployer.findProcessDefinition(processName);
132       log.debug("retrieved process definition: " + processName);
133       // get the endpoint information
134
endpointInfo = deployer.findEndpointInfo(processName, partnerLinkAlias);
135       log.debug("retrieved endpoint info: " + partnerLinkAlias);
136       // partner link
137
PartnerLinkFinder finder = new PartnerLinkFinder(endpointInfo.getPartnerLinkId());
138       finder.visit(process);
139       partnerLink = finder.getPartnerLink();
140       // destination
141
destination = (Destination) initialContext.lookup(endpointInfo.getDestinationName());
142       // connection factory
143
ConnectionFactory factory = (ConnectionFactory) initialContext.lookup(endpointInfo.getConnectionFactoryName());
144       // create a jms connection
145
connection = factory.createConnection();
146       connection.start();
147     }
148     catch (NamingException JavaDoc e) {
149       log.error(e);
150       throw new JAXRPCException JavaDoc("could not lookup object", e);
151     }
152     catch (JMSException e) {
153       log.error(e);
154       throw new JAXRPCException JavaDoc("could not create jms connection", e);
155     }
156     finally {
157       if (initialContext != null) {
158         try {
159           initialContext.close();
160         }
161         catch (NamingException JavaDoc e) {
162           log.warn("could not close naming context", e);
163         }
164       }
165     }
166   }
167
168   /** {@inheritDoc} */
169   public void destroy() {
170     if (connection != null) {
171       try {
172         connection.close();
173       }
174       catch (JMSException e) {
175         log.warn("could not close jms connection", e);
176       }
177     }
178   }
179   
180   /** {@inheritDoc} */
181   public QName JavaDoc[] getHeaders() {
182     return headers;
183   }
184
185   /** {@inheritDoc} */
186   public boolean handleRequest(MessageContext JavaDoc context) throws JAXRPCException JavaDoc, SOAPFaultException JavaDoc {
187     if (!(context instanceof SOAPMessageContext JavaDoc))
188       return true;
189     
190     SOAPMessageContext JavaDoc soapContext = (SOAPMessageContext JavaDoc) context;
191     Session session = null;
192     try {
193       session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
194       TemporaryQueue replyTo = sendRequest(soapContext, session);
195       if (replyTo != null) {
196         receiveResponse(soapContext, session, replyTo);
197       }
198       return true;
199     }
200     catch (JMSException e) {
201       throw new JAXRPCException JavaDoc("could not complete jms operation", e);
202     }
203     catch (SOAPException e) {
204       throw new JAXRPCException JavaDoc("could not complete saaj operation", e);
205     }
206     finally {
207       if (session != null) {
208         try {
209           session.close();
210         }
211         catch (JMSException e) {
212           log.warn("could not close jms session", e);
213         }
214       }
215     }
216   }
217
218   /** {@inheritDoc} */
219   public boolean handleResponse(MessageContext JavaDoc context) throws JAXRPCException JavaDoc {
220     if (context instanceof SOAPMessageContext JavaDoc && context.containsProperty(MESSAGE_OBJECT_PROP)) {
221       SOAPMessageContext JavaDoc soapContext = (SOAPMessageContext JavaDoc) context;
222       if (soapContext.containsProperty(FAULT_NAME_PROP))
223         processFault(soapContext);
224       else
225         processResponse(soapContext);
226     }
227     return true;
228   }
229
230   /** {@inheritDoc} */
231   public boolean handleFault(MessageContext JavaDoc context) throws JAXRPCException JavaDoc {
232     return true;
233   }
234   
235   public BpelEndpointInfo getEndpointInfo() {
236     return endpointInfo;
237   }
238   
239   public long getResponseTimeout() {
240     return responseTimeout;
241   }
242   
243   public long getOneWayTimeout() {
244     return oneWayTimeout;
245   }
246
247   protected Destination getDestination() {
248     return destination;
249   }
250   
251   protected Connection getConnection() {
252     return connection;
253   }
254   
255   protected TemporaryQueue sendRequest(SOAPMessageContext JavaDoc soapContext, Session session)
256   throws JMSException, SOAPException {
257     ObjectMessage message = session.createObjectMessage();
258     // partner link property
259
message.setLongProperty(PARTNER_LINK_ID_PROP, endpointInfo.getPartnerLinkId());
260     // operation property
261
Element JavaDoc operationElem = DOMUtils.getFirstChildElement(soapContext.getMessage().getSOAPBody());
262     String JavaDoc operationName = operationElem.getLocalName();
263     message.setStringProperty(OPERATION_NAME_PROP, operationName);
264     // message properties
265
Operation operation = partnerLink.getMyRole().getPortType().getOperation(operationName, null, null);
266     MessageTypeInfo inputTypeInfo = process.getImports().getMessageTypeInfo(operation.getInput().getMessage().getQName());
267     Map JavaDoc aliases = inputTypeInfo.getPropertyAliases();
268     if (aliases != null) {
269       Iterator JavaDoc aliasIt = aliases.values().iterator();
270       while (aliasIt.hasNext()) {
271         PropertyAlias alias = (PropertyAlias) aliasIt.next();
272         Property property = alias.getProperty();
273         Node JavaDoc value = (Node JavaDoc) alias.getQuery().getScript().evaluate(operationElem);
274         // TODO ensure the value's type matches the property's type
275
message.setObjectProperty(property.getQName().getLocalPart(), NodeUtil.getValue(value));
276       }
277     }
278     // set up producer
279
MessageProducer producer = session.createProducer(destination);
280     producer.setDisableMessageTimestamp(true);
281     // arrange a reply destination, if the operation is request/response
282
TemporaryQueue replyTo = null;
283     if (operation.getStyle().equals(OperationType.REQUEST_RESPONSE)) {
284       replyTo = session.createTemporaryQueue();
285       message.setJMSReplyTo(replyTo);
286       // the request message is no longer needed after the response timeout expires
287
producer.setTimeToLive(responseTimeout);
288     }
289     else {
290       producer.setTimeToLive(oneWayTimeout);
291     }
292     // set message data
293
Element JavaDoc messageElement = MessageVariableInstance.createMessageElement();
294     NodeUtil.copy(messageElement, operationElem);
295     message.setObject((Serializable JavaDoc) messageElement);
296     // send the input message
297
producer.send(message);
298     /* session.commit(); */
299     log.debug("sent request: " + messageToString(message));
300     // it might take a while until a response arrives, so release producer resources
301
producer.close();
302     return replyTo;
303   }
304   
305   protected void receiveResponse(SOAPMessageContext JavaDoc soapContext, Session session, TemporaryQueue replyTo)
306   throws JMSException, SOAPFaultException JavaDoc {
307     MessageConsumer consumer = null;
308     try {
309       consumer = session.createConsumer(replyTo);
310       // receive the output message
311
log.debug("listening for response: " + replyTo.getQueueName());
312       ObjectMessage message = (ObjectMessage) consumer.receive(responseTimeout);
313       // did a message arrive in time?
314
if (message != null) {
315         log.debug("received response: " + messageToString(message));
316         // keep the message element
317
Element JavaDoc messageElement = (Element JavaDoc) message.getObject();
318         soapContext.setProperty(MESSAGE_OBJECT_PROP, messageElement);
319         String JavaDoc faultName = message.getStringProperty(FAULT_NAME_PROP);
320         message.acknowledge();
321         // is this a fault?
322
if (faultName != null) {
323           soapContext.setProperty(FAULT_NAME_PROP, QName.valueOf(faultName));
324           /* throw new SOAPFaultException(PROCESS_FAULTCODE, PROCESS_FAULTSTRING, null, null); */
325         }
326       }
327       else {
328         log.debug("response timeout expired: " + replyTo.getQueueName());
329         soapContext.setProperty(FAULT_NAME_PROP, TIMEOUT_FAULTCODE);
330         /* throw new SOAPFaultException(TIMEOUT_FAULTCODE, TIMEOUT_FAULTSTRING, null, null); */
331       }
332     }
333     finally {
334       try {
335         if (consumer != null) {
336           // close consumer so that the temporary destination can be deleted
337
consumer.close();
338         }
339         replyTo.delete();
340       }
341       catch (JMSException e) {
342         log.warn(e);
343       }
344     }
345   }
346   
347   protected void processResponse(SOAPMessageContext JavaDoc soapContext) throws JAXRPCException JavaDoc {
348     try {
349       SOAPEnvelope envelope = soapContext.getMessage().getSOAPPart().getEnvelope();
350       // remove existing body, since it might be immutable
351
SOAPBody body = envelope.getBody();
352       Name JavaDoc responseName = ((SOAPElement) body.getChildElements().next()).getElementName();
353       body.detachNode();
354       // re-create body
355
body = envelope.addBody();
356       SOAPBodyElement responseElement = body.addBodyElement(responseName);
357       // fill response element with message parts
358
Element JavaDoc messageElement = (Element JavaDoc) soapContext.getProperty(MESSAGE_OBJECT_PROP);
359       NodeUtil.copy(responseElement, messageElement);
360     }
361     catch (SOAPException e) {
362       throw new JAXRPCException JavaDoc("could not complete saaj operation", e);
363     }
364   }
365   
366   protected void processFault(SOAPMessageContext JavaDoc soapContext) throws JAXRPCException JavaDoc {
367     try {
368       SOAPEnvelope envelope = soapContext.getMessage().getSOAPPart().getEnvelope();
369       SOAPElement body = envelope.getBody();
370       // remove existing body, since it might be immutable
371
body.detachNode();
372       // re-create body
373
String JavaDoc prefix = envelope.getPrefix();
374       body = envelope.addChildElement("Body", prefix);
375       // SOAPFault fault = body.addFault();
376
SOAPElement fault = body.addChildElement("Fault", prefix);
377       // set fault fields
378
QName JavaDoc faultName = (QName JavaDoc) soapContext.getProperty(FAULT_NAME_PROP);
379       String JavaDoc faultString;
380       if (faultName.equals(TIMEOUT_FAULTCODE)) {
381         faultString = TIMEOUT_FAULTSTRING;
382       }
383       else {
384         // fault.setFaultCode(envelope.createName(faultName.getLocalPart(), faultName.getPrefix(), faultName.getNamespaceURI()));
385
faultName = PROCESS_FAULTCODE;
386         faultString = PROCESS_FAULTSTRING;
387       }
388       SOAPElement faultcode = fault.addChildElement("faultcode", prefix);
389       faultcode.setValue(prefix + ':' + faultName.getLocalPart());
390       fault.addChildElement("faultstring", prefix).setValue(faultString);
391       // fill detail
392
SOAPElement detail = fault.addChildElement("detail", prefix);
393       // the fault message is one element corresponding to the single part
394
Element JavaDoc messageElement = (Element JavaDoc) soapContext.getProperty(MESSAGE_OBJECT_PROP);
395       SOAPElement detailEntry = detail.addChildElement(envelope.createName(messageElement.getLocalName(),
396           messageElement.getPrefix(), messageElement.getNamespaceURI()));
397       NodeUtil.copy(detailEntry, messageElement);
398     }
399     catch (SOAPException e) {
400       throw new JAXRPCException JavaDoc("could not access soap message", e);
401     }
402   }
403
404   public static String JavaDoc messageToString(Message message) throws JMSException {
405     StringBuffer JavaDoc result = new StringBuffer JavaDoc();
406     // ID & destination
407
result.append("id=").append(message.getJMSMessageID());
408     result.append(", destination=").append(message.getJMSDestination());
409     // replyTo && correlationID
410
Destination replyTo = message.getJMSReplyTo();
411     if (replyTo != null) {
412       result.append(", replyTo=").append(replyTo);
413       result.append(", correlationId=").append(message.getJMSCorrelationID());
414     }
415     // properties
416
Enumeration JavaDoc propertyNames = message.getPropertyNames();
417     while (propertyNames.hasMoreElements()) {
418       String JavaDoc propertyName = (String JavaDoc) propertyNames.nextElement();
419       result.append(", ").append(propertyName).append('=').append(message.getObjectProperty(propertyName));
420     }
421     return result.toString();
422   }
423 }
424
Popular Tags