1 package org.jgroups.tests; 2 3 import junit.framework.TestCase; 4 import org.apache.commons.logging.Log; 5 import org.apache.commons.logging.LogFactory; 6 import org.jgroups.Header; 7 import org.jgroups.JChannel; 8 import org.jgroups.Message; 9 import org.jgroups.MessageListener; 10 import org.jgroups.blocks.PullPushAdapter; 11 12 import java.io.IOException ; 13 import java.io.ObjectInput ; 14 import java.io.ObjectOutput ; 15 import java.util.Iterator ; 16 import java.util.Vector ; 17 18 30 public class EncryptMessageOrderTestCase extends TestCase { 31 32 public static int MESSAGE_NUMBER=5 * 100; 33 34 public static boolean SLEEP_BETWEEN_SENDING=false; 35 36 public static int SLEEP_TIME=1; 37 38 String groupName = "ENCRYPT_ORDER_TEST"; 39 40 boolean orderCounterFailure = false; 41 42 protected Log log=LogFactory.getLog(this.getClass()); 43 44 public static final String properties ="EncryptNoKeyStore.xml"; 45 46 47 50 public EncryptMessageOrderTestCase(String string) { 51 super(string); 52 } 53 54 protected JChannel channel1; 55 protected PullPushAdapter adapter1; 56 57 protected JChannel channel2; 58 protected PullPushAdapter adapter2; 59 60 63 protected static void printSelectedOptions() { 64 System.out.println("will sleep : " + SLEEP_BETWEEN_SENDING); 65 if(SLEEP_BETWEEN_SENDING) 66 System.out.println("sleep time : " + SLEEP_TIME); 67 68 System.out.println("msg num : " + MESSAGE_NUMBER); 69 70 71 } 72 73 77 protected void setUp() throws Exception { 78 super.setUp(); 79 printSelectedOptions(); 80 81 82 83 84 channel1=new JChannel(properties); 85 System.out.print("Connecting to channel..."); 86 channel1.connect(groupName); 87 System.out.println("channel1 connected, view is " + channel1.getView()); 88 89 adapter1=new PullPushAdapter(channel1); 90 91 try { 93 Thread.sleep(1000); 94 } 95 catch(InterruptedException ex) { 96 } 97 98 channel2=new JChannel(properties); 99 channel2.connect(groupName); 100 System.out.println("channel2 connected, view is " + channel2.getView()); 101 102 adapter2=new PullPushAdapter(channel2); 103 104 try { 106 Thread.sleep(1000); 107 } 108 catch(InterruptedException ex) { 109 } 110 111 } 112 113 116 protected void tearDown() throws Exception { 117 super.tearDown(); 118 119 adapter2.stop(); 120 channel2.close(); 121 122 adapter1.stop(); 123 channel1.close(); 124 } 125 126 protected boolean finishedReceiving; 127 128 141 public void testLoad() { 142 try { 143 final String startMessage="start"; 144 final String stopMessage="stop"; 145 146 final Object mutex=new Object (); 147 148 final Vector receivedTimes=new Vector (MESSAGE_NUMBER); 149 final Vector normalMessages=new Vector (MESSAGE_NUMBER); 150 final Vector tooQuickMessages=new Vector (); 151 final Vector tooSlowMessages=new Vector (); 152 153 adapter1.setListener(new MessageListener() { 154 private boolean started=false; 155 private boolean stopped=false; 156 157 private long counter = 0L; 158 159 public byte[] getState() { 160 return null; 161 } 162 163 public void setState(byte[] state) { 164 } 165 166 public void receive(Message jgMessage) { 167 Object message=jgMessage.getObject(); 168 169 if(startMessage.equals(message)) { 170 started=true; 171 finishedReceiving=false; 172 } 173 else if(stopMessage.equals(message)) { 174 stopped=true; 175 finishedReceiving=true; 176 177 synchronized(mutex) { 178 mutex.notifyAll(); 179 } 180 181 } 182 else if(message instanceof Long ) { 183 Long travelTime=new Long (System.currentTimeMillis() - ((Long )message).longValue()); 184 185 try { 186 assertEquals(counter, ((EncryptOrderTestHeader)((Message)jgMessage).getHeader("EncryptOrderTest")).seqno); 187 counter++; 188 } catch (Exception e){ 189 log.warn(e); 190 orderCounterFailure =true; 191 } 192 if(!started) 193 tooQuickMessages.add(message); 194 else if(started && !stopped) { 195 receivedTimes.add(travelTime); 196 normalMessages.add(message); 197 } 198 else 199 tooSlowMessages.add(message); 200 } 201 } 202 }); 203 204 System.out.println("Free memory: " + Runtime.getRuntime().freeMemory()); 205 System.out.println("Total memory: " + Runtime.getRuntime().totalMemory()); 206 System.out.println("Starting sending messages."); 207 208 long time=System.currentTimeMillis(); 209 210 Message startJgMessage=new Message(); 211 startJgMessage.setObject(startMessage); 212 213 JChannel sender= channel2; 214 215 sender.send(startJgMessage); 216 217 for(int i=0; i < MESSAGE_NUMBER; i++) { 218 Long message=new Long (System.currentTimeMillis()); 219 220 221 Message jgMessage=new Message(); 222 jgMessage.putHeader("EncryptOrderTest", new EncryptOrderTestHeader(i)); 223 jgMessage.setObject(message); 224 225 sender.send(jgMessage); 226 227 if(i % 1000 == 0) 228 System.out.println("sent " + i + " messages."); 229 230 if(SLEEP_BETWEEN_SENDING) 231 org.jgroups.util.Util.sleep(1, true); 232 } 233 234 Message stopJgMessage=new Message(); 235 stopJgMessage.setObject(stopMessage); 236 sender.send(stopJgMessage); 237 238 time=System.currentTimeMillis() - time; 239 240 System.out.println("Finished sending messages. Operation took " + time); 241 242 synchronized(mutex) { 243 244 int received=0; 245 246 while(!finishedReceiving) { 247 mutex.wait(1000); 248 249 if(receivedTimes.size() != received) { 250 received=receivedTimes.size(); 251 252 System.out.println(); 253 System.out.print("Received " + receivedTimes.size() + " messages."); 254 } 255 else { 256 System.out.print("."); 257 } 258 } 259 } 260 261 try { 262 Thread.sleep(1000); 263 } 264 catch(Exception ex) { 265 } 266 267 double avgDeliveryTime=-1.0; 268 long maxDeliveryTime=Long.MIN_VALUE; 269 long minDeliveryTime=Long.MAX_VALUE; 270 271 Iterator iterator=receivedTimes.iterator(); 272 while(iterator.hasNext()) { 273 Long message=(Long )iterator.next(); 274 275 if(avgDeliveryTime == -1.0) 276 avgDeliveryTime=message.longValue(); 277 else 278 avgDeliveryTime=(avgDeliveryTime + message.doubleValue()) / 2.0; 279 280 if(message.longValue() > maxDeliveryTime) 281 maxDeliveryTime=message.longValue(); 282 283 if(message.longValue() < minDeliveryTime) 284 minDeliveryTime=message.longValue(); 285 } 286 287 System.out.println("Sent " + MESSAGE_NUMBER + " messages."); 288 System.out.println("Received " + receivedTimes.size() + " messages."); 289 System.out.println("Average delivery time " + avgDeliveryTime + " ms"); 290 System.out.println("Minimum delivery time " + minDeliveryTime + " ms"); 291 System.out.println("Maximum delivery time " + maxDeliveryTime + " ms"); 292 System.out.println("Received " + tooQuickMessages.size() + " too quick messages"); 293 System.out.println("Received " + tooSlowMessages.size() + " too slow messages"); 294 } 295 catch(Exception ex) { 296 ex.printStackTrace(); 297 } 298 299 System.out.println("Free memory: " + Runtime.getRuntime().freeMemory()); 300 System.out.println("Total memory: " + Runtime.getRuntime().totalMemory()); 301 302 System.out.println("Performing GC"); 303 304 Runtime.getRuntime().gc(); 305 306 try { 307 Thread.sleep(2000); 308 } 309 catch(InterruptedException ex) { 310 } 311 312 System.out.println("Free memory: " + Runtime.getRuntime().freeMemory()); 313 System.out.println("Total memory: " + Runtime.getRuntime().totalMemory()); 314 315 assertTrue("Message ordering is incorrect - check log output",(!orderCounterFailure)); 316 } 317 318 319 320 329 public static void main(String [] args) { 330 for(int i=0; i < args.length; i++) { 331 if("-sleep".equals(args[i])) { 332 SLEEP_BETWEEN_SENDING=true; 333 if(!(i < args.length - 1)) 334 throw new RuntimeException ("You have to specify sleep time"); 335 336 try { 337 SLEEP_TIME=Integer.parseInt(args[++i]); 338 } 339 catch(NumberFormatException nfex) { 340 throw new RuntimeException ("Cannot parse sleep time"); 341 } 342 343 } 344 else if("-msg_num".equals(args[i])) { 345 if(!(i < args.length - 1)) 346 throw new RuntimeException ("You have to specify messages number"); 347 348 try { 349 MESSAGE_NUMBER=Integer.parseInt(args[++i]); 350 } 351 catch(NumberFormatException nfex) { 352 throw new RuntimeException ("Cannot parse messages number"); 353 } 354 355 } 356 357 else if("-help".equals(args[i])) { 358 help(); 359 return; 360 } 361 } 362 363 junit.textui.TestRunner.run(EncryptMessageOrderTestCase.class); 364 } 365 366 static void help() { 367 System.out.println("EncryptOrderTest [-help] [-sleep <sleep time between sends (ms)>] " + 368 " [-msg_num <number of msgs to send>]"); 369 } 370 371 public static class EncryptOrderTestHeader extends Header{ 372 373 long seqno = -1; 375 public EncryptOrderTestHeader(){} 376 377 public EncryptOrderTestHeader(long seqno){ 378 379 this.seqno = seqno; 380 } 381 382 public int size(){ 383 return 512; 384 } 385 386 public void writeExternal(ObjectOutput out) throws IOException { 387 388 out.writeLong(seqno); 389 } 390 391 public void readExternal(ObjectInput in) throws IOException , ClassNotFoundException { 392 393 seqno = in.readLong(); 394 395 } 396 397 public EncryptOrderTestHeader copy(){ 398 EncryptOrderTestHeader ret = new EncryptOrderTestHeader(seqno); 399 return ret; 400 } 401 402 public String toString(){ 403 StringBuffer ret = new StringBuffer (); 404 ret.append("[ENCRYPT_ORDER_TEST: seqno=" + seqno); 405 ret.append(']'); 406 407 return ret.toString(); 408 } 409 } 410 411 } 412 | Popular Tags |