KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > ActiveMQSessionExecutor


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18
19 package org.apache.activemq;
20
21 import java.util.Iterator JavaDoc;
22 import java.util.List JavaDoc;
23
24 import javax.jms.JMSException JavaDoc;
25
26 import org.apache.activemq.command.ConsumerId;
27 import org.apache.activemq.command.MessageDispatch;
28 import org.apache.activemq.thread.Task;
29 import org.apache.activemq.thread.TaskRunner;
30 import org.apache.activemq.util.JMSExceptionSupport;
31
32 /**
33  * A utility class used by the Session for dispatching messages asynchronously to consumers
34  *
35  * @version $Revision$
36  * @see javax.jms.Session
37  */

38 public class ActiveMQSessionExecutor implements Task {
39     
40     private ActiveMQSession session;
41     private MessageDispatchChannel messageQueue = new MessageDispatchChannel();
42     private boolean dispatchedBySessionPool;
43     private TaskRunner taskRunner;
44
45     ActiveMQSessionExecutor(ActiveMQSession session) {
46         this.session = session;
47     }
48
49     void setDispatchedBySessionPool(boolean value) {
50         dispatchedBySessionPool = value;
51         wakeup();
52     }
53     
54
55     void execute(MessageDispatch message) throws InterruptedException JavaDoc {
56         if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool){
57             dispatch(message);
58         }else {
59             messageQueue.enqueue(message);
60             wakeup();
61         }
62     }
63
64     public void wakeup() {
65         if( !dispatchedBySessionPool ) {
66             if( session.isSessionAsyncDispatch() ) {
67                 try {
68                     if( taskRunner == null ) {
69                         taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this, "ActiveMQ Session: "+session.getSessionId());
70                     }
71                     taskRunner.wakeup();
72                 } catch (InterruptedException JavaDoc e) {
73                     Thread.currentThread().interrupt();
74                 }
75             } else {
76                 while( iterate() )
77                     ;
78             }
79         }
80     }
81
82     void executeFirst(MessageDispatch message) {
83         messageQueue.enqueueFirst(message);
84         wakeup();
85     }
86
87     public boolean hasUncomsumedMessages() {
88         return !messageQueue.isClosed() && messageQueue.isRunning() && !messageQueue.isEmpty();
89     }
90
91     void dispatch(MessageDispatch message){
92
93         // TODO - we should use a Map for this indexed by consumerId
94

95         for (Iterator JavaDoc i = this.session.consumers.iterator(); i.hasNext();) {
96             ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
97             ConsumerId consumerId = message.getConsumerId();
98             if( consumerId.equals(consumer.getConsumerId()) ) {
99                 consumer.dispatch(message);
100             }
101         }
102     }
103     
104     synchronized void start() {
105         if( !messageQueue.isRunning() ) {
106             messageQueue.start();
107             if( hasUncomsumedMessages() )
108                 wakeup();
109         }
110     }
111
112     void stop() throws JMSException JavaDoc {
113         try {
114             if( messageQueue.isRunning() ) {
115                 messageQueue.stop();
116                 if( taskRunner!=null ) {
117                     taskRunner.shutdown();
118                     taskRunner=null;
119                 }
120             }
121         } catch (InterruptedException JavaDoc e) {
122             Thread.currentThread().interrupt();
123             throw JMSExceptionSupport.create(e);
124         }
125     }
126     
127     boolean isRunning() {
128         return messageQueue.isRunning();
129     }
130
131     void close() {
132         messageQueue.close();
133     }
134
135     void clear() {
136         messageQueue.clear();
137     }
138
139     MessageDispatch dequeueNoWait() {
140         return (MessageDispatch) messageQueue.dequeueNoWait();
141     }
142     
143     protected void clearMessagesInProgress(){
144         messageQueue.clear();
145     }
146
147     public boolean isEmpty() {
148         return messageQueue.isEmpty();
149     }
150
151     public boolean iterate() {
152
153         // Deliver any messages queued on the consumer to their listeners.
154
for (Iterator JavaDoc i = this.session.consumers.iterator(); i.hasNext();) {
155             ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
156             if( consumer.iterate() ) {
157                 return true;
158             }
159         }
160         
161         // No messages left queued on the listeners.. so now dispatch messages queued on the session
162
MessageDispatch message = messageQueue.dequeueNoWait();
163         if( message==null ) {
164             return false;
165         } else {
166             dispatch(message);
167             return !messageQueue.isEmpty();
168         }
169     }
170
171     List JavaDoc getUnconsumedMessages() {
172         return messageQueue.removeAll();
173     }
174     
175 }
176
Popular Tags