KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > presumo > jms > router > RouterAdapter


1 /**
2  * This file is part of Presumo.
3  *
4  * Presumo is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * Presumo is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with Presumo; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17  *
18  *
19  * Copyright 2001 Dan Greff
20  */

21 package com.presumo.jms.router;
22
23 import com.presumo.jms.message.AckHelper;
24 import com.presumo.jms.message.JmsMessage;
25 import com.presumo.jms.plugin.MessageQueue;
26
27 import java.io.IOException JavaDoc;
28
29 import com.presumo.jms.resources.Resources;
30 import com.presumo.util.log.Logger;
31 import com.presumo.util.log.LoggerFactory;
32
33 /**
34  * Class to represent the functionality of starting stoping and closing
35  * a thread that is doing routing. This handles all of the multithreading
36  * logic for the router, and was pulled out to an abstract base class to
37  * simplify the actual Router implementation.
38  *
39  * WARNING: If you change ANYTHING in this class unit test!!!!!
40  */

41 public abstract class RouterAdapter implements Runnable JavaDoc
42 {
43
44     /////////////////////////////////////////////////////////////////////////
45
// Private Instance Variables //
46
/////////////////////////////////////////////////////////////////////////
47
private volatile boolean closed;
48   private volatile boolean stopRouting = true;
49   private int batchSize;
50   private final Object JavaDoc startStopLock = new String JavaDoc("startStopLock");
51   private final Object JavaDoc routerLock = new String JavaDoc("routerLock");
52   private Thread JavaDoc routingThread;
53   private final String JavaDoc threadName;
54   private MessageQueue inbox;
55
56     /////////////////////////////////////////////////////////////////////////
57
// Constructors //
58
/////////////////////////////////////////////////////////////////////////
59

60   public RouterAdapter(MessageQueue queue, int batchSize, String JavaDoc threadName)
61   {
62     super();
63     this.batchSize = batchSize;
64     this.inbox = queue;
65     this.threadName = threadName;
66   }
67
68     /////////////////////////////////////////////////////////////////////////
69
// Public Methods //
70
/////////////////////////////////////////////////////////////////////////
71

72   /**
73    * Implementation of runnable. The method will remain in the loop until
74    * closeRouter() is called. This loop will call routeMessages() while
75    * there are messages to be routed. If there are no messages to be routed
76    * the thread will wait() on the inbox until there are messages.
77    */

78   public void run()
79   {
80     logger.entry(threadName + ": run");
81     synchronized (startStopLock) {
82       
83       for(;;) {
84         
85         if (stopRouting == true)
86           break;
87
88         routeMessages(batchSize);
89
90         synchronized (inbox) {
91           while (stopRouting == false && inbox.size() == 0) {
92             try { inbox.wait(3000); } catch (InterruptedException JavaDoc ie) {}
93             if (stopRouting == false && inbox.size() == 0)
94               timerTick();
95           }
96         }
97       }
98     }
99
100     logger.exit(threadName + ": run");
101   }
102
103     
104     /////////////////////////////////////////////////////////////////////////
105
// Public Methods //
106
/////////////////////////////////////////////////////////////////////////
107

108   public void setMessageQueue(MessageQueue queue)
109   {
110     logger.entry("setMessageQueue", queue);
111
112     synchronized(routerLock) {
113       if (stopRouting == true) throw new
114         IllegalStateException JavaDoc("Attempt to change message queue on running router");
115       
116       synchronized (inbox) {
117         inbox = queue;
118       }
119     }
120     logger.exit("setMessageQueue");
121   }
122   
123   /**
124    *
125    */

126   public void closeRouter()
127   {
128     logger.entry(threadName + ": closeRouter");
129     
130     synchronized (routerLock) {
131       if (closed) return;
132       stopRouter();
133       closed = true;
134     }
135     
136     logger.exit(threadName + ": closeRouter");
137   }
138
139
140   /**
141    * Starts the delivery of asynchronous messages.
142    */

143   public void startRouter()
144   {
145     logger.entry(threadName + ": startRouter");
146     
147     synchronized (routerLock) {
148       
149       if (closed)
150         throw new IllegalStateException JavaDoc("start called on a closed router");
151       if (stopRouting == false) return;
152
153       stopRouting = false;
154       routingThread = new Thread JavaDoc(this, threadName);
155       routingThread.start();
156
157     }
158     
159     logger.exit(threadName + ": startRouter");
160   }
161
162   /**
163    * Stops the delivery of asynchronous messages.
164    */

165   public void stopRouter()
166   {
167     logger.entry(threadName + ": stopRouter");
168     
169     synchronized (routerLock) {
170       
171       if (closed) throw new IllegalStateException JavaDoc("stop() called on a closed connection");
172       if (stopRouting == true) return;
173       
174       // bump the thread out of its inbox.wait
175
synchronized(inbox) {
176         stopRouting = true;
177         inbox.notifyAll();
178       }
179       
180       try {
181         routingThread.join();
182         routingThread = null;
183       } catch (InterruptedException JavaDoc ie) {
184         ie.printStackTrace();
185       }
186     }
187     logger.exit(threadName + ": stopRouter");
188   }
189
190   /**
191    * The size of batches which the router processes messages is a potential
192    * touch point. Consequently I'm making it configurable on the fly.
193    */

194   public void setBatchSize(int batchSize)
195   {
196     this.batchSize = batchSize;
197   }
198
199     //////////////////////////////////////////////////////////////////////////
200
// Protected Methods //
201
//////////////////////////////////////////////////////////////////////////
202

203   /**
204    * Subclasses of this adapter implement this method to add the final
205    * piece of functionality. The thread embedded in this class will
206    * call this method when there are messages on the inbox that need
207    * to be routed.
208    *
209    * @param number Indicates a batch interval in terms of the number of
210    * messages routeMessages() should try to router before
211    * returning.
212    */

213   protected abstract void routeMessages(int number);
214
215   /**
216    * Called when the routing thread has done nothing for a while
217    */

218   protected void timerTick()
219   {
220   }
221   
222   /**
223    * @return The next message off of the queue. The queue maintains FIFO.
224    */

225   protected final JmsMessage [] getNext(int batchsize) throws IOException JavaDoc
226   {
227     JmsMessage [] msgs = null;
228     synchronized (inbox) {
229       msgs = inbox.getNext(batchsize);
230     }
231     return msgs;
232   }
233
234   protected final int queueSize()
235   {
236     synchronized (inbox) {
237       return inbox.size();
238     }
239   }
240
241   /**
242    * Puts a message on the queue and notifies the routing thread.
243    */

244   protected final void queueMessage(JmsMessage msg) throws IOException JavaDoc
245   {
246     if (inbox.isPersistent()) {
247       AckHelper ah = msg.getAckHelper();
248       if (ah != null) {
249         ah.setMessageQueue(inbox);
250       }
251     }
252
253     synchronized (inbox) {
254       inbox.push(msg);
255       inbox.notifyAll();
256     }
257
258   }
259
260   /**
261   protected final void queueMessageFront(JmsMessage msg) throws IOException
262   {
263     // TODO::
264     synchronized (inbox) {
265       inbox.push(msg);
266       inbox.notifyAll();
267     }
268
269   }
270   **/

271
272   /**
273    * Puts an array of messages on the queue and notifies the routing thread.
274    */

275   protected final void queueMessages(JmsMessage [] msgs) throws IOException JavaDoc
276   {
277     if (inbox.isPersistent()) {
278       for (int i=0; i < msgs.length; ++i) {
279         AckHelper ah = msgs[i].getAckHelper();
280         if (ah != null) {
281           ah.setMessageQueue(inbox);
282         }
283       }
284     }
285
286     synchronized (inbox) {
287       inbox.push(msgs);
288       inbox.notifyAll();
289     }
290
291   }
292
293   ////////////////////////////// Misc stuff ////////////////////////////////
294
private static Logger logger =
295      LoggerFactory.getLogger(RouterAdapter.class, Resources.getBundle());
296   ///////////////////////////////////////////////////////////////////////////
297
}
298
299
300
301
302
303
304
305
306
307
308
309
310
Popular Tags