KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > presumo > jms > router > RemoteSession


1 /**
2  * This file is part of Presumo.
3  *
4  * Presumo is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * Presumo is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with Presumo; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17  *
18  *
19  * Copyright 2001 Dan Greff
20  */

21 package com.presumo.jms.router;
22
23 import com.presumo.jms.message.JmsMessage;
24 import com.presumo.jms.message.SystemMessageConstants;
25 import com.presumo.jms.plugin.implementation.MemoryMessageQueue;
26 import com.presumo.jms.plugin.transport.Transport;
27 import com.presumo.jms.selector.JmsOperand;
28 import com.presumo.jms.selector.Parser;
29
30
31 import com.presumo.jms.resources.Resources;
32 import com.presumo.util.log.Logger;
33 import com.presumo.util.log.LoggerFactory;
34
35 import java.io.IOException JavaDoc;
36 import java.util.ArrayList JavaDoc;
37
38 import javax.jms.InvalidSelectorException JavaDoc;
39 import javax.jms.JMSException JavaDoc;
40
41 /**
42  * Class that sits between the Router and the Transport layer in
43  * terms of message flow. Essentially encapsulates the remote
44  * connection from the router, as well as handling the persistent
45  * message acknowledgment scheme.
46  *
47  * @author Dan Greff
48  */

49 public class RemoteSession implements RoutingTarget
50 {
51
52   /** Largest batch of messages that will be bundled together **/
53   protected static final int MSG_BATCH_SIZE = 2000;
54
55   /** Time that between sending batches of messages **/
56   protected static final long latency = 500;
57
58   /** Property name for sending filters via system messages **/
59   protected static final String JavaDoc FILTER_PROPERTY = "JMSX_FILTER";
60
61   /** Property name for sending acks via system messages **/
62   protected static final String JavaDoc ACKS_PROPERTY = "JMSX_ACKS";
63   
64
65   /** Filter sent to this JVM from the other side of the connection
66    * which indicates what messages the router on the other side is
67    * interested in.
68    **/

69   protected JmsOperand remoteFilter;
70
71   /** Local filter which the represents what messages the local
72    * router is interested in. This filter is sent to other routers.
73    */

74   protected JmsOperand localFilter;
75   
76   /** Router this RemoteSession is connected to **/
77   protected final Router router;
78
79   /** Interface which messages are sent to and received from the network **/
80   protected Transport transport;
81
82   /** Listener to be notified of if the connection is lost **/
83   protected ConnectionListener connxListener;
84
85   /** Handles acknowledgement logic for persistent messages **/
86   protected PersistentAckHandler persistentAckHandler;
87
88   /** Instance of inner class that reads message from the Transport **/
89   protected MessageReader msgReader;
90
91   /** Instance of inner class that writes message from the Transport **/
92   protected MessageWriter msgWriter;
93
94   /** Set unique ID assigned to the remote session by the router **/
95   protected int targetID;
96
97   /** Local copy of the Parser singleton **/
98   private final Parser parser = Parser.getInstance();
99   
100   /** Messages queued up to be send **/
101   private final ArrayList JavaDoc outbox = new ArrayList JavaDoc();
102
103
104     /////////////////////////////////////////////////////////////////////////
105
// Constructors //
106
/////////////////////////////////////////////////////////////////////////
107

108   public RemoteSession(Router router,
109                        Transport transport,
110                        ConnectionListener listener)
111   {
112     logger.entry("RemoteSession<init>");
113     
114     this.router = router;
115     this.transport = transport;
116     this.connxListener = listener;
117
118     try {
119       remoteFilter = parser.parseFilter("true");
120     } catch (InvalidSelectorException JavaDoc ise) {}
121
122     persistentAckHandler = new PersistentAckHandler(this);
123     router.addTarget(this);
124     start();
125     
126     logger.exit("RemoteSession<init>");
127   }
128
129
130     /////////////////////////////////////////////////////////////////////////
131
// Public methods //
132
/////////////////////////////////////////////////////////////////////////
133

134   public void setTransport(Transport t)
135   {
136     logger.entry("setTransport", t);
137
138     // TODO:: Add error checking to make sure this is only being called
139
// as a result of a reconnection. DTG
140
this.transport = t;
141
142     logger.exit("setTransport");
143   }
144
145
146   public synchronized void start()
147   {
148     logger.entry("start");
149     
150     if (msgReader == null) {
151       msgReader = new MessageReader();
152       msgReader.start();
153     }
154     if (msgWriter == null) {
155       msgWriter = new MessageWriter();
156       msgWriter.start();
157     }
158     
159     logger.exit("start");
160   }
161
162   public synchronized void stop()
163   {
164     logger.entry("stop");
165     
166     if (msgReader != null) {
167       msgReader.stopReader();
168       msgReader = null;
169     }
170     if (msgWriter != null) {
171       msgWriter.stopWriter();
172       msgWriter = null;
173     }
174     logger.exit("stop");
175   }
176   
177   public void close()
178   {
179     logger.entry("close");
180     this.stop();
181       
182     if (transport != null) {
183       transport.close();
184       transport = null;
185     }
186
187     router.removeTarget(this);
188     logger.exit("close");
189   }
190   
191   ///////////////////////////////////////////////////////////////////////////
192
// Begin implementation of RoutingTarget interface methods
193
//
194

195   public void setTargetID(int id)
196   {
197     logger.entry("setTargetID", new Integer JavaDoc(id));
198     this.targetID = id;
199     logger.exit("setTargetID");
200   }
201   
202   public JmsOperand getRoutingFilter()
203   {
204     logger.entry("getRoutingFilter");
205     JmsOperand retval = remoteFilter;
206     logger.exit("getRoutingFilter", retval);
207     return retval;
208   }
209   
210   public void setRemoteRoutingFilter(JmsOperand filter, boolean add)
211   {
212     logger.entry("setRemoteRoutingFilter", filter, new Boolean JavaDoc(add));
213
214     if (filter != localFilter)
215     {
216       JmsMessage msg = new JmsMessage(router.getName());
217       try {
218         String JavaDoc newFilter = parser.unparse(filter);
219         msg.setJMSSystemMsgType(SystemMessageConstants.REMOTE_FILTER_CHANGE_TYPE);
220         msg.setStringProperty(FILTER_PROPERTY, parser.unparse(filter));
221         localFilter = filter;
222         
223         if (logger.isDebugEnabled()) {
224           logger.debug("setRemoteRoutingFilter() local filter changed to:\n"+newFilter);
225         }
226         queueMessage(msg);
227       } catch (Exception JavaDoc e) {
228         // shouldn't happen with a in memory message queue but, dump it anyways
229
logger.exception(e);
230       }
231     }
232     logger.exit("setRemoteRoutingFilter");
233   }
234   
235   public boolean needsFilterUpdates()
236   {
237     return true;
238   }
239   
240   /**
241    * Called by the router to give a potential message to this
242    * routing target.
243    *
244    * Implementes RoutingTarget.takeMessage();
245    */

246   public JmsMessage takeMessage(JmsMessage msg)
247   {
248     boolean taken = false;
249     if (msg.getSendingTargetID() != this.targetID) // prevent backtracking
250
{
251       taken = parser.evaluate(remoteFilter, msg);
252       if (taken) {
253         persistentAckHandler.handleOutgoingMsg(msg);
254
255         synchronized(outbox) {
256           outbox.add(msg);
257           outbox.notifyAll();
258         }
259
260       }
261     }
262     return msg;
263   }
264   
265   
266   //
267
// End implementaiton of RoutingTarget interface methods
268
///////////////////////////////////////////////////////////////////////////
269

270   
271   /**
272    * Called by MessageReader.run() when messages come from the transport.
273    */

274   public void receiveMessages(JmsMessage [] msgs)
275   {
276     int length = msgs.length;
277     int msgsRemoved = 0;
278     for (int i=0; i < length; ++i)
279     {
280       if (logger.isDebugEnabled())
281         logger.debug("receiveMessages() \n" + msgs[i].toString());
282   
283       boolean systemMsg = false;
284       int type = msgs[i].getJMSSystemMsgType();
285       switch (type)
286       {
287         case(SystemMessageConstants.REMOTE_FILTER_CHANGE_TYPE):
288           handleRemoteFilterMsg(msgs[i]);
289           systemMsg = true;
290           break;
291         case(SystemMessageConstants.ACKS_MSG_TYPE):
292           String JavaDoc acks = (String JavaDoc) msgs[i].getObjectProperty(ACKS_PROPERTY);
293           persistentAckHandler.handleAcks(acks);
294           systemMsg = true;
295           break;
296       }
297       
298       if (systemMsg) {
299         ++msgsRemoved;
300         msgs[i] = null;
301       }
302       else {
303
304         // Filter out duplicates
305
boolean duplicate = persistentAckHandler.isDuplicate(msgs[i]);
306         if (duplicate) {
307           msgs[i] = null;
308           ++msgsRemoved;
309         }
310         else {
311           // Valid user message... start acknowledgment tracking
312
msgs[i].setSendingTargetID( targetID );
313           persistentAckHandler.setOriginator(msgs[i]);
314         }
315       }
316     }
317           
318     if (msgsRemoved > 0) {
319       msgs = collapseMsgArray(msgs, length - msgsRemoved);
320     }
321
322     try {
323       router.routeMessages(msgs);
324       persistentAckHandler.handleIncomingMsgs(msgs);
325     } catch (IOException JavaDoc ioe) {
326       // TODO:: add some error handling
327
logger.error("An exception occurred while routing msgs: \n", ioe);
328     }
329
330   }
331
332
333     /////////////////////////////////////////////////////////////////////////
334
// Package methods //
335
/////////////////////////////////////////////////////////////////////////
336

337   void acksAvailable()
338   {
339     synchronized(outbox) {
340       outbox.notifyAll();
341     }
342   }
343
344     /////////////////////////////////////////////////////////////////////////
345
// Private methods //
346
/////////////////////////////////////////////////////////////////////////
347

348   private void queueMessage(JmsMessage msg)
349   {
350     synchronized(outbox) {
351       outbox.add(msg);
352       outbox.notifyAll();
353     }
354   }
355
356   
357   private void handleRemoteFilterMsg(JmsMessage msg)
358   {
359     String JavaDoc filter = (String JavaDoc) msg.getObjectProperty(FILTER_PROPERTY);
360     try {
361       remoteFilter = parser.parseFilter(filter);
362       router.recalculateFilters(true);
363     } catch (javax.jms.InvalidSelectorException JavaDoc ex) {
364       logger.error("Remote client sent invalid routing filter: " +
365                    transport.getRemoteID() + " :: " + filter);
366     }
367   }
368   
369   /*
370    * Utility function to remove null values from the array
371    */

372   private JmsMessage [] collapseMsgArray(JmsMessage [] msgs, int size)
373   {
374     JmsMessage [] retval = new JmsMessage[size];
375     for (int i=0, j=0; i < msgs.length; ++i) {
376       if (msgs[i] != null) {
377         retval[j] = msgs[i];
378         ++j;
379       }
380     }
381     return retval;
382   }
383
384
385   /*
386    * Called by the internal methods reading and sending messages
387    * to the transport layer when an IOException is thrown from
388    * the transport layer. This indicates a connection loss or
389    * corruption.
390    *
391    * TODO:: explain why this needs to spawn off as a thread
392    */

393   private void connectionLost(IOException JavaDoc ioe)
394   {
395     Thread JavaDoc t = new Thread JavaDoc("ConnectionLost Thread") {
396       public void run() {
397         logger.entry("connectionLostThread.run()");
398         RemoteSession.this.stop();
399         if (transport != null) {
400           transport.close();
401           transport = null;
402         }
403
404         connxListener.connectionLost(RemoteSession.this);
405         logger.exit("connectionLostThread.run()");
406       }
407     };
408     t.start();
409
410   }
411
412
413   private JmsMessage createAckMessage()
414   {
415     JmsMessage ackMessage = null;
416     String JavaDoc acks = persistentAckHandler.getAckString();
417     
418     if (acks != null) {
419       
420       ackMessage = new JmsMessage(router.getName());
421
422       try {
423         ackMessage.setJMSSystemMsgType(SystemMessageConstants.ACKS_MSG_TYPE);
424         ackMessage.setStringProperty(ACKS_PROPERTY, acks);
425       } catch (JMSException JavaDoc jmsex) {}
426     }
427
428     return ackMessage;
429   }
430
431   
432     /////////////////////////////////////////////////////////////////////////
433
// Begin Inner Class MessageReader //
434
/////////////////////////////////////////////////////////////////////////
435
/**
436    * Inner class reperesenting thread reading messages from the transport
437    * implementation.
438    */

439   protected class MessageReader extends Thread JavaDoc
440   {
441  
442     private volatile boolean stopped = false;
443     
444     public MessageReader() { super("MessageReader"); }
445
446     public void stopReader()
447     {
448       if (! stopped) stopped = true;
449     }
450     
451     public void run()
452     {
453       JmsMessage [] msgs;
454       while (stopped == false) {
455         try {
456           msgs = transport.receiveMessages();
457           if (msgs != null && stopped == false) {
458             receiveMessages(msgs);
459             msgs = null;
460           }
461         } catch (IOException JavaDoc ioe) {
462           if (! stopped) {
463             // TODO:: handle this properly
464
stopped = true;
465             connectionLost(ioe);
466           }
467         }
468       }
469     }
470   }
471     /////////////////////////////////////////////////////////////////////////
472
// End Inner Class MessageReader //
473
/////////////////////////////////////////////////////////////////////////
474

475     /////////////////////////////////////////////////////////////////////////
476
// Begin Inner Class MessageWriter //
477
/////////////////////////////////////////////////////////////////////////
478

479   /**
480    * Inner class writing messages to the Transport
481    */

482   protected class MessageWriter extends Thread JavaDoc
483   {
484     private volatile boolean stopped = false;
485     private volatile boolean sendMessages;
486
487     public MessageWriter() { super("MessageWriter"); }
488
489     public final void stopWriter()
490     {
491       if (! stopped) {
492         synchronized (outbox) {
493           stopped = true;
494           outbox.notifyAll();
495         }
496       }
497     }
498
499     public final void sendMessagesNow()
500     {
501       sendMessages = true;
502     }
503
504     public final void run()
505     {
506       long nextSend = System.currentTimeMillis() + latency;
507       long sleepTime = 0;
508
509       while (!stopped) {
510
511         synchronized (outbox) {
512           sendMessages = false;
513
514           if (outbox.size() >= MSG_BATCH_SIZE) {
515             sendMessages = true;
516           } else if(outbox.size() > 0 ||
517                     persistentAckHandler.acksAvailable() ) {
518             long currentTime = System.currentTimeMillis();
519             long diff = nextSend - currentTime;
520             if (diff <= 0) {
521               sendMessages = true;
522             }
523             else {
524               sleepTime = diff;
525             }
526           } else {
527             sleepTime = 0;
528           }
529
530           if (sendMessages == false && !stopped) {
531             try {
532               outbox.wait(sleepTime);
533             } catch (InterruptedException JavaDoc ie) {}
534           }
535           
536         }
537
538         if (sendMessages == true) {
539           send();
540           nextSend = System.currentTimeMillis() + latency;
541           sleepTime = 0;
542         }
543       }
544     }
545  
546
547
548   private final void send()
549   {
550     try {
551       JmsMessage [] msgs = null;
552       JmsMessage ackMessage = createAckMessage();
553
554       synchronized(outbox) {
555         if (ackMessage != null) {
556           outbox.add(ackMessage);
557         }
558         msgs = new JmsMessage[outbox.size()];
559         msgs = (JmsMessage[]) outbox.toArray(msgs);
560         outbox.clear();
561       }
562       
563       if (msgs != null && msgs.length != 0 && transport != null) {
564         transport.sendMessages(msgs);
565       }
566       
567     } catch (IOException JavaDoc ioe) {
568       connectionLost(ioe);
569     }
570   }
571
572   
573   }
574     /////////////////////////////////////////////////////////////////////////
575
// End Inner Class MessageReader //
576
/////////////////////////////////////////////////////////////////////////
577

578
579   ////////////////////////////// Misc stuff ////////////////////////////////
580
private static Logger logger =
581     LoggerFactory.getLogger(RemoteSession.class, Resources.getBundle());
582   ///////////////////////////////////////////////////////////////////////////
583

584 }
585
Popular Tags