KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > presumo > jms > router > Router


1 /**
2  * This file is part of Presumo.
3  *
4  * Presumo is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * Presumo is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with Presumo; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17  *
18  * Copyright (c) 2001, 2002 Dan Greff
19  */

20 package com.presumo.jms.router;
21
22 import com.presumo.jms.message.JmsMessage;
23 import com.presumo.jms.plugin.MessageQueue;
24 import com.presumo.jms.resources.Resources;
25 import com.presumo.jms.selector.JmsOperand;
26 import com.presumo.jms.selector.Parser;
27
28 import com.presumo.util.log.Logger;
29 import com.presumo.util.log.LoggerFactory;
30
31 import java.io.IOException JavaDoc;
32 import java.util.ArrayList JavaDoc;
33 import javax.jms.ExceptionListener JavaDoc;
34 import javax.jms.DeliveryMode JavaDoc;
35 import javax.jms.JMSException JavaDoc;
36
37
38 /**
39  * The main routing functionality. The Router maintains a list of
40  * <code>RoutingTarget</code>'s, and for every message it is asked
41  * to route, it simply hands it to each routing target asking. The
42  * RoutingTarget itself determines if it needs the message and keeps
43  * it if so.
44  * <p>
45  * The Router is also responsible for maintaining the filters.
46  *
47  * @author Dan Greff
48  */

49 public final class Router extends RouterAdapter
50 {
51   /** Local instance of the parser singleton for convienance. **/
52   private final Parser parser;
53   
54   /** All possible routing targets to route messages to **/
55   private RoutingTarget [] targets = new RoutingTarget[0];
56
57   /** Number of routing targets **/
58   private int numOfTargets;
59
60   /** Synchronization lock **/
61   private final Object JavaDoc targetChangeLock = "JVM_Router_Target_change_lock";
62
63   /** Data structure of all exception listeners **/
64   private final ArrayList JavaDoc eListeners = new ArrayList JavaDoc();
65
66   /** Number of messages routed by this instance **/
67   private long msgsRouted;
68   
69   /** Name of the router **/
70   private String JavaDoc name = "router";
71   
72     /////////////////////////////////////////////////////////////////////////
73
// Constructors //
74
/////////////////////////////////////////////////////////////////////////
75

76   public Router(MessageQueue queue)
77   {
78     super(queue, 100, "JVM Router");
79     parser = Parser.getInstance();
80   }
81
82     /////////////////////////////////////////////////////////////////////////
83
// Public Methods //
84
/////////////////////////////////////////////////////////////////////////
85

86   public final String JavaDoc getName()
87   {
88     return name;
89   }
90
91   public final void setName(String JavaDoc value)
92   {
93     name = value;
94   }
95
96   /**
97    * Add the given target to this instances' list of
98    * <code>RoutingTarget</code>'s to route to.
99    */

100   public final void addTarget(RoutingTarget target)
101   {
102     synchronized (targetChangeLock) {
103       logger.entry("addTarget: " + target.toString());
104       
105       boolean targetAdded = false;
106       for (int i=0; i < targets.length; ++i) {
107         if (targets[i] == null) {
108           targets[i] = target;
109           target.setTargetID(i);
110           targetAdded = true;
111           break;
112         }
113       }
114     
115       if (! targetAdded) {
116         int newSize = targets.length + 1;
117         int targetID = targets.length;
118         RoutingTarget [] tmp = new RoutingTarget[newSize];
119         System.arraycopy(targets, 0, tmp, 0, targets.length);
120         tmp[targetID] = target;
121         targets = tmp;
122         target.setTargetID(targetID);
123       }
124       
125       ++numOfTargets;
126       recalculateFilters(true);
127       
128       if (numOfTargets == 1)
129         this.startRouter();
130         
131       logger.exit("addTarget");
132     }
133   }
134   
135   /**
136    *
137    */

138   public final void removeTarget(RoutingTarget target)
139   {
140     synchronized (targetChangeLock)
141     {
142       logger.entry("removeTarget");
143       
144       for (int i=0; i < targets.length; ++i) {
145         if (target.equals(targets[i])) {
146           targets[i] = null;
147           --numOfTargets;
148           recalculateFilters(false);
149         }
150       }
151
152       if (numOfTargets == 0) {
153         this.stopRouter();
154       }
155       logger.exit("removeTarget");
156     }
157   }
158   
159   /**
160    *
161    */

162   public final void recalculateFilters(boolean filterAdded)
163   {
164     synchronized (targetChangeLock) {
165       logger.entry("recalculateFilters");
166
167       int i,j, targetLoc;
168
169       for (i=0; i < targets.length; ++i) {
170         RoutingTarget t = targets[i];
171         if (t != null && t.needsFilterUpdates()) {
172           if (numOfTargets == 1) {
173              try {
174                t.setRemoteRoutingFilter(parser.parseFilter("false"), false);
175              } catch (javax.jms.InvalidSelectorException JavaDoc ise) {}
176              break;
177           }
178           JmsOperand [] allFilters = new JmsOperand[numOfTargets-1];
179           
180           for (j=0, targetLoc=0; targetLoc < targets.length; ++targetLoc) {
181  
182             // Don't include the current or null filters.
183
if (targetLoc != i && targets[targetLoc] != null) {
184               allFilters[j] = targets[targetLoc].getRoutingFilter();
185               ++j;
186             }
187           }
188           JmsOperand joinedFilter = parser.orTogether(allFilters);
189           t.setRemoteRoutingFilter(joinedFilter, filterAdded);
190         }
191       } // end for(i
192

193       logger.exit("recalculateFilters");
194     } // end synchronized
195
}
196
197   public final void routeMessage(JmsMessage msg) throws IOException JavaDoc
198   {
199     ++msgsRouted;
200     queueMessage(msg);
201   }
202   
203   public final void routeMessages(JmsMessage [] msgs) throws IOException JavaDoc
204   {
205     msgsRouted += msgs.length;
206     queueMessages(msgs);
207   }
208   
209   public final void addExceptionListener(ExceptionListener JavaDoc listener)
210   {
211     synchronized (eListeners) {
212       eListeners.add(listener);
213     }
214   }
215   
216   public final void removeExceptionListener(ExceptionListener JavaDoc listener)
217   {
218     synchronized (eListeners) {
219       eListeners.remove(listener);
220     }
221   }
222   
223
224     /////////////////////////////////////////////////////////////////////////
225
// Protected Methods //
226
/////////////////////////////////////////////////////////////////////////
227

228   /**
229    *
230    *
231    */

232   protected final void routeMessages(int batchsize)
233   {
234     
235     try {
236       JmsMessage [] msgs = getNext(batchsize); // Possible Disk I/O
237

238       if (msgs == null) return;
239       int length = msgs.length;
240
241       synchronized(targetChangeLock)
242       {
243         parser.obtainLock();
244         int targetLength = targets.length;
245         
246         for (int i=0; i < length; i++)
247         {
248           JmsMessage message = msgs[i]; // save on array accesses
249
// TODO:: message.setDeleteBlock();
250
parser.resetEvaluateOnce(); // @see Parser
251

252           // For each target...
253
for (int j=0; j < targetLength; j++)
254           {
255             RoutingTarget t = targets[j];
256             
257             if ( t != null) {
258               message = t.takeMessage(message);
259               if (message == null)
260                 break;
261             }
262           }
263           // TODO:: message.removeDeleteBlock();
264
}
265       }
266       parser.releaseLock();
267     } catch (IOException JavaDoc ioe) {
268       // TODO:: handle resynch scenerio
269
}
270   }
271
272     /////////////////////////////////////////////////////////////////////////
273
// Private Methods //
274
/////////////////////////////////////////////////////////////////////////
275

276   /**
277    * Convienancs function to report an exception to all registered
278    * ExceptionListeners.
279    */

280   private void reportException(Exception JavaDoc e)
281   {
282     JMSException JavaDoc jmsex;
283     if (e instanceof JMSException JavaDoc) {
284       jmsex = (JMSException JavaDoc) e;
285     } else {
286       jmsex = new JMSException JavaDoc("An exception occurred in the Router: " +
287                                e.toString());
288       jmsex.setLinkedException(e);
289     }
290     synchronized (eListeners) {
291       if (eListeners.size() == 0) {
292         jmsex.printStackTrace();
293       } else {
294         for (int i=0; i < eListeners.size(); ++i)
295           ( (ExceptionListener JavaDoc) eListeners.get(i)).onException(jmsex);
296       }
297     }
298   }
299  
300   ////////////////////////////// Misc stuff ////////////////////////////////
301
private static Logger logger =
302           LoggerFactory.getLogger(Router.class, Resources.getBundle());
303   ///////////////////////////////////////////////////////////////////////////
304
}
305
Popular Tags