1 package org.jbpm.bpel.service.exe; 2 3 import java.util.Iterator ; 4 import java.util.Map ; 5 6 import javax.jms.*; 7 import javax.xml.namespace.QName ; 8 9 import org.apache.commons.logging.Log; 10 import org.apache.commons.logging.LogFactory; 11 12 import org.jbpm.bpel.messager.MessagerService; 13 import org.jbpm.bpel.messager.MessagerSession; 14 import org.jbpm.bpel.messager.PartnerLinkInfo; 15 import org.jbpm.bpel.service.def.BpelEndpointHandler; 16 import org.jbpm.bpel.service.def.PartnerLinkDefinition; 17 import org.jbpm.bpel.service.def.Receiver; 18 19 23 public abstract class RequestListener implements MessageListener { 24 25 private Receiver receiver; 26 private MessagerSession messagerSession; 27 private MessageConsumer consumer; 28 29 static final Log log = LogFactory.getLog(RequestListener.class); 30 31 RequestListener() { 32 } 33 34 RequestListener(Receiver receiver) { 35 this.receiver = receiver; 36 } 37 38 public Receiver getReceiver() { 39 return receiver; 40 } 41 42 public void setReceiver(Receiver receiver) { 43 this.receiver = receiver; 44 } 45 46 public void receiveRequest(MessagerSession msgSession) { 47 messagerSession = msgSession; 48 MessagerService msgService = msgSession.getMessagerService(); 50 PartnerLinkDefinition partnerLink = receiver.getPartnerLink(); 51 PartnerLinkInfo info = msgService.getPartnerLinkInfo(partnerLink); 52 Destination destination = info.getDestination(); 53 String selector = buildSelector(); 55 Session jmsSession = msgSession.getJmsSession(); 57 try { 58 consumer = jmsSession.createConsumer(destination, selector); 59 consumer.setMessageListener(this); 60 log.debug("listening for request: " + selector); 61 } 62 catch (JMSException e) { 63 log.error(e); 64 throw new RuntimeException ("could not start listening for request", e); 65 } 66 } 67 68 public void onMessage(Message message) { 69 messagerSession.setAsCurrentSession(); 70 try { 72 log.debug("received request: " + BpelEndpointHandler.messageToString(message)); 73 messagerSession.beginTransaction(); 74 deliverRequest((ObjectMessage) message); 75 messagerSession.commitTransaction(); 76 } 77 catch (Exception e) { 78 log.debug("could not deliver request", e); 79 close(); 81 messagerSession.rollbackTransaction(); 82 } 83 } 84 85 protected abstract void deliverRequest(ObjectMessage request) throws JMSException; 86 87 public void close() { 88 if (consumer != null) { 89 try { 90 String selector = consumer.getMessageSelector(); 91 consumer.close(); 92 log.debug("stopped listening for request: " + selector); 93 } 94 catch (JMSException e) { 95 log.warn(e); 96 } 97 consumer = null; 98 } 99 } 100 101 protected String buildSelector() { 102 StringBuffer selector = new StringBuffer (); 103 selector.append(BpelEndpointHandler.PARTNER_LINK_ID_PROP).append('=').append(receiver.getPartnerLink().getId()); 105 selector.append(" AND ").append(BpelEndpointHandler.OPERATION_NAME_PROP).append("='").append(receiver.getOperation().getName()).append('\''); 107 Map properties = getReceptionProperties(); 109 Iterator namePropertyIter = properties.entrySet().iterator(); 110 while (namePropertyIter.hasNext()) { 111 Map.Entry nameProperty = (Map.Entry ) namePropertyIter.next(); 112 QName name = (QName ) nameProperty.getKey(); 113 String property = (String ) nameProperty.getValue(); 114 selector.append(" AND ").append(name.getLocalPart()).append("='").append(property).append('\''); 115 } 116 return selector.toString(); 117 } 118 119 protected abstract Map getReceptionProperties(); 120 } | Popular Tags |