1 24 package org.ofbiz.service.jms; 25 26 import java.util.ArrayList ; 27 import java.util.HashMap ; 28 import java.util.Iterator ; 29 import java.util.List ; 30 import java.util.Map ; 31 32 import javax.jms.JMSException ; 33 import javax.jms.MapMessage ; 34 import javax.jms.Message ; 35 import javax.jms.Queue ; 36 import javax.jms.QueueConnection ; 37 import javax.jms.QueueConnectionFactory ; 38 import javax.jms.QueueSender ; 39 import javax.jms.QueueSession ; 40 import javax.jms.Session ; 41 import javax.jms.Topic ; 42 import javax.jms.TopicConnection ; 43 import javax.jms.TopicConnectionFactory ; 44 import javax.jms.TopicPublisher ; 45 import javax.jms.TopicSession ; 46 import javax.jms.XAQueueConnection ; 47 import javax.jms.XAQueueConnectionFactory ; 48 import javax.jms.XAQueueSession ; 49 import javax.naming.InitialContext ; 50 import javax.naming.NamingException ; 51 import javax.transaction.xa.XAResource ; 52 53 import org.ofbiz.base.config.GenericConfigException; 54 import org.ofbiz.base.util.Debug; 55 import org.ofbiz.base.util.GeneralException; 56 import org.ofbiz.base.util.JNDIContextFactory; 57 import org.ofbiz.base.util.UtilXml; 58 import org.ofbiz.entity.serialize.XmlSerializer; 59 import org.ofbiz.entity.transaction.GenericTransactionException; 60 import org.ofbiz.entity.transaction.TransactionUtil; 61 import org.ofbiz.service.GenericRequester; 62 import org.ofbiz.service.GenericServiceException; 63 import org.ofbiz.service.ModelService; 64 import org.ofbiz.service.ServiceDispatcher; 65 import org.ofbiz.service.ServiceUtil; 66 import org.ofbiz.service.config.ServiceConfigUtil; 67 import org.ofbiz.service.engine.AbstractEngine; 68 import org.w3c.dom.Element ; 69 70 77 public class JmsServiceEngine extends AbstractEngine { 78 79 public static final String module = JmsServiceEngine.class.getName(); 80 81 public JmsServiceEngine(ServiceDispatcher dispatcher) { 82 super(dispatcher); 83 } 84 85 protected Element getServiceElement(ModelService modelService) throws GenericServiceException { 86 Element rootElement = null; 87 88 try { 89 rootElement = ServiceConfigUtil.getXmlRootElement(); 90 } catch (GenericConfigException e) { 91 throw new GenericServiceException("Error getting JMS Service element", e); 92 } 93 94 String location = this.getLocation(modelService); 95 96 Element serviceElement = UtilXml.firstChildElement(rootElement, "jms-service", "name", location); 97 98 if (serviceElement == null) { 99 throw new GenericServiceException("Cannot find an JMS service definition for the name [" + location + "] in the serviceengine.xml file"); 100 } 101 return serviceElement; 102 } 103 104 protected Message makeMessage(Session session, ModelService modelService, Map context) 105 throws GenericServiceException, JMSException { 106 List outParams = modelService.getParameterNames(ModelService.OUT_PARAM, false); 107 108 if (outParams != null && outParams.size() > 0) 109 throw new GenericServiceException("JMS service cannot have required OUT parameters; no parameters will be returned."); 110 String xmlContext = null; 111 112 try { 113 if (Debug.verboseOn()) Debug.logVerbose("Serializing Context --> " + context, module); 114 xmlContext = XmlSerializer.serialize(context); 115 } catch (Exception e) { 116 throw new GenericServiceException("Cannot serialize context.", e); 117 } 118 MapMessage message = session.createMapMessage(); 119 120 message.setString("serviceName", modelService.invoke); 121 message.setString("serviceContext", xmlContext); 122 return message; 123 } 124 125 protected List serverList(Element serviceElement) throws GenericServiceException { 126 String sendMode = serviceElement.getAttribute("send-mode"); 127 List serverList = UtilXml.childElementList(serviceElement, "server"); 128 129 if (sendMode.equals("none")) { 130 return new ArrayList (); 131 } else if (sendMode.equals("all")) { 132 return serverList; 133 } else { 134 throw new GenericServiceException("Requested send mode not supported."); 135 } 136 } 137 138 protected Map runTopic(ModelService modelService, Map context, Element server) throws GenericServiceException { 139 String serverName = server.getAttribute("jndi-server-name"); 140 String jndiName = server.getAttribute("jndi-name"); 141 String topicName = server.getAttribute("topic-queue"); 142 String userName = server.getAttribute("username"); 143 String password = server.getAttribute("password"); 144 String clientId = server.getAttribute("client-id"); 145 146 InitialContext jndi = null; 147 TopicConnectionFactory factory = null; 148 TopicConnection con = null; 149 150 try { 151 jndi = JNDIContextFactory.getInitialContext(serverName); 152 factory = (TopicConnectionFactory ) jndi.lookup(jndiName); 153 } catch (GeneralException ge) { 154 throw new GenericServiceException("Problems getting JNDI InitialContext.", ge.getNested()); 155 } catch (NamingException ne) { 156 JNDIContextFactory.clearInitialContext(serverName); 157 try { 158 jndi = JNDIContextFactory.getInitialContext(serverName); 159 factory = (TopicConnectionFactory ) jndi.lookup(jndiName); 160 } catch (GeneralException ge2) { 161 throw new GenericServiceException("Problems getting JNDI InitialContext.", ge2.getNested()); 162 } catch (NamingException ne2) { 163 throw new GenericServiceException("JNDI lookup problems.", ne); 164 } 165 } 166 167 try { 168 con = factory.createTopicConnection(userName, password); 169 170 if (clientId != null && clientId.length() > 1) 171 con.setClientID(clientId); 172 con.start(); 173 174 TopicSession session = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 175 Topic topic = (Topic ) jndi.lookup(topicName); 176 TopicPublisher publisher = session.createPublisher(topic); 177 178 Message message = makeMessage(session, modelService, context); 180 181 publisher.publish(message); 182 if (Debug.verboseOn()) Debug.logVerbose("Sent JMS Message to " + topicName, module); 183 184 publisher.close(); 186 session.close(); 187 con.close(); 188 } catch (NamingException ne) { 189 throw new GenericServiceException("Problems with JNDI lookup.", ne); 190 } catch (JMSException je) { 191 throw new GenericServiceException("JMS Internal Error.", je); 192 } 193 return ServiceUtil.returnSuccess(); 194 195 } 196 197 protected Map runQueue(ModelService modelService, Map context, Element server) throws GenericServiceException { 198 String serverName = server.getAttribute("jndi-server-name"); 199 String jndiName = server.getAttribute("jndi-name"); 200 String queueName = server.getAttribute("topic-queue"); 201 String userName = server.getAttribute("username"); 202 String password = server.getAttribute("password"); 203 String clientId = server.getAttribute("client-id"); 204 205 InitialContext jndi = null; 206 QueueConnectionFactory factory = null; 207 QueueConnection con = null; 208 209 try { 210 jndi = JNDIContextFactory.getInitialContext(serverName); 211 factory = (QueueConnectionFactory ) jndi.lookup(jndiName); 212 } catch (GeneralException ge){ 213 throw new GenericServiceException("Problems getting JNDI InitialContext.", ge.getNested()); 214 } catch (NamingException ne) { 215 JNDIContextFactory.clearInitialContext(serverName); 216 try { 217 jndi = JNDIContextFactory.getInitialContext(serverName); 218 factory = (QueueConnectionFactory ) jndi.lookup(jndiName); 219 } catch (GeneralException ge2) { 220 throw new GenericServiceException("Problems getting JNDI InitialContext.", ge2.getNested()); 221 } catch (NamingException ne2) { 222 throw new GenericServiceException("JNDI lookup problem.", ne2); 223 } 224 } 225 226 try { 227 con = factory.createQueueConnection(userName, password); 228 229 if (clientId != null && clientId.length() > 1) 230 con.setClientID(clientId); 231 con.start(); 232 233 QueueSession session = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 234 Queue queue = (Queue ) jndi.lookup(queueName); 235 QueueSender sender = session.createSender(queue); 236 237 Message message = makeMessage(session, modelService, context); 239 240 sender.send(message); 241 if (Debug.verboseOn()) Debug.logVerbose("Sent JMS Message to " + queueName, module); 242 243 sender.close(); 245 session.close(); 246 con.close(); 247 } catch (NamingException ne) { 248 throw new GenericServiceException("Problems with JNDI lookup.", ne); 249 } catch (JMSException je) { 250 throw new GenericServiceException("JMS Internal Error.", je); 251 } 252 return ServiceUtil.returnSuccess(); 253 } 254 255 protected Map runXaQueue(ModelService modelService, Map context, Element server) throws GenericServiceException { 256 String serverName = server.getAttribute("jndi-server-name"); 257 String jndiName = server.getAttribute("jndi-name"); 258 String queueName = server.getAttribute("topic-queue"); 259 String userName = server.getAttribute("username"); 260 String password = server.getAttribute("password"); 261 String clientId = server.getAttribute("client-id"); 262 263 InitialContext jndi = null; 264 XAQueueConnectionFactory factory = null; 265 XAQueueConnection con = null; 266 267 try { 268 jndi = JNDIContextFactory.getInitialContext(serverName); 269 factory = (XAQueueConnectionFactory ) jndi.lookup(jndiName); 270 } catch (GeneralException ge){ 271 throw new GenericServiceException("Problems getting JNDI InitialContext.", ge.getNested()); 272 } catch (NamingException ne) { 273 JNDIContextFactory.clearInitialContext(serverName); 274 try { 275 jndi = JNDIContextFactory.getInitialContext(serverName); 276 factory = (XAQueueConnectionFactory ) jndi.lookup(jndiName); 277 } catch (GeneralException ge2){ 278 throw new GenericServiceException("Problems getting JNDI InitialContext.", ge2.getNested()); 279 } catch (NamingException ne2) { 280 throw new GenericServiceException("JNDI lookup problems.", ne2); 281 } 282 } 283 284 try { 285 con = factory.createXAQueueConnection(userName, password); 286 287 if (clientId != null && clientId.length() > 1) 288 con.setClientID(userName); 289 con.start(); 290 291 XAQueueSession session = con.createXAQueueSession(); 293 XAResource resource = session.getXAResource(); 294 295 if (TransactionUtil.getStatus() == TransactionUtil.STATUS_ACTIVE) 296 TransactionUtil.enlistResource(resource); 297 298 Queue queue = (Queue ) jndi.lookup(queueName); 299 QueueSession qSession = session.getQueueSession(); 300 QueueSender sender = qSession.createSender(queue); 301 302 Message message = makeMessage(session, modelService, context); 304 305 sender.send(message); 306 307 if (TransactionUtil.getStatus() != TransactionUtil.STATUS_ACTIVE) 308 session.commit(); 309 310 Debug.logInfo("Message sent.", module); 311 312 sender.close(); 314 session.close(); 315 con.close(); 316 } catch (GenericTransactionException gte) { 317 throw new GenericServiceException("Problems enlisting resource w/ transaction manager.", gte.getNested()); 318 } catch (NamingException ne) { 319 throw new GenericServiceException("Problems with JNDI lookup.", ne); 320 } catch (JMSException je) { 321 throw new GenericServiceException("JMS Internal Error.", je); 322 } 323 return ServiceUtil.returnSuccess(); 324 } 325 326 protected Map run(ModelService modelService, Map context) throws GenericServiceException { 327 Element serviceElement = getServiceElement(modelService); 328 List serverList = serverList(serviceElement); 329 330 Map result = new HashMap (); 331 Iterator i = serverList.iterator(); 332 333 while (i.hasNext()) { 334 Element server = (Element ) i.next(); 335 String serverType = server.getAttribute("type"); 336 337 if (serverType.equals("topic")) 338 result.putAll(runTopic(modelService, context, server)); 339 else if (serverType.equals("queue")) 340 result.putAll(runQueue(modelService, context, server)); 341 else 342 throw new GenericServiceException("Illegal server messaging type."); 343 } 344 return result; 345 } 346 347 350 public Map runSync(String localName, ModelService modelService, Map context) throws GenericServiceException { 351 return run(modelService, context); 352 } 353 354 357 public void runSyncIgnore(String localName, ModelService modelService, Map context) throws GenericServiceException { 358 run(modelService, context); 359 } 360 361 364 public void runAsync(String localName, ModelService modelService, Map context, GenericRequester requester, boolean persist) throws GenericServiceException { 365 Map result = run(modelService, context); 366 367 requester.receiveResult(result); 368 } 369 370 373 public void runAsync(String localName, ModelService modelService, Map context, boolean persist) throws GenericServiceException { 374 run(modelService, context); 375 } 376 377 } 378 | Popular Tags |