1 package org.jgroups.protocols; 2 3 4 import org.jgroups.*; 5 import org.jgroups.annotations.GuardedBy; 6 import org.jgroups.stack.Protocol; 7 import org.jgroups.util.Streamable; 8 import org.jgroups.util.TimeScheduler; 9 import org.jgroups.util.Util; 10 11 import java.io.*; 12 import java.util.Properties ; 13 import java.util.Vector ; 14 import java.util.concurrent.Future ; 15 import java.util.concurrent.locks.Lock ; 16 import java.util.concurrent.locks.ReentrantLock ; 17 18 19 26 public class VIEW_SYNC extends Protocol { 27 Address local_addr=null; 28 final Vector mbrs=new Vector (); 29 View my_view=null; 30 ViewId my_vid=null; 31 32 33 long avg_send_interval=60000; 34 35 private int num_views_sent=0; 36 private int num_views_adjusted=0; 37 38 @GuardedBy("view_task_lock") 39 private Future view_send_task_future=null; 41 private final Lock view_task_lock=new ReentrantLock (); 42 43 TimeScheduler timer=null; 44 static final String name="VIEW_SYNC"; 45 46 47 48 public String getName() { 49 return name; 50 } 51 52 public long getAverageSendInterval() { 53 return avg_send_interval; 54 } 55 56 public void setAverageSendInterval(long gossip_interval) { 57 avg_send_interval=gossip_interval; 58 } 59 60 public int getNumViewsSent() { 61 return num_views_sent; 62 } 63 64 public int getNumViewsAdjusted() { 65 return num_views_adjusted; 66 } 67 68 public void resetStats() { 69 super.resetStats(); 70 num_views_adjusted=num_views_sent=0; 71 } 72 73 74 75 public boolean setProperties(Properties props) { 76 String str; 77 78 super.setProperties(props); 79 80 str=props.getProperty("avg_send_interval"); 81 if(str != null) { 82 avg_send_interval=Long.parseLong(str); 83 props.remove("avg_send_interval"); 84 } 85 86 if(!props.isEmpty()) { 87 log.error("these properties are not recognized: " + props); 88 return false; 89 } 90 return true; 91 } 92 93 94 public void start() throws Exception { 95 if(stack != null && stack.timer != null) 96 timer=stack.timer; 97 else 98 throw new Exception ("timer cannot be retrieved from protocol stack"); 99 } 100 101 public void stop() { 102 stopViewSender(); 103 } 104 105 106 public void sendViewRequest() { 107 Message msg=new Message(null); 108 msg.setFlag(Message.OOB); 109 ViewSyncHeader hdr=new ViewSyncHeader(ViewSyncHeader.VIEW_SYNC_REQ, null); 110 msg.putHeader(name, hdr); 111 down_prot.down(new Event(Event.MSG, msg)); 112 } 113 114 123 124 public Object up(Event evt) { 125 Message msg; 126 ViewSyncHeader hdr; 127 int type=evt.getType(); 128 129 switch(type) { 130 131 case Event.MSG: 132 msg=(Message)evt.getArg(); 133 hdr=(ViewSyncHeader)msg.getHeader(name); 134 if(hdr == null) 135 break; 136 Address sender=msg.getSrc(); 137 switch(hdr.type) { 138 case ViewSyncHeader.VIEW_SYNC: 139 handleView(hdr.view, sender); 140 break; 141 case ViewSyncHeader.VIEW_SYNC_REQ: 142 if(!sender.equals(local_addr)) 143 sendView(); 144 break; 145 default: 146 if(log.isErrorEnabled()) log.error("ViewSyncHeader type " + hdr.type + " not known"); 147 } 148 return null; 149 150 case Event.VIEW_CHANGE: 151 View view=(View)evt.getArg(); 152 handleViewChange(view); 153 break; 154 155 case Event.SET_LOCAL_ADDRESS: 156 local_addr=(Address)evt.getArg(); 157 break; 158 } 159 160 return up_prot.up(evt); 161 } 162 163 164 165 public Object down(Event evt) { 166 switch(evt.getType()) { 167 case Event.VIEW_CHANGE: 168 View v=(View)evt.getArg(); 169 handleViewChange(v); 170 break; 171 } 172 return down_prot.down(evt); 173 } 174 175 176 177 178 179 private void handleView(View v, Address sender) { 180 Vector members=v.getMembers(); 181 if(!members.contains(local_addr)) { 182 if(log.isWarnEnabled()) 183 log.warn("discarding view as I (" + local_addr + ") am not member of view (" + v + ")"); 184 return; 185 } 186 187 ViewId vid=v.getVid(); 188 int rc=vid.compareTo(my_vid); 189 if(rc > 0) { if(log.isTraceEnabled()) 191 log.trace("view from " + sender + " (" + vid + ") is greater than my own view (" + my_vid + ");" + 192 " will update my own view"); 193 194 Message view_change=new Message(local_addr, local_addr, null); 195 org.jgroups.protocols.pbcast.GMS.GmsHeader hdr; 196 hdr=new org.jgroups.protocols.pbcast.GMS.GmsHeader(org.jgroups.protocols.pbcast.GMS.GmsHeader.VIEW, v); 197 view_change.putHeader(name, hdr); 198 up_prot.up(new Event(Event.MSG, view_change)); 199 num_views_adjusted++; 200 } 201 } 202 203 private void handleViewChange(View view) { 204 Vector tmp=view.getMembers(); 205 if(tmp != null) { 206 mbrs.clear(); 207 mbrs.addAll(tmp); 208 } 209 my_view=(View)view.clone(); 210 my_vid=my_view.getVid(); 211 if(my_view.size() > 1) { 212 startViewSender(); 213 } 214 else { 215 stopViewSender(); 216 } 217 } 218 219 private void sendView() { 220 View tmp=(View)(my_view != null? my_view.clone() : null); 221 if(tmp == null) return; 222 Message msg=new Message(null); msg.setFlag(Message.OOB); 224 ViewSyncHeader hdr=new ViewSyncHeader(ViewSyncHeader.VIEW_SYNC, tmp); 225 msg.putHeader(name, hdr); 226 down_prot.down(new Event(Event.MSG, msg)); 227 num_views_sent++; 228 } 229 230 231 void startViewSender() { 232 try { 233 view_task_lock.lock(); 234 if(view_send_task_future == null || view_send_task_future.isDone()) { 235 ViewSendTask view_send_task=new ViewSendTask(); 236 view_send_task_future=timer.scheduleWithDynamicInterval(view_send_task, true); if(log.isTraceEnabled()) 238 log.trace("view send task started"); 239 } 240 } 241 finally { 242 view_task_lock.unlock(); 243 } 244 } 245 246 247 void stopViewSender() { 248 try { 249 view_task_lock.lock(); 250 if(view_send_task_future != null) { 251 view_send_task_future.cancel(false); 252 view_send_task_future=null; 253 if(log.isTraceEnabled()) 254 log.trace("view send task stopped"); 255 } 256 } 257 finally { 258 view_task_lock.unlock(); 259 } 260 } 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 public static class ViewSyncHeader extends Header implements Streamable { 276 public static final int VIEW_SYNC = 1; public static final int VIEW_SYNC_REQ = 2; 279 int type=0; 280 View view=null; 281 282 public ViewSyncHeader() { 283 } 284 285 286 public ViewSyncHeader(int type, View view) { 287 this.type=type; 288 this.view=view; 289 } 290 291 public int getType() { 292 return type; 293 } 294 295 public View getView() { 296 return view; 297 } 298 299 static String type2String(int t) { 300 switch(t) { 301 case VIEW_SYNC: 302 return "VIEW_SYNC"; 303 case VIEW_SYNC_REQ: 304 return "VIEW_SYNC_REQ"; 305 default: 306 return "<unknown>"; 307 } 308 } 309 310 public String toString() { 311 StringBuilder sb=new StringBuilder ("[").append(type2String(type)).append("]"); 312 if(view != null) 313 sb.append(", view= ").append(view); 314 return sb.toString(); 315 } 316 317 318 public void writeExternal(ObjectOutput out) throws IOException { 319 out.writeInt(type); 320 if(view == null) { 321 out.writeBoolean(false); 322 return; 323 } 324 out.writeBoolean(true); 325 view.writeExternal(out); 326 } 327 328 329 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 330 type=in.readInt(); 331 boolean available=in.readBoolean(); 332 if(available) { 333 view=new View(); 334 view.readExternal(in); 335 } 336 } 337 338 public int size() { 339 int retval=Global.INT_SIZE + Global.BYTE_SIZE + Global.BYTE_SIZE; if(view != null) 341 retval+=view.serializedSize(); 342 return retval; 343 } 344 345 public void writeTo(DataOutputStream out) throws IOException { 346 out.writeInt(type); 347 byte b=(byte)(view == null? 0 : (view instanceof MergeView? 2 : 1)); 349 out.writeByte(b); 350 Util.writeStreamable(view, out); 351 } 352 353 public void readFrom(DataInputStream in) throws IOException, IllegalAccessException , InstantiationException { 354 type=in.readInt(); 355 byte b=in.readByte(); 356 Class clazz=b == 2? MergeView.class : View.class; 357 view=(View)Util.readStreamable(clazz, in); 358 } 359 360 361 } 362 363 364 365 366 369 private class ViewSendTask implements TimeScheduler.Task { 370 371 public long nextInterval() { 372 long interval=computeSleepTime(); 373 if(interval <= 0) 374 return 10000; 375 else 376 return interval; 377 } 378 379 380 public void run() { 381 sendView(); 382 } 383 384 long computeSleepTime() { 385 int num_mbrs=Math.max(mbrs.size(), 1); 386 return getRandom((num_mbrs * avg_send_interval * 2)); 387 } 388 389 long getRandom(long range) { 390 return (long)((Math.random() * range) % range); 391 } 392 } 393 394 395 396 } 397 | Popular Tags |