1 3 package org.jgroups.protocols; 4 5 import org.jgroups.Event; 6 import org.jgroups.Message; 7 import org.jgroups.stack.Protocol; 8 import org.jgroups.util.Util; 9 10 import java.util.Vector ; 11 12 13 22 23 public class QUEUE extends Protocol { 24 final Vector up_vec=new Vector (); 25 final Vector dn_vec=new Vector (); 26 boolean queueing_up=false, queueing_dn=false; 27 Observer observer=null; 28 29 30 public interface Observer { 31 36 boolean addingToUpVector(Event evt, int num_events); 37 38 43 boolean addingToDownVector(Event evt, int num_events); 44 } 45 46 48 public void setObserver(Observer observer) {this.observer=observer;} 49 50 public Vector getUpVector() {return up_vec;} 51 public Vector getDownVector() {return dn_vec;} 52 public boolean getQueueingUp() {return queueing_up;} 53 public boolean getQueueingDown() {return queueing_dn;} 54 55 56 57 public String getName() {return "QUEUE";} 58 59 60 public Vector providedUpServices() { 61 Vector ret=new Vector (); 62 ret.addElement(new Integer (Event.START_QUEUEING)); 63 ret.addElement(new Integer (Event.STOP_QUEUEING)); 64 return ret; 65 } 66 67 public Vector providedDownServices() { 68 Vector ret=new Vector (); 69 ret.addElement(new Integer (Event.START_QUEUEING)); 70 ret.addElement(new Integer (Event.STOP_QUEUEING)); 71 return ret; 72 } 73 74 75 76 77 81 public void up(Event evt) { 82 Message msg; 83 Vector event_list; Event e; 85 86 87 switch(evt.getType()) { 88 89 case Event.START_QUEUEING: if(log.isInfoEnabled()) log.info("received START_QUEUEING"); 91 queueing_up=true; 92 return; 93 94 case Event.STOP_QUEUEING: event_list=(Vector )evt.getArg(); 96 if(event_list != null) 97 for(int i=0; i < event_list.size(); i++) 98 passUp((Event)event_list.elementAt(i)); 99 100 if(log.isInfoEnabled()) log.info("replaying up events"); 101 102 for(int i=0; i < up_vec.size(); i++) { 103 e=(Event)up_vec.elementAt(i); 104 passUp(e); 105 } 106 107 up_vec.removeAllElements(); 108 queueing_up=false; 109 return; 110 } 111 112 if(queueing_up) { 113 { 114 if(log.isInfoEnabled()) log.info("queued up event " + evt); 115 } 116 if(observer != null) { 117 if(observer.addingToUpVector(evt, up_vec.size()) == false) 118 return; } 120 up_vec.addElement(evt); 121 } 122 else 123 passUp(evt); } 125 126 127 128 129 130 public void down(Event evt) { 131 Message msg; 132 Vector event_list; 134 switch(evt.getType()) { 135 136 case Event.START_QUEUEING: if(log.isInfoEnabled()) log.info("received START_QUEUEING"); 138 queueing_dn=true; 139 return; 140 141 case Event.STOP_QUEUEING: if(log.isInfoEnabled()) log.info("received STOP_QUEUEING"); 143 event_list=(Vector )evt.getArg(); 144 if(event_list != null) for(int i=0; i < event_list.size(); i++) 146 passDown((Event)event_list.elementAt(i)); 147 148 if(log.isInfoEnabled()) log.info("replaying down events ("+ dn_vec.size() +')'); 149 150 for(int i=0; i < dn_vec.size(); i++) { 151 passDown((Event)dn_vec.elementAt(i)); 152 } 153 154 dn_vec.removeAllElements(); 155 queueing_dn=false; 156 return; 157 } 158 159 if(queueing_dn) { 160 161 if(log.isInfoEnabled()) log.info("queued down event: " + Util.printEvent(evt)); 162 163 if(observer != null) { 164 if(observer.addingToDownVector(evt, dn_vec.size()) == false) 165 return; } 167 dn_vec.addElement(evt); 168 } 169 else 170 passDown(evt); } 172 173 174 175 } 176 | Popular Tags |