1 3 package org.jgroups.protocols.pbcast; 4 5 import org.jgroups.Address; 6 import org.jgroups.Event; 7 import org.jgroups.Message; 8 import org.jgroups.View; 9 import org.jgroups.stack.Protocol; 10 import org.jgroups.util.Util; 11 12 import java.util.Enumeration ; 13 import java.util.Hashtable ; 14 import java.util.Properties ; 15 import java.util.Vector ; 16 17 18 34 public class FD extends Protocol implements Runnable { 35 Address local_addr=null; 36 Thread checker=null; final Object checker_lock=new Object (); 38 long timeout=6000; final Hashtable members=new Hashtable (11); final Vector suspected_mbrs=new Vector (11); 43 44 static class Entry { 45 long timestamp; 46 47 Entry(long timestamp) { 48 this.timestamp=timestamp; 49 } 50 51 public String toString() { 52 return Long.toString(timestamp); 53 } 54 } 55 56 57 public String getName() { 58 return "FD"; 59 } 60 61 62 public boolean setProperties(Properties props) { 63 String str; 64 65 super.setProperties(props); 66 str=props.getProperty("timeout"); 67 if(str != null) { 68 timeout=Long.parseLong(str); 69 props.remove("timeout"); 70 } 71 72 if(props.size() > 0) { 73 System.err.println("FD.setProperties(): the following properties are not recognized:"); 74 props.list(System.out); 75 return false; 76 } 77 return true; 78 } 79 80 81 public void stop() { 82 stopChecker(); 83 } 84 85 public void up(Event evt) { 86 Message msg; 87 Address sender; 88 89 switch(evt.getType()) { 90 91 case Event.SET_LOCAL_ADDRESS: 92 local_addr=(Address)evt.getArg(); 93 break; 94 95 case Event.MSG: 96 msg=(Message)evt.getArg(); 97 sender=msg.getSrc(); 98 updateSender(sender); 99 break; 100 } 101 102 passUp(evt); } 104 105 106 public void down(Event evt) { 107 View v; 108 Vector mbrs; 109 Address mbr; 110 111 switch(evt.getType()) { 112 113 case Event.VIEW_CHANGE: 114 v=(View)evt.getArg(); 115 mbrs=v.getMembers(); 116 passDown(evt); 117 for(Enumeration e=members.keys(); e.hasMoreElements();) { 118 mbr=(Address)e.nextElement(); 119 if(!mbrs.contains(mbr)) { 120 members.remove(mbr); 121 continue; 122 } 123 } 124 members.remove(local_addr); 125 if(members.size() > 0 && checker == null) 126 startChecker(); 127 return; 128 129 case Event.HEARD_FROM: 131 updateSenders((Vector )evt.getArg()); 132 return; } 134 135 passDown(evt); 136 } 137 138 139 public void run() { 140 Address mbr; 141 long timestamp, diff; 142 143 while(checker != null && Thread.currentThread().equals(checker) && members.size() > 0) { 144 for(Enumeration e=members.keys(); e.hasMoreElements();) { 145 mbr=(Address)e.nextElement(); 146 timestamp=((Entry)members.get(mbr)).timestamp; 147 diff=System.currentTimeMillis() - timestamp; 148 if(diff >= timeout) { 149 if(log.isInfoEnabled()) log.info("suspecting " + mbr); 150 passUp(new Event(Event.SUSPECT, mbr)); 151 if(!suspected_mbrs.contains(mbr)) 152 suspected_mbrs.addElement(mbr); 153 } 154 } 155 Util.sleep(timeout); 156 } 157 checker=null; 158 } 159 160 161 void startChecker() { 162 synchronized(checker_lock) { 163 if(checker == null) { 164 checker=new Thread (this, "FD.CheckerThread"); 165 checker.setDaemon(true); 166 checker.start(); 167 } 168 } 169 } 170 171 void stopChecker() { 172 Thread tmp=null; 173 synchronized(checker_lock) { 174 if(checker != null && checker.isAlive()) { 175 tmp=checker; 176 checker=null; 177 tmp.interrupt(); 178 try { 179 tmp.join(timeout); 180 } 181 catch(Exception ex) { 182 } 183 if(tmp.isAlive()) 184 if(log.isWarnEnabled()) log.warn("interrupted checker thread is still alive !"); 185 } 186 checker=null; 187 } 188 } 189 190 191 void updateSender(Address mbr) { 192 Entry entry; 193 long curr_time=0; 194 195 if(mbr == null) { 196 if(log.isDebugEnabled()) log.debug("member " + mbr + " not found"); 197 return; 198 } 199 200 if(suspected_mbrs.size() > 0 && suspected_mbrs.contains(mbr)) { 201 passUp(new Event(Event.UNSUSPECT, mbr)); 202 suspected_mbrs.remove(mbr); 203 } 204 205 if(mbr.equals(local_addr)) 206 return; 207 entry=(Entry)members.get(mbr); 208 curr_time=System.currentTimeMillis(); 209 if(entry != null) 210 entry.timestamp=curr_time; 211 else 212 members.put(mbr, new Entry(curr_time)); 213 } 214 215 216 void updateSenders(Vector v) { 217 Address mbr; 218 if(v == null) return; 219 for(int i=0; i < v.size(); i++) { 220 mbr=(Address)v.elementAt(i); 221 updateSender(mbr); 222 } 223 } 224 225 226 String printTimestamps() { 227 StringBuffer sb=new StringBuffer (); 228 Address mbr; 229 230 synchronized(members) { 231 for(Enumeration e=members.keys(); e.hasMoreElements();) { 232 mbr=(Address)e.nextElement(); 233 sb.append("\n" + mbr + ": " + (System.currentTimeMillis() - ((Entry)members.get(mbr)).timestamp)); 234 } 235 } 236 return sb.toString(); 237 } 238 239 240 } 241 | Popular Tags |