1 3 package org.jgroups.protocols; 4 5 import org.jgroups.*; 6 import org.jgroups.stack.IpAddress; 7 import org.jgroups.stack.Protocol; 8 import org.jgroups.util.Promise; 9 import org.jgroups.util.TimeScheduler; 10 import org.jgroups.util.Util; 11 12 import java.io.IOException ; 13 import java.io.ObjectInput ; 14 import java.io.ObjectOutput ; 15 import java.net.InetAddress ; 16 import java.util.Enumeration ; 17 import java.util.Hashtable ; 18 import java.util.Properties ; 19 import java.util.Vector ; 20 21 22 40 public class FD_PID extends Protocol { 41 Address ping_dest=null; int ping_pid=0; Address local_addr=null; int local_pid=0; long timeout=3000; long get_pids_timeout=3000; final long get_pids_retry_timeout=500; int num_tries=3; final Vector members=new Vector (); final Hashtable pids=new Hashtable (); boolean own_pid_sent=false; final Vector pingable_mbrs=new Vector (); final Promise get_pids_promise=new Promise(); boolean got_cache_from_coord=false; TimeScheduler timer=null; Monitor monitor=null; 58 59 public String getName() { 60 return "FD_PID"; 61 } 62 63 64 public boolean setProperties(Properties props) { 65 String str; 66 67 super.setProperties(props); 68 str=props.getProperty("timeout"); 69 if(str != null) { 70 timeout=Long.parseLong(str); 71 props.remove("timeout"); 72 } 73 74 str=props.getProperty("get_pids_timeout"); 75 if(str != null) { 76 get_pids_timeout=Long.parseLong(str); 77 props.remove("get_pids_timeout"); 78 } 79 80 str=props.getProperty("num_tries"); 81 if(str != null) { 82 num_tries=Integer.parseInt(str); 83 props.remove("num_tries"); 84 } 85 86 if(props.size() > 0) { 87 System.err.println("FD_PID.setProperties(): the following properties are not recognized:"); 88 props.list(System.out); 89 return false; 90 } 91 return true; 92 } 93 94 95 public void start() throws Exception { 96 if(stack != null && stack.timer != null) 97 timer=stack.timer; 98 else { 99 if(log.isWarnEnabled()) log.warn("TimeScheduler in protocol stack is null (or protocol stack is null)"); 100 return; 101 } 102 103 if(monitor != null && monitor.started == false) { 104 monitor=null; 105 } 106 if(monitor == null) { 107 monitor=new Monitor(); 108 timer.add(monitor, true); } 110 } 111 112 public void stop() { 113 if(monitor != null) { 114 monitor.stop(); 115 monitor=null; 116 } 117 } 118 119 120 public void up(Event evt) { 121 Message msg; 122 FdHeader hdr=null; 123 Object tmphdr; 124 125 switch(evt.getType()) { 126 127 case Event.SET_LOCAL_ADDRESS: 128 local_addr=(Address)evt.getArg(); 129 break; 130 131 case Event.MSG: 132 msg=(Message)evt.getArg(); 133 tmphdr=msg.getHeader(getName()); 134 if(tmphdr == null || !(tmphdr instanceof FdHeader)) 135 break; 137 hdr=(FdHeader)msg.removeHeader(getName()); 138 139 switch(hdr.type) { 140 141 case FdHeader.SUSPECT: 142 if(hdr.mbr != null) { 143 144 if(log.isInfoEnabled()) log.info("[SUSPECT] hdr: " + hdr); 145 passUp(new Event(Event.SUSPECT, hdr.mbr)); 146 passDown(new Event(Event.SUSPECT, hdr.mbr)); 147 } 148 break; 149 150 case FdHeader.WHO_HAS_PID: 152 if(local_addr != null && local_addr.equals(msg.getSrc())) 153 return; 155 if(hdr.mbr == null) { 156 if(log.isErrorEnabled()) log.error("[WHO_HAS_PID] hdr.mbr is null"); 157 return; 158 } 159 160 if(local_addr != null && local_addr.equals(hdr.mbr) && local_pid > 0) { 162 sendIHavePidMessage(msg.getSrc(), hdr.mbr, local_pid); return; 164 } 165 166 if(pids.containsKey(hdr.mbr)) 168 sendIHavePidMessage(msg.getSrc(), hdr.mbr, ((Integer )pids.get(hdr.mbr)).intValue()); break; 170 171 172 case FdHeader.I_HAVE_PID: 174 175 if(log.isInfoEnabled()) log.info("i-have pid: " + hdr.mbr + " --> " + hdr.pid); 176 177 if(hdr.mbr == null || hdr.pid <= 0) { 178 if(log.isErrorEnabled()) log.error("[I_HAVE_PID] hdr.mbr is null or hdr.pid == 0"); 179 return; 180 } 181 182 if(!sameHost(local_addr, hdr.mbr)) { 183 if(log.isErrorEnabled()) 184 log.error(hdr.mbr + " is not on the same host as I (" + 185 local_addr + ", discarding I_HAVE_PID event"); 186 return; 187 } 188 189 pids.put(hdr.mbr, new Integer (hdr.pid)); 192 if(log.isInfoEnabled()) log.info("[" + local_addr + "]: the cache is " + pids); 193 194 if(ping_pid <= 0 && ping_dest != null && pids.containsKey(ping_dest)) { 195 ping_pid=((Integer )pids.get(ping_dest)).intValue(); 196 try { 197 start(); 198 } 199 catch(Exception ex) { 200 if(log.isWarnEnabled()) log.warn("exception when calling start(): " + ex); 201 } 202 } 203 break; 204 205 case FdHeader.GET_PIDS: 207 if(hdr.mbr == null) { 208 209 if(log.isErrorEnabled()) log.error("[GET_PIDS]: hdr.mbr == null"); 210 return; 211 } 212 hdr=new FdHeader(FdHeader.GET_PIDS_RSP); 213 hdr.pids=(Hashtable )pids.clone(); 214 msg=new Message(hdr.mbr, null, null); 215 msg.putHeader(getName(), hdr); 216 passDown(new Event(Event.MSG, msg)); 217 break; 218 219 case FdHeader.GET_PIDS_RSP: 220 if(hdr.pids == null) { 221 222 if(log.isErrorEnabled()) log.error("[GET_PIDS_RSP]: cache is null"); 223 return; 224 } 225 get_pids_promise.setResult(hdr.pids); 226 break; 227 } 228 return; 229 } 230 231 passUp(evt); } 233 234 235 public void down(Event evt) { 236 Integer pid; 237 Address mbr, tmp_ping_dest; 238 View v; 239 240 241 switch(evt.getType()) { 242 243 case Event.SET_PID: 244 pid=(Integer )evt.getArg(); 246 if(pid == null) { 247 if(log.isErrorEnabled()) log.error("SET_PID did not contain a pid !"); 248 return; 249 } 250 local_pid=pid.intValue(); 251 252 if(log.isInfoEnabled()) log.info("local_pid=" + local_pid); 253 break; 254 255 case Event.VIEW_CHANGE: 256 synchronized(this) { 257 v=(View)evt.getArg(); 258 members.removeAllElements(); 259 members.addAll(v.getMembers()); 260 pingable_mbrs.removeAllElements(); 261 pingable_mbrs.addAll(members); 262 passDown(evt); 263 264 265 if(!got_cache_from_coord) { 267 getPidsFromCoordinator(); 268 got_cache_from_coord=true; 269 } 270 271 272 if(!own_pid_sent) { 274 if(local_pid > 0) { 275 sendIHavePidMessage(null, local_addr, 277 local_pid); 278 own_pid_sent=true; 279 } 280 else 281 if(log.isWarnEnabled()) log.warn("[VIEW_CHANGE]: local_pid == 0"); 282 } 283 284 if(members != null) { 286 for(Enumeration e=pids.keys(); e.hasMoreElements();) { 287 mbr=(Address)e.nextElement(); 288 if(!members.contains(mbr)) 289 pids.remove(mbr); 290 } 291 } 292 tmp_ping_dest=determinePingDest(); 293 ping_pid=0; 294 if(tmp_ping_dest == null) { 295 stop(); 296 ping_dest=null; 297 } 298 else { 299 ping_dest=tmp_ping_dest; 300 try { 301 start(); 302 } 303 catch(Exception ex) { 304 if(log.isWarnEnabled()) log.warn("exception when calling start(): " + ex); 305 } 306 } 307 } 308 break; 309 310 default: 311 passDown(evt); 312 break; 313 } 314 } 315 316 317 318 319 320 321 322 323 324 325 326 330 void getPidsFromCoordinator() { 331 Address coord; 332 int attempts=num_tries; 333 Message msg; 334 FdHeader hdr; 335 Hashtable result; 336 337 get_pids_promise.reset(); 338 while(attempts > 0) { 339 if((coord=determineCoordinator()) != null) { 340 if(coord.equals(local_addr)) { 342 if(log.isInfoEnabled()) log.info("first member; cache is empty"); 343 return; 344 } 345 hdr=new FdHeader(FdHeader.GET_PIDS); 346 hdr.mbr=local_addr; 347 msg=new Message(coord, null, null); 348 msg.putHeader(getName(), hdr); 349 passDown(new Event(Event.MSG, msg)); 350 result=(Hashtable )get_pids_promise.getResult(get_pids_timeout); 351 if(result != null) { 352 pids.putAll(result); 354 if(log.isInfoEnabled()) 355 log.info("got cache from " + 356 coord + ": cache is " + pids); 357 return; 358 } 359 else { 360 361 if(log.isErrorEnabled()) log.error("received null cache; retrying"); 362 } 363 } 364 365 Util.sleep(get_pids_retry_timeout); 366 --attempts; 367 } 368 } 369 370 371 void broadcastSuspectMessage(Address suspected_mbr) { 372 Message suspect_msg; 373 FdHeader hdr; 374 375 376 if(log.isInfoEnabled()) 377 log.info("suspecting " + suspected_mbr + 378 " (own address is " + local_addr + ')'); 379 380 hdr=new FdHeader(FdHeader.SUSPECT); 381 hdr.mbr=suspected_mbr; 382 suspect_msg=new Message(); suspect_msg.putHeader(getName(), hdr); 384 passDown(new Event(Event.MSG, suspect_msg)); 385 } 386 387 388 void broadcastWhoHasPidMessage(Address mbr) { 389 Message msg; 390 FdHeader hdr; 391 392 if(local_addr != null && mbr != null) 393 if(log.isInfoEnabled()) log.info("[" + local_addr + "]: who-has " + mbr); 394 395 msg=new Message(); hdr=new FdHeader(FdHeader.WHO_HAS_PID); 397 hdr.mbr=mbr; 398 msg.putHeader(getName(), hdr); 399 passDown(new Event(Event.MSG, msg)); 400 } 401 402 403 407 void sendIHavePidMessage(Address dst, Address mbr, int pid) { 408 Message msg=new Message(dst, null, null); 409 FdHeader hdr=new FdHeader(FdHeader.I_HAVE_PID); 410 hdr.mbr=mbr; 411 hdr.pid=pid; 412 msg.putHeader(getName(), hdr); 413 passDown(new Event(Event.MSG, msg)); 414 } 415 416 417 420 Address determinePingDest() { 421 Address tmp; 422 423 if(pingable_mbrs == null || pingable_mbrs.size() < 2 || local_addr == null) 424 return null; 425 for(int i=0; i < pingable_mbrs.size(); i++) { 426 tmp=(Address)pingable_mbrs.elementAt(i); 427 if(local_addr.equals(tmp)) { 428 if(i + 1 >= pingable_mbrs.size()) 429 return (Address)pingable_mbrs.elementAt(0); 430 else 431 return (Address)pingable_mbrs.elementAt(i + 1); 432 } 433 } 434 return null; 435 } 436 437 438 Address determineCoordinator() { 439 return members.size() > 0 ? (Address)members.elementAt(0) : null; 440 } 441 442 443 446 boolean sameHost(Address one, Address two) { 447 InetAddress a, b; 448 String host_a, host_b; 449 450 if(one == null || two == null) return false; 451 if(!(one instanceof IpAddress) || !(two instanceof IpAddress)) { 452 if(log.isErrorEnabled()) log.error("addresses have to be of type IpAddress to be compared"); 453 return false; 454 } 455 456 a=((IpAddress)one).getIpAddress(); 457 b=((IpAddress)two).getIpAddress(); 458 if(a == null || b == null) return false; 459 host_a=a.getHostAddress(); 460 host_b=b.getHostAddress(); 461 return host_a.equals(host_b); 462 } 463 464 465 466 467 468 469 public static class FdHeader extends Header { 470 static final int SUSPECT=10; 471 static final int WHO_HAS_PID=11; 472 static final int I_HAVE_PID=12; 473 static final int GET_PIDS=13; static final int GET_PIDS_RSP=14; 476 477 int type=SUSPECT; 478 Address mbr=null; int pid=0; Hashtable pids=null; 482 483 public FdHeader() { 484 } 486 FdHeader(int type) { 487 this.type=type; 488 } 489 490 491 public String toString() { 492 StringBuffer sb=new StringBuffer (); 493 sb.append(type2String(type)); 494 if(mbr != null) 495 sb.append(", mbr=" + mbr); 496 if(pid > 0) 497 sb.append(", pid=" + pid); 498 if(pids != null) 499 sb.append(", pids=" + pids); 500 return sb.toString(); 501 } 502 503 504 public static String type2String(int type) { 505 switch(type) { 506 case SUSPECT: 507 return "SUSPECT"; 508 case WHO_HAS_PID: 509 return "WHO_HAS_PID"; 510 case I_HAVE_PID: 511 return "I_HAVE_PID"; 512 case GET_PIDS: 513 return "GET_PIDS"; 514 case GET_PIDS_RSP: 515 return "GET_PIDS_RSP"; 516 default: 517 return "unknown type (" + type + ')'; 518 } 519 } 520 521 public void writeExternal(ObjectOutput out) throws IOException { 522 out.writeInt(type); 523 out.writeObject(mbr); 524 out.writeInt(pid); 525 out.writeObject(pids); 526 } 527 528 529 public void readExternal(ObjectInput in) throws IOException , ClassNotFoundException { 530 type=in.readInt(); 531 mbr=(Address)in.readObject(); 532 pid=in.readInt(); 533 pids=(Hashtable )in.readObject(); 534 } 535 536 } 537 538 539 544 private class Monitor implements TimeScheduler.Task { 545 boolean started=true; 546 547 548 void stop() { 549 started=false; 550 } 551 552 553 554 555 public boolean cancelled() { 556 return !started; 557 } 558 559 560 public long nextInterval() { 561 return timeout; 562 } 563 564 565 569 public void run() { 570 if(ping_dest == null) { 571 if(log.isWarnEnabled()) log.warn("ping_dest is null, skipping ping"); 572 return; 573 } 574 575 576 if(log.isInfoEnabled()) 577 log.info("ping_dest=" + ping_dest + ", ping_pid=" + ping_pid + 578 ", cache=" + pids); 579 580 if(ping_pid <= 0) { 582 if(ping_dest != null && pids.containsKey(ping_dest)) { 583 ping_pid=((Integer )pids.get(ping_dest)).intValue(); 584 585 if(log.isInfoEnabled()) 586 log.info("found PID for " + 587 ping_dest + " in cache (pid=" + ping_pid + ')'); 588 } 589 else { 590 591 if(log.isErrorEnabled()) 592 log.error("PID for " + ping_dest + " not known" + 593 ", cache is " + pids); 594 broadcastWhoHasPidMessage(ping_dest); 595 return; 596 } 597 } 598 599 if(!Util.fileExists("/proc/" + ping_pid)) { 600 601 if(log.isInfoEnabled()) log.info("process " + ping_pid + " does not exist"); 602 broadcastSuspectMessage(ping_dest); 603 pingable_mbrs.removeElement(ping_dest); 604 ping_dest=determinePingDest(); 605 if(ping_dest == null) 606 stop(); 607 ping_pid=0; 608 } 609 else { 610 611 if(log.isInfoEnabled()) log.info(ping_dest + " is alive"); 612 } 613 } 614 615 616 617 } 618 619 } 620 | Popular Tags |