1 3 package org.jgroups.protocols; 4 5 6 import org.jgroups.*; 7 import org.jgroups.stack.Protocol; 8 import org.jgroups.stack.RouterStub; 9 import org.jgroups.util.List; 10 import org.jgroups.util.Util; 11 12 import java.io.IOException ; 13 import java.io.ObjectInput ; 14 import java.io.ObjectOutput ; 15 import java.util.Enumeration ; 16 import java.util.Properties ; 17 import java.util.Vector ; 18 19 20 29 public class MERGE extends Protocol implements Runnable { 30 Vector members=new Vector (); 31 Address local_addr=null; 32 String group_addr=null; 33 final String groupname=null; 34 Thread hello_thread=null; long timeout=5000; 37 String router_host=null; 38 int router_port=0; 39 40 RouterStub client=null; 41 boolean is_server=false; 42 boolean is_coord=false; 43 boolean merging=false; 44 45 46 public String getName() { 47 return "MERGE"; 48 } 49 50 51 public boolean setProperties(Properties props) { 52 String str; 53 54 super.setProperties(props); 55 str=props.getProperty("timeout"); if(str != null) { 57 timeout=Long.parseLong(str); 58 props.remove("timeout"); 59 } 60 61 str=props.getProperty("router_host"); if(str != null) { 63 router_host=str; 64 props.remove("router_host"); 65 } 66 67 str=props.getProperty("router_port"); 68 if(str != null) { 69 router_port=Integer.parseInt(str); 70 props.remove("router_port"); 71 } 72 73 if(router_host != null && router_port != 0) 74 client=new RouterStub(router_host, router_port); 75 76 if(props.size() > 0) { 77 System.err.println("MERGE.setProperties(): the following properties are not recognized:"); 78 props.list(System.out); 79 return false; 80 } 81 return true; 82 } 83 84 85 public void start() throws Exception { 86 if(hello_thread == null) { 87 hello_thread=new Thread (this, "MERGE Thread"); 88 hello_thread.setDaemon(true); 89 hello_thread.start(); 90 } 91 } 92 93 94 public void stop() { 95 Thread tmp=null; 96 if(hello_thread != null && hello_thread.isAlive()) { 97 tmp=hello_thread; 98 hello_thread=null; 99 tmp.interrupt(); 100 try { 101 tmp.join(1000); 102 } 103 catch(Exception ex) { 104 } 105 } 106 hello_thread=null; 107 } 108 109 110 public void up(Event evt) { 111 Message msg; 112 Object obj; 113 MergeHeader hdr; 114 Address sender; 115 boolean contains; 116 Vector tmp; 117 118 119 switch(evt.getType()) { 120 121 case Event.MSG: 122 msg=(Message)evt.getArg(); 123 obj=msg.getHeader(getName()); 124 if(obj == null || !(obj instanceof MergeHeader)) { 125 passUp(evt); 126 return; 127 } 128 hdr=(MergeHeader)msg.removeHeader(getName()); 129 130 switch(hdr.type) { 131 132 case MergeHeader.HELLO: if(!is_server || !is_coord) { 134 return; 135 } 136 if(merging) { 137 return; 138 } 139 sender=msg.getSrc(); 140 if((sender != null) && (members.size() >= 0)) { 141 synchronized(members) { 142 contains=members.contains(sender); 143 } 144 if(!contains && sender.compareTo(local_addr) < 0) { 146 if(log.isInfoEnabled()) 147 log.info("membership " + members + 148 " does not contain " + sender + "; merging it"); 149 tmp=new Vector (); 150 tmp.addElement(sender); 151 merging=true; 152 passUp(new Event(Event.MERGE, tmp)); 153 } 154 } 155 return; 156 157 default: 158 if(log.isErrorEnabled()) log.error("got MERGE hdr with unknown type (" + hdr.type + ')'); 159 return; 160 } 161 162 case Event.SET_LOCAL_ADDRESS: 163 local_addr=(Address)evt.getArg(); 164 passUp(evt); 165 break; 166 167 default: 168 passUp(evt); break; 170 } 171 } 172 173 174 public void down(Event evt) { 175 176 switch(evt.getType()) { 177 178 case Event.TMP_VIEW: 179 passDown(evt); 180 break; 181 182 case Event.MERGE_DENIED: 183 merging=false; 184 passDown(evt); 185 break; 186 187 case Event.VIEW_CHANGE: 188 merging=false; 189 members=((View)evt.getArg()).getMembers(); 190 if((members == null) || (members.size() == 0)) { 191 if(log.isFatalEnabled()) log.fatal("received VIEW_CHANGE with null or empty vector"); 192 System.exit(6); 193 } 194 if(members.elementAt(0).equals(local_addr)) 195 is_coord=true; 196 else 197 is_coord=false; 198 passDown(evt); 199 if(is_coord) { 200 if(log.isInfoEnabled()) log.info("start sending Hellos"); 201 try { 202 start(); 203 } 204 catch(Exception ex) { 205 if(log.isWarnEnabled()) log.warn("exception calling start(): " + ex); 206 } 207 } 208 else { 209 if(log.isInfoEnabled()) log.info("stop sending Hellos"); 210 stop(); 211 } 212 break; 213 214 case Event.BECOME_SERVER: passDown(evt); 216 try { 217 start(); 218 is_server=true; 219 } 220 catch(Exception ex) { 221 if(log.isWarnEnabled()) log.warn("exception calling start(): " + ex); 222 } 223 break; 224 225 case Event.CONNECT: 226 group_addr=(String )evt.getArg(); 227 passDown(evt); 228 break; 229 230 case Event.DISCONNECT: 231 if(local_addr != null && evt.getArg() != null && local_addr.equals(evt.getArg())) 232 stop(); 233 passDown(evt); 234 break; 235 236 default: 237 passDown(evt); break; 239 } 240 } 241 242 243 248 public void run() { 249 Message hello_msg; 250 MergeHeader hdr; 251 List rsps; 252 Vector members_to_merge=new Vector (), tmp; 253 Object mbr; 254 255 256 try { 257 Thread.sleep(3000); 258 } catch(Exception e) { 260 } 261 262 263 while(hello_thread != null) { 264 Util.sleep(timeout); 265 if(hello_thread == null) break; 266 267 if(client == null) { hello_msg=new Message(null, null, null); 269 hdr=new MergeHeader(MergeHeader.HELLO); 270 hello_msg.putHeader(getName(), hdr); 271 passDown(new Event(Event.MSG, hello_msg)); 272 } 273 else { rsps=client.get(group_addr); 275 276 synchronized(members) { 277 members_to_merge.removeAllElements(); 278 279 for(Enumeration e=rsps.elements(); e.hasMoreElements();) { 280 mbr=e.nextElement(); 281 if(!members.contains(mbr)) { 282 283 if(log.isInfoEnabled()) 284 log.info("membership " + members + 285 " does not contain " + mbr + "; merging it"); 286 287 members_to_merge.addElement(mbr); 288 } 289 } 290 if(members_to_merge.size() > 0) { 291 Membership new_membership=new Membership(members_to_merge); 292 new_membership.sort(); 293 Address coord=(Address)new_membership.elementAt(0); 294 tmp=new Vector (); 295 tmp.addElement(coord); 296 if(coord.compareTo(local_addr) < 0) 297 passUp(new Event(Event.MERGE, tmp)); 298 } 299 } 300 } 301 } 302 } 303 304 305 306 307 308 309 310 311 312 public static class MergeHeader extends Header { 313 public static final int HELLO=1; 315 public int type=0; 316 317 public MergeHeader() { 318 } 320 public MergeHeader(int type) { 321 this.type=type; 322 } 323 324 public String toString() { 325 return "[MERGE: type=" + type2Str(type) + ']'; 326 } 327 328 String type2Str(int t) { 329 switch(t) { 330 case HELLO: 331 return "HELLO"; 332 default: 333 return "<unkown type (" + t + ")>"; 334 } 335 } 336 337 public void writeExternal(ObjectOutput out) throws IOException { 338 out.writeInt(type); 339 } 340 341 342 public void readExternal(ObjectInput in) throws IOException , ClassNotFoundException { 343 type=in.readInt(); 344 } 345 } 346 347 } 348 | Popular Tags |