1 18 package org.apache.activemq.ra; 19 20 import java.io.Serializable ; 21 import java.net.URI ; 22 import java.net.URISyntaxException ; 23 import java.util.HashMap ; 24 25 import javax.jms.Connection ; 26 import javax.jms.JMSException ; 27 import javax.jms.XAConnection ; 28 import javax.jms.XASession ; 29 import javax.resource.NotSupportedException ; 30 import javax.resource.ResourceException ; 31 import javax.resource.spi.ActivationSpec ; 32 import javax.resource.spi.BootstrapContext ; 33 import javax.resource.spi.ResourceAdapterInternalException ; 34 import javax.resource.spi.endpoint.MessageEndpointFactory ; 35 import javax.transaction.xa.XAResource ; 36 37 import org.apache.activemq.ActiveMQConnection; 38 import org.apache.activemq.ActiveMQConnectionFactory; 39 import org.apache.activemq.RedeliveryPolicy; 40 import org.apache.activemq.broker.BrokerFactory; 41 import org.apache.activemq.broker.BrokerService; 42 import org.apache.activemq.util.ServiceSupport; 43 import org.apache.commons.logging.Log; 44 import org.apache.commons.logging.LogFactory; 45 46 56 public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializable { 57 58 private static final long serialVersionUID = -5417363537865649130L; 59 private static final Log log = LogFactory.getLog(ActiveMQResourceAdapter.class); 60 61 private final HashMap endpointWorkers = new HashMap (); 62 private final ActiveMQConnectionRequestInfo info = new ActiveMQConnectionRequestInfo(); 63 64 private BootstrapContext bootstrapContext; 65 private String brokerXmlConfig; 66 private BrokerService broker; 67 private ActiveMQConnectionFactory connectionFactory; 68 69 72 public ActiveMQResourceAdapter() { 73 super(); 74 } 75 76 79 public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException { 80 this.bootstrapContext = bootstrapContext; 81 if (brokerXmlConfig!=null && brokerXmlConfig.trim().length()>0 ) { 82 try { 83 broker = BrokerFactory.createBroker(new URI (brokerXmlConfig)); 84 broker.start(); 85 } catch (Throwable e) { 86 throw new ResourceAdapterInternalException ("Failed to startup an embedded broker: "+brokerXmlConfig+", due to: "+e, e); 87 } 88 } 89 } 90 91 94 public ActiveMQConnection makeConnection() throws JMSException { 95 if (connectionFactory != null) { 96 return makeConnection(info, connectionFactory); 97 } 98 return makeConnection(info); 99 } 100 101 103 public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo info) throws JMSException { 104 105 ActiveMQConnectionFactory connectionFactory = createConnectionFactory(info); 106 return makeConnection(info, connectionFactory); 107 } 108 109 112 public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo info, ActiveMQConnectionFactory connectionFactory) throws JMSException { 113 String userName = info.getUserName(); 114 String password = info.getPassword(); 115 ActiveMQConnection physicalConnection = (ActiveMQConnection) connectionFactory.createConnection(userName, password); 116 117 String clientId = info.getClientid(); 118 if (clientId != null && clientId.length() > 0) { 119 physicalConnection.setClientID(clientId); 120 } 121 return physicalConnection; 122 } 123 124 127 public ActiveMQConnection makeConnection(MessageActivationSpec activationSpec) throws JMSException { 128 ActiveMQConnectionFactory connectionFactory = createConnectionFactory(info); 129 String userName = defaultValue(activationSpec.getUserName(), info.getUserName()); 130 String password = defaultValue(activationSpec.getPassword(), info.getPassword()); 131 String clientId = activationSpec.getClientId(); 132 if (clientId != null) { 133 connectionFactory.setClientID(clientId); 134 } 135 else { 136 if (activationSpec.isDurableSubscription()) { 137 log.warn("No clientID specified for durable subscription: " + activationSpec); 138 } 139 } 140 ActiveMQConnection physicalConnection = (ActiveMQConnection) connectionFactory.createConnection(userName, password); 141 142 RedeliveryPolicy redeliveryPolicy = activationSpec.redeliveryPolicy(); 144 if (redeliveryPolicy != null) { 145 physicalConnection.setRedeliveryPolicy(redeliveryPolicy); 146 } 147 return physicalConnection; 148 } 149 150 155 synchronized private ActiveMQConnectionFactory createConnectionFactory(ActiveMQConnectionRequestInfo info) throws JMSException { 156 ActiveMQConnectionFactory factory = connectionFactory; 157 if (factory != null && info.isConnectionFactoryConfigured()) { 158 factory = factory.copy(); 159 } 160 else if (factory == null) { 161 factory = new ActiveMQConnectionFactory(); 162 } 163 info.configure(factory); 164 return factory; 165 } 166 167 private String defaultValue(String value, String defaultValue) { 168 if (value != null) 169 return value; 170 return defaultValue; 171 } 172 173 176 public void stop() { 177 while (endpointWorkers.size() > 0) { 178 ActiveMQEndpointActivationKey key = (ActiveMQEndpointActivationKey) endpointWorkers.keySet().iterator().next(); 179 endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec()); 180 } 181 if (broker != null) { 182 ServiceSupport.dispose(broker); 183 broker = null; 184 } 185 this.bootstrapContext = null; 186 } 187 188 191 public BootstrapContext getBootstrapContext() { 192 return bootstrapContext; 193 } 194 195 199 public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) 200 throws ResourceException { 201 202 if (!equals(activationSpec.getResourceAdapter())) { 204 throw new ResourceException ("Activation spec not initialized with this ResourceAdapter instance (" + activationSpec.getResourceAdapter() + " != " + this + ")"); 205 } 206 207 if (!(activationSpec instanceof MessageActivationSpec)) { 208 throw new NotSupportedException ("That type of ActicationSpec not supported: " + activationSpec.getClass()); 209 } 210 211 ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, 212 (MessageActivationSpec) activationSpec); 213 if (endpointWorkers.containsKey(key)) { 216 throw new IllegalStateException ("Endpoint previously activated"); 217 } 218 219 ActiveMQEndpointWorker worker = new ActiveMQEndpointWorker(this, key); 220 221 endpointWorkers.put(key, worker); 222 worker.start(); 223 } 224 225 229 public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) { 230 231 if (activationSpec instanceof MessageActivationSpec) { 232 ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec) activationSpec); 233 ActiveMQEndpointWorker worker = (ActiveMQEndpointWorker) endpointWorkers.remove(key); 234 if (worker == null) { 235 return; 239 } 240 try { 241 worker.stop(); 242 } catch (InterruptedException e) { 243 Thread.currentThread().interrupt(); 248 } 249 250 } 251 252 } 253 254 260 public XAResource [] getXAResources(ActivationSpec [] activationSpecs) throws ResourceException { 261 Connection connection = null; 262 try { 263 connection = makeConnection(); 264 if (connection instanceof XAConnection ) { 265 XASession session = ((XAConnection ) connection).createXASession(); 266 XAResource xaResource = session.getXAResource(); 267 return new XAResource [] { xaResource }; 268 } 269 return new XAResource [] {}; 270 } catch (JMSException e) { 271 throw new ResourceException (e); 272 } finally { 273 try { 274 connection.close(); 275 } catch (Throwable ignore) { 276 } 278 } 279 } 280 281 287 290 public String getClientid() { 291 return emptyToNull(info.getClientid()); 292 } 293 294 297 public String getPassword() { 298 return emptyToNull(info.getPassword()); 299 } 300 301 304 public String getServerUrl() { 305 return info.getServerUrl(); 306 } 307 308 311 public String getUserName() { 312 return emptyToNull(info.getUserName()); 313 } 314 315 318 public void setClientid(String clientid) { 319 info.setClientid(clientid); 320 } 321 322 325 public void setPassword(String password) { 326 info.setPassword(password); 327 } 328 329 332 public void setServerUrl(String url) { 333 info.setServerUrl(url); 334 } 335 336 339 public void setUserName(String userid) { 340 info.setUserName(userid); 341 } 342 343 346 public String getBrokerXmlConfig() { 347 return brokerXmlConfig; 348 } 349 350 362 public void setBrokerXmlConfig(String brokerXmlConfig) { 363 this.brokerXmlConfig=brokerXmlConfig; 364 } 365 366 369 public Integer getDurableTopicPrefetch() { 370 return info.getDurableTopicPrefetch(); 371 } 372 373 376 public Long getInitialRedeliveryDelay() { 377 return info.getInitialRedeliveryDelay(); 378 } 379 380 383 public Integer getInputStreamPrefetch() { 384 return info.getInputStreamPrefetch(); 385 } 386 387 390 public Integer getMaximumRedeliveries() { 391 return info.getMaximumRedeliveries(); 392 } 393 394 397 public Integer getQueueBrowserPrefetch() { 398 return info.getQueueBrowserPrefetch(); 399 } 400 401 404 public Integer getQueuePrefetch() { 405 return info.getQueuePrefetch(); 406 } 407 408 411 public Short getRedeliveryBackOffMultiplier() { 412 return info.getRedeliveryBackOffMultiplier(); 413 } 414 415 418 public Boolean getRedeliveryUseExponentialBackOff() { 419 return info.getRedeliveryUseExponentialBackOff(); 420 } 421 422 425 public Integer getTopicPrefetch() { 426 return info.getTopicPrefetch(); 427 } 428 429 432 public boolean isUseInboundSessionEnabled() { 433 return info.isUseInboundSessionEnabled(); 434 } 435 436 439 public void setAllPrefetchValues(Integer i) { 440 info.setAllPrefetchValues(i); 441 } 442 443 446 public void setDurableTopicPrefetch(Integer durableTopicPrefetch) { 447 info.setDurableTopicPrefetch(durableTopicPrefetch); 448 } 449 450 453 public void setInitialRedeliveryDelay(Long value) { 454 info.setInitialRedeliveryDelay(value); 455 } 456 457 460 public void setInputStreamPrefetch(Integer inputStreamPrefetch) { 461 info.setInputStreamPrefetch(inputStreamPrefetch); 462 } 463 464 467 public void setMaximumRedeliveries(Integer value) { 468 info.setMaximumRedeliveries(value); 469 } 470 471 474 public void setQueueBrowserPrefetch(Integer queueBrowserPrefetch) { 475 info.setQueueBrowserPrefetch(queueBrowserPrefetch); 476 } 477 478 481 public void setQueuePrefetch(Integer queuePrefetch) { 482 info.setQueuePrefetch(queuePrefetch); 483 } 484 485 488 public void setRedeliveryBackOffMultiplier(Short value) { 489 info.setRedeliveryBackOffMultiplier(value); 490 } 491 492 495 public void setRedeliveryUseExponentialBackOff(Boolean value) { 496 info.setRedeliveryUseExponentialBackOff(value); 497 } 498 499 502 public void setTopicPrefetch(Integer topicPrefetch) { 503 info.setTopicPrefetch(topicPrefetch); 504 } 505 506 509 public ActiveMQConnectionRequestInfo getInfo() { 510 return info; 511 } 512 513 516 @Override 517 public boolean equals(Object o) { 518 if (this == o) { 519 return true; 520 } 521 if (!(o instanceof MessageResourceAdapter)) { 522 return false; 523 } 524 525 final MessageResourceAdapter activeMQResourceAdapter = (MessageResourceAdapter) o; 526 527 if (!info.equals(activeMQResourceAdapter.getInfo())) { 528 return false; 529 } 530 if ( notEqual(brokerXmlConfig, activeMQResourceAdapter.getBrokerXmlConfig()) ) { 531 return false; 532 } 533 534 return true; 535 } 536 537 private boolean notEqual(Object o1, Object o2) { 538 return (o1 == null ^ o2 == null) || (o1 != null && !o1.equals(o2)); 539 } 540 541 542 545 @Override 546 public int hashCode() { 547 int result; 548 result = info.hashCode(); 549 if( brokerXmlConfig !=null ) { 550 result ^= brokerXmlConfig.hashCode(); 551 } 552 return result; 553 } 554 555 private String emptyToNull(String value) { 556 if (value == null || value.length() == 0) { 557 return null; 558 } 559 return value; 560 } 561 562 565 public Boolean getUseInboundSession() { 566 return info.getUseInboundSession(); 567 } 568 569 572 public void setUseInboundSession(Boolean useInboundSession) { 573 info.setUseInboundSession(useInboundSession); 574 } 575 576 579 public ActiveMQConnectionFactory getConnectionFactory() { 580 return connectionFactory; 581 } 582 583 588 public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) { 589 this.connectionFactory = connectionFactory; 590 } 591 592 593 } 594 | Popular Tags |