1 18 package org.apache.activemq; 19 20 import java.net.URI ; 21 import java.net.URISyntaxException ; 22 import java.util.Map ; 23 import java.util.Properties ; 24 import java.util.concurrent.Executor ; 25 import java.util.concurrent.ScheduledThreadPoolExecutor ; 26 import java.util.concurrent.ThreadFactory ; 27 28 import javax.jms.Connection ; 29 import javax.jms.ConnectionFactory ; 30 import javax.jms.JMSException ; 31 import javax.jms.QueueConnection ; 32 import javax.jms.QueueConnectionFactory ; 33 import javax.jms.TopicConnection ; 34 import javax.jms.TopicConnectionFactory ; 35 import javax.naming.Context ; 36 37 import org.apache.activemq.blob.BlobTransferPolicy; 38 import org.apache.activemq.jndi.JNDIBaseStorable; 39 import org.apache.activemq.management.JMSStatsImpl; 40 import org.apache.activemq.management.StatsCapable; 41 import org.apache.activemq.management.StatsImpl; 42 import org.apache.activemq.transport.Transport; 43 import org.apache.activemq.transport.TransportFactory; 44 import org.apache.activemq.util.IdGenerator; 45 import org.apache.activemq.util.IntrospectionSupport; 46 import org.apache.activemq.util.JMSExceptionSupport; 47 import org.apache.activemq.util.URISupport; 48 import org.apache.activemq.util.URISupport.CompositeData; 49 50 59 public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory , QueueConnectionFactory , TopicConnectionFactory , StatsCapable, Cloneable { 60 61 public static final String DEFAULT_BROKER_URL = "tcp://localhost:61616"; 62 public static final String DEFAULT_USER = null; 63 public static final String DEFAULT_PASSWORD = null; 64 public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0; 65 66 private IdGenerator clientIdGenerator; 67 private String clientIDPrefix; 68 protected URI brokerURL; 69 protected String userName; 70 protected String password; 71 protected String clientID; 72 73 private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); 75 private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); 76 private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy(); 77 private MessageTransformer transformer; 78 79 private boolean disableTimeStampsByDefault = false; 80 private boolean optimizedMessageDispatch = true; 81 private boolean copyMessageOnSend = true; 82 private boolean useCompression = false; 83 private boolean objectMessageSerializationDefered = false; 84 protected boolean dispatchAsync = false; 85 protected boolean alwaysSessionAsync=true; 86 private boolean useAsyncSend = false; 87 private boolean optimizeAcknowledge = false; 88 private int closeTimeout = 15000; 89 private boolean useRetroactiveConsumer; 90 private boolean nestedMapAndListEnabled = true; 91 JMSStatsImpl factoryStats = new JMSStatsImpl(); 92 private boolean alwaysSyncSend; 93 private boolean watchTopicAdvisories=true; 94 private int producerWindowSize=DEFAULT_PRODUCER_WINDOW_SIZE; 95 96 static protected final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor (5, new ThreadFactory () { 97 public Thread newThread(Runnable run) { 98 Thread thread = new Thread (run); 99 thread.setPriority(ThreadPriorities.INBOUND_CLIENT_CONNECTION); 100 return thread; 101 } 102 }); 103 104 110 public ActiveMQConnectionFactory() { 111 this(DEFAULT_BROKER_URL); 112 } 113 114 public ActiveMQConnectionFactory(String brokerURL) { 115 this(createURI(brokerURL)); 116 } 117 118 121 public ActiveMQConnectionFactory copy() { 122 try { 123 return (ActiveMQConnectionFactory) super.clone(); 124 } 125 catch (CloneNotSupportedException e) { 126 throw new RuntimeException ("This should never happen: " + e, e); 127 } 128 } 129 130 135 private static URI createURI(String brokerURL) { 136 try { 137 return new URI (brokerURL); 138 } 139 catch (URISyntaxException e) { 140 throw (IllegalArgumentException ) new IllegalArgumentException ("Invalid broker URI: " + brokerURL).initCause(e); 141 } 142 } 143 144 public ActiveMQConnectionFactory(URI brokerURL) { 145 setBrokerURL(brokerURL.toString()); 146 } 147 148 public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) { 149 setUserName(userName); 150 setPassword(password); 151 setBrokerURL(brokerURL.toString()); 152 } 153 154 public ActiveMQConnectionFactory(String userName, String password, String brokerURL) { 155 setUserName(userName); 156 setPassword(password); 157 setBrokerURL(brokerURL); 158 } 159 160 163 public Connection createConnection() throws JMSException { 164 return createActiveMQConnection(); 165 } 166 167 170 public Connection createConnection(String userName, String password) throws JMSException { 171 return createActiveMQConnection(userName, password); 172 } 173 174 178 public QueueConnection createQueueConnection() throws JMSException { 179 return createActiveMQConnection(); 180 } 181 182 185 public QueueConnection createQueueConnection(String userName, String password) throws JMSException { 186 return createActiveMQConnection(userName, password); 187 } 188 189 193 public TopicConnection createTopicConnection() throws JMSException { 194 return createActiveMQConnection(); 195 } 196 197 200 public TopicConnection createTopicConnection(String userName, String password) throws JMSException { 201 return createActiveMQConnection(userName, password); 202 } 203 204 public StatsImpl getStats() { 205 return null; 207 } 208 209 215 216 protected ActiveMQConnection createActiveMQConnection() throws JMSException { 217 return createActiveMQConnection(userName, password); 218 } 219 220 231 protected Transport createTransport() throws JMSException { 232 try { 233 return TransportFactory.connect(brokerURL,DEFAULT_CONNECTION_EXECUTOR); 234 } catch (Exception e) { 235 throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e); 236 } 237 } 238 239 242 protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException { 243 if (brokerURL == null) { 244 throw new ConfigurationException("brokerURL not set."); 245 } 246 ActiveMQConnection connection=null; 247 try { 248 Transport transport = createTransport(); 249 connection = createActiveMQConnection(transport, factoryStats); 250 251 connection.setUserName(userName); 252 connection.setPassword(password); 253 connection.setPrefetchPolicy(getPrefetchPolicy()); 254 connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault()); 255 connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch()); 256 connection.setCopyMessageOnSend(isCopyMessageOnSend()); 257 connection.setUseCompression(isUseCompression()); 258 connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered()); 259 connection.setDispatchAsync(isDispatchAsync()); 260 connection.setUseAsyncSend(isUseAsyncSend()); 261 connection.setAlwaysSyncSend(isAlwaysSyncSend()); 262 connection.setAlwaysSessionAsync(isAlwaysSessionAsync()); 263 connection.setOptimizeAcknowledge(isOptimizeAcknowledge()); 264 connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer()); 265 connection.setRedeliveryPolicy(getRedeliveryPolicy()); 266 connection.setTransformer(getTransformer()); 267 connection.setBlobTransferPolicy(getBlobTransferPolicy().copy()); 268 connection.setWatchTopicAdvisories(watchTopicAdvisories); 269 connection.setProducerWindowSize(producerWindowSize); 270 transport.start(); 271 272 if( clientID !=null ) 273 connection.setDefaultClientID(clientID); 274 275 return connection; 276 } 277 catch (JMSException e) { 278 try { connection.close(); } catch ( Throwable ignore ) {} 280 throw e; 281 } 282 catch (Exception e) { 283 try { connection.close(); } catch ( Throwable ignore ) {} 285 throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e); 286 } 287 } 288 289 protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception { 290 ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(), stats); 291 return connection; 292 } 293 294 300 public String getBrokerURL() { 301 return brokerURL==null?null:brokerURL.toString(); 302 } 303 304 309 public void setBrokerURL(String brokerURL) { 310 this.brokerURL = createURI(brokerURL); 311 312 if( this.brokerURL.getQuery() !=null ) { 315 try { 317 318 Map map = URISupport.parseQuery(this.brokerURL.getQuery()); 319 if( buildFromMap(IntrospectionSupport.extractProperties(map, "jms.")) ) { 320 this.brokerURL = URISupport.createRemainingURI(this.brokerURL, map); 321 } 322 323 } catch (URISyntaxException e) { 324 } 325 326 } else { 327 328 try { 330 CompositeData data = URISupport.parseComposite(this.brokerURL); 331 if( buildFromMap(IntrospectionSupport.extractProperties(data.getParameters(), "jms.")) ) { 332 this.brokerURL = data.toURI(); 333 } 334 } catch (URISyntaxException e) { 335 } 336 } 337 } 338 339 public String getClientID() { 340 return clientID; 341 } 342 343 347 public void setClientID(String clientID) { 348 this.clientID = clientID; 349 } 350 351 public boolean isCopyMessageOnSend() { 352 return copyMessageOnSend; 353 } 354 355 361 public void setCopyMessageOnSend(boolean copyMessageOnSend) { 362 this.copyMessageOnSend = copyMessageOnSend; 363 } 364 365 public boolean isDisableTimeStampsByDefault() { 366 return disableTimeStampsByDefault; 367 } 368 369 373 public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) { 374 this.disableTimeStampsByDefault = disableTimeStampsByDefault; 375 } 376 377 public boolean isOptimizedMessageDispatch() { 378 return optimizedMessageDispatch; 379 } 380 381 385 public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) { 386 this.optimizedMessageDispatch = optimizedMessageDispatch; 387 } 388 389 public String getPassword() { 390 return password; 391 } 392 393 396 public void setPassword(String password) { 397 this.password = password; 398 } 399 400 public ActiveMQPrefetchPolicy getPrefetchPolicy() { 401 return prefetchPolicy; 402 } 403 404 409 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) { 410 this.prefetchPolicy = prefetchPolicy; 411 } 412 413 public boolean isUseAsyncSend() { 414 return useAsyncSend; 415 } 416 417 public BlobTransferPolicy getBlobTransferPolicy() { 418 return blobTransferPolicy; 419 } 420 421 425 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 426 this.blobTransferPolicy = blobTransferPolicy; 427 } 428 429 436 public void setUseAsyncSend(boolean useAsyncSend) { 437 this.useAsyncSend = useAsyncSend; 438 } 439 440 public synchronized boolean isWatchTopicAdvisories() { 441 return watchTopicAdvisories; 442 } 443 444 public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) { 445 this.watchTopicAdvisories = watchTopicAdvisories; 446 } 447 448 451 public boolean isAlwaysSyncSend(){ 452 return this.alwaysSyncSend; 453 } 454 455 459 public void setAlwaysSyncSend(boolean alwaysSyncSend){ 460 this.alwaysSyncSend=alwaysSyncSend; 461 } 462 463 public String getUserName() { 464 return userName; 465 } 466 467 470 public void setUserName(String userName) { 471 this.userName = userName; 472 } 473 474 public boolean isUseRetroactiveConsumer() { 475 return useRetroactiveConsumer; 476 } 477 478 483 public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) { 484 this.useRetroactiveConsumer = useRetroactiveConsumer; 485 } 486 487 public RedeliveryPolicy getRedeliveryPolicy() { 488 return redeliveryPolicy; 489 } 490 491 494 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 495 this.redeliveryPolicy = redeliveryPolicy; 496 } 497 498 public MessageTransformer getTransformer() { 499 return transformer; 500 } 501 502 506 public void setTransformer(MessageTransformer transformer) { 507 this.transformer = transformer; 508 } 509 510 public void buildFromProperties(Properties properties) { 511 512 if (properties == null) { 513 properties = new Properties (); 514 } 515 516 String temp = properties.getProperty(Context.PROVIDER_URL); 517 if (temp == null || temp.length() == 0) { 518 temp = properties.getProperty("brokerURL"); 519 } 520 if (temp != null && temp.length() > 0) { 521 setBrokerURL(temp); 522 } 523 524 buildFromMap(properties); 525 } 526 527 public boolean buildFromMap(Map properties) { 528 boolean rc=false; 529 530 ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy(); 531 if( IntrospectionSupport.setProperties(p, properties, "prefetchPolicy.") ) { 532 setPrefetchPolicy(p); 533 rc = true; 534 } 535 536 RedeliveryPolicy rp = new RedeliveryPolicy(); 537 if ( IntrospectionSupport.setProperties(rp, properties, "redeliveryPolicy.") ) { 538 setRedeliveryPolicy(rp); 539 rc = true; 540 } 541 542 BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy(); 543 if ( IntrospectionSupport.setProperties(blobTransferPolicy, properties, "blobTransferPolicy.") ) { 544 setBlobTransferPolicy(blobTransferPolicy); 545 rc = true; 546 } 547 548 rc |= IntrospectionSupport.setProperties(this, properties); 549 550 return rc; 551 } 552 553 public void populateProperties(Properties props) { 554 props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync())); 555 556 if (getBrokerURL() != null) { 557 props.setProperty(Context.PROVIDER_URL, getBrokerURL()); 558 props.setProperty("brokerURL", getBrokerURL()); 559 } 560 561 if (getClientID() != null) { 562 props.setProperty("clientID", getClientID()); 563 } 564 565 IntrospectionSupport.getProperties(getPrefetchPolicy(), props, "prefetchPolicy."); 566 IntrospectionSupport.getProperties(getRedeliveryPolicy(), props, "redeliveryPolicy."); 567 IntrospectionSupport.getProperties(getBlobTransferPolicy(), props, "blobTransferPolicy."); 568 569 props.setProperty("copyMessageOnSend", Boolean.toString(isCopyMessageOnSend())); 570 props.setProperty("disableTimeStampsByDefault", Boolean.toString(isDisableTimeStampsByDefault())); 571 props.setProperty("objectMessageSerializationDefered", Boolean.toString(isObjectMessageSerializationDefered())); 572 props.setProperty("optimizedMessageDispatch", Boolean.toString(isOptimizedMessageDispatch())); 573 574 if (getPassword() != null) { 575 props.setProperty("password", getPassword()); 576 } 577 578 props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend())); 579 props.setProperty("useCompression", Boolean.toString(isUseCompression())); 580 props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer())); 581 props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories())); 582 583 if (getUserName() != null) { 584 props.setProperty("userName", getUserName()); 585 } 586 587 props.setProperty("closeTimeout", Integer.toString(getCloseTimeout())); 588 props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync())); 589 props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge())); 590 props.setProperty("statsEnabled",Boolean.toString(isStatsEnabled())); 591 props.setProperty("alwaysSyncSend",Boolean.toString(isAlwaysSyncSend())); 592 props.setProperty("producerWindowSize", Integer.toString(producerWindowSize)); 593 } 594 595 public boolean isUseCompression() { 596 return useCompression; 597 } 598 599 602 public void setUseCompression(boolean useCompression) { 603 this.useCompression = useCompression; 604 } 605 606 public boolean isObjectMessageSerializationDefered() { 607 return objectMessageSerializationDefered; 608 } 609 610 616 public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) { 617 this.objectMessageSerializationDefered = objectMessageSerializationDefered; 618 } 619 620 public boolean isDispatchAsync() { 621 return dispatchAsync; 622 } 623 624 640 public void setDispatchAsync(boolean asyncDispatch) { 641 this.dispatchAsync = asyncDispatch; 642 } 643 644 647 public int getCloseTimeout(){ 648 return closeTimeout; 649 } 650 651 657 public void setCloseTimeout(int closeTimeout){ 658 this.closeTimeout=closeTimeout; 659 } 660 661 664 public boolean isAlwaysSessionAsync(){ 665 return alwaysSessionAsync; 666 } 667 668 674 public void setAlwaysSessionAsync(boolean alwaysSessionAsync){ 675 this.alwaysSessionAsync=alwaysSessionAsync; 676 } 677 678 681 public boolean isOptimizeAcknowledge(){ 682 return optimizeAcknowledge; 683 } 684 685 688 public void setOptimizeAcknowledge(boolean optimizeAcknowledge){ 689 this.optimizeAcknowledge=optimizeAcknowledge; 690 } 691 692 public boolean isNestedMapAndListEnabled() { 693 return nestedMapAndListEnabled ; 694 } 695 696 702 public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) { 703 this.nestedMapAndListEnabled = structuredMapsEnabled; 704 } 705 706 public String getClientIDPrefix() { 707 return clientIDPrefix; 708 } 709 710 716 public void setClientIDPrefix(String clientIDPrefix) { 717 this.clientIDPrefix = clientIDPrefix; 718 } 719 720 protected synchronized IdGenerator getClientIdGenerator() { 721 if (clientIdGenerator == null) { 722 if (clientIDPrefix != null) { 723 clientIdGenerator = new IdGenerator(clientIDPrefix); 724 } 725 else { 726 clientIdGenerator = new IdGenerator(); 727 } 728 } 729 return clientIdGenerator; 730 } 731 732 protected void setClientIdGenerator(IdGenerator clientIdGenerator) { 733 this.clientIdGenerator = clientIdGenerator; 734 } 735 736 737 740 public boolean isStatsEnabled(){ 741 return this.factoryStats.isEnabled(); 742 } 743 744 745 748 public void setStatsEnabled(boolean statsEnabled){ 749 this.factoryStats.setEnabled(statsEnabled); 750 } 751 752 synchronized public int getProducerWindowSize() { 753 return producerWindowSize; 754 } 755 756 synchronized public void setProducerWindowSize(int producerWindowSize) { 757 this.producerWindowSize = producerWindowSize; 758 } 759 } 760 | Popular Tags |