1 3 package org.jgroups.protocols; 4 5 6 import org.jgroups.*; 7 import org.jgroups.stack.Protocol; 8 import org.jgroups.util.TimeScheduler; 9 import org.jgroups.util.Util; 10 11 import java.io.IOException ; 12 import java.io.ObjectInput ; 13 import java.io.ObjectOutput ; 14 import java.util.*; 15 16 17 18 19 35 public class MERGE3 extends Protocol { 36 Address local_addr=null; 37 long min_interval=5000; long max_interval=20000; boolean is_coord=false; 40 final Vector mbrs=new Vector(); 41 TimeScheduler timer=null; 42 CoordinatorAnnouncer announcer_task=null; 43 final Set announcements=Collections.synchronizedSet(new HashSet()); 44 45 46 boolean use_separate_thread=false; 47 48 49 50 51 public String getName() { 52 return "MERGE3"; 53 } 54 55 56 public boolean setProperties(Properties props) { 57 String str; 58 59 super.setProperties(props); 60 str=props.getProperty("min_interval"); 61 if(str != null) { 62 min_interval=Long.parseLong(str); 63 props.remove("min_interval"); 64 } 65 66 str=props.getProperty("max_interval"); 67 if(str != null) { 68 max_interval=Long.parseLong(str); 69 props.remove("max_interval"); 70 } 71 72 if(min_interval <= 0 || max_interval <= 0) { 73 if(log.isErrorEnabled()) log.error("min_interval and max_interval have to be > 0"); 74 return false; 75 } 76 if(max_interval <= min_interval) { 77 if(log.isErrorEnabled()) log.error("max_interval has to be greater than min_interval"); 78 return false; 79 } 80 81 str=props.getProperty("use_separate_thread"); 82 if(str != null) { 83 use_separate_thread=Boolean.valueOf(str).booleanValue(); 84 props.remove("use_separate_thread"); 85 } 86 87 if(props.size() > 0) { 88 System.err.println("MERGE2.setProperties(): the following properties are not recognized:"); 89 props.list(System.out); 90 return false; 91 } 92 return true; 93 } 94 95 public void init() throws Exception { 96 timer=stack.timer; 97 } 98 99 100 103 public void startUpHandler() { 104 ; 105 } 106 107 108 111 public void startDownHandler() { 112 ; 113 } 114 115 116 public void up(Event evt) { 117 switch(evt.getType()) { 118 119 case Event.MSG: 120 Message msg=(Message)evt.getArg(); 121 CoordAnnouncement hdr=(CoordAnnouncement)msg.removeHeader(getName()); 122 if(hdr != null) { 123 if(hdr.coord_addr != null && is_coord) { 124 boolean contains; 125 contains=announcements.contains(hdr.coord_addr); 126 announcements.add(hdr.coord_addr); 127 if(log.isDebugEnabled()) { 128 if(contains) 129 log.debug("discarded duplicate announcement: " + hdr.coord_addr + 130 ", announcements=" + announcements); 131 else 132 log.debug("received announcement: " + hdr.coord_addr + ", announcements=" + announcements); 133 } 134 135 if(announcements.size() > 1 && is_coord) { 136 processAnnouncements(); 137 } 138 } 139 } 140 else 141 passUp(evt); 142 break; 143 144 case Event.SET_LOCAL_ADDRESS: 145 local_addr=(Address)evt.getArg(); 146 passUp(evt); 147 break; 148 149 default: 150 passUp(evt); break; 152 } 153 } 154 155 156 public void down(Event evt) { 157 Vector tmp=null; 158 Address coord; 159 160 switch(evt.getType()) { 161 162 case Event.VIEW_CHANGE: 163 passDown(evt); 164 tmp=((View)evt.getArg()).getMembers(); 165 mbrs.clear(); 166 mbrs.addAll(tmp); 167 coord=(Address)mbrs.elementAt(0); 168 if(coord.equals(local_addr)) { 169 if(is_coord == false) { 170 is_coord=true; 171 startCoordAnnouncerTask(); 172 } 173 } 174 else { 175 if(is_coord == true) { 176 is_coord=false; 177 stopCoordAnnouncerTask(); 178 } 179 } 180 break; 181 182 default: 183 passDown(evt); break; 185 } 186 } 187 188 189 void startCoordAnnouncerTask() { 190 if(announcer_task == null) { 191 announcements.add(local_addr); 192 announcer_task=new CoordinatorAnnouncer(); 193 timer.add(announcer_task); 194 if(log.isDebugEnabled()) 195 log.debug("coordinator announcement task started, announcements=" + announcements); 196 } 197 } 198 199 void stopCoordAnnouncerTask() { 200 if(announcer_task != null) { 201 announcer_task.stop(); 202 announcer_task=null; 203 announcements.clear(); 204 if(log.isDebugEnabled()) 205 log.debug("coordinator announcement task stopped"); 206 } 207 } 208 209 210 211 214 long computeInterval() { 215 long retval=min_interval + Util.random(max_interval - min_interval); 216 return retval; 217 } 218 219 220 221 void sendCoordinatorAnnouncement(Address coord) { 222 Message coord_announcement=new Message(); CoordAnnouncement hdr=new CoordAnnouncement(coord); 224 coord_announcement.putHeader(getName(), hdr); 225 passDown(new Event(Event.MSG, coord_announcement)); 226 } 227 228 void processAnnouncements() { 229 if(announcements.size() > 1) { 230 Vector coords=new Vector(announcements); if(coords.size() > 1) { 232 if(log.isDebugEnabled()) 233 log.debug("passing up MERGE event, coords=" + coords); 234 final Event evt=new Event(Event.MERGE, coords); 235 if(use_separate_thread) { 236 Thread merge_notifier=new Thread () { 237 public void run() { 238 passUp(evt); 239 } 240 }; 241 merge_notifier.setDaemon(true); 242 merge_notifier.setName("merge notifier thread"); 243 } 244 else { 245 passUp(evt); 246 } 247 } 248 announcements.clear(); 249 } 250 } 251 252 253 class CoordinatorAnnouncer implements TimeScheduler.Task { 254 boolean cancelled=false; 255 256 public void start() { 257 cancelled=false; 258 } 259 260 public void stop() { 261 cancelled=true; 262 } 263 264 public boolean cancelled() { 265 return cancelled; 266 } 267 268 public long nextInterval() { 269 return computeInterval(); 270 } 271 272 public void run() { 273 if(is_coord) 274 sendCoordinatorAnnouncement(local_addr); 275 } 276 } 277 278 279 280 public static class CoordAnnouncement extends Header { 281 Address coord_addr=null; 282 283 public CoordAnnouncement() { 284 } 285 286 public CoordAnnouncement(Address coord) { 287 this.coord_addr=coord; 288 } 289 290 public void readExternal(ObjectInput in) throws IOException , ClassNotFoundException { 291 coord_addr=(Address)in.readObject(); 292 } 293 294 public void writeExternal(ObjectOutput out) throws IOException { 295 out.writeObject(coord_addr); 296 } 297 } 298 299 } 300 | Popular Tags |