KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > ActiveMQConnectionConsumer


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18
19 package org.apache.activemq;
20
21 import java.util.Collections JavaDoc;
22 import java.util.LinkedList JavaDoc;
23 import java.util.List JavaDoc;
24
25 import javax.jms.ConnectionConsumer JavaDoc;
26 import javax.jms.IllegalStateException JavaDoc;
27 import javax.jms.JMSException JavaDoc;
28 import javax.jms.ServerSession JavaDoc;
29 import javax.jms.ServerSessionPool JavaDoc;
30 import javax.jms.Session JavaDoc;
31
32 import org.apache.activemq.command.ConsumerInfo;
33 import org.apache.activemq.command.MessageDispatch;
34
35 /**
36  * For application servers, <CODE>Connection</CODE> objects provide a special
37  * facility for creating a <CODE>ConnectionConsumer</CODE> (optional). The
38  * messages it is to consume are specified by a <CODE>Destination</CODE> and
39  * a message selector. In addition, a <CODE>ConnectionConsumer</CODE> must be
40  * given a <CODE>ServerSessionPool</CODE> to use for processing its messages.
41  * <p/>
42  * <P>
43  * Normally, when traffic is light, a <CODE>ConnectionConsumer</CODE> gets a
44  * <CODE>ServerSession</CODE> from its pool, loads it with a single message,
45  * and starts it. As traffic picks up, messages can back up. If this happens, a
46  * <CODE>ConnectionConsumer</CODE> can load each <CODE>ServerSession</CODE>
47  * with more than one message. This reduces the thread context switches and
48  * minimizes resource use at the expense of some serialization of message
49  * processing.
50  *
51  * @see javax.jms.Connection#createConnectionConsumer
52  * @see javax.jms.Connection#createDurableConnectionConsumer
53  * @see javax.jms.QueueConnection#createConnectionConsumer
54  * @see javax.jms.TopicConnection#createConnectionConsumer
55  * @see javax.jms.TopicConnection#createDurableConnectionConsumer
56  */

57
58 public class ActiveMQConnectionConsumer implements ConnectionConsumer JavaDoc, ActiveMQDispatcher {
59
60     private ActiveMQConnection connection;
61     private ServerSessionPool JavaDoc sessionPool;
62     private ConsumerInfo consumerInfo;
63     private boolean closed;
64
65     protected final List JavaDoc messageQueue = Collections.synchronizedList(new LinkedList JavaDoc());
66     
67
68     /**
69      * Create a ConnectionConsumer
70      *
71      * @param theConnection
72      * @param theSessionPool
73      * @param theConsumerInfo
74      * @param theMaximumMessages
75      * @throws JMSException
76      */

77     protected ActiveMQConnectionConsumer(ActiveMQConnection theConnection,
78                                          ServerSessionPool JavaDoc theSessionPool,
79                                          ConsumerInfo theConsumerInfo) throws JMSException JavaDoc {
80         this.connection = theConnection;
81         this.sessionPool = theSessionPool;
82         this.consumerInfo = theConsumerInfo;
83         
84         this.connection.addConnectionConsumer(this);
85         this.connection.addDispatcher(consumerInfo.getConsumerId(), this);
86         this.connection.syncSendPacket(this.consumerInfo);
87     }
88
89     /**
90      * Gets the server session pool associated with this connection consumer.
91      *
92      * @return the server session pool used by this connection consumer
93      * @throws JMSException if the JMS provider fails to get the server session pool
94      * associated with this consumer due to some internal error.
95      */

96
97     public ServerSessionPool JavaDoc getServerSessionPool() throws JMSException JavaDoc {
98         if (closed) {
99             throw new IllegalStateException JavaDoc("The Connection Consumer is closed");
100         }
101         return this.sessionPool;
102     }
103
104     /**
105      * Closes the connection consumer.
106      * <p/>
107      * <P>
108      * Since a provider may allocate some resources on behalf of a connection
109      * consumer outside the Java virtual machine, clients should close these
110      * resources when they are not needed. Relying on garbage collection to
111      * eventually reclaim these resources may not be timely enough.
112      *
113      * @throws JMSException
114      */

115
116     public void close() throws JMSException JavaDoc {
117         if (!closed) {
118             dispose();
119             this.connection.asyncSendPacket(this.consumerInfo.createRemoveCommand());
120         }
121
122     }
123
124     public void dispose() {
125         if (!closed) {
126             this.connection.removeDispatcher(consumerInfo.getConsumerId());
127             this.connection.removeConnectionConsumer(this);
128             closed = true;
129         }
130     }
131
132     public void dispatch(MessageDispatch messageDispatch) {
133         try {
134             messageDispatch.setConsumer(this);
135             
136             ServerSession JavaDoc serverSession = sessionPool.getServerSession();
137             Session JavaDoc s = serverSession.getSession();
138             ActiveMQSession session = null;
139             
140             
141             if( s instanceof ActiveMQSession ) {
142                 session = (ActiveMQSession) s;
143             } else if (s instanceof ActiveMQTopicSession) {
144                 ActiveMQTopicSession topicSession = (ActiveMQTopicSession) s;
145                 session = (ActiveMQSession) topicSession.getNext();
146             } else if (s instanceof ActiveMQQueueSession) {
147                 ActiveMQQueueSession queueSession = (ActiveMQQueueSession) s;
148                 session = (ActiveMQSession) queueSession.getNext();
149             } else {
150                 connection.onAsyncException(new JMSException JavaDoc("Session pool provided an invalid session type: "+s.getClass()));
151                 return;
152             }
153             
154             session.dispatch(messageDispatch);
155             serverSession.start();
156         } catch (JMSException JavaDoc e) {
157             connection.onAsyncException(e);
158         }
159     }
160     
161     public String JavaDoc toString() {
162         return "ActiveMQConnectionConsumer { value=" +consumerInfo.getConsumerId()+" }";
163     }
164 }
165
Popular Tags