1 18 package org.apache.activemq; 19 20 import java.io.IOException ; 21 import java.io.InputStream ; 22 import java.io.OutputStream ; 23 import java.net.URI ; 24 import java.net.URISyntaxException ; 25 import java.util.HashMap ; 26 import java.util.Iterator ; 27 import java.util.Map ; 28 import java.util.concurrent.ConcurrentHashMap ; 29 import java.util.concurrent.CopyOnWriteArrayList ; 30 import java.util.concurrent.CountDownLatch ; 31 import java.util.concurrent.LinkedBlockingQueue ; 32 import java.util.concurrent.ThreadFactory ; 33 import java.util.concurrent.ThreadPoolExecutor ; 34 import java.util.concurrent.TimeUnit ; 35 import java.util.concurrent.atomic.AtomicBoolean ; 36 import java.util.concurrent.atomic.AtomicInteger ; 37 38 import javax.jms.Connection ; 39 import javax.jms.ConnectionConsumer ; 40 import javax.jms.ConnectionMetaData ; 41 import javax.jms.DeliveryMode ; 42 import javax.jms.Destination ; 43 import javax.jms.ExceptionListener ; 44 import javax.jms.IllegalStateException ; 45 import javax.jms.InvalidDestinationException ; 46 import javax.jms.JMSException ; 47 import javax.jms.Queue ; 48 import javax.jms.QueueConnection ; 49 import javax.jms.QueueSession ; 50 import javax.jms.ServerSessionPool ; 51 import javax.jms.Session ; 52 import javax.jms.Topic ; 53 import javax.jms.TopicConnection ; 54 import javax.jms.TopicSession ; 55 import javax.jms.XAConnection ; 56 57 import org.apache.activemq.blob.BlobTransferPolicy; 58 import org.apache.activemq.command.ActiveMQDestination; 59 import org.apache.activemq.command.ActiveMQMessage; 60 import org.apache.activemq.command.ActiveMQTempDestination; 61 import org.apache.activemq.command.ActiveMQTempQueue; 62 import org.apache.activemq.command.ActiveMQTempTopic; 63 import org.apache.activemq.command.BrokerInfo; 64 import org.apache.activemq.command.Command; 65 import org.apache.activemq.command.CommandTypes; 66 import org.apache.activemq.command.ConnectionControl; 67 import org.apache.activemq.command.ConnectionError; 68 import org.apache.activemq.command.ConnectionId; 69 import org.apache.activemq.command.ConnectionInfo; 70 import org.apache.activemq.command.ConsumerControl; 71 import org.apache.activemq.command.ConsumerId; 72 import org.apache.activemq.command.ConsumerInfo; 73 import org.apache.activemq.command.ControlCommand; 74 import org.apache.activemq.command.DestinationInfo; 75 import org.apache.activemq.command.ExceptionResponse; 76 import org.apache.activemq.command.Message; 77 import org.apache.activemq.command.MessageDispatch; 78 import org.apache.activemq.command.MessageId; 79 import org.apache.activemq.command.ProducerAck; 80 import org.apache.activemq.command.ProducerId; 81 import org.apache.activemq.command.RemoveSubscriptionInfo; 82 import org.apache.activemq.command.Response; 83 import org.apache.activemq.command.SessionId; 84 import org.apache.activemq.command.ShutdownInfo; 85 import org.apache.activemq.command.WireFormatInfo; 86 import org.apache.activemq.management.JMSConnectionStatsImpl; 87 import org.apache.activemq.management.JMSStatsImpl; 88 import org.apache.activemq.management.StatsCapable; 89 import org.apache.activemq.management.StatsImpl; 90 import org.apache.activemq.state.CommandVisitorAdapter; 91 import org.apache.activemq.thread.TaskRunnerFactory; 92 import org.apache.activemq.transport.Transport; 93 import org.apache.activemq.transport.TransportListener; 94 import org.apache.activemq.util.IdGenerator; 95 import org.apache.activemq.util.IntrospectionSupport; 96 import org.apache.activemq.util.JMSExceptionSupport; 97 import org.apache.activemq.util.LongSequenceGenerator; 98 import org.apache.activemq.util.ServiceSupport; 99 import org.apache.commons.logging.Log; 100 import org.apache.commons.logging.LogFactory; 101 102 103 public class ActiveMQConnection implements Connection , TopicConnection , QueueConnection , StatsCapable, Closeable, StreamConnection, TransportListener { 104 105 private TaskRunnerFactory sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task",ThreadPriorities.INBOUND_CLIENT_SESSION,true,1000); 106 private final ThreadPoolExecutor asyncConnectionThread; 107 108 private static final Log log = LogFactory.getLog(ActiveMQConnection.class); 109 private static final IdGenerator connectionIdGenerator = new IdGenerator(); 110 111 public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER; 112 public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; 113 public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; 114 115 private final ConnectionInfo info; 117 private ExceptionListener exceptionListener; 118 private boolean clientIDSet; 119 private boolean isConnectionInfoSentToBroker; 120 private boolean userSpecifiedClientID; 121 122 private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); 124 private BlobTransferPolicy blobTransferPolicy; 125 private RedeliveryPolicy redeliveryPolicy; 126 private MessageTransformer transformer; 127 128 private boolean disableTimeStampsByDefault = false; 129 private boolean optimizedMessageDispatch = true; 130 private boolean copyMessageOnSend = true; 131 private boolean useCompression = false; 132 private boolean objectMessageSerializationDefered = false; 133 protected boolean dispatchAsync = false; 134 protected boolean alwaysSessionAsync=true; 135 private boolean useAsyncSend = false; 136 private boolean optimizeAcknowledge = false; 137 private boolean nestedMapAndListEnabled = true; 138 private boolean useRetroactiveConsumer; 139 private boolean alwaysSyncSend; 140 private int closeTimeout = 15000; 141 private boolean useSyncSend=false; 142 private boolean watchTopicAdvisories=true; 143 144 private final Transport transport; 145 private final IdGenerator clientIdGenerator; 146 private final JMSStatsImpl factoryStats; 147 private final JMSConnectionStatsImpl stats; 148 149 private final AtomicBoolean started = new AtomicBoolean (false); 150 private final AtomicBoolean closing = new AtomicBoolean (false); 151 private final AtomicBoolean closed = new AtomicBoolean (false); 152 private final AtomicBoolean transportFailed = new AtomicBoolean (false); 153 private final CopyOnWriteArrayList sessions = new CopyOnWriteArrayList (); 154 private final CopyOnWriteArrayList connectionConsumers = new CopyOnWriteArrayList (); 155 private final CopyOnWriteArrayList inputStreams = new CopyOnWriteArrayList (); 156 private final CopyOnWriteArrayList outputStreams = new CopyOnWriteArrayList (); 157 private final CopyOnWriteArrayList transportListeners = new CopyOnWriteArrayList (); 158 159 private final ConcurrentHashMap dispatchers = new ConcurrentHashMap (); 161 private final ConcurrentHashMap <ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap <ProducerId, ActiveMQMessageProducer>(); 162 private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator(); 163 private final SessionId connectionSessionId; 164 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 165 private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator(); 166 private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator(); 167 private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator(); 168 final ConcurrentHashMap activeTempDestinations = new ConcurrentHashMap (); 169 170 private AdvisoryConsumer advisoryConsumer; 171 private final CountDownLatch brokerInfoReceived = new CountDownLatch (1); 172 private BrokerInfo brokerInfo; 173 private IOException firstFailureError; 174 private int producerWindowSize=ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE; 175 176 private AtomicInteger protocolVersion=new AtomicInteger (CommandTypes.PROTOCOL_VERSION); 179 180 186 protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) 187 throws Exception { 188 189 this.transport = transport; 190 this.clientIdGenerator = clientIdGenerator; 191 this.factoryStats = factoryStats; 192 193 asyncConnectionThread = new ThreadPoolExecutor (1,1,5,TimeUnit.SECONDS, new LinkedBlockingQueue (), new ThreadFactory () { 195 public Thread newThread(Runnable r) { 196 Thread thread = new Thread (r, "AcitveMQ Connection Worker: "+transport); 197 thread.setDaemon(true); 198 return thread; 199 }}); 200 202 this.info = new ConnectionInfo(new ConnectionId(connectionIdGenerator.generateId())); 203 this.info.setManageable(true); 204 this.connectionSessionId = new SessionId(info.getConnectionId(), -1); 205 206 this.transport.setTransportListener(this); 207 208 this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection ); 209 this.factoryStats.addConnection(this); 210 } 211 212 213 protected void setUserName(String userName) { 214 this.info.setUserName(userName); 215 } 216 217 protected void setPassword(String password) { 218 this.info.setPassword(password); 219 } 220 221 227 public static ActiveMQConnection makeConnection() throws JMSException { 228 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); 229 return (ActiveMQConnection) factory.createConnection(); 230 } 231 232 239 public static ActiveMQConnection makeConnection(String uri) throws JMSException , URISyntaxException { 240 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri); 241 return (ActiveMQConnection) factory.createConnection(); 242 } 243 244 253 public static ActiveMQConnection makeConnection(String user, String password, String uri) 254 throws JMSException , URISyntaxException { 255 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI (uri)); 256 return (ActiveMQConnection) factory.createConnection(); 257 } 258 259 262 public JMSConnectionStatsImpl getConnectionStats() { 263 return stats; 264 } 265 266 288 public Session createSession(boolean transacted,int acknowledgeMode) throws JMSException { 289 checkClosedOrFailed(); 290 ensureConnectionInfoSent(); 291 boolean doSessionAsync=alwaysSessionAsync||sessions.size()>0||transacted 292 ||acknowledgeMode==Session.CLIENT_ACKNOWLEDGE; 293 return new ActiveMQSession(this,getNextSessionId(),(transacted?Session.SESSION_TRANSACTED 294 :(acknowledgeMode==Session.SESSION_TRANSACTED?Session.AUTO_ACKNOWLEDGE:acknowledgeMode)), 295 dispatchAsync,alwaysSessionAsync); 296 } 297 298 301 protected SessionId getNextSessionId() { 302 return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId()); 303 } 304 305 318 public String getClientID() throws JMSException { 319 checkClosedOrFailed(); 320 return this.info.getClientId(); 321 } 322 323 364 public void setClientID(String newClientID) throws JMSException { 365 checkClosedOrFailed(); 366 367 if (this.clientIDSet) { 368 throw new IllegalStateException ("The clientID has already been set"); 369 } 370 371 if (this.isConnectionInfoSentToBroker) { 372 throw new IllegalStateException ("Setting clientID on a used Connection is not allowed"); 373 } 374 375 this.info.setClientId(newClientID); 376 this.userSpecifiedClientID = true; 377 ensureConnectionInfoSent(); 378 } 379 380 384 public void setDefaultClientID(String clientID) throws JMSException { 385 this.info.setClientId(clientID); 386 this.userSpecifiedClientID = true; 387 } 388 389 390 399 public ConnectionMetaData getMetaData() throws JMSException { 400 checkClosedOrFailed(); 401 return ActiveMQConnectionMetaData.INSTANCE; 402 } 403 404 417 public ExceptionListener getExceptionListener() throws JMSException { 418 checkClosedOrFailed(); 419 return this.exceptionListener; 420 } 421 422 446 public void setExceptionListener(ExceptionListener listener) throws JMSException { 447 checkClosedOrFailed(); 448 this.exceptionListener = listener; 449 } 450 451 461 public void start() throws JMSException { 462 checkClosedOrFailed(); 463 ensureConnectionInfoSent(); 464 if (started.compareAndSet(false, true)) { 465 for (Iterator i = sessions.iterator(); i.hasNext();) { 466 ActiveMQSession session = (ActiveMQSession) i.next(); 467 session.start(); 468 } 469 } 470 } 471 472 504 public void stop() throws JMSException { 505 checkClosedOrFailed(); 506 if (started.compareAndSet(true, false)) { 507 for (Iterator i = sessions.iterator(); i.hasNext();) { 508 ActiveMQSession s = (ActiveMQSession) i.next(); 509 s.stop(); 510 } 511 } 512 } 513 514 557 public void close() throws JMSException { 558 checkClosed(); 559 560 try { 561 stop(); 563 564 synchronized (this) { 565 if (!closed.get()) { 566 closing.set(true); 567 568 if( advisoryConsumer!=null ) { 569 advisoryConsumer.dispose(); 570 advisoryConsumer=null; 571 } 572 573 for (Iterator i = this.sessions.iterator(); i.hasNext();) { 574 ActiveMQSession s = (ActiveMQSession) i.next(); 575 s.dispose(); 576 } 577 for (Iterator i = this.connectionConsumers.iterator(); i.hasNext();) { 578 ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer) i.next(); 579 c.dispose(); 580 } 581 for (Iterator i = this.inputStreams.iterator(); i.hasNext();) { 582 ActiveMQInputStream c = (ActiveMQInputStream) i.next(); 583 c.dispose(); 584 } 585 for (Iterator i = this.outputStreams.iterator(); i.hasNext();) { 586 ActiveMQOutputStream c = (ActiveMQOutputStream) i.next(); 587 c.dispose(); 588 } 589 590 if (isConnectionInfoSentToBroker) { 591 syncSendPacket(info.createRemoveCommand(), closeTimeout); 594 asyncSendPacket(new ShutdownInfo()); 595 } 596 597 ServiceSupport.dispose(this.transport); 598 599 started.set(false); 600 601 sessionTaskRunner.shutdown(); 605 606 closed.set(true); 607 closing.set(false); 608 } 609 } 610 } 611 finally { 612 factoryStats.removeConnection(this); 613 } 614 } 615 616 621 628 629 630 661 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, 662 String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 663 return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false); 664 } 665 666 700 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, 701 String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal) 702 throws JMSException { 703 checkClosedOrFailed(); 704 ensureConnectionInfoSent(); 705 SessionId sessionId = new SessionId(info.getConnectionId(), -1); 706 ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator 707 .getNextSequenceId())); 708 info.setDestination(ActiveMQMessageTransformation.transformDestination(topic)); 709 info.setSubscriptionName(subscriptionName); 710 info.setSelector(messageSelector); 711 info.setPrefetchSize(maxMessages); 712 info.setDispatchAsync(dispatchAsync); 713 714 if( info.getDestination().getOptions()!=null ) { 716 HashMap options = new HashMap (info.getDestination().getOptions()); 717 IntrospectionSupport.setProperties(this.info, options, "consumer."); 718 } 719 720 return new ActiveMQConnectionConsumer(this, sessionPool, info); 721 } 722 723 724 727 732 public boolean isStarted() { 733 return started.get(); 734 } 735 736 739 public boolean isClosed() { 740 return closed.get(); 741 } 742 743 746 public boolean isClosing() { 747 return closing.get(); 748 } 749 750 753 public boolean isTransportFailed() { 754 return transportFailed.get(); 755 } 756 757 760 public ActiveMQPrefetchPolicy getPrefetchPolicy() { 761 return prefetchPolicy; 762 } 763 764 769 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) { 770 this.prefetchPolicy = prefetchPolicy; 771 } 772 773 775 public Transport getTransportChannel() { 776 return transport; 777 } 778 779 784 public String getInitializedClientID() throws JMSException { 785 ensureConnectionInfoSent(); 786 return info.getClientId(); 787 } 788 789 793 public boolean isDisableTimeStampsByDefault() { 794 return disableTimeStampsByDefault; 795 } 796 797 801 public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) { 802 this.disableTimeStampsByDefault = timeStampsDisableByDefault; 803 } 804 805 809 public boolean isOptimizedMessageDispatch() { 810 return optimizedMessageDispatch; 811 } 812 813 817 public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) { 818 this.optimizedMessageDispatch = dispatchOptimizedMessage; 819 } 820 821 824 public int getCloseTimeout(){ 825 return closeTimeout; 826 } 827 828 829 835 public void setCloseTimeout(int closeTimeout){ 836 this.closeTimeout=closeTimeout; 837 } 838 839 843 public ConnectionInfo getConnectionInfo() { 844 return this.info; 845 } 846 847 public boolean isUseRetroactiveConsumer() { 848 return useRetroactiveConsumer; 849 } 850 851 856 public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) { 857 this.useRetroactiveConsumer = useRetroactiveConsumer; 858 } 859 860 public boolean isNestedMapAndListEnabled() { 861 return nestedMapAndListEnabled; 862 } 863 864 870 public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) { 871 this.nestedMapAndListEnabled = structuredMapsEnabled; 872 } 873 874 875 879 public void addTransportListener(TransportListener transportListener) { 880 transportListeners.add(transportListener); 881 } 882 883 public void removeTransportListener(TransportListener transportListener) { 884 transportListeners.remove(transportListener); 885 } 886 887 public TaskRunnerFactory getSessionTaskRunner() { 888 return sessionTaskRunner; 889 } 890 891 public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) { 892 this.sessionTaskRunner = sessionTaskRunner; 893 } 894 895 public MessageTransformer getTransformer() { 896 return transformer; 897 } 898 899 903 public void setTransformer(MessageTransformer transformer) { 904 this.transformer = transformer; 905 } 906 907 910 public boolean isStatsEnabled(){ 911 return this.stats.isEnabled(); 912 } 913 914 915 918 public void setStatsEnabled(boolean statsEnabled){ 919 this.stats.setEnabled(statsEnabled); 920 } 921 922 923 926 933 protected void addSession(ActiveMQSession session) throws JMSException { 934 this.sessions.add(session); 935 if (sessions.size()>1 || session.isTransacted()){ 936 optimizedMessageDispatch = false; 937 } 938 } 939 940 945 protected void removeSession(ActiveMQSession session) { 946 this.sessions.remove(session); 947 } 948 949 955 protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException { 956 this.connectionConsumers.add(connectionConsumer); 957 } 958 959 964 protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) { 965 this.connectionConsumers.remove(connectionConsumer); 966 } 967 968 989 public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException { 990 return new ActiveMQTopicSession((ActiveMQSession) createSession(transacted, acknowledgeMode)); 991 } 992 993 1021 public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 1022 return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false); 1023 } 1024 1025 1053 public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, 1054 ServerSessionPool sessionPool, int maxMessages) throws JMSException { 1055 return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false); 1056 } 1057 1058 1087 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, 1088 ServerSessionPool sessionPool, int maxMessages) throws JMSException { 1089 return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false); 1090 } 1091 1092 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal) throws JMSException { 1093 1094 checkClosedOrFailed(); 1095 ensureConnectionInfoSent(); 1096 1097 ConsumerId consumerId = createConsumerId(); 1098 ConsumerInfo info = new ConsumerInfo(consumerId); 1099 info.setDestination(ActiveMQMessageTransformation.transformDestination(destination)); 1100 info.setSelector(messageSelector); 1101 info.setPrefetchSize(maxMessages); 1102 info.setNoLocal(noLocal); 1103 info.setDispatchAsync(dispatchAsync); 1104 1105 if( info.getDestination().getOptions()!=null ) { 1107 HashMap options = new HashMap (info.getDestination().getOptions()); 1108 IntrospectionSupport.setProperties(info, options, "consumer."); 1109 } 1110 1111 return new ActiveMQConnectionConsumer(this, sessionPool, info); 1112 } 1113 1114 1117 private ConsumerId createConsumerId() { 1118 return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId()); 1119 } 1120 1121 1124 private ProducerId createProducerId() { 1125 return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId()); 1126 } 1127 1128 1129 1150 public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException { 1151 return new ActiveMQQueueSession((ActiveMQSession) createSession(transacted, acknowledgeMode)); 1152 } 1153 1154 1162 public void checkClientIDWasManuallySpecified() throws JMSException { 1163 if (!userSpecifiedClientID) { 1164 throw new JMSException ( 1165 "You cannot create a durable subscriber without specifying a unique clientID on a Connection"); 1166 } 1167 } 1168 1169 1175 public void asyncSendPacket(Command command) throws JMSException { 1176 if (isClosed()) { 1177 throw new ConnectionClosedException(); 1178 } else { 1179 1180 try { 1181 this.transport.oneway(command); 1182 } catch (IOException e) { 1183 throw JMSExceptionSupport.create(e); 1184 } 1185 } 1186 } 1187 1188 1195 public Response syncSendPacket(Command command) throws JMSException { 1196 if (isClosed()) { 1197 throw new ConnectionClosedException(); 1198 } else { 1199 1200 try { 1201 Response response = (Response) this.transport.request(command); 1202 if (response.isException()) { 1203 ExceptionResponse er = (ExceptionResponse) response; 1204 if (er.getException() instanceof JMSException ) 1205 throw (JMSException ) er.getException(); 1206 else 1207 throw JMSExceptionSupport.create(er.getException()); 1208 } 1209 return response; 1210 } catch (IOException e) { 1211 throw JMSExceptionSupport.create(e); 1212 } 1213 } 1214 } 1215 1216 1223 public Response syncSendPacket(Command command, int timeout) throws JMSException { 1224 if (isClosed()) { 1225 throw new ConnectionClosedException(); 1226 } else { 1227 1228 try { 1229 Response response = (Response) this.transport.request(command,timeout); 1230 if (response!=null && response.isException()) { 1231 ExceptionResponse er = (ExceptionResponse) response; 1232 if (er.getException() instanceof JMSException ) 1233 throw (JMSException ) er.getException(); 1234 else 1235 throw JMSExceptionSupport.create(er.getException()); 1236 } 1237 return response; 1238 } catch (IOException e) { 1239 throw JMSExceptionSupport.create(e); 1240 } 1241 } 1242 } 1243 1244 1247 public StatsImpl getStats() { 1248 return stats; 1249 } 1250 1251 1257 protected synchronized void checkClosedOrFailed() throws JMSException { 1258 checkClosed(); 1259 if (transportFailed.get()){ 1260 throw new ConnectionFailedException(firstFailureError); 1261 } 1262 } 1263 1264 1269 protected synchronized void checkClosed() throws JMSException { 1270 if (closed.get()) { 1271 throw new ConnectionClosedException(); 1272 } 1273 } 1274 1275 1280 protected synchronized void ensureConnectionInfoSent() throws JMSException { 1281 if (isConnectionInfoSentToBroker || closed.get()) { 1283 return; 1284 } 1285 1286 if (info.getClientId() == null || info.getClientId().trim().length() == 0) { 1287 info.setClientId(clientIdGenerator.generateId()); 1288 } 1289 syncSendPacket(info); 1290 1291 this.isConnectionInfoSentToBroker = true; 1292 1296 ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1),consumerIdGenerator.getNextSequenceId()); 1297 if( watchTopicAdvisories ) { 1298 advisoryConsumer = new AdvisoryConsumer(this, consumerId); 1299 } 1300 } 1301 1302 1303 1306 public boolean isUseAsyncSend() { 1307 return useAsyncSend; 1308 } 1309 1310 public void setUseSyncSend(boolean forceSyncSend) { 1311 this.useSyncSend = forceSyncSend; 1312 } 1313 1314 1315 public synchronized boolean isWatchTopicAdvisories() { 1316 return watchTopicAdvisories; 1317 } 1318 1319 1320 public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) { 1321 this.watchTopicAdvisories = watchTopicAdvisories; 1322 } 1323 1324 1325 1332 public void setUseAsyncSend(boolean useAsyncSend) { 1333 this.useAsyncSend = useAsyncSend; 1334 } 1335 1336 1339 public boolean isAlwaysSyncSend(){ 1340 return this.alwaysSyncSend; 1341 } 1342 1343 1347 public void setAlwaysSyncSend(boolean alwaysSyncSend){ 1348 this.alwaysSyncSend=alwaysSyncSend; 1349 } 1350 1351 1352 1359 public void cleanup() throws JMSException { 1360 1361 if( advisoryConsumer!=null ) { 1362 advisoryConsumer.dispose(); 1363 advisoryConsumer=null; 1364 } 1365 1366 for (Iterator i = this.sessions.iterator(); i.hasNext();) { 1367 ActiveMQSession s = (ActiveMQSession) i.next(); 1368 s.dispose(); 1369 } 1370 for (Iterator i = this.connectionConsumers.iterator(); i.hasNext();) { 1371 ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer) i.next(); 1372 c.dispose(); 1373 } 1374 for (Iterator i = this.inputStreams.iterator(); i.hasNext();) { 1375 ActiveMQInputStream c = (ActiveMQInputStream) i.next(); 1376 c.dispose(); 1377 } 1378 for (Iterator i = this.outputStreams.iterator(); i.hasNext();) { 1379 ActiveMQOutputStream c = (ActiveMQOutputStream) i.next(); 1380 c.dispose(); 1381 } 1382 1383 if(isConnectionInfoSentToBroker){ 1384 if(!transportFailed.get() && !closing.get()){ 1385 asyncSendPacket(info.createRemoveCommand()); 1386 } 1387 isConnectionInfoSentToBroker=false; 1388 } 1389 if( userSpecifiedClientID ) { 1390 info.setClientId(null); 1391 userSpecifiedClientID=false; 1392 } 1393 clientIDSet = false; 1394 1395 started.set(false); 1396 } 1397 1398 1407 public void changeUserInfo(String userName, String password) throws JMSException { 1408 if (isConnectionInfoSentToBroker) 1409 throw new IllegalStateException ("changeUserInfo used Connection is not allowed"); 1410 1411 this.info.setUserName(userName); 1412 this.info.setPassword(password); 1413 } 1414 1415 1419 public String getResourceManagerId() throws JMSException { 1420 waitForBrokerInfo(); 1421 if( brokerInfo==null ) 1422 throw new JMSException ("Connection failed before Broker info was received."); 1423 return brokerInfo.getBrokerId().getValue(); 1424 } 1425 1426 1429 public String getBrokerName() { 1430 if (brokerInfo == null) { 1431 return null; 1432 } 1433 return brokerInfo.getBrokerName(); 1434 } 1435 1436 1439 public BrokerInfo getBrokerInfo() { 1440 return brokerInfo; 1441 } 1442 1443 1447 public RedeliveryPolicy getRedeliveryPolicy() throws JMSException { 1448 return redeliveryPolicy; 1449 } 1450 1451 1454 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 1455 this.redeliveryPolicy = redeliveryPolicy; 1456 } 1457 1458 public BlobTransferPolicy getBlobTransferPolicy() { 1459 if (blobTransferPolicy == null) { 1460 blobTransferPolicy = createBlobTransferPolicy(); 1461 } 1462 return blobTransferPolicy; 1463 } 1464 1465 1469 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 1470 this.blobTransferPolicy = blobTransferPolicy; 1471 } 1472 1473 1476 public boolean isAlwaysSessionAsync(){ 1477 return alwaysSessionAsync; 1478 } 1479 1480 1481 1487 public void setAlwaysSessionAsync(boolean alwaysSessionAsync) { 1488 this.alwaysSessionAsync = alwaysSessionAsync; 1489 } 1490 1491 1494 public boolean isOptimizeAcknowledge(){ 1495 return optimizeAcknowledge; 1496 } 1497 1498 1499 1504 public void setOptimizeAcknowledge(boolean optimizeAcknowledge){ 1505 this.optimizeAcknowledge=optimizeAcknowledge; 1506 } 1507 1508 1509 private void waitForBrokerInfo() throws JMSException { 1510 try { 1511 brokerInfoReceived.await(); 1512 } catch (InterruptedException e) { 1513 Thread.currentThread().interrupt(); 1514 throw JMSExceptionSupport.create(e); 1515 } 1516 } 1517 1518 Transport getTransport() { 1520 return transport; 1521 } 1522 1523 public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) { 1524 producers.put(producerId, producer); 1525 } 1526 public void removeProducer(ProducerId producerId) { 1527 producers.remove(producerId); 1528 } 1529 1530 1531 public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) { 1532 dispatchers.put(consumerId, dispatcher); 1533 } 1534 public void removeDispatcher(ConsumerId consumerId) { 1535 dispatchers.remove(consumerId); 1536 } 1537 1538 1541 public void onCommand(final Object o) { 1542 final Command command = (Command) o; 1543 if (!closed.get() && command != null) { 1544 try { 1545 command.visit(new CommandVisitorAdapter(){ 1546 @Override 1547 public Response processMessageDispatch(MessageDispatch md) throws Exception { 1548 ActiveMQDispatcher dispatcher = (ActiveMQDispatcher) dispatchers.get(md.getConsumerId()); 1549 if (dispatcher != null) { 1550 Message msg = md.getMessage(); 1553 if( msg!=null ) { 1554 msg = msg.copy(); 1555 msg.setReadOnlyBody(true); 1556 msg.setReadOnlyProperties(true); 1557 msg.setRedeliveryCounter(md.getRedeliveryCounter()); 1558 msg.setConnection(ActiveMQConnection.this); 1559 md.setMessage( msg ); 1560 } 1561 dispatcher.dispatch(md); 1562 } 1563 return null; 1564 } 1565 1566 @Override 1567 public Response processProducerAck(ProducerAck pa) throws Exception { 1568 ActiveMQMessageProducer producer = producers.get(pa.getProducerId()); 1569 if( producer!=null ) { 1570 producer.onProducerAck(pa); 1571 } 1572 return null; 1573 } 1574 1575 @Override 1576 public Response processBrokerInfo(BrokerInfo info) throws Exception { 1577 brokerInfo=info; 1578 brokerInfoReceived.countDown(); 1579 optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration(); 1580 getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl()); 1581 return null; 1582 } 1583 1584 @Override 1585 public Response processConnectionError(final ConnectionError error) throws Exception { 1586 asyncConnectionThread.execute(new Runnable (){ 1587 public void run() { 1588 onAsyncException(error.getException()); 1589 } 1590 }); 1591 new Thread ("Async error worker") { 1592 }.start(); 1593 return null; 1594 } 1595 @Override 1596 public Response processControlCommand(ControlCommand command) throws Exception { 1597 onControlCommand(command); 1598 return null; 1599 } 1600 @Override 1601 public Response processConnectionControl(ConnectionControl control) throws Exception { 1602 onConnectionControl((ConnectionControl) command); 1603 return null; 1604 } 1605 @Override 1606 public Response processConsumerControl(ConsumerControl control) throws Exception { 1607 onConsumerControl((ConsumerControl) command); 1608 return null; 1609 } 1610 @Override 1611 public Response processWireFormat(WireFormatInfo info) throws Exception { 1612 onWireFormatInfo((WireFormatInfo) command); 1613 return null; 1614 } 1615 }); 1616 } catch (Exception e) { 1617 onAsyncException(e); 1618 } 1619 1620 } 1621 for (Iterator iter = transportListeners.iterator(); iter.hasNext();) { 1622 TransportListener listener = (TransportListener) iter.next(); 1623 listener.onCommand(command); 1624 } 1625 } 1626 1627 protected void onWireFormatInfo(WireFormatInfo info) { 1628 protocolVersion.set(info.getVersion()); 1629 } 1630 1631 1632 1637 public void onAsyncException(Throwable error) { 1638 if (!closed.get() && !closing.get()) { 1639 if (this.exceptionListener != null) { 1640 1641 if (!(error instanceof JMSException )) 1642 error = JMSExceptionSupport.create(error); 1643 final JMSException e = (JMSException ) error; 1644 1645 asyncConnectionThread.execute(new Runnable (){ 1646 public void run() { 1647 ActiveMQConnection.this.exceptionListener.onException(e); 1648 } 1649 }); 1650 1651 } else { 1652 log.warn("Async exception with no exception listener: " + error, error); 1653 } 1654 } 1655 } 1656 1657 public void onException(final IOException error) { 1658 onAsyncException(error); 1659 asyncConnectionThread.execute(new Runnable (){ 1660 public void run() { 1661 transportFailed(error); 1662 ServiceSupport.dispose(ActiveMQConnection.this.transport); 1663 brokerInfoReceived.countDown(); 1664 1665 for (Iterator iter = transportListeners.iterator(); iter.hasNext();) { 1666 TransportListener listener = (TransportListener) iter.next(); 1667 listener.onException(error); 1668 } 1669 } 1670 }); 1671 } 1672 1673 public void transportInterupted() { 1674 for (Iterator i = this.sessions.iterator(); i.hasNext();) { 1675 ActiveMQSession s = (ActiveMQSession) i.next(); 1676 s.clearMessagesInProgress(); 1677 } 1678 for (Iterator iter = transportListeners.iterator(); iter.hasNext();) { 1679 TransportListener listener = (TransportListener) iter.next(); 1680 listener.transportInterupted(); 1681 } 1682 } 1683 1684 public void transportResumed() { 1685 for (Iterator iter = transportListeners.iterator(); iter.hasNext();) { 1686 TransportListener listener = (TransportListener) iter.next(); 1687 listener.transportResumed(); 1688 } 1689 for (Iterator i = this.sessions.iterator(); i.hasNext();) { 1690 ActiveMQSession s = (ActiveMQSession) i.next(); 1691 s.deliverAcks(); 1692 } 1693 } 1694 1695 1696 1703 protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException { 1704 1705 ActiveMQTempDestination dest; 1707 if( topic ) { 1708 dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId()); 1709 } else { 1710 dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId()); 1711 } 1712 1713 DestinationInfo info = new DestinationInfo(); 1714 info.setConnectionId(this.info.getConnectionId()); 1715 info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE); 1716 info.setDestination(dest); 1717 syncSendPacket(info); 1718 1719 dest.setConnection(this); 1720 activeTempDestinations.put(dest,dest); 1721 return dest; 1722 } 1723 1724 1729 public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException { 1730 1731 checkClosedOrFailed(); 1732 1733 for(Iterator i=this.sessions.iterator();i.hasNext();){ 1734 ActiveMQSession s=(ActiveMQSession) i.next(); 1735 if( s.isInUse(destination) ) { 1736 throw new JMSException ("A consumer is consuming from the temporary destination"); 1737 } 1738 } 1739 1740 activeTempDestinations.remove(destination); 1741 1742 DestinationInfo info = new DestinationInfo(); 1743 info.setConnectionId(this.info.getConnectionId()); 1744 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 1745 info.setDestination(destination); 1746 info.setTimeout(0); 1747 syncSendPacket(info); 1748 } 1749 1750 1751 1752 public boolean isDeleted(ActiveMQDestination dest) { 1753 1754 if( advisoryConsumer==null ) 1757 return false; 1758 1759 return !activeTempDestinations.contains(dest); 1760 } 1761 1762 public boolean isCopyMessageOnSend() { 1763 return copyMessageOnSend; 1764 } 1765 1766 public LongSequenceGenerator getLocalTransactionIdGenerator() { 1767 return localTransactionIdGenerator; 1768 } 1769 1770 public boolean isUseCompression() { 1771 return useCompression; 1772 } 1773 1774 1777 public void setUseCompression(boolean useCompression) { 1778 this.useCompression = useCompression; 1779 } 1780 1781 public void destroyDestination(ActiveMQDestination destination) throws JMSException { 1782 1783 checkClosedOrFailed(); 1784 ensureConnectionInfoSent(); 1785 1786 DestinationInfo info = new DestinationInfo(); 1787 info.setConnectionId(this.info.getConnectionId()); 1788 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 1789 info.setDestination(destination); 1790 info.setTimeout(0); 1791 syncSendPacket(info); 1792 1793 } 1794 1795 public boolean isDispatchAsync() { 1796 return dispatchAsync; 1797 } 1798 1799 1815 public void setDispatchAsync(boolean asyncDispatch) { 1816 this.dispatchAsync = asyncDispatch; 1817 } 1818 1819 public boolean isObjectMessageSerializationDefered() { 1820 return objectMessageSerializationDefered; 1821 } 1822 1823 1829 public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) { 1830 this.objectMessageSerializationDefered = objectMessageSerializationDefered; 1831 } 1832 1833 public InputStream createInputStream(Destination dest) throws JMSException { 1834 return createInputStream(dest, null); 1835 } 1836 1837 public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException { 1838 return createInputStream(dest, messageSelector, false); 1839 } 1840 1841 public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException { 1842 return doCreateInputStream(dest, messageSelector, noLocal, null); 1843 } 1844 1845 public InputStream createDurableInputStream(Topic dest, String name) throws JMSException { 1846 return createInputStream(dest, null, false); 1847 } 1848 1849 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException { 1850 return createDurableInputStream(dest, name, messageSelector, false); 1851 } 1852 1853 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException { 1854 return doCreateInputStream(dest, messageSelector, noLocal, name); 1855 } 1856 1857 private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName) throws JMSException { 1858 checkClosedOrFailed(); 1859 ensureConnectionInfoSent(); 1860 return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch()); 1861 } 1862 1863 1864 1867 public OutputStream createOutputStream(Destination dest) throws JMSException { 1868 return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE); 1869 } 1870 1871 1874 public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException { 1875 return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE); 1876 } 1877 1878 1890 public OutputStream createOutputStream(Destination dest, Map streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException { 1891 checkClosedOrFailed(); 1892 ensureConnectionInfoSent(); 1893 return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive); 1894 } 1895 1896 1916 public void unsubscribe(String name) throws JMSException { 1917 checkClosedOrFailed(); 1918 RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); 1919 rsi.setConnectionId(getConnectionInfo().getConnectionId()); 1920 rsi.setSubcriptionName(name); 1921 rsi.setClientId(getConnectionInfo().getClientId()); 1922 syncSendPacket(rsi); 1923 } 1924 1925 1932 void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException { 1933 checkClosedOrFailed(); 1934 1935 if( destination.isTemporary() && isDeleted(destination) ) { 1936 throw new JMSException ("Cannot publish to a deleted Destination: "+destination); 1937 } 1938 1939 msg.setJMSDestination(destination); 1940 msg.setJMSDeliveryMode(deliveryMode); 1941 long expiration = 0L; 1942 1943 if (!isDisableTimeStampsByDefault()) { 1944 long timeStamp = System.currentTimeMillis(); 1945 msg.setJMSTimestamp(timeStamp); 1946 if (timeToLive > 0) { 1947 expiration = timeToLive + timeStamp; 1948 } 1949 } 1950 1951 msg.setJMSExpiration(expiration); 1952 msg.setJMSPriority(priority); 1953 1954 msg.setJMSRedelivered(false); 1955 msg.setMessageId( messageId ); 1956 1957 msg.onSend(); 1958 1959 msg.setProducerId(msg.getMessageId().getProducerId()); 1960 1961 if (log.isDebugEnabled()) { 1962 log.debug("Sending message: " + msg); 1963 } 1964 1965 if( async) { 1966 asyncSendPacket(msg); 1967 } else { 1968 syncSendPacket(msg); 1969 } 1970 1971 } 1972 1973 public void addOutputStream(ActiveMQOutputStream stream) { 1974 outputStreams.add(stream); 1975 } 1976 public void removeOutputStream(ActiveMQOutputStream stream) { 1977 outputStreams.remove(stream); 1978 } 1979 public void addInputStream(ActiveMQInputStream stream) { 1980 inputStreams.add(stream); 1981 } 1982 public void removeInputStream(ActiveMQInputStream stream) { 1983 inputStreams.remove(stream); 1984 } 1985 1986 protected void onControlCommand(ControlCommand command) { 1987 String text = command.getCommand(); 1988 if (text != null) { 1989 if (text.equals("shutdown")) { 1990 log.info("JVM told to shutdown"); 1991 System.exit(0); 1992 } 1993 } 1994 } 1995 1996 protected void onConnectionControl(ConnectionControl command){ 1997 if (command.isFaultTolerant()){ 1998 this.optimizeAcknowledge = false; 1999 for(Iterator i=this.sessions.iterator();i.hasNext();){ 2000 ActiveMQSession s=(ActiveMQSession) i.next(); 2001 s.setOptimizeAcknowledge(false); 2002 } 2003 } 2004 } 2005 2006 protected void onConsumerControl(ConsumerControl command){ 2007 if(command.isClose()){ 2008 for(Iterator i=this.sessions.iterator();i.hasNext();){ 2009 ActiveMQSession s=(ActiveMQSession) i.next(); 2010 s.close(command.getConsumerId()); 2011 } 2012 }else{ 2013 for(Iterator i=this.sessions.iterator();i.hasNext();){ 2014 ActiveMQSession s=(ActiveMQSession) i.next(); 2015 s.setPrefetchSize(command.getConsumerId(),command.getPrefetch()); 2016 } 2017 } 2018 } 2019 2020 protected void transportFailed(IOException error){ 2021 transportFailed.set(true); 2022 if (firstFailureError == null) { 2023 firstFailureError = error; 2024 } 2025 if (!closed.get() && !closing.get()) { 2026 try{ 2027 cleanup(); 2028 }catch(JMSException e){ 2029 log.warn("Cleanup failed",e); 2030 } 2031 } 2032 } 2033 2034 2040 public void setCopyMessageOnSend(boolean copyMessageOnSend) { 2041 this.copyMessageOnSend = copyMessageOnSend; 2042 } 2043 2044 public String toString() { 2045 return "ActiveMQConnection {id="+info.getConnectionId()+",clientId="+info.getClientId()+",started="+started.get()+"}"; 2046 } 2047 2048 protected BlobTransferPolicy createBlobTransferPolicy() { 2049 return new BlobTransferPolicy(); 2050 } 2051 2052 2053 public int getProtocolVersion() { 2054 return protocolVersion.get(); 2055 } 2056 2057 2058 public int getProducerWindowSize() { 2059 return producerWindowSize; 2060 } 2061 2062 2063 public void setProducerWindowSize(int producerWindowSize) { 2064 this.producerWindowSize = producerWindowSize; 2065 } 2066 2067 2068} 2069 | Popular Tags |