KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > stack > Protocol


1 // $Id: Protocol.java,v 1.25 2005/04/20 20:25:49 belaban Exp $
2

3 package org.jgroups.stack;
4
5
6 import org.apache.commons.logging.Log;
7 import org.apache.commons.logging.LogFactory;
8 import org.jgroups.Event;
9 import org.jgroups.util.Queue;
10 import org.jgroups.util.QueueClosedException;
11 import org.jgroups.util.Util;
12
13 import java.util.Properties JavaDoc;
14 import java.util.Vector JavaDoc;
15
16
17
18
19 class UpHandler extends Thread JavaDoc {
20     private Queue mq=null;
21     private Protocol handler=null;
22     private ProtocolObserver observer=null;
23     protected final Log log=LogFactory.getLog(this.getClass());
24
25
26     public UpHandler(Queue mq, Protocol handler, ProtocolObserver observer) {
27         this.mq=mq;
28         this.handler=handler;
29         this.observer=observer;
30         if(handler != null)
31             setName("UpHandler (" + handler.getName() + ')');
32         else
33             setName("UpHandler");
34         setDaemon(true);
35     }
36
37
38     public void setObserver(ProtocolObserver observer) {
39         this.observer=observer;
40     }
41
42
43     /** Removes events from mq and calls handler.up(evt) */
44     public void run() {
45         Event evt;
46         while(!mq.closed()) {
47             try {
48                 evt=(Event)mq.remove();
49                 if(evt == null) {
50                     if(log.isWarnEnabled()) log.warn("removed null event");
51                     continue;
52                 }
53
54                 if(observer != null) { // call debugger hook (if installed)
55
if(observer.up(evt, mq.size()) == false) { // false means discard event
56
return;
57                     }
58                 }
59                 handler.up(evt);
60             }
61             catch(QueueClosedException queue_closed) {
62                 break;
63             }
64             catch(Throwable JavaDoc e) {
65                 if(log.isWarnEnabled()) log.warn(getName() + " exception: " + e);
66                 e.printStackTrace();
67             }
68         }
69     }
70
71 }
72
73
74 class DownHandler extends Thread JavaDoc {
75     private Queue mq=null;
76     private Protocol handler=null;
77     private ProtocolObserver observer=null;
78     protected final Log log=LogFactory.getLog(this.getClass());
79
80
81
82     public DownHandler(Queue mq, Protocol handler, ProtocolObserver observer) {
83         this.mq=mq;
84         this.handler=handler;
85         this.observer=observer;
86         if(handler != null)
87             setName("DownHandler (" + handler.getName() + ')');
88         else
89             setName("DownHandler");
90         setDaemon(true);
91     }
92
93
94     public void setObserver(ProtocolObserver observer) {
95         this.observer=observer;
96     }
97
98
99     /** Removes events from mq and calls handler.down(evt) */
100     public void run() {
101         Event evt;
102         while(!mq.closed()) {
103             try {
104                 evt=(Event)mq.remove();
105                 if(evt == null) {
106                     if(log.isWarnEnabled()) log.warn("removed null event");
107                     continue;
108                 }
109
110                 if(observer != null) { // call debugger hook (if installed)
111
if(observer.down(evt, mq.size()) == false) { // false means discard event
112
continue;
113                     }
114                 }
115
116                 int type=evt.getType();
117                 if(type == Event.START || type == Event.STOP) {
118                     if(handler.handleSpecialDownEvent(evt) == false)
119                         continue;
120                 }
121                 handler.down(evt);
122             }
123             catch(QueueClosedException queue_closed) {
124                 break;
125             }
126             catch(Throwable JavaDoc e) {
127                 if(log.isWarnEnabled()) log.warn(getName() + " exception is " + e);
128                 e.printStackTrace();
129             }
130         }
131     }
132
133 }
134
135
136 /**
137  * The Protocol class provides a set of common services for protocol layers. Each layer has to
138  * be a subclass of Protocol and override a number of methods (typically just <code>up()</code>,
139  * <code>Down</code> and <code>getName</code>. Layers are stacked in a certain order to form
140  * a protocol stack. <a HREF=org.jgroups.Event.html>Events</a> are passed from lower
141  * layers to upper ones and vice versa. E.g. a Message received by the UDP layer at the bottom
142  * will be passed to its higher layer as an Event. That layer will in turn pass the Event to
143  * its layer and so on, until a layer handles the Message and sends a response or discards it,
144  * the former resulting in another Event being passed down the stack.<p>
145  * Each layer has 2 FIFO queues, one for up Events and one for down Events. When an Event is
146  * received by a layer (calling the internal upcall <code>ReceiveUpEvent</code>), it is placed
147  * in the up-queue where it will be retrieved by the up-handler thread which will invoke method
148  * <code>Up</code> of the layer. The same applies for Events traveling down the stack. Handling
149  * of the up-handler and down-handler threads and the 2 FIFO queues is donw by the Protocol
150  * class, subclasses will almost never have to override this behavior.<p>
151  * The important thing to bear in mind is that Events have to passed on between layers in FIFO
152  * order which is guaranteed by the Protocol implementation and must be guranteed by subclasses
153  * implementing their on Event queuing.<p>
154  * <b>Note that each class implementing interface Protocol MUST provide an empty, public
155  * constructor !</b>
156  */

157 public abstract class Protocol {
158     protected final Properties JavaDoc props=new Properties JavaDoc();
159     protected Protocol up_prot=null, down_prot=null;
160     protected ProtocolStack stack=null;
161     protected final Queue up_queue=new Queue();
162     protected final Queue down_queue=new Queue();
163     protected UpHandler up_handler=null;
164     protected int up_thread_prio=-1;
165     protected DownHandler down_handler=null;
166     protected int down_thread_prio=-1;
167     protected ProtocolObserver observer=null; // hook for debugger
168
private final static long THREAD_JOIN_TIMEOUT=1000;
169     protected boolean down_thread=true; // determines whether the down_handler thread should be started
170
protected boolean up_thread=true; // determines whether the up_handler thread should be started
171
protected final Log log=LogFactory.getLog(this.getClass());
172
173
174     /**
175      * Configures the protocol initially. A configuration string consists of name=value
176      * items, separated by a ';' (semicolon), e.g.:<pre>
177      * "loopback=false;unicast_inport=4444"
178      * </pre>
179      */

180     public boolean setProperties(Properties JavaDoc props) {
181         if(props != null)
182             this.props.putAll(props);
183         return true;
184     }
185
186
187     /** Called by Configurator. Removes 2 properties which are used by the Protocol directly and then
188      * calls setProperties(), which might invoke the setProperties() method of the actual protocol instance.
189      */

190     public boolean setPropertiesInternal(Properties JavaDoc props) {
191         String JavaDoc str;
192         this.props.putAll(props);
193
194         str=props.getProperty("down_thread");
195         if(str != null) {
196             down_thread=Boolean.valueOf(str).booleanValue();
197             props.remove("down_thread");
198         }
199
200         str=props.getProperty("down_thread_prio");
201         if(str != null) {
202             down_thread_prio=Integer.parseInt(str);
203             props.remove("down_thread_prio");
204         }
205
206         str=props.getProperty("up_thread");
207         if(str != null) {
208             up_thread=Boolean.valueOf(str).booleanValue();
209             props.remove("up_thread");
210         }
211
212         str=props.getProperty("up_thread_prio");
213         if(str != null) {
214             up_thread_prio=Integer.parseInt(str);
215             props.remove("up_thread_prio");
216         }
217
218         return setProperties(props);
219     }
220
221
222     public Properties JavaDoc getProperties() {
223         return props;
224     }
225
226
227     public void setObserver(ProtocolObserver observer) {
228         this.observer=observer;
229         observer.setProtocol(this);
230         if(up_handler != null)
231             up_handler.setObserver(observer);
232         if(down_handler != null)
233             down_handler.setObserver(observer);
234     }
235
236     /**
237      * Called after instance has been created (null constructor) and before protocol is started.
238      * Properties are already set. Other protocols are not yet connected and events cannot yet be sent.
239      * @exception Exception Thrown if protocol cannot be initialized successfully. This will cause the
240      * ProtocolStack to fail, so the channel constructor will throw an exception
241      */

242     public void init() throws Exception JavaDoc {
243     }
244
245     /**
246      * This method is called on a {@link org.jgroups.Channel#connect(String)}. Starts work.
247      * Protocols are connected and queues are ready to receive events.
248      * Will be called <em>from bottom to top</em>. This call will replace
249      * the <b>START</b> and <b>START_OK</b> events.
250      * @exception Exception Thrown if protocol cannot be started successfully. This will cause the ProtocolStack
251      * to fail, so {@link org.jgroups.Channel#connect(String)} will throw an exception
252      */

253     public void start() throws Exception JavaDoc {
254     }
255
256     /**
257      * This method is called on a {@link org.jgroups.Channel#disconnect()}. Stops work (e.g. by closing multicast socket).
258      * Will be called <em>from top to bottom</em>. This means that at the time of the method invocation the
259      * neighbor protocol below is still working. This method will replace the
260      * <b>STOP</b>, <b>STOP_OK</b>, <b>CLEANUP</b> and <b>CLEANUP_OK</b> events. The ProtocolStack guarantees that
261      * when this method is called all messages in the down queue will have been flushed
262      */

263     public void stop() {
264     }
265
266
267     /**
268      * This method is called on a {@link org.jgroups.Channel#close()}.
269      * Does some cleanup; after the call the VM will terminate
270      */

271     public void destroy() {
272     }
273
274
275     public Queue getUpQueue() {
276         return up_queue;
277     } // used by Debugger (ProtocolView)
278

279     public Queue getDownQueue() {
280         return down_queue;
281     } // used by Debugger (ProtocolView)
282

283
284     /** List of events that are required to be answered by some layer above.
285      @return Vector (of Integers) */

286     public Vector JavaDoc requiredUpServices() {
287         return null;
288     }
289
290     /** List of events that are required to be answered by some layer below.
291      @return Vector (of Integers) */

292     public Vector JavaDoc requiredDownServices() {
293         return null;
294     }
295
296     /** List of events that are provided to layers above (they will be handled when sent down from
297      above).
298      @return Vector (of Integers) */

299     public Vector JavaDoc providedUpServices() {
300         return null;
301     }
302
303     /** List of events that are provided to layers below (they will be handled when sent down from
304      below).
305      @return Vector (of Integers) */

306     public Vector JavaDoc providedDownServices() {
307         return null;
308     }
309
310
311     public abstract String JavaDoc getName(); // all protocol names have to be unique !
312

313     public Protocol getUpProtocol() {
314         return up_prot;
315     }
316
317     public Protocol getDownProtocol() {
318         return down_prot;
319     }
320
321     public void setUpProtocol(Protocol up_prot) {
322         this.up_prot=up_prot;
323     }
324
325     public void setDownProtocol(Protocol down_prot) {
326         this.down_prot=down_prot;
327     }
328
329     public void setProtocolStack(ProtocolStack stack) {
330         this.stack=stack;
331     }
332
333
334     /** Used internally. If overridden, call this method first. Only creates the up_handler thread
335      if down_thread is true */

336     public void startUpHandler() {
337         if(up_thread) {
338             if(up_handler == null) {
339                 up_handler=new UpHandler(up_queue, this, observer);
340                 if(up_thread_prio >= 0) {
341                     try {
342                         up_handler.setPriority(up_thread_prio);
343                     }
344                     catch(Throwable JavaDoc t) {
345                         if(log.isErrorEnabled()) log.error("priority " + up_thread_prio +
346                                     " could not be set for thread: " + Util.getStackTrace(t));
347                     }
348                 }
349                 up_handler.start();
350             }
351         }
352     }
353
354
355     /** Used internally. If overridden, call this method first. Only creates the down_handler thread
356      if down_thread is true */

357     public void startDownHandler() {
358         if(down_thread) {
359             if(down_handler == null) {
360                 down_handler=new DownHandler(down_queue, this, observer);
361                 if(down_thread_prio >= 0) {
362                     try {
363                         down_handler.setPriority(down_thread_prio);
364                     }
365                     catch(Throwable JavaDoc t) {
366                         if(log.isErrorEnabled()) log.error("priority " + down_thread_prio +
367                                     " could not be set for thread: " + Util.getStackTrace(t));
368                     }
369                 }
370                 down_handler.start();
371             }
372         }
373     }
374
375
376     /** Used internally. If overridden, call parent's method first */
377     public void stopInternal() {
378         up_queue.close(false); // this should terminate up_handler thread
379

380         if(up_handler != null && up_handler.isAlive()) {
381             try {
382                 up_handler.join(THREAD_JOIN_TIMEOUT);
383             }
384             catch(Exception JavaDoc ex) {
385             }
386             if(up_handler != null && up_handler.isAlive()) {
387                 up_handler.interrupt(); // still alive ? let's just kill it without mercy...
388
try {
389                     up_handler.join(THREAD_JOIN_TIMEOUT);
390                 }
391                 catch(Exception JavaDoc ex) {
392                 }
393                 if(up_handler != null && up_handler.isAlive())
394                     if(log.isErrorEnabled()) log.error("up_handler thread for " + getName() +
395                                                            " was interrupted (in order to be terminated), but is still alive");
396             }
397         }
398         up_handler=null;
399
400         down_queue.close(false); // this should terminate down_handler thread
401
if(down_handler != null && down_handler.isAlive()) {
402             try {
403                 down_handler.join(THREAD_JOIN_TIMEOUT);
404             }
405             catch(Exception JavaDoc ex) {
406             }
407             if(down_handler != null && down_handler.isAlive()) {
408                 down_handler.interrupt(); // still alive ? let's just kill it without mercy...
409
try {
410                     down_handler.join(THREAD_JOIN_TIMEOUT);
411                 }
412                 catch(Exception JavaDoc ex) {
413                 }
414                 if(down_handler != null && down_handler.isAlive())
415                     if(log.isErrorEnabled()) log.error("down_handler thread for " + getName() +
416                                                            " was interrupted (in order to be terminated), but is is still alive");
417             }
418         }
419         down_handler=null;
420     }
421
422
423     /**
424      * Internal method, should not be called by clients. Used by ProtocolStack. I would have
425      * used the 'friends' modifier, but this is available only in C++ ... If the up_handler thread
426      * is not available (down_thread == false), then directly call the up() method: we will run on the
427      * caller's thread (e.g. the protocol layer below us).
428      */

429     protected void receiveUpEvent(Event evt) {
430         if(up_handler == null) {
431             if(observer != null) { // call debugger hook (if installed)
432
if(observer.up(evt, up_queue.size()) == false) { // false means discard event
433
return;
434                 }
435             }
436             up(evt);
437             return;
438         }
439         try {
440             up_queue.add(evt);
441         }
442         catch(Exception JavaDoc e) {
443             if(log.isWarnEnabled()) log.warn("exception: " + e);
444         }
445     }
446
447     /**
448      * Internal method, should not be called by clients. Used by ProtocolStack. I would have
449      * used the 'friends' modifier, but this is available only in C++ ... If the down_handler thread
450      * is not available (down_thread == false), then directly call the down() method: we will run on the
451      * caller's thread (e.g. the protocol layer above us).
452      */

453     protected void receiveDownEvent(Event evt) {
454         if(down_handler == null) {
455             if(observer != null) { // call debugger hook (if installed)
456
if(observer.down(evt, down_queue.size()) == false) { // false means discard event
457
return;
458                 }
459             }
460             int type=evt.getType();
461             if(type == Event.START || type == Event.STOP) {
462                 if(handleSpecialDownEvent(evt) == false)
463                     return;
464             }
465             down(evt);
466             return;
467         }
468         try {
469             down_queue.add(evt);
470         }
471         catch(Exception JavaDoc e) {
472             if(log.isWarnEnabled()) log.warn("exception: " + e);
473         }
474     }
475
476     /**
477      * Causes the event to be forwarded to the next layer up in the hierarchy. Typically called
478      * by the implementation of <code>Up</code> (when done).
479      */

480     public void passUp(Event evt) {
481         if(observer != null) { // call debugger hook (if installed)
482
if(observer.passUp(evt) == false) { // false means don't pass up (=discard) event
483
return;
484             }
485         }
486
487         if(up_prot != null) {
488             up_prot.receiveUpEvent(evt);
489         }
490         else
491             if(log.isErrorEnabled()) log.error("no upper layer available");
492     }
493
494     /**
495      * Causes the event to be forwarded to the next layer down in the hierarchy.Typically called
496      * by the implementation of <code>Down</code> (when done).
497      */

498     public void passDown(Event evt) {
499         if(observer != null) { // call debugger hook (if installed)
500
if(observer.passDown(evt) == false) { // false means don't pass down (=discard) event
501
return;
502             }
503         }
504
505         if(down_prot != null) {
506             down_prot.receiveDownEvent(evt);
507         }
508         else
509             if(log.isErrorEnabled()) log.error("no lower layer available");
510     }
511
512
513     /**
514      * An event was received from the layer below. Usually the current layer will want to examine
515      * the event type and - depending on its type - perform some computation
516      * (e.g. removing headers from a MSG event type, or updating the internal membership list
517      * when receiving a VIEW_CHANGE event).
518      * Finally the event is either a) discarded, or b) an event is sent down
519      * the stack using <code>passDown()</code> or c) the event (or another event) is sent up
520      * the stack using <code>passUp()</code>.
521      */

522     public void up(Event evt) {
523         passUp(evt);
524     }
525
526     /**
527      * An event is to be sent down the stack. The layer may want to examine its type and perform
528      * some action on it, depending on the event's type. If the event is a message MSG, then
529      * the layer may need to add a header to it (or do nothing at all) before sending it down
530      * the stack using <code>passDown()</code>. In case of a GET_ADDRESS event (which tries to
531      * retrieve the stack's address from one of the bottom layers), the layer may need to send
532      * a new response event back up the stack using <code>passUp()</code>.
533      */

534     public void down(Event evt) {
535         passDown(evt);
536     }
537
538
539     /** These are special internal events that should not be handled by protocols
540      * @return boolean True: the event should be passed further down the stack. False: the event should
541      * be discarded (not passed down the stack)
542      */

543     protected boolean handleSpecialDownEvent(Event evt) {
544         switch(evt.getType()) {
545             case Event.START:
546                 try {
547                     start();
548
549                     // if we're the transport protocol, reply with a START_OK up the stack
550
if(down_prot == null) {
551                         passUp(new Event(Event.START_OK, Boolean.TRUE));
552                         return false; // don't pass down the stack
553
}
554                     else
555                         return true; // pass down the stack
556
}
557                 catch(Exception JavaDoc e) {
558                     passUp(new Event(Event.START_OK, new Exception JavaDoc("exception caused by " + getName() + ".start(): " + e)));
559                     return false;
560                 }
561             case Event.STOP:
562                 stop();
563                 if(down_prot == null) {
564                     passUp(new Event(Event.STOP_OK, Boolean.TRUE));
565                     return false; // don't pass down the stack
566
}
567                 else
568                     return true; // pass down the stack
569
default:
570                 return true; // pass down by default
571
}
572     }
573 }
574
Popular Tags