KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > QUEUE


1 // $Id: QUEUE.java,v 1.7 2004/09/23 16:29:42 belaban Exp $
2

3 package org.jgroups.protocols;
4
5 import org.jgroups.Event;
6 import org.jgroups.Message;
7 import org.jgroups.stack.Protocol;
8 import org.jgroups.util.Util;
9
10 import java.util.Vector JavaDoc;
11
12
13 /**
14  * Queuing layer. Upon reception of event START_QUEUEING, all events traveling through
15  * this layer upwards/downwards (depending on direction of event) will be queued. Upon
16  * reception of a STOP_QUEUEING event, all events will be released. Finally, the
17  * queueing flag is reset.
18  * When queueing, only event STOP_QUEUEING (received up or downwards) will be allowed
19  * to release queueing.
20  * @author Bela Ban
21  */

22
23 public class QUEUE extends Protocol {
24     final Vector JavaDoc up_vec=new Vector JavaDoc();
25     final Vector JavaDoc dn_vec=new Vector JavaDoc();
26     boolean queueing_up=false, queueing_dn=false;
27     Observer observer=null;
28
29
30     public interface Observer {
31     /** Called before event is added. Blocks until call returns.
32         @param evt The event
33         @param num_events The number of events in the up vector <em>before</em>
34         this event is added
35         @return boolean True if event should be added. False if it should be discarded */

36     boolean addingToUpVector(Event evt, int num_events);
37
38     /** Called before event is added. Blocks until call returns.
39         @param evt The event
40         @param num_events The number of events in the down vector <em>before</em>
41         this event is added
42         @return boolean True if event should be added. False if it should be discarded */

43     boolean addingToDownVector(Event evt, int num_events);
44     }
45
46     /** Only 1 observer is allowed. More than one might slow down the system. Will be called
47     when an event is queued (up or down) */

48     public void setObserver(Observer observer) {this.observer=observer;}
49
50     public Vector JavaDoc getUpVector() {return up_vec;}
51     public Vector JavaDoc getDownVector() {return dn_vec;}
52     public boolean getQueueingUp() {return queueing_up;}
53     public boolean getQueueingDown() {return queueing_dn;}
54
55
56     /** All protocol names have to be unique ! */
57     public String JavaDoc getName() {return "QUEUE";}
58
59
60     public Vector JavaDoc providedUpServices() {
61     Vector JavaDoc ret=new Vector JavaDoc();
62     ret.addElement(new Integer JavaDoc(Event.START_QUEUEING));
63     ret.addElement(new Integer JavaDoc(Event.STOP_QUEUEING));
64     return ret;
65     }
66
67     public Vector JavaDoc providedDownServices() {
68     Vector JavaDoc ret=new Vector JavaDoc();
69     ret.addElement(new Integer JavaDoc(Event.START_QUEUEING));
70     ret.addElement(new Integer JavaDoc(Event.STOP_QUEUEING));
71     return ret;
72     }
73
74
75
76
77     /**
78        Queues or passes up events. No queue sync. necessary, as this method is never called
79        concurrently.
80      */

81     public void up(Event evt) {
82     Message msg;
83     Vector JavaDoc event_list; // to be passed up *before* replaying event queue
84
Event e;
85
86
87     switch(evt.getType()) {
88
89     case Event.START_QUEUEING: // start queueing all up events
90
if(log.isInfoEnabled()) log.info("received START_QUEUEING");
91         queueing_up=true;
92         return;
93
94     case Event.STOP_QUEUEING: // stop queueing all up events
95
event_list=(Vector JavaDoc)evt.getArg();
96         if(event_list != null)
97         for(int i=0; i < event_list.size(); i++)
98             passUp((Event)event_list.elementAt(i));
99         
100          if(log.isInfoEnabled()) log.info("replaying up events");
101         
102         for(int i=0; i < up_vec.size(); i++) {
103         e=(Event)up_vec.elementAt(i);
104         passUp(e);
105         }
106
107         up_vec.removeAllElements();
108         queueing_up=false;
109         return;
110     }
111     
112     if(queueing_up) {
113          {
114         if(log.isInfoEnabled()) log.info("queued up event " + evt);
115         }
116         if(observer != null) {
117         if(observer.addingToUpVector(evt, up_vec.size()) == false)
118             return; // discard event (don't queue)
119
}
120         up_vec.addElement(evt);
121     }
122     else
123         passUp(evt); // Pass up to the layer above us
124
}
125
126
127
128
129     
130     public void down(Event evt) {
131     Message msg;
132     Vector JavaDoc event_list; // to be passed down *before* replaying event queue
133

134     switch(evt.getType()) {
135         
136     case Event.START_QUEUEING: // start queueing all down events
137
if(log.isInfoEnabled()) log.info("received START_QUEUEING");
138         queueing_dn=true;
139         return;
140
141     case Event.STOP_QUEUEING: // stop queueing all down events
142
if(log.isInfoEnabled()) log.info("received STOP_QUEUEING");
143         event_list=(Vector JavaDoc)evt.getArg();
144         if(event_list != null) // play events first (if available)
145
for(int i=0; i < event_list.size(); i++)
146             passDown((Event)event_list.elementAt(i));
147         
148          if(log.isInfoEnabled()) log.info("replaying down events ("+ dn_vec.size() +')');
149         
150         for(int i=0; i < dn_vec.size(); i++) {
151         passDown((Event)dn_vec.elementAt(i));
152         }
153
154         dn_vec.removeAllElements();
155         queueing_dn=false;
156         return;
157     }
158         
159     if(queueing_dn) {
160
161         if(log.isInfoEnabled()) log.info("queued down event: " + Util.printEvent(evt));
162
163         if(observer != null) {
164         if(observer.addingToDownVector(evt, dn_vec.size()) == false)
165             return; // discard event (don't queue)
166
}
167         dn_vec.addElement(evt);
168     }
169     else
170         passDown(evt); // Pass up to the layer below us
171
}
172
173
174
175 }
176
Popular Tags