1 package org.jgroups.tests; 2 3 import java.io.IOException ; 4 import java.io.InputStream ; 5 import java.io.ObjectInputStream ; 6 import java.io.ObjectOutputStream ; 7 import java.io.OutputStream ; 8 import java.util.ArrayList ; 9 import java.util.HashMap ; 10 import java.util.List ; 11 import java.util.Map ; 12 import java.util.Properties ; 13 14 import junit.framework.Test; 15 import junit.framework.TestSuite; 16 17 import org.jgroups.Address; 18 import org.jgroups.Channel; 19 import org.jgroups.ChannelException; 20 import org.jgroups.Event; 21 import org.jgroups.ExtendedReceiverAdapter; 22 import org.jgroups.JChannel; 23 import org.jgroups.Message; 24 import org.jgroups.View; 25 import org.jgroups.protocols.DISCARD; 26 import org.jgroups.stack.Protocol; 27 import org.jgroups.stack.ProtocolStack; 28 import org.jgroups.util.Util; 29 30 37 public class ReconciliationTest extends ChannelTestBase { 38 39 private JChannel c1, c2; 40 41 private List <JChannel> channels; 42 private List <MyReceiver> receivers; 43 44 public ReconciliationTest(){ 45 super(); 46 } 47 48 public ReconciliationTest(String name){ 49 super(name); 50 } 51 52 public void setUp() throws Exception { 53 super.setUp(); 54 CHANNEL_CONFIG = System.getProperty("channel.conf.flush", "flush-udp.xml"); 55 } 56 57 public void tearDown() throws Exception { 58 if(channels != null){ 59 for(JChannel channel:channels){ 60 channel.close(); 61 }} 62 63 Util.sleep(500); 64 super.tearDown(); 65 } 66 67 public boolean useBlocking() { 68 return true; 69 } 70 71 84 public void testReconciliationFlushTriggeredByNewMemberJoin() throws Exception { 85 86 FlushTrigger t = new FlushTrigger() { 87 public void triggerFlush() { 88 log.info("Joining D, this will trigger FLUSH and a subsequent view change to {A,B,C,D}"); 89 JChannel newChannel; 90 try{ 91 newChannel = createChannel(); 92 newChannel.connect("x"); 93 channels.add(newChannel); 94 }catch(ChannelException e){ 95 e.printStackTrace(); 96 } 97 }; 98 }; 99 String apps [] = createApplicationNames(3); 100 reconciliationHelper(apps,t); 101 } 102 103 116 public void testReconciliationFlushTriggeredByManualFlush() throws Exception { 117 118 FlushTrigger t = new FlushTrigger() { 119 public void triggerFlush() { 120 JChannel channel = channels.get(0); 121 boolean rc = channel.startFlush(0, false); 122 log.info("manual flush success="+rc); 123 channel.stopFlush(); 124 }; 125 }; 126 String apps [] = createApplicationNames(3); 127 reconciliationHelper(apps,t); 128 } 129 130 143 public void testReconciliationFlushTriggeredByMemberCrashing() throws Exception { 144 145 FlushTrigger t = new FlushTrigger() { 146 public void triggerFlush() { 147 JChannel channel = channels.remove(channels.size()-1); 148 channel.shutdown(); 149 }; 150 }; 151 String apps [] = createApplicationNames(3); 152 reconciliationHelper(apps,t); 153 } 154 155 public void reconciliationHelper(String [] names,FlushTrigger ft) throws Exception { 156 157 int channelCount = names.length; 159 channels = new ArrayList <JChannel>(names.length); 160 receivers = new ArrayList <MyReceiver>(names.length); 161 for(int i = 0;i < channelCount;i++){ 162 JChannel channel = createChannel(); 163 MyReceiver r = new MyReceiver(channel,names[i]); 164 receivers.add(r); 165 channels.add(channel); 166 channel.setReceiver(r); 167 channel.connect("x"); 168 Util.sleep(250); 169 } 170 JChannel last = channels.get(channels.size()-1); 171 JChannel nextToLast = channels.get(channels.size()-2); 172 173 insertDISCARD(nextToLast, last.getLocalAddress()); 174 175 String lastsName = names[names.length-1]; 176 String nextToLastName = names[names.length-2]; 177 printDigests(channels,"\nDigests before " + lastsName +" sends any messages:"); 178 179 180 log.info("\n" + lastsName + " sending 5 messages;" + nextToLastName + " will ignore them, but others will receive them"); 182 for(int i = 1;i <= 5;i++){ 183 last.send(null, null, new Integer (i)); 184 } 185 Util.sleep(1000); 188 printDigests(channels,"\nDigests after " + lastsName +" sent messages:"); 189 190 191 MyReceiver lastReceiver = receivers.get(receivers.size()-1); 192 MyReceiver nextToLastReceiver = receivers.get(receivers.size()-2); 193 194 Map <Address, List <Integer >> map = lastReceiver.getMsgs(); 196 assertEquals("we should have only 1 sender, namely C at this time", 1, map.size()); 197 List <Integer > list = map.get(last.getLocalAddress()); 198 log.info(lastsName + ": messages received from "+ lastsName + ",list=" +list); 199 assertEquals("correct msgs: " + list, 5, list.size()); 200 201 map = nextToLastReceiver.getMsgs(); 203 assertEquals("we should have no sender at this time", 0, map.size()); 204 list = map.get(last.getLocalAddress()); 205 log.info(nextToLastName+": messages received from "+lastsName +" : " + list); 206 assertNull(list); 207 208 List <MyReceiver> otherReceivers = receivers.subList(0, receivers.size()-2); 209 210 for(MyReceiver receiver:otherReceivers){ 212 map = receiver.getMsgs(); 213 assertEquals("we should have only 1 sender", 1, map.size()); 214 list = map.get(last.getLocalAddress()); 215 log.info(receiver.name +" messages received from "+lastsName +":" + list); 216 assertEquals("correct msgs" + list, 5, list.size()); 217 } 218 219 220 removeDISCARD(nextToLast); 221 222 Address address = last.getLocalAddress(); 223 ft.triggerFlush(); 224 225 int cnt = 1000; 226 View v; 227 while((v = channels.get(0).getView()) != null && cnt > 0){ 228 cnt--; 229 if(v.size() == channels.size()) 230 break; 231 Util.sleep(500); 232 } 233 234 printDigests(channels,""); 235 236 map = nextToLastReceiver.getMsgs(); 238 assertEquals("we should have 1 sender at this time", 1, map.size()); 239 list = map.get(address); 240 log.info(nextToLastName+": messages received from "+lastsName+" : " + list); 241 assertEquals(5, list.size()); 242 } 243 244 private void printDigests(List <JChannel> channels,String message) { 245 log.info(message); 246 for(JChannel channel:channels){ 247 log.info(channel.downcall(Event.GET_DIGEST_EVT)); 248 } 249 } 250 251 private static void insertDISCARD(JChannel ch, Address exclude) throws Exception { 252 Properties prop = new Properties (); 253 prop.setProperty("excludeitself", "true"); DISCARD discard = new DISCARD(); 256 discard.setProperties(prop); 257 discard.addIgnoreMember(exclude); ch.getProtocolStack().insertProtocol(discard, ProtocolStack.BELOW, "NAKACK"); 259 } 260 261 private static void removeDISCARD(JChannel... channels) throws Exception { 262 for(JChannel ch:channels){ 263 ch.getProtocolStack().removeProtocol("DISCARD"); 264 } 265 } 266 267 private interface FlushTrigger { 268 void triggerFlush(); 269 } 270 271 private static class MyReceiver extends ExtendedReceiverAdapter { 272 Map <Address, List <Integer >> msgs = new HashMap <Address, List <Integer >>(10); 273 274 Channel channel; 275 276 String name; 277 278 public MyReceiver(Channel ch,String name){ 279 this.channel = ch; 280 this.name = name; 281 } 282 283 public Map <Address, List <Integer >> getMsgs() { 284 return msgs; 285 } 286 287 public void reset() { 288 msgs.clear(); 289 } 290 291 public void receive(Message msg) { 292 List <Integer > list = msgs.get(msg.getSrc()); 293 if(list == null){ 294 list = new ArrayList <Integer >(); 295 msgs.put(msg.getSrc(), list); 296 } 297 list.add((Integer ) msg.getObject()); 298 System.out.println("[" + name 299 + " / " 300 + channel.getLocalAddress() 301 + "]: received message from " 302 + msg.getSrc() 303 + ": " 304 + msg.getObject()); 305 } 306 307 public void viewAccepted(View new_view) { 308 System.out.println("[" + name + " / " + channel.getLocalAddress() + "]: " + new_view); 309 } 310 } 311 312 public void testVirtualSynchrony() throws Exception { 313 c1 = createChannel(CHANNEL_CONFIG); 314 Cache cache_1 = new Cache(c1, "cache-1"); 315 c1.connect("bla"); 316 317 c2 = createChannel(CHANNEL_CONFIG); 318 Cache cache_2 = new Cache(c2, "cache-2"); 319 c2.connect("bla"); 320 assertEquals("view: " + c1.getView(), 2, c2.getView().size()); 321 322 flush(c1, 5000); 326 for(int i = 1;i <= 20;i++){ 327 if(i % 2 == 0){ 328 cache_1.put("key-" + i, Boolean.TRUE); }else{ 330 cache_2.put("key-" + i, Boolean.TRUE); } 332 } 333 334 flush(c1, 5000); 335 System.out.println("cache_1 (" + cache_1.size() 336 + " elements): " 337 + cache_1 338 + "\ncache_2 (" 339 + cache_2.size() 340 + " elements): " 341 + cache_2); 342 assertEquals(cache_1.size(), cache_2.size()); 343 assertEquals(20, cache_1.size()); 344 } 345 346 private static void flush(Channel channel, long timeout) { 347 if(channel.flushSupported()){ 348 boolean success = channel.startFlush(timeout, true); 349 System.out.println("startFlush(): " + success); 350 assertTrue(success); 351 }else 352 Util.sleep(timeout); 353 } 354 355 protected JChannel createChannel() throws ChannelException { 356 JChannel ret = new JChannel(CHANNEL_CONFIG); 357 ret.setOpt(Channel.BLOCK, Boolean.TRUE); 358 Protocol flush = ret.getProtocolStack().findProtocol("FLUSH"); 359 if(flush != null){ 360 Properties p = new Properties (); 361 p.setProperty("timeout", "0"); 362 flush.setProperties(p); 363 364 Map <Object , Object > map = new HashMap <Object , Object >(); 367 map.put("flush_timeout", new Long (0)); 368 flush.getUpProtocol().up(new Event(Event.CONFIG, map)); 369 flush.getDownProtocol().down(new Event(Event.CONFIG, map)); 370 } 371 return ret; 372 } 373 374 private static class Cache extends ExtendedReceiverAdapter { 375 protected final Map <Object , Object > data; 376 377 Channel ch; 378 379 String name; 380 381 public Cache(Channel ch,String name){ 382 this.data = new HashMap <Object , Object >(); 383 this.ch = ch; 384 this.name = name; 385 this.ch.setReceiver(this); 386 } 387 388 protected Object get(Object key) { 389 synchronized(data){ 390 return data.get(key); 391 } 392 } 393 394 protected void put(Object key, Object val) throws Exception { 395 Object [] buf = new Object [2]; 396 buf[0] = key; 397 buf[1] = val; 398 Message msg = new Message(null, null, buf); 399 ch.send(msg); 400 } 401 402 protected int size() { 403 synchronized(data){ 404 return data.size(); 405 } 406 } 407 408 public void receive(Message msg) { 409 Object [] modification = (Object []) msg.getObject(); 410 Object key = modification[0]; 411 Object val = modification[1]; 412 synchronized(data){ 413 data.put(key, val); 417 } 418 } 419 420 public byte[] getState() { 421 byte[] state = null; 422 synchronized(data){ 423 try{ 424 state = Util.objectToByteBuffer(data); 425 }catch(Exception e){ 426 e.printStackTrace(); 427 return null; 428 } 429 } 430 return state; 431 } 432 433 public byte[] getState(String state_id) { 434 return getState(); 435 } 436 437 public void setState(byte[] state) { 438 Map <Object , Object > m; 439 try{ 440 m = (Map <Object , Object >) Util.objectFromByteBuffer(state); 441 synchronized(data){ 442 data.clear(); 443 data.putAll(m); 444 } 445 }catch(Exception e){ 446 e.printStackTrace(); 447 } 448 } 449 450 public void setState(String state_id, byte[] state) { 451 setState(state); 452 } 453 454 public void getState(OutputStream ostream) { 455 ObjectOutputStream oos = null; 456 try{ 457 oos = new ObjectOutputStream (ostream); 458 synchronized(data){ 459 oos.writeObject(data); 460 } 461 oos.flush(); 462 }catch(IOException e){ 463 }finally{ 464 try{ 465 if(oos != null) 466 oos.close(); 467 }catch(IOException e){ 468 System.err.println(e); 469 } 470 } 471 } 472 473 public void getState(String state_id, OutputStream ostream) { 474 getState(ostream); 475 } 476 477 public void setState(InputStream istream) { 478 ObjectInputStream ois = null; 479 try{ 480 ois = new ObjectInputStream (istream); 481 Map <Object , Object > m = (Map <Object , Object >) ois.readObject(); 482 synchronized(data){ 483 data.clear(); 484 data.putAll(m); 485 } 486 487 }catch(Exception e){ 488 }finally{ 489 try{ 490 if(ois != null) 491 ois.close(); 492 }catch(IOException e){ 493 System.err.println(e); 494 } 495 } 496 } 497 498 public void setState(String state_id, InputStream istream) { 499 setState(istream); 500 } 501 502 public void clear() { 503 synchronized(data){ 504 data.clear(); 505 } 506 } 507 508 public void viewAccepted(View new_view) { 509 log("view is " + new_view); 510 } 511 512 public String toString() { 513 synchronized(data){ 514 return data.toString(); 515 } 516 } 517 518 private void log(String msg) { 519 System.out.println("-- [" + name + "] " + msg); 520 } 521 522 } 523 524 public static Test suite() { 525 return new TestSuite(ReconciliationTest.class); 526 } 527 528 public static void main(String [] args) { 529 junit.textui.TestRunner.run(ReconciliationTest.suite()); 530 } 531 } 532 | Popular Tags |