1 3 package org.jgroups.tests; 4 5 import junit.framework.Test; 6 import junit.framework.TestCase; 7 import junit.framework.TestSuite; 8 import org.jgroups.*; 9 import org.jgroups.debug.ProtocolTester; 10 import org.jgroups.stack.IpAddress; 11 import org.jgroups.stack.Protocol; 12 13 import java.util.Hashtable ; 14 import java.util.Vector ; 15 16 17 public class NakackTest extends TestCase { 18 static final long WAIT_TIME=5000; 19 public static final long NUM_MSGS=10000; 20 long num_msgs_received=0; 21 long num_msgs_sent=0; 22 23 24 public NakackTest(String name) { 25 super(name); 26 } 27 28 29 public void setUp() throws Exception { 30 super.setUp(); 31 num_msgs_received=0; 32 num_msgs_sent=0; 33 } 34 35 36 37 public void test0() throws Exception { 38 Object mutex=new Object (); 39 CheckNoGaps check=new CheckNoGaps(1, this, mutex); 40 ProtocolTester t=new ProtocolTester("pbcast.NAKACK", check); 41 Address my_addr=new IpAddress("localhost", 10000); 42 ViewId vid=new ViewId(my_addr, 322649); 43 Vector mbrs=new Vector (); 44 View view; 45 46 mbrs.addElement(my_addr); 47 view=new View(vid, mbrs); 48 49 t.start(); 50 t.getBottom().up(new Event(Event.SET_LOCAL_ADDRESS, my_addr)); 51 52 53 check.down(new Event(Event.BECOME_SERVER)); 54 check.down(new Event(Event.VIEW_CHANGE, view)); 55 56 for(long i=1; i <= NUM_MSGS; i++) { 57 if(i % 1000 == 0) 58 System.out.println("sending msg #" + i); 59 check.down(new Event(Event.MSG, new Message(null, my_addr, new Long (i)))); 60 num_msgs_sent++; 61 } 62 63 synchronized(mutex) { 64 while(!check.isDone()) 65 mutex.wait(WAIT_TIME); 66 } 67 System.out.println("\nMessages sent: " + num_msgs_sent + ", messages received: " + num_msgs_received); 68 assertEquals(num_msgs_received, num_msgs_sent); 69 t.stop(); 70 } 71 72 73 public static Test suite() { 74 return new TestSuite(NakackTest.class); 75 } 76 77 public static void main(String [] args) { 78 junit.textui.TestRunner.run(suite()); 79 } 80 81 82 private static class CheckNoGaps extends Protocol { 83 long starting_seqno=1; 84 Hashtable senders=new Hashtable (); NakackTest t=null; 86 final Object mut; 87 long highest_seqno=starting_seqno; 88 boolean done=false; 89 90 91 92 CheckNoGaps(long seqno, NakackTest t, Object mut) { 93 starting_seqno=seqno; 94 this.t=t; 95 this.mut=mut; 96 } 97 98 public String getName() { 99 return "CheckNoGaps"; 100 } 101 102 103 public boolean isDone() { 104 return done; 105 } 106 107 public Object up(Event evt) { 108 Message msg=null; 109 Address sender; 110 long received_seqno; 111 Long s; 112 113 if(evt == null) 114 return null; 115 116 if(evt.getType() == Event.SET_LOCAL_ADDRESS) { 117 System.out.println("local address is " + evt.getArg()); 118 } 119 120 if(evt.getType() != Event.MSG) 121 return null; 122 msg=(Message)evt.getArg(); 123 sender=msg.getSrc(); 124 if(sender == null) { 125 log.error("NakackTest.CheckNoGaps.up(): sender is null; discarding msg"); 126 return null; 127 } 128 s=(Long )senders.get(sender); 129 if(s == null) { 130 s=new Long (starting_seqno); 131 senders.put(sender, s); 132 } 133 134 try { 135 s=(Long )msg.getObject(); 136 received_seqno=s.longValue(); 137 if(received_seqno == highest_seqno) { 138 if(received_seqno % 1000 == 0 && received_seqno > 0) 140 System.out.println("PASS: received msg #" + received_seqno); 141 senders.put(sender, new Long (highest_seqno)); 142 highest_seqno++; 143 144 synchronized(mut) { 145 t.num_msgs_received++; 146 if(t.num_msgs_received >= NakackTest.NUM_MSGS) { 147 done=true; 148 mut.notifyAll(); 149 } 150 151 } 152 } 153 else { 154 fail("FAIL: received msg #" + received_seqno + ", expected " + highest_seqno); 156 } 157 } 158 catch(Exception ex) { 159 log.error("NakackTest.CheckNoGaps.up()", ex); 160 } 161 return null; 162 } 163 164 165 } 166 } 167 168 169 | Popular Tags |