1 package org.example; 2 3 import java.util.HashMap ; 4 import java.util.Map ; 5 6 import javax.jms.Connection ; 7 import javax.jms.ConnectionFactory ; 8 import javax.jms.Destination ; 9 import javax.jms.JMSException ; 10 import javax.jms.MapMessage ; 11 import javax.jms.MessageProducer ; 12 import javax.jms.Session ; 13 import javax.naming.Context ; 14 import javax.naming.InitialContext ; 15 import javax.naming.NamingException ; 16 17 import org.apache.commons.logging.Log; 18 import org.apache.commons.logging.LogFactory; 19 20 import org.jbpm.context.exe.ContextInstance; 21 import org.jbpm.graph.def.ActionHandler; 22 import org.jbpm.graph.exe.ExecutionContext; 23 import org.jbpm.graph.exe.Token; 24 25 29 public class AsyncAction implements ActionHandler { 30 31 private String operation; 33 private String variables; 35 36 private Connection jmsConnection; 37 private Map operationQueues = new HashMap (); 39 40 41 public static final String OPERATION_PROPERTY = "Operation"; 42 43 public static final String PROCESS_ID_ITEM = "_$processId"; 44 45 public static final String TOKEN_ID_ITEM = "_$tokenId"; 46 47 private static final String DEFAULT_CONNECTION_FACTORY = "ConnectionFactory"; 49 50 private static final String [] DEFAULT_OPERATION_QUEUES = { 52 "prong1", "queue/testQueue", 53 "prong2", "queue/testQueue", 54 "prong3", "queue/testQueue" 55 }; 56 57 private static final Log log = LogFactory.getLog(AsyncAction.class); 58 59 public AsyncAction() { 60 Context initialContext = null; 61 try { 62 initialContext = new InitialContext (); 63 createJmsConnection(initialContext); 64 registerOperationQueues(initialContext); 65 } 66 catch (NamingException e) { 67 log.error(e); 68 throw new RuntimeException ("could not create initial context", e); 69 } 70 finally { 71 if (initialContext != null) { 72 try { 73 initialContext.close(); 74 } 75 catch (NamingException e) { 76 log.warn("could not close initial context", e); 77 } 78 } 79 } 80 } 81 82 public void execute(ExecutionContext executionContext) { 83 ContextInstance contextInstance = executionContext.getContextInstance(); 84 Token token = executionContext.getToken(); 85 86 Session jmsSession = null; 87 try { 88 jmsSession = jmsConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 89 90 MapMessage message = jmsSession.createMapMessage(); 91 92 message.setStringProperty(OPERATION_PROPERTY, operation); 94 95 if (variables != null) { 97 String [] variableNames = variables.split("\\s"); 98 for (int i = 0, n = variableNames.length; i < n; i++) { 99 String name = variableNames[i]; 100 Object value = contextInstance.getVariable(name, token); 101 log.debug("setting variable in message body: name=" + name + ", value=" + value); 102 message.setObject(name, value); 103 } 104 } 105 106 long processId = token.getProcessInstance().getId(); 108 message.setLong(PROCESS_ID_ITEM, processId); 109 long tokenId = token.getId(); 110 message.setLong(TOKEN_ID_ITEM, tokenId); 111 112 log.debug("sending message: operation=" + operation + 114 ", processId=" + processId + ", tokenId=" + tokenId); 115 Destination queue = getQueueForOperation(operation); 116 MessageProducer sender = jmsSession.createProducer(queue); 117 sender.send(message); 118 } 119 catch (JMSException e) { 120 log.error(e); 121 throw new RuntimeException ("could not send message for operation: " + operation, e); 122 } 123 finally { 124 if (jmsSession != null) { 125 try { 126 jmsSession.close(); 127 } 128 catch (JMSException e) { 129 log.warn("could not close jms session", e); 130 } 131 } 132 } 133 } 134 135 protected Destination getQueueForOperation(String operation) { 136 return (Destination ) operationQueues.get(operation); 137 } 138 139 protected void createJmsConnection(Context namingContext) { 140 log.debug("creating jms connection: factory=" + DEFAULT_CONNECTION_FACTORY); 141 try { 142 ConnectionFactory factory = (ConnectionFactory ) namingContext.lookup(DEFAULT_CONNECTION_FACTORY); 143 jmsConnection = factory.createConnection(); 144 } 145 catch (NamingException e) { 146 log.error(e); 147 throw new RuntimeException ("could not retrieve jms connection factory bound to: " + 148 DEFAULT_CONNECTION_FACTORY, e); 149 } 150 catch (JMSException e) { 151 log.error(e); 152 throw new RuntimeException ("could not create jms connection", e); 153 } 154 } 155 156 protected void registerOperationQueues(Context namingContext) { 157 for (int i = 0, n = DEFAULT_OPERATION_QUEUES.length; i < n; i += 2) { 158 String operation = DEFAULT_OPERATION_QUEUES[i]; 159 String queueName = DEFAULT_OPERATION_QUEUES[i+1]; 160 log.debug("registering operation queue: operation=" + operation + ", queue=" + queueName); 161 try { 162 Destination queue = (Destination ) namingContext.lookup(queueName); 163 operationQueues.put(operation, queue); 164 } 165 catch (NamingException e) { 166 log.error(e); 167 throw new RuntimeException ("could not retrieve queue bound to: " + queueName, e); 168 } 169 } 170 } 171 } 172 | Popular Tags |