1 2 package org.jgroups.protocols; 3 4 import org.jgroups.*; 5 import org.jgroups.stack.Protocol; 6 7 import java.util.*; 8 9 10 28 public abstract class Discovery extends Protocol { 29 final Vector members=new Vector(11); 30 final Set members_set=new HashSet(11); Address local_addr=null; 32 String group_addr=null; 33 long timeout=3000; 34 int num_initial_members=2; 35 boolean is_server=false; 36 PingWaiter ping_waiter; 37 38 39 40 int num_ping_requests=2; 41 42 43 public abstract String getName(); 44 45 46 public void localAddressSet(Address addr) { 47 ; 48 } 49 50 public abstract void sendGetMembersRequest(); 51 52 53 54 public void handleConnectOK() { 55 ; 56 } 57 58 public void handleDisconnect() { 59 ; 60 } 61 62 public void handleConnect() { 63 ; 64 } 65 66 67 public Vector providedUpServices() { 68 Vector ret=new Vector(1); 69 ret.addElement(new Integer (Event.FIND_INITIAL_MBRS)); 70 return ret; 71 } 72 73 82 public boolean setProperties(Properties props) { 83 String str; 84 85 super.setProperties(props); 86 str=props.getProperty("timeout"); if(str != null) { 88 timeout=Long.parseLong(str); 89 if(timeout <= 0) { 90 if(log.isErrorEnabled()) log.error("timeout must be > 0"); 91 return false; 92 } 93 props.remove("timeout"); 94 } 95 96 str=props.getProperty("num_initial_members"); if(str != null) { 98 num_initial_members=Integer.parseInt(str); 99 props.remove("num_initial_members"); 100 } 101 102 str=props.getProperty("num_ping_requests"); if(str != null) { 104 num_ping_requests=Integer.parseInt(str); 105 props.remove("num_ping_requests"); 106 if(num_ping_requests < 1) 107 num_ping_requests=1; 108 } 109 110 if(props.size() > 0) { 111 StringBuffer sb=new StringBuffer (); 112 for(Enumeration e=props.propertyNames(); e.hasMoreElements();) { 113 sb.append(e.nextElement().toString()); 114 if(e.hasMoreElements()) { 115 sb.append(", "); 116 } 117 } 118 if(log.isErrorEnabled()) log.error("The following properties are not recognized: " + sb); 119 return false; 120 } 121 return true; 122 } 123 124 public void start() throws Exception { 125 super.start(); 126 PingSender ping_sender=new PingSender(timeout, num_ping_requests, this); 127 if(ping_waiter == null) 128 ping_waiter=new PingWaiter(timeout, num_initial_members, this, ping_sender); 129 } 130 131 public void stop() { 132 is_server=false; 133 if(ping_waiter != null) 134 ping_waiter.stop(); 135 } 136 137 138 159 160 public void up(Event evt) { 161 Message msg, rsp_msg; 162 Object obj; 163 PingHeader hdr, rsp_hdr; 164 PingRsp rsp; 165 Address coord; 166 167 switch(evt.getType()) { 168 169 case Event.MSG: 170 msg=(Message)evt.getArg(); 171 obj=msg.getHeader(getName()); 172 if(obj == null || !(obj instanceof PingHeader)) { 173 passUp(evt); 174 return; 175 } 176 hdr=(PingHeader)msg.removeHeader(getName()); 177 178 switch(hdr.type) { 179 180 case PingHeader.GET_MBRS_REQ: if(local_addr != null && msg.getSrc() != null && local_addr.equals(msg.getSrc())) { 182 if(log.isTraceEnabled()) 183 log.trace("discarded my own discovery request"); 184 return; 185 } 186 synchronized(members) { 187 coord=members.size() > 0 ? (Address)members.firstElement() : local_addr; 188 } 189 190 PingRsp ping_rsp=new PingRsp(local_addr, coord, is_server); 191 rsp_msg=new Message(msg.getSrc(), null, null); 192 rsp_hdr=new PingHeader(PingHeader.GET_MBRS_RSP, ping_rsp); 193 rsp_msg.putHeader(getName(), rsp_hdr); 194 if(log.isTraceEnabled()) 195 log.trace("received GET_MBRS_REQ from " + msg.getSrc() + ", sending response " + rsp_hdr); 196 passDown(new Event(Event.MSG, rsp_msg)); 197 return; 198 199 case PingHeader.GET_MBRS_RSP: rsp=hdr.arg; 201 202 if(log.isTraceEnabled()) 203 log.trace("received GET_MBRS_RSP, rsp=" + rsp); 204 ping_waiter.addResponse(rsp); 205 return; 206 207 default: 208 if(log.isWarnEnabled()) log.warn("got PING header with unknown type (" + hdr.type + ')'); 209 return; 210 } 211 212 213 case Event.SET_LOCAL_ADDRESS: 214 passUp(evt); 215 local_addr=(Address)evt.getArg(); 216 localAddressSet(local_addr); 217 break; 218 219 case Event.CONNECT_OK: 220 handleConnectOK(); 221 passUp(evt); 222 break; 223 224 default: 225 passUp(evt); break; 227 } 228 } 229 230 231 244 public void down(Event evt) { 245 246 switch(evt.getType()) { 247 248 case Event.FIND_INITIAL_MBRS: ping_waiter.start(); 251 break; 252 253 case Event.TMP_VIEW: 254 case Event.VIEW_CHANGE: 255 Vector tmp; 256 if((tmp=((View)evt.getArg()).getMembers()) != null) { 257 synchronized(members) { 258 members.clear(); 259 members.addAll(tmp); 260 members_set.clear(); 261 members_set.addAll(tmp); 262 } 263 } 264 passDown(evt); 265 break; 266 267 case Event.BECOME_SERVER: passDown(evt); 269 is_server=true; 270 break; 271 272 case Event.CONNECT: 273 group_addr=(String )evt.getArg(); 274 passDown(evt); 275 handleConnect(); 276 break; 277 278 case Event.DISCONNECT: 279 handleDisconnect(); 280 passDown(evt); 281 break; 282 283 default: 284 passDown(evt); break; 286 } 287 } 288 289 290 291 292 293 294 protected View makeView(Vector mbrs) { 295 Address coord=null; 296 long id=0; 297 ViewId view_id=new ViewId(local_addr); 298 299 coord=view_id.getCoordAddress(); 300 id=view_id.getId(); 301 302 return new View(coord, id, mbrs); 303 } 304 305 306 307 } 308 | Popular Tags |