1 3 package org.jgroups.protocols; 4 5 6 import org.jgroups.Address; 7 import org.jgroups.Event; 8 import org.jgroups.View; 9 import org.jgroups.stack.Protocol; 10 import org.jgroups.util.Promise; 11 import org.jgroups.util.Util; 12 13 import java.util.Properties ; 14 import java.util.Vector ; 15 16 17 18 19 44 public class MERGE2 extends Protocol { 45 Address local_addr=null; 46 FindSubgroups task=null; private final Object task_lock=new Object (); 48 long min_interval=5000; long max_interval=20000; boolean is_coord=false; 51 final Promise find_promise=new Promise(); 53 54 boolean use_separate_thread=false; 55 56 57 public String getName() { 58 return "MERGE2"; 59 } 60 61 62 public boolean setProperties(Properties props) { 63 String str; 64 65 super.setProperties(props); 66 str=props.getProperty("min_interval"); 67 if(str != null) { 68 min_interval=Long.parseLong(str); 69 props.remove("min_interval"); 70 } 71 72 str=props.getProperty("max_interval"); 73 if(str != null) { 74 max_interval=Long.parseLong(str); 75 props.remove("max_interval"); 76 } 77 78 if(min_interval <= 0 || max_interval <= 0) { 79 if(log.isErrorEnabled()) log.error("min_interval and max_interval have to be > 0"); 80 return false; 81 } 82 if(max_interval <= min_interval) { 83 if(log.isErrorEnabled()) log.error("max_interval has to be greater than min_interval"); 84 return false; 85 } 86 87 str=props.getProperty("use_separate_thread"); 88 if(str != null) { 89 use_separate_thread=Boolean.valueOf(str).booleanValue(); 90 props.remove("use_separate_thread"); 91 } 92 93 if(props.size() > 0) { 94 System.err.println("MERGE2.setProperties(): the following properties are not recognized:"); 95 props.list(System.out); 96 return false; 97 } 98 return true; 99 } 100 101 102 public Vector requiredDownServices() { 103 Vector retval=new Vector (1); 104 retval.addElement(new Integer (Event.FIND_INITIAL_MBRS)); 105 return retval; 106 } 107 108 109 public void stop() { 110 is_coord=false; 111 stopTask(); 112 } 113 114 115 118 public void startUpHandler() { 119 ; 120 } 121 122 123 126 public void startDownHandler() { 127 ; 128 } 129 130 131 public void up(Event evt) { 132 switch(evt.getType()) { 133 134 case Event.SET_LOCAL_ADDRESS: 135 local_addr=(Address)evt.getArg(); 136 passUp(evt); 137 break; 138 139 case Event.FIND_INITIAL_MBRS_OK: 140 find_promise.setResult(evt.getArg()); 141 passUp(evt); break; 143 144 default: 145 passUp(evt); break; 147 } 148 } 149 150 151 public void down(Event evt) { 152 Vector mbrs=null; 153 Address coord; 154 155 switch(evt.getType()) { 156 157 case Event.VIEW_CHANGE: 158 passDown(evt); 159 mbrs=((View)evt.getArg()).getMembers(); 160 if(mbrs == null || mbrs.size() == 0 || local_addr == null) { 161 stopTask(); 162 break; 163 } 164 coord=(Address)mbrs.elementAt(0); 165 if(coord.equals(local_addr)) { 166 is_coord=true; 167 startTask(); } 169 else { 170 if(is_coord) { 173 is_coord=false; 174 } 175 stopTask(); 176 } 177 break; 178 179 default: 180 passDown(evt); break; 182 } 183 } 184 185 186 187 void startTask() { 188 synchronized(task_lock) { 189 if(task == null) 190 task=new FindSubgroups(); 191 task.start(); 192 } 193 } 194 195 void stopTask() { 196 synchronized(task_lock) { 197 if(task != null) { 198 task.stop(); task=null; 200 } 201 } 202 } 203 204 205 206 207 208 213 private class FindSubgroups implements Runnable { 214 Thread thread=null; 215 216 217 public void start() { 218 if(thread == null || !thread.isAlive()) { 219 thread=new Thread (this, "MERGE2.FindSubgroups thread"); 220 thread.setDaemon(true); 221 thread.start(); 222 } 223 } 224 225 226 public void stop() { 227 if(thread != null) { 228 Thread tmp=thread; 229 thread=null; 230 tmp.interrupt(); find_promise.reset(); 232 } 233 thread=null; 234 } 235 236 237 public void run() { 238 long interval; 239 Vector coords=null; 240 Vector initial_mbrs; 241 242 if(log.isDebugEnabled()) log.debug("merge task started as I'm the coordinator"); 243 while(thread != null && Thread.currentThread().equals(thread)) { 244 interval=computeInterval(); 245 Util.sleep(interval); 246 if(thread == null) break; 247 initial_mbrs=findInitialMembers(); 248 if(thread == null) break; 249 if(log.isDebugEnabled()) log.debug("initial_mbrs=" + initial_mbrs); 250 coords=detectMultipleCoordinators(initial_mbrs); 251 if(coords != null && coords.size() > 1) { 252 if(log.isDebugEnabled()) 253 log.debug("found multiple coordinators: " + coords + "; sending up MERGE event"); 254 final Event evt=new Event(Event.MERGE, coords); 255 if(use_separate_thread) { 256 Thread merge_notifier=new Thread () { 257 public void run() { 258 passUp(evt); 259 } 260 }; 261 merge_notifier.setDaemon(true); 262 merge_notifier.setName("merge notifier thread"); 263 merge_notifier.start(); 264 } 265 else { 266 passUp(evt); 267 } 268 } 269 else { 270 if(log.isTraceEnabled()) 271 log.trace("didn't find multiple coordinators in " + initial_mbrs + ", no need for merge"); 272 } 273 } 274 if(log.isTraceEnabled()) 275 log.trace("MERGE2.FindSubgroups thread terminated"); 276 } 277 278 279 282 long computeInterval() { 283 long retval=min_interval + Util.random(max_interval - min_interval); 284 return retval; 285 } 286 287 288 291 Vector findInitialMembers() { 292 PingRsp tmp=new PingRsp(local_addr, local_addr, true); 293 find_promise.reset(); 294 passDown(Event.FIND_INITIAL_MBRS_EVT); 295 Vector retval=(Vector )find_promise.getResult(0); if(retval != null && is_coord && local_addr != null && !retval.contains(tmp)) 297 retval.add(tmp); 298 return retval; 299 } 300 301 302 308 Vector detectMultipleCoordinators(Vector initial_mbrs) { 309 Vector ret=new Vector (11); 310 PingRsp rsp; 311 Address coord; 312 313 if(initial_mbrs == null) return null; 314 for(int i=0; i < initial_mbrs.size(); i++) { 315 rsp=(PingRsp)initial_mbrs.elementAt(i); 316 if(!rsp.is_server) 317 continue; 318 coord=rsp.getCoordAddress(); 319 if(!ret.contains(coord)) 320 ret.addElement(coord); 321 } 322 323 return ret; 324 } 325 326 } 327 328 } 329 | Popular Tags |