KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > ActiveMQConnection


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq;
19
20 import java.io.IOException JavaDoc;
21 import java.io.InputStream JavaDoc;
22 import java.io.OutputStream JavaDoc;
23 import java.net.URI JavaDoc;
24 import java.net.URISyntaxException JavaDoc;
25 import java.util.HashMap JavaDoc;
26 import java.util.Iterator JavaDoc;
27 import java.util.Map JavaDoc;
28 import java.util.concurrent.ConcurrentHashMap JavaDoc;
29 import java.util.concurrent.CopyOnWriteArrayList JavaDoc;
30 import java.util.concurrent.CountDownLatch JavaDoc;
31 import java.util.concurrent.LinkedBlockingQueue JavaDoc;
32 import java.util.concurrent.ThreadFactory JavaDoc;
33 import java.util.concurrent.ThreadPoolExecutor JavaDoc;
34 import java.util.concurrent.TimeUnit JavaDoc;
35 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
36 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
37
38 import javax.jms.Connection JavaDoc;
39 import javax.jms.ConnectionConsumer JavaDoc;
40 import javax.jms.ConnectionMetaData JavaDoc;
41 import javax.jms.DeliveryMode JavaDoc;
42 import javax.jms.Destination JavaDoc;
43 import javax.jms.ExceptionListener JavaDoc;
44 import javax.jms.IllegalStateException JavaDoc;
45 import javax.jms.InvalidDestinationException JavaDoc;
46 import javax.jms.JMSException JavaDoc;
47 import javax.jms.Queue JavaDoc;
48 import javax.jms.QueueConnection JavaDoc;
49 import javax.jms.QueueSession JavaDoc;
50 import javax.jms.ServerSessionPool JavaDoc;
51 import javax.jms.Session JavaDoc;
52 import javax.jms.Topic JavaDoc;
53 import javax.jms.TopicConnection JavaDoc;
54 import javax.jms.TopicSession JavaDoc;
55 import javax.jms.XAConnection JavaDoc;
56
57 import org.apache.activemq.blob.BlobTransferPolicy;
58 import org.apache.activemq.command.ActiveMQDestination;
59 import org.apache.activemq.command.ActiveMQMessage;
60 import org.apache.activemq.command.ActiveMQTempDestination;
61 import org.apache.activemq.command.ActiveMQTempQueue;
62 import org.apache.activemq.command.ActiveMQTempTopic;
63 import org.apache.activemq.command.BrokerInfo;
64 import org.apache.activemq.command.Command;
65 import org.apache.activemq.command.CommandTypes;
66 import org.apache.activemq.command.ConnectionControl;
67 import org.apache.activemq.command.ConnectionError;
68 import org.apache.activemq.command.ConnectionId;
69 import org.apache.activemq.command.ConnectionInfo;
70 import org.apache.activemq.command.ConsumerControl;
71 import org.apache.activemq.command.ConsumerId;
72 import org.apache.activemq.command.ConsumerInfo;
73 import org.apache.activemq.command.ControlCommand;
74 import org.apache.activemq.command.DestinationInfo;
75 import org.apache.activemq.command.ExceptionResponse;
76 import org.apache.activemq.command.Message;
77 import org.apache.activemq.command.MessageDispatch;
78 import org.apache.activemq.command.MessageId;
79 import org.apache.activemq.command.ProducerAck;
80 import org.apache.activemq.command.ProducerId;
81 import org.apache.activemq.command.RemoveSubscriptionInfo;
82 import org.apache.activemq.command.Response;
83 import org.apache.activemq.command.SessionId;
84 import org.apache.activemq.command.ShutdownInfo;
85 import org.apache.activemq.command.WireFormatInfo;
86 import org.apache.activemq.management.JMSConnectionStatsImpl;
87 import org.apache.activemq.management.JMSStatsImpl;
88 import org.apache.activemq.management.StatsCapable;
89 import org.apache.activemq.management.StatsImpl;
90 import org.apache.activemq.state.CommandVisitorAdapter;
91 import org.apache.activemq.thread.TaskRunnerFactory;
92 import org.apache.activemq.transport.Transport;
93 import org.apache.activemq.transport.TransportListener;
94 import org.apache.activemq.util.IdGenerator;
95 import org.apache.activemq.util.IntrospectionSupport;
96 import org.apache.activemq.util.JMSExceptionSupport;
97 import org.apache.activemq.util.LongSequenceGenerator;
98 import org.apache.activemq.util.ServiceSupport;
99 import org.apache.commons.logging.Log;
100 import org.apache.commons.logging.LogFactory;
101
102
103 public class ActiveMQConnection implements Connection JavaDoc, TopicConnection JavaDoc, QueueConnection JavaDoc, StatsCapable, Closeable, StreamConnection, TransportListener {
104
105     private TaskRunnerFactory sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task",ThreadPriorities.INBOUND_CLIENT_SESSION,true,1000);
106     private final ThreadPoolExecutor JavaDoc asyncConnectionThread;
107
108     private static final Log log = LogFactory.getLog(ActiveMQConnection.class);
109     private static final IdGenerator connectionIdGenerator = new IdGenerator();
110
111     public static final String JavaDoc DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
112     public static final String JavaDoc DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
113     public static final String JavaDoc DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
114      
115     // Connection state variables
116
private final ConnectionInfo info;
117     private ExceptionListener JavaDoc exceptionListener;
118     private boolean clientIDSet;
119     private boolean isConnectionInfoSentToBroker;
120     private boolean userSpecifiedClientID;
121     
122     // Configuration options variables
123
private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
124     private BlobTransferPolicy blobTransferPolicy;
125     private RedeliveryPolicy redeliveryPolicy;
126     private MessageTransformer transformer;
127
128     private boolean disableTimeStampsByDefault = false;
129     private boolean optimizedMessageDispatch = true;
130     private boolean copyMessageOnSend = true;
131     private boolean useCompression = false;
132     private boolean objectMessageSerializationDefered = false;
133     protected boolean dispatchAsync = false;
134     protected boolean alwaysSessionAsync=true;
135     private boolean useAsyncSend = false;
136     private boolean optimizeAcknowledge = false;
137     private boolean nestedMapAndListEnabled = true;
138     private boolean useRetroactiveConsumer;
139     private boolean alwaysSyncSend;
140     private int closeTimeout = 15000;
141     private boolean useSyncSend=false;
142     private boolean watchTopicAdvisories=true;
143     
144     private final Transport transport;
145     private final IdGenerator clientIdGenerator;
146     private final JMSStatsImpl factoryStats;
147     private final JMSConnectionStatsImpl stats;
148     
149     private final AtomicBoolean JavaDoc started = new AtomicBoolean JavaDoc(false);
150     private final AtomicBoolean JavaDoc closing = new AtomicBoolean JavaDoc(false);
151     private final AtomicBoolean JavaDoc closed = new AtomicBoolean JavaDoc(false);
152     private final AtomicBoolean JavaDoc transportFailed = new AtomicBoolean JavaDoc(false);
153     private final CopyOnWriteArrayList JavaDoc sessions = new CopyOnWriteArrayList JavaDoc();
154     private final CopyOnWriteArrayList JavaDoc connectionConsumers = new CopyOnWriteArrayList JavaDoc();
155     private final CopyOnWriteArrayList JavaDoc inputStreams = new CopyOnWriteArrayList JavaDoc();
156     private final CopyOnWriteArrayList JavaDoc outputStreams = new CopyOnWriteArrayList JavaDoc();
157     private final CopyOnWriteArrayList JavaDoc transportListeners = new CopyOnWriteArrayList JavaDoc();
158
159     // Maps ConsumerIds to ActiveMQConsumer objects
160
private final ConcurrentHashMap JavaDoc dispatchers = new ConcurrentHashMap JavaDoc();
161     private final ConcurrentHashMap JavaDoc<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap JavaDoc<ProducerId, ActiveMQMessageProducer>();
162     private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
163     private final SessionId connectionSessionId;
164     private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
165     private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
166     private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
167     private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
168     final ConcurrentHashMap JavaDoc activeTempDestinations = new ConcurrentHashMap JavaDoc();
169
170     private AdvisoryConsumer advisoryConsumer;
171     private final CountDownLatch JavaDoc brokerInfoReceived = new CountDownLatch JavaDoc(1);
172     private BrokerInfo brokerInfo;
173     private IOException JavaDoc firstFailureError;
174     private int producerWindowSize=ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
175     
176     // Assume that protocol is the latest. Change to the actual protocol
177
// version when a WireFormatInfo is received.
178
private AtomicInteger JavaDoc protocolVersion=new AtomicInteger JavaDoc(CommandTypes.PROTOCOL_VERSION);
179
180     /**
181      * Construct an <code>ActiveMQConnection</code>
182      * @param transport
183      * @param factoryStats
184      * @throws Exception
185      */

186     protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats)
187             throws Exception JavaDoc {
188        
189         this.transport = transport;
190         this.clientIdGenerator = clientIdGenerator;
191         this.factoryStats = factoryStats;
192         
193         // Configure a single threaded executor who's core thread can timeout if idle
194
asyncConnectionThread = new ThreadPoolExecutor JavaDoc(1,1,5,TimeUnit.SECONDS, new LinkedBlockingQueue JavaDoc(), new ThreadFactory JavaDoc() {
195             public Thread JavaDoc newThread(Runnable JavaDoc r) {
196                 Thread JavaDoc thread = new Thread JavaDoc(r, "AcitveMQ Connection Worker: "+transport);
197                 thread.setDaemon(true);
198                 return thread;
199             }});
200         //asyncConnectionThread.allowCoreThreadTimeOut(true);
201

202         this.info = new ConnectionInfo(new ConnectionId(connectionIdGenerator.generateId()));
203         this.info.setManageable(true);
204         this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
205         
206         this.transport.setTransportListener(this);
207
208         this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection JavaDoc);
209         this.factoryStats.addConnection(this);
210     }
211
212
213     protected void setUserName(String JavaDoc userName) {
214         this.info.setUserName(userName);
215     }
216
217     protected void setPassword(String JavaDoc password) {
218         this.info.setPassword(password);
219     }
220
221     /**
222      * A static helper method to create a new connection
223      *
224      * @return an ActiveMQConnection
225      * @throws JMSException
226      */

227     public static ActiveMQConnection makeConnection() throws JMSException JavaDoc {
228         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
229         return (ActiveMQConnection) factory.createConnection();
230     }
231
232     /**
233      * A static helper method to create a new connection
234      *
235      * @param uri
236      * @return and ActiveMQConnection
237      * @throws JMSException
238      */

239     public static ActiveMQConnection makeConnection(String JavaDoc uri) throws JMSException JavaDoc, URISyntaxException JavaDoc {
240         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
241         return (ActiveMQConnection) factory.createConnection();
242     }
243
244     /**
245      * A static helper method to create a new connection
246      *
247      * @param user
248      * @param password
249      * @param uri
250      * @return an ActiveMQConnection
251      * @throws JMSException
252      */

253     public static ActiveMQConnection makeConnection(String JavaDoc user, String JavaDoc password, String JavaDoc uri)
254             throws JMSException JavaDoc, URISyntaxException JavaDoc {
255         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI JavaDoc(uri));
256         return (ActiveMQConnection) factory.createConnection();
257     }
258
259     /**
260      * @return a number unique for this connection
261      */

262     public JMSConnectionStatsImpl getConnectionStats() {
263         return stats;
264     }
265
266     /**
267      * Creates a <CODE>Session</CODE> object.
268      *
269      * @param transacted
270      * indicates whether the session is transacted
271      * @param acknowledgeMode
272      * indicates whether the consumer or the client will acknowledge
273      * any messages it receives; ignored if the session is
274      * transacted. Legal values are
275      * <code>Session.AUTO_ACKNOWLEDGE</code>,
276      * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
277      * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
278      * @return a newly created session
279      * @throws JMSException
280      * if the <CODE>Connection</CODE> object fails to create a
281      * session due to some internal error or lack of support for the
282      * specific transaction and acknowledgement mode.
283      * @see Session#AUTO_ACKNOWLEDGE
284      * @see Session#CLIENT_ACKNOWLEDGE
285      * @see Session#DUPS_OK_ACKNOWLEDGE
286      * @since 1.1
287      */

288     public Session JavaDoc createSession(boolean transacted,int acknowledgeMode) throws JMSException JavaDoc{
289         checkClosedOrFailed();
290         ensureConnectionInfoSent();
291         boolean doSessionAsync=alwaysSessionAsync||sessions.size()>0||transacted
292                         ||acknowledgeMode==Session.CLIENT_ACKNOWLEDGE;
293         return new ActiveMQSession(this,getNextSessionId(),(transacted?Session.SESSION_TRANSACTED
294                         :(acknowledgeMode==Session.SESSION_TRANSACTED?Session.AUTO_ACKNOWLEDGE:acknowledgeMode)),
295                         dispatchAsync,alwaysSessionAsync);
296     }
297
298     /**
299      * @return sessionId
300      */

301     protected SessionId getNextSessionId() {
302         return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
303     }
304
305     /**
306      * Gets the client identifier for this connection.
307      * <P>
308      * This value is specific to the JMS provider. It is either preconfigured by
309      * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
310      * dynamically by the application by calling the <code>setClientID</code>
311      * method.
312      *
313      * @return the unique client identifier
314      * @throws JMSException
315      * if the JMS provider fails to return the client ID for this
316      * connection due to some internal error.
317      */

318     public String JavaDoc getClientID() throws JMSException JavaDoc {
319         checkClosedOrFailed();
320         return this.info.getClientId();
321     }
322
323     /**
324      * Sets the client identifier for this connection.
325      * <P>
326      * The preferred way to assign a JMS client's client identifier is for it to
327      * be configured in a client-specific <CODE>ConnectionFactory</CODE>
328      * object and transparently assigned to the <CODE>Connection</CODE> object
329      * it creates.
330      * <P>
331      * Alternatively, a client can set a connection's client identifier using a
332      * provider-specific value. The facility to set a connection's client
333      * identifier explicitly is not a mechanism for overriding the identifier
334      * that has been administratively configured. It is provided for the case
335      * where no administratively specified identifier exists. If one does exist,
336      * an attempt to change it by setting it must throw an <CODE>IllegalStateException</CODE>.
337      * If a client sets the client identifier explicitly, it must do so
338      * immediately after it creates the connection and before any other action
339      * on the connection is taken. After this point, setting the client
340      * identifier is a programming error that should throw an <CODE>IllegalStateException</CODE>.
341      * <P>
342      * The purpose of the client identifier is to associate a connection and its
343      * objects with a state maintained on behalf of the client by a provider.
344      * The only such state identified by the JMS API is that required to support
345      * durable subscriptions.
346      * <P>
347      * If another connection with the same <code>clientID</code> is already
348      * running when this method is called, the JMS provider should detect the
349      * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
350      *
351      * @param newClientID
352      * the unique client identifier
353      * @throws JMSException
354      * if the JMS provider fails to set the client ID for this
355      * connection due to some internal error.
356      * @throws javax.jms.InvalidClientIDException
357      * if the JMS client specifies an invalid or duplicate client
358      * ID.
359      * @throws javax.jms.IllegalStateException
360      * if the JMS client attempts to set a connection's client ID at
361      * the wrong time or when it has been administratively
362      * configured.
363      */

364     public void setClientID(String JavaDoc newClientID) throws JMSException JavaDoc {
365         checkClosedOrFailed();
366
367         if (this.clientIDSet) {
368             throw new IllegalStateException JavaDoc("The clientID has already been set");
369         }
370
371         if (this.isConnectionInfoSentToBroker) {
372             throw new IllegalStateException JavaDoc("Setting clientID on a used Connection is not allowed");
373         }
374
375         this.info.setClientId(newClientID);
376         this.userSpecifiedClientID = true;
377         ensureConnectionInfoSent();
378     }
379     
380     /**
381      * Sets the default client id that the connection will use if explicitly not set with
382      * the setClientId() call.
383      */

384     public void setDefaultClientID(String JavaDoc clientID) throws JMSException JavaDoc {
385         this.info.setClientId(clientID);
386         this.userSpecifiedClientID = true;
387     }
388
389
390     /**
391      * Gets the metadata for this connection.
392      *
393      * @return the connection metadata
394      * @throws JMSException
395      * if the JMS provider fails to get the connection metadata for
396      * this connection.
397      * @see javax.jms.ConnectionMetaData
398      */

399     public ConnectionMetaData JavaDoc getMetaData() throws JMSException JavaDoc {
400         checkClosedOrFailed();
401         return ActiveMQConnectionMetaData.INSTANCE;
402     }
403
404     /**
405      * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
406      * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
407      * associated with it.
408      *
409      * @return the <CODE>ExceptionListener</CODE> for this connection, or
410      * null. if no <CODE>ExceptionListener</CODE> is associated with
411      * this connection.
412      * @throws JMSException
413      * if the JMS provider fails to get the <CODE>ExceptionListener</CODE>
414      * for this connection.
415      * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
416      */

417     public ExceptionListener JavaDoc getExceptionListener() throws JMSException JavaDoc {
418         checkClosedOrFailed();
419         return this.exceptionListener;
420     }
421
422     /**
423      * Sets an exception listener for this connection.
424      * <P>
425      * If a JMS provider detects a serious problem with a connection, it informs
426      * the connection's <CODE> ExceptionListener</CODE>, if one has been
427      * registered. It does this by calling the listener's <CODE>onException
428      * </CODE> method, passing it a <CODE>JMSException</CODE> object
429      * describing the problem.
430      * <P>
431      * An exception listener allows a client to be notified of a problem
432      * asynchronously. Some connections only consume messages, so they would
433      * have no other way to learn their connection has failed.
434      * <P>
435      * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
436      * <P>
437      * A JMS provider should attempt to resolve connection problems itself
438      * before it notifies the client of them.
439      *
440      * @param listener
441      * the exception listener
442      * @throws JMSException
443      * if the JMS provider fails to set the exception listener for
444      * this connection.
445      */

446     public void setExceptionListener(ExceptionListener JavaDoc listener) throws JMSException JavaDoc {
447         checkClosedOrFailed();
448         this.exceptionListener = listener;
449     }
450
451     /**
452      * Starts (or restarts) a connection's delivery of incoming messages. A call
453      * to <CODE>start</CODE> on a connection that has already been started is
454      * ignored.
455      *
456      * @throws JMSException
457      * if the JMS provider fails to start message delivery due to
458      * some internal error.
459      * @see javax.jms.Connection#stop()
460      */

461     public void start() throws JMSException JavaDoc {
462         checkClosedOrFailed();
463         ensureConnectionInfoSent();
464         if (started.compareAndSet(false, true)) {
465             for (Iterator JavaDoc i = sessions.iterator(); i.hasNext();) {
466                 ActiveMQSession session = (ActiveMQSession) i.next();
467                 session.start();
468             }
469         }
470     }
471
472     /**
473      * Temporarily stops a connection's delivery of incoming messages. Delivery
474      * can be restarted using the connection's <CODE>start</CODE> method. When
475      * the connection is stopped, delivery to all the connection's message
476      * consumers is inhibited: synchronous receives block, and messages are not
477      * delivered to message listeners.
478      * <P>
479      * This call blocks until receives and/or message listeners in progress have
480      * completed.
481      * <P>
482      * Stopping a connection has no effect on its ability to send messages. A
483      * call to <CODE>stop</CODE> on a connection that has already been stopped
484      * is ignored.
485      * <P>
486      * A call to <CODE>stop</CODE> must not return until delivery of messages
487      * has paused. This means that a client can rely on the fact that none of
488      * its message listeners will be called and that all threads of control
489      * waiting for <CODE>receive</CODE> calls to return will not return with a
490      * message until the connection is restarted. The receive timers for a
491      * stopped connection continue to advance, so receives may time out while
492      * the connection is stopped.
493      * <P>
494      * If message listeners are running when <CODE>stop</CODE> is invoked, the
495      * <CODE>stop</CODE> call must wait until all of them have returned before
496      * it may return. While these message listeners are completing, they must
497      * have the full services of the connection available to them.
498      *
499      * @throws JMSException
500      * if the JMS provider fails to stop message delivery due to
501      * some internal error.
502      * @see javax.jms.Connection#start()
503      */

504     public void stop() throws JMSException JavaDoc {
505         checkClosedOrFailed();
506         if (started.compareAndSet(true, false)) {
507             for (Iterator JavaDoc i = sessions.iterator(); i.hasNext();) {
508                 ActiveMQSession s = (ActiveMQSession) i.next();
509                 s.stop();
510             }
511         }
512     }
513
514     /**
515      * Closes the connection.
516      * <P>
517      * Since a provider typically allocates significant resources outside the
518      * JVM on behalf of a connection, clients should close these resources when
519      * they are not needed. Relying on garbage collection to eventually reclaim
520      * these resources may not be timely enough.
521      * <P>
522      * There is no need to close the sessions, producers, and consumers of a
523      * closed connection.
524      * <P>
525      * Closing a connection causes all temporary destinations to be deleted.
526      * <P>
527      * When this method is invoked, it should not return until message
528      * processing has been shut down in an orderly fashion. This means that all
529      * message listeners that may have been running have returned, and that all
530      * pending receives have returned. A close terminates all pending message
531      * receives on the connection's sessions' consumers. The receives may return
532      * with a message or with null, depending on whether there was a message
533      * available at the time of the close. If one or more of the connection's
534      * sessions' message listeners is processing a message at the time when
535      * connection <CODE>close</CODE> is invoked, all the facilities of the
536      * connection and its sessions must remain available to those listeners
537      * until they return control to the JMS provider.
538      * <P>
539      * Closing a connection causes any of its sessions' transactions in progress
540      * to be rolled back. In the case where a session's work is coordinated by
541      * an external transaction manager, a session's <CODE>commit</CODE> and
542      * <CODE> rollback</CODE> methods are not used and the result of a closed
543      * session's work is determined later by the transaction manager. Closing a
544      * connection does NOT force an acknowledgment of client-acknowledged
545      * sessions.
546      * <P>
547      * Invoking the <CODE>acknowledge</CODE> method of a received message from
548      * a closed connection's session must throw an <CODE>IllegalStateException</CODE>.
549      * Closing a closed connection must NOT throw an exception.
550      *
551      * @throws JMSException
552      * if the JMS provider fails to close the connection due to some
553      * internal error. For example, a failure to release resources
554      * or to close a socket connection can cause this exception to
555      * be thrown.
556      */

557     public void close() throws JMSException JavaDoc {
558         checkClosed();
559
560         try {
561             // If we were running, lets stop first.
562
stop();
563
564             synchronized (this) {
565                 if (!closed.get()) {
566                     closing.set(true);
567
568                 if( advisoryConsumer!=null ) {
569                     advisoryConsumer.dispose();
570                     advisoryConsumer=null;
571                 }
572
573                     for (Iterator JavaDoc i = this.sessions.iterator(); i.hasNext();) {
574                         ActiveMQSession s = (ActiveMQSession) i.next();
575                         s.dispose();
576                     }
577                     for (Iterator JavaDoc i = this.connectionConsumers.iterator(); i.hasNext();) {
578                         ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer) i.next();
579                         c.dispose();
580                     }
581                     for (Iterator JavaDoc i = this.inputStreams.iterator(); i.hasNext();) {
582                         ActiveMQInputStream c = (ActiveMQInputStream) i.next();
583                         c.dispose();
584                     }
585                     for (Iterator JavaDoc i = this.outputStreams.iterator(); i.hasNext();) {
586                         ActiveMQOutputStream c = (ActiveMQOutputStream) i.next();
587                         c.dispose();
588                     }
589
590                     if (isConnectionInfoSentToBroker) {
591                         // If we announced ourselfs to the broker.. Try to let the broker
592
// know that the connection is being shutdown.
593
syncSendPacket(info.createRemoveCommand(), closeTimeout);
594                         asyncSendPacket(new ShutdownInfo());
595                     }
596
597                     ServiceSupport.dispose(this.transport);
598
599                     started.set(false);
600
601                     // TODO if we move the TaskRunnerFactory to the connection factory
602
// then we may need to call
603
// factory.onConnectionClose(this);
604
sessionTaskRunner.shutdown();
605
606                     closed.set(true);
607                     closing.set(false);
608                 }
609             }
610         }
611         finally {
612             factoryStats.removeConnection(this);
613         }
614     }
615
616     /**
617      * Tells the broker to terminate its VM. This can be used to cleanly
618      * terminate a broker running in a standalone java process. Server must have
619      * property enable.vm.shutdown=true defined to allow this to work.
620      */

621     // TODO : org.apache.activemq.message.BrokerAdminCommand not yet implemented.
622
/*
623      * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
624      * command = new BrokerAdminCommand();
625      * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
626      * asyncSendPacket(command); }
627      */

628
629
630     /**
631      * Create a durable connection consumer for this connection (optional
632      * operation). This is an expert facility not used by regular JMS clients.
633      *
634      * @param topic
635      * topic to access
636      * @param subscriptionName
637      * durable subscription name
638      * @param messageSelector
639      * only messages with properties matching the message selector
640      * expression are delivered. A value of null or an empty string
641      * indicates that there is no message selector for the message
642      * consumer.
643      * @param sessionPool
644      * the server session pool to associate with this durable
645      * connection consumer
646      * @param maxMessages
647      * the maximum number of messages that can be assigned to a
648      * server session at one time
649      * @return the durable connection consumer
650      * @throws JMSException
651      * if the <CODE>Connection</CODE> object fails to create a
652      * connection consumer due to some internal error or invalid
653      * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
654      * @throws javax.jms.InvalidDestinationException
655      * if an invalid destination is specified.
656      * @throws javax.jms.InvalidSelectorException
657      * if the message selector is invalid.
658      * @see javax.jms.ConnectionConsumer
659      * @since 1.1
660      */

661     public ConnectionConsumer JavaDoc createDurableConnectionConsumer(Topic JavaDoc topic, String JavaDoc subscriptionName,
662             String JavaDoc messageSelector, ServerSessionPool JavaDoc sessionPool, int maxMessages) throws JMSException JavaDoc {
663         return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
664     }
665
666     /**
667      * Create a durable connection consumer for this connection (optional
668      * operation). This is an expert facility not used by regular JMS clients.
669      *
670      * @param topic
671      * topic to access
672      * @param subscriptionName
673      * durable subscription name
674      * @param messageSelector
675      * only messages with properties matching the message selector
676      * expression are delivered. A value of null or an empty string
677      * indicates that there is no message selector for the message
678      * consumer.
679      * @param sessionPool
680      * the server session pool to associate with this durable
681      * connection consumer
682      * @param maxMessages
683      * the maximum number of messages that can be assigned to a
684      * server session at one time
685      * @param noLocal
686      * set true if you want to filter out messages published locally
687      *
688      * @return the durable connection consumer
689      * @throws JMSException
690      * if the <CODE>Connection</CODE> object fails to create a
691      * connection consumer due to some internal error or invalid
692      * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
693      * @throws javax.jms.InvalidDestinationException
694      * if an invalid destination is specified.
695      * @throws javax.jms.InvalidSelectorException
696      * if the message selector is invalid.
697      * @see javax.jms.ConnectionConsumer
698      * @since 1.1
699      */

700     public ConnectionConsumer JavaDoc createDurableConnectionConsumer(Topic JavaDoc topic, String JavaDoc subscriptionName,
701             String JavaDoc messageSelector, ServerSessionPool JavaDoc sessionPool, int maxMessages, boolean noLocal)
702             throws JMSException JavaDoc {
703         checkClosedOrFailed();
704         ensureConnectionInfoSent();
705         SessionId sessionId = new SessionId(info.getConnectionId(), -1);
706         ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator
707                 .getNextSequenceId()));
708         info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
709         info.setSubscriptionName(subscriptionName);
710         info.setSelector(messageSelector);
711         info.setPrefetchSize(maxMessages);
712         info.setDispatchAsync(dispatchAsync);
713
714         // Allows the options on the destination to configure the consumerInfo
715
if( info.getDestination().getOptions()!=null ) {
716             HashMap JavaDoc options = new HashMap JavaDoc(info.getDestination().getOptions());
717             IntrospectionSupport.setProperties(this.info, options, "consumer.");
718         }
719
720         return new ActiveMQConnectionConsumer(this, sessionPool, info);
721     }
722
723
724     // Properties
725
// -------------------------------------------------------------------------
726

727     /**
728      * Returns true if this connection has been started
729      *
730      * @return true if this Connection is started
731      */

732     public boolean isStarted() {
733         return started.get();
734     }
735
736     /**
737      * Returns true if the connection is closed
738      */

739     public boolean isClosed() {
740         return closed.get();
741     }
742
743     /**
744      * Returns true if the connection is in the process of being closed
745      */

746     public boolean isClosing() {
747         return closing.get();
748     }
749     
750     /**
751      * Returns true if the underlying transport has failed
752      */

753     public boolean isTransportFailed() {
754         return transportFailed.get();
755     }
756     
757     /**
758      * @return Returns the prefetchPolicy.
759      */

760     public ActiveMQPrefetchPolicy getPrefetchPolicy() {
761         return prefetchPolicy;
762     }
763
764     /**
765      * Sets the <a
766      * HREF="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
767      * policy</a> for consumers created by this connection.
768      */

769     public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
770         this.prefetchPolicy = prefetchPolicy;
771     }
772
773     /**
774      */

775     public Transport getTransportChannel() {
776         return transport;
777     }
778
779     /**
780      *
781      * @return Returns the clientID of the connection, forcing one to be
782      * generated if one has not yet been configured.
783      */

784     public String JavaDoc getInitializedClientID() throws JMSException JavaDoc {
785         ensureConnectionInfoSent();
786         return info.getClientId();
787     }
788
789     /**
790      *
791      * @return Returns the timeStampsDisableByDefault.
792      */

793     public boolean isDisableTimeStampsByDefault() {
794         return disableTimeStampsByDefault;
795     }
796
797     /**
798      * Sets whether or not timestamps on messages should be disabled or not. If
799      * you disable them it adds a small performance boost.
800      */

801     public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
802         this.disableTimeStampsByDefault = timeStampsDisableByDefault;
803     }
804
805     /**
806      *
807      * @return Returns the dispatchOptimizedMessage.
808      */

809     public boolean isOptimizedMessageDispatch() {
810         return optimizedMessageDispatch;
811     }
812
813     /**
814      * If this flag is set then an larger prefetch limit is used - only
815      * applicable for durable topic subscribers.
816      */

817     public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
818         this.optimizedMessageDispatch = dispatchOptimizedMessage;
819     }
820
821     /**
822      * @return Returns the closeTimeout.
823      */

824     public int getCloseTimeout(){
825         return closeTimeout;
826     }
827
828
829     /**
830      * Sets the timeout before a close is considered complete. Normally a
831      * close() on a connection waits for confirmation from the broker; this
832      * allows that operation to timeout to save the client hanging if there is
833      * no broker
834      */

835     public void setCloseTimeout(int closeTimeout){
836         this.closeTimeout=closeTimeout;
837     }
838
839     /**
840      *
841      * @return ConnectionInfo
842      */

843     public ConnectionInfo getConnectionInfo() {
844         return this.info;
845     }
846
847     public boolean isUseRetroactiveConsumer() {
848         return useRetroactiveConsumer;
849     }
850
851     /**
852      * Sets whether or not retroactive consumers are enabled. Retroactive consumers allow
853      * non-durable topic subscribers to receive old messages that were published before the
854      * non-durable subscriber started.
855      */

856     public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
857         this.useRetroactiveConsumer = useRetroactiveConsumer;
858     }
859
860     public boolean isNestedMapAndListEnabled() {
861         return nestedMapAndListEnabled;
862     }
863
864     /**
865      * Enables/disables whether or not Message properties and MapMessage entries
866      * support <a
867      * HREF="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
868      * Structures</a> of Map and List objects
869      */

870     public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
871         this.nestedMapAndListEnabled = structuredMapsEnabled;
872     }
873
874
875     /**
876      * Adds a transport listener so that a client can be notified of events in the underlying
877      * transport
878      */

879     public void addTransportListener(TransportListener transportListener) {
880         transportListeners.add(transportListener);
881     }
882     
883     public void removeTransportListener(TransportListener transportListener) {
884         transportListeners.remove(transportListener);
885     }
886     
887     public TaskRunnerFactory getSessionTaskRunner() {
888         return sessionTaskRunner;
889     }
890
891     public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
892         this.sessionTaskRunner = sessionTaskRunner;
893     }
894
895     public MessageTransformer getTransformer() {
896         return transformer;
897     }
898
899     /**
900      * Sets the transformer used to transform messages before they are sent on to the JMS bus
901      * or when they are received from the bus but before they are delivered to the JMS client
902      */

903     public void setTransformer(MessageTransformer transformer) {
904         this.transformer = transformer;
905     }
906
907     /**
908      * @return the statsEnabled
909      */

910     public boolean isStatsEnabled(){
911         return this.stats.isEnabled();
912     }
913
914     
915     /**
916      * @param statsEnabled the statsEnabled to set
917      */

918     public void setStatsEnabled(boolean statsEnabled){
919         this.stats.setEnabled(statsEnabled);
920     }
921
922     
923     // Implementation methods
924
// -------------------------------------------------------------------------
925

926     /**
927      * Used internally for adding Sessions to the Connection
928      *
929      * @param session
930      * @throws JMSException
931      * @throws JMSException
932      */

933     protected void addSession(ActiveMQSession session) throws JMSException JavaDoc {
934         this.sessions.add(session);
935         if (sessions.size()>1 || session.isTransacted()){
936             optimizedMessageDispatch = false;
937         }
938     }
939
940     /**
941      * Used interanlly for removing Sessions from a Connection
942      *
943      * @param session
944      */

945     protected void removeSession(ActiveMQSession session) {
946         this.sessions.remove(session);
947     }
948
949     /**
950      * Add a ConnectionConsumer
951      *
952      * @param connectionConsumer
953      * @throws JMSException
954      */

955     protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException JavaDoc {
956         this.connectionConsumers.add(connectionConsumer);
957     }
958
959     /**
960      * Remove a ConnectionConsumer
961      *
962      * @param connectionConsumer
963      */

964     protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
965         this.connectionConsumers.remove(connectionConsumer);
966     }
967
968     /**
969      * Creates a <CODE>TopicSession</CODE> object.
970      *
971      * @param transacted
972      * indicates whether the session is transacted
973      * @param acknowledgeMode
974      * indicates whether the consumer or the client will acknowledge
975      * any messages it receives; ignored if the session is
976      * transacted. Legal values are
977      * <code>Session.AUTO_ACKNOWLEDGE</code>,
978      * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
979      * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
980      * @return a newly created topic session
981      * @throws JMSException
982      * if the <CODE>TopicConnection</CODE> object fails to create
983      * a session due to some internal error or lack of support for
984      * the specific transaction and acknowledgement mode.
985      * @see Session#AUTO_ACKNOWLEDGE
986      * @see Session#CLIENT_ACKNOWLEDGE
987      * @see Session#DUPS_OK_ACKNOWLEDGE
988      */

989     public TopicSession JavaDoc createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException JavaDoc {
990         return new ActiveMQTopicSession((ActiveMQSession) createSession(transacted, acknowledgeMode));
991     }
992
993     /**
994      * Creates a connection consumer for this connection (optional operation).
995      * This is an expert facility not used by regular JMS clients.
996      *
997      * @param topic
998      * the topic to access
999      * @param messageSelector
1000     * only messages with properties matching the message selector
1001     * expression are delivered. A value of null or an empty string
1002     * indicates that there is no message selector for the message
1003     * consumer.
1004     * @param sessionPool
1005     * the server session pool to associate with this connection
1006     * consumer
1007     * @param maxMessages
1008     * the maximum number of messages that can be assigned to a
1009     * server session at one time
1010     * @return the connection consumer
1011     * @throws JMSException
1012     * if the <CODE>TopicConnection</CODE> object fails to create
1013     * a connection consumer due to some internal error or invalid
1014     * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
1015     * @throws javax.jms.InvalidDestinationException
1016     * if an invalid topic is specified.
1017     * @throws javax.jms.InvalidSelectorException
1018     * if the message selector is invalid.
1019     * @see javax.jms.ConnectionConsumer
1020     */

1021    public ConnectionConsumer JavaDoc createConnectionConsumer(Topic JavaDoc topic, String JavaDoc messageSelector, ServerSessionPool JavaDoc sessionPool, int maxMessages) throws JMSException JavaDoc {
1022        return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
1023    }
1024    
1025    /**
1026     * Creates a connection consumer for this connection (optional operation).
1027     * This is an expert facility not used by regular JMS clients.
1028     *
1029     * @param queue
1030     * the queue to access
1031     * @param messageSelector
1032     * only messages with properties matching the message selector
1033     * expression are delivered. A value of null or an empty string
1034     * indicates that there is no message selector for the message
1035     * consumer.
1036     * @param sessionPool
1037     * the server session pool to associate with this connection
1038     * consumer
1039     * @param maxMessages
1040     * the maximum number of messages that can be assigned to a
1041     * server session at one time
1042     * @return the connection consumer
1043     * @throws JMSException
1044     * if the <CODE>QueueConnection</CODE> object fails to create
1045     * a connection consumer due to some internal error or invalid
1046     * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
1047     * @throws javax.jms.InvalidDestinationException
1048     * if an invalid queue is specified.
1049     * @throws javax.jms.InvalidSelectorException
1050     * if the message selector is invalid.
1051     * @see javax.jms.ConnectionConsumer
1052     */

1053    public ConnectionConsumer JavaDoc createConnectionConsumer(Queue JavaDoc queue, String JavaDoc messageSelector,
1054            ServerSessionPool JavaDoc sessionPool, int maxMessages) throws JMSException JavaDoc {
1055        return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
1056    }
1057
1058    /**
1059     * Creates a connection consumer for this connection (optional operation).
1060     * This is an expert facility not used by regular JMS clients.
1061     *
1062     * @param destination
1063     * the destination to access
1064     * @param messageSelector
1065     * only messages with properties matching the message selector
1066     * expression are delivered. A value of null or an empty string
1067     * indicates that there is no message selector for the message
1068     * consumer.
1069     * @param sessionPool
1070     * the server session pool to associate with this connection
1071     * consumer
1072     * @param maxMessages
1073     * the maximum number of messages that can be assigned to a
1074     * server session at one time
1075     * @return the connection consumer
1076     * @throws JMSException
1077     * if the <CODE>Connection</CODE> object fails to create a
1078     * connection consumer due to some internal error or invalid
1079     * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
1080     * @throws javax.jms.InvalidDestinationException
1081     * if an invalid destination is specified.
1082     * @throws javax.jms.InvalidSelectorException
1083     * if the message selector is invalid.
1084     * @see javax.jms.ConnectionConsumer
1085     * @since 1.1
1086     */

1087    public ConnectionConsumer JavaDoc createConnectionConsumer(Destination JavaDoc destination, String JavaDoc messageSelector,
1088            ServerSessionPool JavaDoc sessionPool, int maxMessages) throws JMSException JavaDoc {
1089        return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
1090    }
1091
1092    public ConnectionConsumer JavaDoc createConnectionConsumer(Destination JavaDoc destination, String JavaDoc messageSelector, ServerSessionPool JavaDoc sessionPool, int maxMessages, boolean noLocal) throws JMSException JavaDoc {
1093        
1094        checkClosedOrFailed();
1095        ensureConnectionInfoSent();
1096        
1097        ConsumerId consumerId = createConsumerId();
1098        ConsumerInfo info = new ConsumerInfo(consumerId);
1099        info.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
1100        info.setSelector(messageSelector);
1101        info.setPrefetchSize(maxMessages);
1102        info.setNoLocal(noLocal);
1103        info.setDispatchAsync(dispatchAsync);
1104        
1105        // Allows the options on the destination to configure the consumerInfo
1106
if( info.getDestination().getOptions()!=null ) {
1107            HashMap JavaDoc options = new HashMap JavaDoc(info.getDestination().getOptions());
1108            IntrospectionSupport.setProperties(info, options, "consumer.");
1109        }
1110        
1111        return new ActiveMQConnectionConsumer(this, sessionPool, info);
1112    }
1113
1114    /**
1115     * @return
1116     */

1117    private ConsumerId createConsumerId() {
1118        return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
1119    }
1120    
1121    /**
1122     * @return
1123     */

1124    private ProducerId createProducerId() {
1125        return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId());
1126    }
1127
1128
1129    /**
1130     * Creates a <CODE>QueueSession</CODE> object.
1131     *
1132     * @param transacted
1133     * indicates whether the session is transacted
1134     * @param acknowledgeMode
1135     * indicates whether the consumer or the client will acknowledge
1136     * any messages it receives; ignored if the session is
1137     * transacted. Legal values are
1138     * <code>Session.AUTO_ACKNOWLEDGE</code>,
1139     * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1140     * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1141     * @return a newly created queue session
1142     * @throws JMSException
1143     * if the <CODE>QueueConnection</CODE> object fails to create
1144     * a session due to some internal error or lack of support for
1145     * the specific transaction and acknowledgement mode.
1146     * @see Session#AUTO_ACKNOWLEDGE
1147     * @see Session#CLIENT_ACKNOWLEDGE
1148     * @see Session#DUPS_OK_ACKNOWLEDGE
1149     */

1150    public QueueSession JavaDoc createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException JavaDoc {
1151        return new ActiveMQQueueSession((ActiveMQSession) createSession(transacted, acknowledgeMode));
1152    }
1153
1154    /**
1155     * Ensures that the clientID was manually specified and not auto-generated.
1156     * If the clientID was not specified this method will throw an exception.
1157     * This method is used to ensure that the clientID + durableSubscriber name
1158     * are used correctly.
1159     *
1160     * @throws JMSException
1161     */

1162    public void checkClientIDWasManuallySpecified() throws JMSException JavaDoc {
1163        if (!userSpecifiedClientID) {
1164            throw new JMSException JavaDoc(
1165                    "You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1166        }
1167    }
1168
1169    /**
1170     * send a Packet through the Connection - for internal use only
1171     *
1172     * @param command
1173     * @throws JMSException
1174     */

1175    public void asyncSendPacket(Command command) throws JMSException JavaDoc {
1176        if (isClosed()) {
1177            throw new ConnectionClosedException();
1178        } else {
1179
1180            try {
1181                this.transport.oneway(command);
1182            } catch (IOException JavaDoc e) {
1183                throw JMSExceptionSupport.create(e);
1184            }
1185        }
1186    }
1187
1188    /**
1189     * Send a packet through a Connection - for internal use only
1190     *
1191     * @param command
1192     * @return
1193     * @throws JMSException
1194     */

1195    public Response syncSendPacket(Command command) throws JMSException JavaDoc {
1196        if (isClosed()) {
1197            throw new ConnectionClosedException();
1198        } else {
1199
1200            try {
1201                Response response = (Response) this.transport.request(command);
1202                if (response.isException()) {
1203                    ExceptionResponse er = (ExceptionResponse) response;
1204                    if (er.getException() instanceof JMSException JavaDoc)
1205                        throw (JMSException JavaDoc) er.getException();
1206                    else
1207                        throw JMSExceptionSupport.create(er.getException());
1208                }
1209                return response;
1210            } catch (IOException JavaDoc e) {
1211                throw JMSExceptionSupport.create(e);
1212            }
1213        }
1214    }
1215    
1216    /**
1217     * Send a packet through a Connection - for internal use only
1218     *
1219     * @param command
1220     * @return
1221     * @throws JMSException
1222     */

1223    public Response syncSendPacket(Command command, int timeout) throws JMSException JavaDoc {
1224        if (isClosed()) {
1225            throw new ConnectionClosedException();
1226        } else {
1227
1228            try {
1229                Response response = (Response) this.transport.request(command,timeout);
1230                if (response!=null && response.isException()) {
1231                    ExceptionResponse er = (ExceptionResponse) response;
1232                    if (er.getException() instanceof JMSException JavaDoc)
1233                        throw (JMSException JavaDoc) er.getException();
1234                    else
1235                        throw JMSExceptionSupport.create(er.getException());
1236                }
1237                return response;
1238            } catch (IOException JavaDoc e) {
1239                throw JMSExceptionSupport.create(e);
1240            }
1241        }
1242    }
1243
1244    /**
1245     * @return statistics for this Connection
1246     */

1247    public StatsImpl getStats() {
1248        return stats;
1249    }
1250
1251    /**
1252     * simply throws an exception if the Connection is already closed
1253     * or the Transport has failed
1254     *
1255     * @throws JMSException
1256     */

1257    protected synchronized void checkClosedOrFailed() throws JMSException JavaDoc {
1258        checkClosed();
1259        if (transportFailed.get()){
1260            throw new ConnectionFailedException(firstFailureError);
1261        }
1262    }
1263    
1264    /**
1265     * simply throws an exception if the Connection is already closed
1266     *
1267     * @throws JMSException
1268     */

1269    protected synchronized void checkClosed() throws JMSException JavaDoc {
1270        if (closed.get()) {
1271            throw new ConnectionClosedException();
1272        }
1273    }
1274
1275    /**
1276     * Send the ConnectionInfo to the Broker
1277     *
1278     * @throws JMSException
1279     */

1280    protected synchronized void ensureConnectionInfoSent() throws JMSException JavaDoc {
1281        // Can we skip sending the ConnectionInfo packet??
1282
if (isConnectionInfoSentToBroker || closed.get()) {
1283            return;
1284        }
1285
1286        if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
1287            info.setClientId(clientIdGenerator.generateId());
1288        }
1289        syncSendPacket(info);
1290        
1291        this.isConnectionInfoSentToBroker = true;
1292        // Add a temp destination advisory consumer so that
1293
// We know what the valid temporary destinations are on the
1294
// broker without having to do an RPC to the broker.
1295

1296        ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1),consumerIdGenerator.getNextSequenceId());
1297        if( watchTopicAdvisories ) {
1298            advisoryConsumer = new AdvisoryConsumer(this, consumerId);
1299        }
1300    }
1301
1302
1303    /**
1304     * @return Returns the useAsyncSend.
1305     */

1306    public boolean isUseAsyncSend() {
1307        return useAsyncSend;
1308    }
1309    
1310    public void setUseSyncSend(boolean forceSyncSend) {
1311        this.useSyncSend = forceSyncSend;
1312    }
1313
1314
1315    public synchronized boolean isWatchTopicAdvisories() {
1316        return watchTopicAdvisories;
1317    }
1318
1319
1320    public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
1321        this.watchTopicAdvisories = watchTopicAdvisories;
1322    }
1323        
1324
1325    /**
1326     * Forces the use of <a
1327     * HREF="http://activemq.apache.org/async-sends.html">Async Sends</a>
1328     * which adds a massive performance boost; but means that the send() method
1329     * will return immediately whether the message has been sent or not which
1330     * could lead to message loss.
1331     */

1332    public void setUseAsyncSend(boolean useAsyncSend) {
1333        this.useAsyncSend = useAsyncSend;
1334    }
1335    
1336    /**
1337     * @return true if always sync send messages
1338     */

1339    public boolean isAlwaysSyncSend(){
1340        return this.alwaysSyncSend;
1341    }
1342
1343    /**
1344     * Set true if always require messages to be sync sent
1345     * @param alwaysSyncSend
1346     */

1347    public void setAlwaysSyncSend(boolean alwaysSyncSend){
1348        this.alwaysSyncSend=alwaysSyncSend;
1349    }
1350
1351
1352    /**
1353     * Cleans up this connection so that it's state is as if the connection was
1354     * just created. This allows the Resource Adapter to clean up a connection
1355     * so that it can be reused without having to close and recreate the
1356     * connection.
1357     *
1358     */

1359    public void cleanup() throws JMSException JavaDoc {
1360        
1361        if( advisoryConsumer!=null ) {
1362            advisoryConsumer.dispose();
1363            advisoryConsumer=null;
1364        }
1365        
1366        for (Iterator JavaDoc i = this.sessions.iterator(); i.hasNext();) {
1367            ActiveMQSession s = (ActiveMQSession) i.next();
1368            s.dispose();
1369        }
1370        for (Iterator JavaDoc i = this.connectionConsumers.iterator(); i.hasNext();) {
1371            ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer) i.next();
1372            c.dispose();
1373        }
1374        for (Iterator JavaDoc i = this.inputStreams.iterator(); i.hasNext();) {
1375            ActiveMQInputStream c = (ActiveMQInputStream) i.next();
1376            c.dispose();
1377        }
1378        for (Iterator JavaDoc i = this.outputStreams.iterator(); i.hasNext();) {
1379            ActiveMQOutputStream c = (ActiveMQOutputStream) i.next();
1380            c.dispose();
1381        }
1382
1383        if(isConnectionInfoSentToBroker){
1384            if(!transportFailed.get() && !closing.get()){
1385                asyncSendPacket(info.createRemoveCommand());
1386            }
1387            isConnectionInfoSentToBroker=false;
1388        }
1389        if( userSpecifiedClientID ) {
1390            info.setClientId(null);
1391            userSpecifiedClientID=false;
1392        }
1393        clientIDSet = false;
1394
1395        started.set(false);
1396    }
1397
1398    /**
1399     * Changes the associated username/password that is associated with this
1400     * connection. If the connection has been used, you must called cleanup()
1401     * before calling this method.
1402     *
1403     * @throws IllegalStateException
1404     * if the connection is in used.
1405     *
1406     */

1407    public void changeUserInfo(String JavaDoc userName, String JavaDoc password) throws JMSException JavaDoc {
1408        if (isConnectionInfoSentToBroker)
1409            throw new IllegalStateException JavaDoc("changeUserInfo used Connection is not allowed");
1410
1411        this.info.setUserName(userName);
1412        this.info.setPassword(password);
1413    }
1414
1415    /**
1416     * @return Returns the resourceManagerId.
1417     * @throws JMSException
1418     */

1419    public String JavaDoc getResourceManagerId() throws JMSException JavaDoc {
1420        waitForBrokerInfo();
1421        if( brokerInfo==null )
1422            throw new JMSException JavaDoc("Connection failed before Broker info was received.");
1423        return brokerInfo.getBrokerId().getValue();
1424    }
1425
1426    /**
1427     * Returns the broker name if one is available or null if one is not available yet.
1428     */

1429    public String JavaDoc getBrokerName() {
1430        if (brokerInfo == null) {
1431            return null;
1432        }
1433        return brokerInfo.getBrokerName();
1434    }
1435   
1436    /**
1437     * Returns the broker information if it is available or null if it is not available yet.
1438     */

1439    public BrokerInfo getBrokerInfo() {
1440        return brokerInfo;
1441    }
1442
1443    /**
1444     * @return Returns the RedeliveryPolicy.
1445     * @throws JMSException
1446     */

1447    public RedeliveryPolicy getRedeliveryPolicy() throws JMSException JavaDoc {
1448        return redeliveryPolicy;
1449    }
1450
1451    /**
1452     * Sets the redelivery policy to be used when messages are rolled back
1453     */

1454    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
1455        this.redeliveryPolicy = redeliveryPolicy;
1456    }
1457
1458    public BlobTransferPolicy getBlobTransferPolicy() {
1459        if (blobTransferPolicy == null) {
1460            blobTransferPolicy = createBlobTransferPolicy();
1461        }
1462        return blobTransferPolicy;
1463    }
1464
1465    /**
1466     * Sets the policy used to describe how out-of-band BLOBs (Binary Large OBjects)
1467     * are transferred from producers to brokers to consumers
1468     */

1469    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1470        this.blobTransferPolicy = blobTransferPolicy;
1471    }
1472
1473    /**
1474     * @return Returns the alwaysSessionAsync.
1475     */

1476    public boolean isAlwaysSessionAsync(){
1477        return alwaysSessionAsync;
1478    }
1479
1480
1481    /**
1482     * If this flag is set then a separate thread is not used for dispatching
1483     * messages for each Session in the Connection. However, a separate thread
1484     * is always used if there is more than one session, or the session isn't in
1485     * auto acknowledge or duplicates ok mode
1486     */

1487    public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
1488        this.alwaysSessionAsync = alwaysSessionAsync;
1489    }
1490
1491    /**
1492     * @return Returns the optimizeAcknowledge.
1493     */

1494    public boolean isOptimizeAcknowledge(){
1495        return optimizeAcknowledge;
1496    }
1497
1498
1499    /**
1500     * Enables an optimised acknowledgement mode where messages are acknowledged in batches rather than individually
1501     *
1502     * @param optimizeAcknowledge The optimizeAcknowledge to set.
1503     */

1504    public void setOptimizeAcknowledge(boolean optimizeAcknowledge){
1505        this.optimizeAcknowledge=optimizeAcknowledge;
1506    }
1507
1508
1509    private void waitForBrokerInfo() throws JMSException JavaDoc {
1510        try {
1511            brokerInfoReceived.await();
1512        } catch (InterruptedException JavaDoc e) {
1513            Thread.currentThread().interrupt();
1514            throw JMSExceptionSupport.create(e);
1515        }
1516    }
1517
1518    // Package protected so that it can be used in unit tests
1519
Transport getTransport() {
1520        return transport;
1521    }
1522    
1523    public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
1524        producers.put(producerId, producer);
1525    }
1526    public void removeProducer(ProducerId producerId) {
1527        producers.remove(producerId);
1528    }
1529
1530
1531    public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
1532        dispatchers.put(consumerId, dispatcher);
1533    }
1534    public void removeDispatcher(ConsumerId consumerId) {
1535        dispatchers.remove(consumerId);
1536    }
1537    
1538    /**
1539     * @param o - the command to consume
1540     */

1541    public void onCommand(final Object JavaDoc o) {
1542        final Command command = (Command) o;
1543        if (!closed.get() && command != null) {
1544            try {
1545                command.visit(new CommandVisitorAdapter(){
1546                    @Override JavaDoc
1547                    public Response processMessageDispatch(MessageDispatch md) throws Exception JavaDoc {
1548                        ActiveMQDispatcher dispatcher = (ActiveMQDispatcher) dispatchers.get(md.getConsumerId());
1549                        if (dispatcher != null) {
1550                            // Copy in case a embedded broker is dispatching via vm://
1551
// md.getMessage() == null to signal end of queue browse.
1552
Message msg = md.getMessage();
1553                            if( msg!=null ) {
1554                                msg = msg.copy();
1555                                msg.setReadOnlyBody(true);
1556                                msg.setReadOnlyProperties(true);
1557                                msg.setRedeliveryCounter(md.getRedeliveryCounter());
1558                                msg.setConnection(ActiveMQConnection.this);
1559                                md.setMessage( msg );
1560                            }
1561                            dispatcher.dispatch(md);
1562                        }
1563                        return null;
1564                    }
1565                    
1566                    @Override JavaDoc
1567                    public Response processProducerAck(ProducerAck pa) throws Exception JavaDoc {
1568                        ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
1569                        if( producer!=null ) {
1570                            producer.onProducerAck(pa);
1571                        }
1572                        return null;
1573                    }
1574                    
1575                    @Override JavaDoc
1576                    public Response processBrokerInfo(BrokerInfo info) throws Exception JavaDoc {
1577                        brokerInfo=info;
1578                        brokerInfoReceived.countDown();
1579                        optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
1580                        getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
1581                        return null;
1582                    }
1583                    
1584                    @Override JavaDoc
1585                    public Response processConnectionError(final ConnectionError error) throws Exception JavaDoc {
1586                        asyncConnectionThread.execute(new Runnable JavaDoc(){
1587                            public void run() {
1588                                onAsyncException(error.getException());
1589                            }
1590                        });
1591                        new Thread JavaDoc("Async error worker") {
1592                        }.start();
1593                        return null;
1594                    }
1595                    @Override JavaDoc
1596                    public Response processControlCommand(ControlCommand command) throws Exception JavaDoc {
1597                        onControlCommand(command);
1598                        return null;
1599                    }
1600                    @Override JavaDoc
1601                    public Response processConnectionControl(ConnectionControl control) throws Exception JavaDoc {
1602                        onConnectionControl((ConnectionControl) command);
1603                        return null;
1604                    }
1605                    @Override JavaDoc
1606                    public Response processConsumerControl(ConsumerControl control) throws Exception JavaDoc {
1607                        onConsumerControl((ConsumerControl) command);
1608                        return null;
1609                    }
1610                    @Override JavaDoc
1611                    public Response processWireFormat(WireFormatInfo info) throws Exception JavaDoc {
1612                        onWireFormatInfo((WireFormatInfo) command);
1613                        return null;
1614                    }
1615                });
1616            } catch (Exception JavaDoc e) {
1617                onAsyncException(e);
1618            }
1619            
1620        }
1621        for (Iterator JavaDoc iter = transportListeners.iterator(); iter.hasNext();) {
1622            TransportListener listener = (TransportListener) iter.next();
1623            listener.onCommand(command);
1624        }
1625    }
1626
1627    protected void onWireFormatInfo(WireFormatInfo info) {
1628        protocolVersion.set(info.getVersion());
1629    }
1630
1631
1632    /**
1633     * Used for handling async exceptions
1634     *
1635     * @param error
1636     */

1637    public void onAsyncException(Throwable JavaDoc error) {
1638        if (!closed.get() && !closing.get()) {
1639            if (this.exceptionListener != null) {
1640                
1641                if (!(error instanceof JMSException JavaDoc))
1642                    error = JMSExceptionSupport.create(error);
1643                final JMSException JavaDoc e = (JMSException JavaDoc) error;
1644                
1645                asyncConnectionThread.execute(new Runnable JavaDoc(){
1646                    public void run() {
1647                        ActiveMQConnection.this.exceptionListener.onException(e);
1648                    }
1649                });
1650                
1651            } else {
1652                log.warn("Async exception with no exception listener: " + error, error);
1653            }
1654        }
1655    }
1656    
1657    public void onException(final IOException JavaDoc error) {
1658        onAsyncException(error);
1659        asyncConnectionThread.execute(new Runnable JavaDoc(){
1660            public void run() {
1661                transportFailed(error);
1662                ServiceSupport.dispose(ActiveMQConnection.this.transport);
1663                brokerInfoReceived.countDown();
1664        
1665                for (Iterator JavaDoc iter = transportListeners.iterator(); iter.hasNext();) {
1666                    TransportListener listener = (TransportListener) iter.next();
1667                    listener.onException(error);
1668                }
1669            }
1670        });
1671    }
1672    
1673    public void transportInterupted() {
1674        for (Iterator JavaDoc i = this.sessions.iterator(); i.hasNext();) {
1675            ActiveMQSession s = (ActiveMQSession) i.next();
1676            s.clearMessagesInProgress();
1677        }
1678        for (Iterator JavaDoc iter = transportListeners.iterator(); iter.hasNext();) {
1679            TransportListener listener = (TransportListener) iter.next();
1680            listener.transportInterupted();
1681        }
1682    }
1683
1684    public void transportResumed() {
1685        for (Iterator JavaDoc iter = transportListeners.iterator(); iter.hasNext();) {
1686            TransportListener listener = (TransportListener) iter.next();
1687            listener.transportResumed();
1688        }
1689        for (Iterator JavaDoc i = this.sessions.iterator(); i.hasNext();) {
1690            ActiveMQSession s = (ActiveMQSession) i.next();
1691            s.deliverAcks();
1692        }
1693    }
1694
1695
1696    /**
1697     * Create the DestinationInfo object for the temporary destination.
1698     *
1699     * @param topic - if its true topic, else queue.
1700     * @return DestinationInfo
1701     * @throws JMSException
1702     */

1703    protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException JavaDoc {
1704        
1705        // Check if Destination info is of temporary type.
1706
ActiveMQTempDestination dest;
1707        if( topic ) {
1708            dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
1709        } else {
1710            dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
1711        }
1712        
1713        DestinationInfo info = new DestinationInfo();
1714        info.setConnectionId(this.info.getConnectionId());
1715        info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
1716        info.setDestination(dest);
1717        syncSendPacket(info);
1718        
1719        dest.setConnection(this);
1720        activeTempDestinations.put(dest,dest);
1721        return dest;
1722    }
1723    
1724    /**
1725     *
1726     * @param destination
1727     * @throws JMSException
1728     */

1729    public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException JavaDoc {
1730        
1731        checkClosedOrFailed();
1732
1733        for(Iterator JavaDoc i=this.sessions.iterator();i.hasNext();){
1734            ActiveMQSession s=(ActiveMQSession) i.next();
1735            if( s.isInUse(destination) ) {
1736                throw new JMSException JavaDoc("A consumer is consuming from the temporary destination");
1737            }
1738        }
1739        
1740        activeTempDestinations.remove(destination);
1741
1742        DestinationInfo info = new DestinationInfo();
1743        info.setConnectionId(this.info.getConnectionId());
1744        info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
1745        info.setDestination(destination);
1746        info.setTimeout(0);
1747        syncSendPacket(info);
1748    }
1749
1750
1751
1752    public boolean isDeleted(ActiveMQDestination dest) {
1753        
1754        // If we are not watching the advisories.. then
1755
// we will assume that the temp destination does exist.
1756
if( advisoryConsumer==null )
1757            return false;
1758            
1759        return !activeTempDestinations.contains(dest);
1760    }
1761
1762    public boolean isCopyMessageOnSend() {
1763        return copyMessageOnSend;
1764    }
1765
1766    public LongSequenceGenerator getLocalTransactionIdGenerator() {
1767        return localTransactionIdGenerator;
1768    }
1769
1770    public boolean isUseCompression() {
1771        return useCompression;
1772    }
1773
1774    /**
1775     * Enables the use of compression of the message bodies
1776     */

1777    public void setUseCompression(boolean useCompression) {
1778        this.useCompression = useCompression;
1779    }
1780
1781    public void destroyDestination(ActiveMQDestination destination) throws JMSException JavaDoc {
1782        
1783        checkClosedOrFailed();
1784        ensureConnectionInfoSent();
1785
1786        DestinationInfo info = new DestinationInfo();
1787        info.setConnectionId(this.info.getConnectionId());
1788        info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
1789        info.setDestination(destination);
1790        info.setTimeout(0);
1791        syncSendPacket(info);
1792
1793    }
1794
1795    public boolean isDispatchAsync() {
1796        return dispatchAsync;
1797    }
1798
1799    /**
1800     * Enables or disables the default setting of whether or not consumers have
1801     * their messages <a
1802     * HREF="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
1803     * synchronously or asynchronously by the broker</a>.
1804     *
1805     * For non-durable topics for example we typically dispatch synchronously by
1806     * default to minimize context switches which boost performance. However
1807     * sometimes its better to go slower to ensure that a single blocked
1808     * consumer socket does not block delivery to other consumers.
1809     *
1810     * @param asyncDispatch
1811     * If true then consumers created on this connection will default
1812     * to having their messages dispatched asynchronously. The
1813     * default value is false.
1814     */

1815    public void setDispatchAsync(boolean asyncDispatch) {
1816        this.dispatchAsync = asyncDispatch;
1817    }
1818
1819    public boolean isObjectMessageSerializationDefered() {
1820        return objectMessageSerializationDefered;
1821    }
1822
1823    /**
1824     * When an object is set on an ObjectMessage, the JMS spec requires the
1825     * object to be serialized by that set method. Enabling this flag causes the
1826     * object to not get serialized. The object may subsequently get serialized
1827     * if the message needs to be sent over a socket or stored to disk.
1828     */

1829    public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
1830        this.objectMessageSerializationDefered = objectMessageSerializationDefered;
1831    }
1832
1833    public InputStream JavaDoc createInputStream(Destination JavaDoc dest) throws JMSException JavaDoc {
1834        return createInputStream(dest, null);
1835    }
1836
1837    public InputStream JavaDoc createInputStream(Destination JavaDoc dest, String JavaDoc messageSelector) throws JMSException JavaDoc {
1838        return createInputStream(dest, messageSelector, false);
1839    }
1840
1841    public InputStream JavaDoc createInputStream(Destination JavaDoc dest, String JavaDoc messageSelector, boolean noLocal) throws JMSException JavaDoc {
1842        return doCreateInputStream(dest, messageSelector, noLocal, null);
1843    }
1844
1845    public InputStream JavaDoc createDurableInputStream(Topic JavaDoc dest, String JavaDoc name) throws JMSException JavaDoc {
1846        return createInputStream(dest, null, false);
1847    }
1848
1849    public InputStream JavaDoc createDurableInputStream(Topic JavaDoc dest, String JavaDoc name, String JavaDoc messageSelector) throws JMSException JavaDoc {
1850        return createDurableInputStream(dest, name, messageSelector, false);
1851    }
1852
1853    public InputStream JavaDoc createDurableInputStream(Topic JavaDoc dest, String JavaDoc name, String JavaDoc messageSelector, boolean noLocal) throws JMSException JavaDoc {
1854        return doCreateInputStream(dest, messageSelector, noLocal, name);
1855    }
1856    
1857    private InputStream JavaDoc doCreateInputStream(Destination JavaDoc dest, String JavaDoc messageSelector, boolean noLocal, String JavaDoc subName) throws JMSException JavaDoc {
1858        checkClosedOrFailed();
1859        ensureConnectionInfoSent();
1860        return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch());
1861    }
1862
1863
1864    /**
1865     * Creates a persistent output stream; individual messages will be written to disk/database by the broker
1866     */

1867    public OutputStream JavaDoc createOutputStream(Destination JavaDoc dest) throws JMSException JavaDoc {
1868        return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
1869    }
1870
1871    /**
1872     * Creates a non persistent output stream; messages will not be written to disk
1873     */

1874    public OutputStream JavaDoc createNonPersistentOutputStream(Destination JavaDoc dest) throws JMSException JavaDoc {
1875        return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
1876    }
1877
1878    /**
1879     * Creates an output stream allowing full control over the delivery mode,
1880     * the priority and time to live of the messages and the properties added to
1881     * messages on the stream.
1882     *
1883     * @param streamProperties
1884     * defines a map of key-value pairs where the keys are strings
1885     * and the values are primitive values (numbers and strings)
1886     * which are appended to the messages similarly to using the
1887     * {@link javax.jms.Message#setObjectProperty(String, Object)}
1888     * method
1889     */

1890    public OutputStream JavaDoc createOutputStream(Destination JavaDoc dest, Map JavaDoc streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException JavaDoc {
1891        checkClosedOrFailed();
1892        ensureConnectionInfoSent();
1893        return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
1894    }
1895
1896    /**
1897     * Unsubscribes a durable subscription that has been created by a client.
1898     * <P>
1899     * This method deletes the state being maintained on behalf of the
1900     * subscriber by its provider.
1901     * <P>
1902     * It is erroneous for a client to delete a durable subscription while there
1903     * is an active <CODE>MessageConsumer </CODE> or <CODE>TopicSubscriber</CODE>
1904     * for the subscription, or while a consumed message is part of a pending
1905     * transaction or has not been acknowledged in the session.
1906     *
1907     * @param name
1908     * the name used to identify this subscription
1909     * @throws JMSException
1910     * if the session fails to unsubscribe to the durable
1911     * subscription due to some internal error.
1912     * @throws InvalidDestinationException
1913     * if an invalid subscription name is specified.
1914     * @since 1.1
1915     */

1916    public void unsubscribe(String JavaDoc name) throws JMSException JavaDoc {
1917        checkClosedOrFailed();
1918        RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
1919        rsi.setConnectionId(getConnectionInfo().getConnectionId());
1920        rsi.setSubcriptionName(name);
1921        rsi.setClientId(getConnectionInfo().getClientId());
1922        syncSendPacket(rsi);
1923    }
1924
1925    /**
1926     * Internal send method optimized:
1927     * - It does not copy the message
1928     * - It can only handle ActiveMQ messages.
1929     * - You can specify if the send is async or sync
1930     * - Does not allow you to send /w a transaction.
1931     */

1932    void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException JavaDoc {
1933        checkClosedOrFailed();
1934
1935        if( destination.isTemporary() && isDeleted(destination) ) {
1936            throw new JMSException JavaDoc("Cannot publish to a deleted Destination: "+destination);
1937        }
1938        
1939        msg.setJMSDestination(destination);
1940        msg.setJMSDeliveryMode(deliveryMode);
1941        long expiration = 0L;
1942
1943        if (!isDisableTimeStampsByDefault()) {
1944            long timeStamp = System.currentTimeMillis();
1945            msg.setJMSTimestamp(timeStamp);
1946            if (timeToLive > 0) {
1947                expiration = timeToLive + timeStamp;
1948            }
1949        }
1950
1951        msg.setJMSExpiration(expiration);
1952        msg.setJMSPriority(priority);
1953        
1954        msg.setJMSRedelivered(false);
1955        msg.setMessageId( messageId );
1956        
1957        msg.onSend();
1958        
1959        msg.setProducerId(msg.getMessageId().getProducerId());
1960
1961        if (log.isDebugEnabled()) {
1962            log.debug("Sending message: " + msg);
1963        }
1964
1965        if( async) {
1966            asyncSendPacket(msg);
1967        } else {
1968            syncSendPacket(msg);
1969        }
1970
1971    }
1972
1973    public void addOutputStream(ActiveMQOutputStream stream) {
1974        outputStreams.add(stream);
1975    }
1976    public void removeOutputStream(ActiveMQOutputStream stream) {
1977        outputStreams.remove(stream);
1978    }
1979    public void addInputStream(ActiveMQInputStream stream) {
1980        inputStreams.add(stream);
1981    }
1982    public void removeInputStream(ActiveMQInputStream stream) {
1983        inputStreams.remove(stream);
1984    }
1985
1986    protected void onControlCommand(ControlCommand command) {
1987        String JavaDoc text = command.getCommand();
1988        if (text != null) {
1989            if (text.equals("shutdown")) {
1990                log.info("JVM told to shutdown");
1991                System.exit(0);
1992            }
1993        }
1994    }
1995    
1996    protected void onConnectionControl(ConnectionControl command){
1997        if (command.isFaultTolerant()){
1998            this.optimizeAcknowledge = false;
1999            for(Iterator JavaDoc i=this.sessions.iterator();i.hasNext();){
2000                ActiveMQSession s=(ActiveMQSession) i.next();
2001                s.setOptimizeAcknowledge(false);
2002            }
2003        }
2004    }
2005    
2006    protected void onConsumerControl(ConsumerControl command){
2007        if(command.isClose()){
2008            for(Iterator JavaDoc i=this.sessions.iterator();i.hasNext();){
2009                ActiveMQSession s=(ActiveMQSession) i.next();
2010                s.close(command.getConsumerId());
2011            }
2012        }else{
2013            for(Iterator JavaDoc i=this.sessions.iterator();i.hasNext();){
2014                ActiveMQSession s=(ActiveMQSession) i.next();
2015                s.setPrefetchSize(command.getConsumerId(),command.getPrefetch());
2016            }
2017        }
2018    }
2019    
2020    protected void transportFailed(IOException JavaDoc error){
2021        transportFailed.set(true);
2022        if (firstFailureError == null) {
2023            firstFailureError = error;
2024        }
2025        if (!closed.get() && !closing.get()) {
2026            try{
2027                cleanup();
2028            }catch(JMSException JavaDoc e){
2029               log.warn("Cleanup failed",e);
2030            }
2031        }
2032    }
2033
2034    /**
2035     * Should a JMS message be copied to a new JMS Message object as part of the
2036     * send() method in JMS. This is enabled by default to be compliant with the
2037     * JMS specification. You can disable it if you do not mutate JMS messages
2038     * after they are sent for a performance boost
2039     */

2040    public void setCopyMessageOnSend(boolean copyMessageOnSend) {
2041        this.copyMessageOnSend = copyMessageOnSend;
2042    }
2043    
2044    public String JavaDoc toString() {
2045        return "ActiveMQConnection {id="+info.getConnectionId()+",clientId="+info.getClientId()+",started="+started.get()+"}";
2046    }
2047
2048    protected BlobTransferPolicy createBlobTransferPolicy() {
2049        return new BlobTransferPolicy();
2050    }
2051
2052
2053    public int getProtocolVersion() {
2054        return protocolVersion.get();
2055    }
2056
2057
2058    public int getProducerWindowSize() {
2059        return producerWindowSize;
2060    }
2061
2062
2063    public void setProducerWindowSize(int producerWindowSize) {
2064        this.producerWindowSize = producerWindowSize;
2065    }
2066
2067
2068}
2069
Popular Tags