1 package org.jgroups.tests; 2 3 import java.io.IOException ; 4 import java.io.InputStream ; 5 import java.io.ObjectInputStream ; 6 import java.io.ObjectOutputStream ; 7 import java.io.OutputStream ; 8 import java.util.LinkedList ; 9 import java.util.List ; 10 import java.util.Map ; 11 import java.util.TreeMap ; 12 import java.util.concurrent.Semaphore ; 13 14 import junit.framework.Test; 15 import junit.framework.TestSuite; 16 17 import org.jgroups.Channel; 18 import org.jgroups.JChannelFactory; 19 import org.jgroups.Message; 20 import org.jgroups.View; 21 import org.jgroups.util.Util; 22 23 24 29 public class ConcurrentStartupTest extends ChannelTestBase 30 { 31 32 private int mod = 1; 33 34 public void setUp() throws Exception 35 { 36 super.setUp(); 37 mod = 1; 38 CHANNEL_CONFIG = System.getProperty("channel.conf.flush", "flush-udp.xml"); 39 } 40 41 public boolean useBlocking() 42 { 43 return true; 44 } 45 46 public void testConcurrentStartupLargeState() 47 { 48 concurrentStartupHelper(true,false); 49 } 50 51 public void testConcurrentStartupSmallState() 52 { 53 concurrentStartupHelper(false,true); 54 } 55 56 68 protected void concurrentStartupHelper(boolean largeState,boolean useDispatcher) 69 { 70 String [] names = null; 71 72 if(isMuxChannelUsed()) 74 { 75 names = createMuxApplicationNames(1); 76 } 77 else 78 { 79 names = new String []{"A", "B", "C", "D"}; 80 } 81 82 int count = names.length; 83 84 ConcurrentStartupChannel[] channels = new ConcurrentStartupChannel[count]; 85 try 86 { 87 Semaphore semaphore = new Semaphore (count); 89 takeAllPermits(semaphore, count); 90 91 for (int i = 0; i < count; i++) 93 { 94 if(largeState) 95 { 96 if(isMuxChannelUsed()) 97 { 98 channels[i] = new ConcurrentStartupChannelWithLargeState(names[i],muxFactory[i%getMuxFactoryCount()],semaphore); 99 } 100 else 101 { 102 channels[i] = new ConcurrentStartupChannelWithLargeState(semaphore, names[i],useDispatcher); 103 } 104 } 105 else 106 { 107 108 if(isMuxChannelUsed()) 109 { 110 channels[i] = new ConcurrentStartupChannel(names[i],muxFactory[i%getMuxFactoryCount()],semaphore); 111 } 112 else 113 { 114 channels[i] = new ConcurrentStartupChannel(names[i],semaphore,useDispatcher); 115 } 116 } 117 118 channels[i].start(); 120 semaphore.release(1); 121 sleepRandom(1500); 122 } 123 124 if(isMuxChannelUsed()) 126 { 127 blockUntilViewsReceived(channels,getMuxFactoryCount(), 60000); 128 } 129 else 130 { 131 blockUntilViewsReceived(channels, 60000); 132 } 133 134 Util.sleep(1000); 136 137 acquireSemaphore(semaphore, 60000, count); 140 141 Util.sleep(6000); 143 144 List [] lists = new List [count]; 146 for (int i = 0; i < count; i++) 147 { 148 lists[i] = channels[i].getList(); 149 } 150 151 Map [] mods = new Map [count]; 152 for (int i = 0; i < count; i++) 153 { 154 mods[i] = channels[i].getModifications(); 155 } 156 157 printLists(lists); 158 printModifications(mods); 159 160 int len = lists.length; 161 for (int i = 0; i < lists.length; i++) 162 { 163 List l = lists[i]; 164 assertEquals("list #" + i + " should have " + len + " elements", len, l.size()); 165 } 166 } 167 catch (Exception ex) 168 { 169 log.warn("Exception encountered during test",ex); 170 } 171 finally 172 { 173 for (int i = 0; i < count; i++) 174 { 175 Util.sleep(500); 176 channels[i].cleanup(); 177 } 178 } 179 } 180 181 public void testConcurrentLargeStateTranfer() 182 { 183 concurrentStateTranferHelper(true,false); 184 } 185 186 public void testConcurrentSmallStateTranfer() 187 { 188 concurrentStateTranferHelper(false,true); 189 } 190 191 192 193 201 protected void concurrentStateTranferHelper(boolean largeState, boolean useDispatcher) 202 { 203 String [] names = null; 204 205 if(isMuxChannelUsed()) 207 { 208 names = createMuxApplicationNames(1); 209 } 210 else 211 { 212 names = new String []{"A", "B", "C", "D"}; 213 } 214 215 int count = names.length; 216 ConcurrentStateTransfer[] channels = new ConcurrentStateTransfer[count]; 217 218 Semaphore semaphore = new Semaphore (count); 220 takeAllPermits(semaphore, count); 221 222 try 223 { 224 225 for (int i = 0; i < count; i++) 227 { 228 if(largeState) 229 { 230 if(isMuxChannelUsed()) 231 { 232 channels[i] = new ConcurrentLargeStateTransfer(names[i],muxFactory[i%getMuxFactoryCount()],semaphore); 233 } 234 else 235 { 236 channels[i] = new ConcurrentLargeStateTransfer(names[i],semaphore,useDispatcher); 237 } 238 } 239 else 240 { 241 if(isMuxChannelUsed()) 242 { 243 channels[i] = new ConcurrentStateTransfer(names[i],muxFactory[i%getMuxFactoryCount()],semaphore); 244 } 245 else 246 { 247 channels[i] = new ConcurrentStateTransfer(names[i],semaphore,useDispatcher); 248 } 249 } 250 251 channels[i].start(); 253 Util.sleep(2000); 254 } 255 256 if(isMuxChannelUsed()) 258 { 259 blockUntilViewsReceived(channels,getMuxFactoryCount(), 60000); 260 } 261 else 262 { 263 blockUntilViewsReceived(channels, 60000); 264 } 265 266 Util.sleep(2000); 267 semaphore.release(count); 269 270 Util.sleep(2000); 272 273 acquireSemaphore(semaphore, 60000, count); 276 277 Util.sleep(6000); 279 List [] lists = new List [count]; 281 for (int i = 0; i < count; i++) 282 { 283 lists[i] = channels[i].getList(); 284 } 285 286 Map [] mods = new Map [count]; 287 for (int i = 0; i < count; i++) 288 { 289 mods[i] = channels[i].getModifications(); 290 } 291 292 printLists(lists); 293 printModifications(mods); 294 295 int len = lists.length; 296 for (int i = 0; i < lists.length; i++) 297 { 298 List l = lists[i]; 299 assertEquals("list #" + i + " should have " + len + " elements", len, l.size()); 300 } 301 } 302 catch (Exception ex) 303 { 304 log.warn("Exception encountered during test",ex); 305 } 306 finally 307 { 308 for (int i = 0; i < count; i++) 309 { 310 Util.sleep(500); 311 channels[i].cleanup(); 312 } 313 } 314 } 315 316 protected int getMod() 317 { 318 synchronized (this) 319 { 320 int retval = mod; 321 mod++; 322 return retval; 323 } 324 } 325 326 protected void printModifications(Map [] modifications) 327 { 328 for (int i = 0; i < modifications.length; i++) 329 { 330 Map modification = modifications[i]; 331 log.info("modifications for #" + i + ": " + modification); 332 } 333 } 334 335 protected void printLists(List [] lists) 336 { 337 for (int i = 0; i < lists.length; i++) 338 { 339 List l = lists[i]; 340 log.info(i + ": " + l); 341 } 342 } 343 344 protected class ConcurrentStateTransfer extends ConcurrentStartupChannel 345 { 346 public ConcurrentStateTransfer(String name,Semaphore semaphore,boolean useDispatcher) throws Exception 347 { 348 super(name,semaphore,useDispatcher); 349 channel.connect("test"); 350 } 351 352 public ConcurrentStateTransfer(String name,JChannelFactory factory, Semaphore semaphore) throws Exception 353 { 354 super(name,factory,semaphore); 355 channel.connect("test"); 356 } 357 358 public void useChannel() throws Exception 359 { 360 boolean success = channel.getState(null, 30000); 361 log.info("channel.getState at " + getName() + getLocalAddress() + " returned " + success); 362 channel.send(null, null, channel.getLocalAddress()); 363 } 364 } 365 366 protected class ConcurrentLargeStateTransfer extends ConcurrentStateTransfer 367 { 368 public ConcurrentLargeStateTransfer(String name,Semaphore semaphore,boolean useDispatcher) throws Exception 369 { 370 super(name,semaphore,useDispatcher); 371 } 372 373 public ConcurrentLargeStateTransfer(String name,JChannelFactory factory,Semaphore semaphore) throws Exception 374 { 375 super(name,factory,semaphore); 376 } 377 378 public void setState(byte[] state) 379 { 380 super.setState(state); 381 Util.sleep(5000); 382 } 383 384 public byte[] getState() 385 { 386 Util.sleep(5000); 387 return super.getState(); 388 } 389 390 public void getState(OutputStream ostream) 391 { 392 super.getState(ostream); 393 Util.sleep(5000); 394 } 395 396 public void setState(InputStream istream) 397 { 398 super.setState(istream); 399 Util.sleep(5000); 400 } 401 } 402 403 protected class ConcurrentStartupChannelWithLargeState extends ConcurrentStartupChannel 404 { 405 public ConcurrentStartupChannelWithLargeState(Semaphore semaphore, String name,boolean useDispatcher) throws Exception 406 { 407 super(name,semaphore,useDispatcher); 408 } 409 410 public ConcurrentStartupChannelWithLargeState(String name,JChannelFactory f,Semaphore semaphore) throws Exception 411 { 412 super(name,f,semaphore); 413 } 414 415 public void setState(byte[] state) 416 { 417 super.setState(state); 418 Util.sleep(5000); 419 } 420 421 public byte[] getState() 422 { 423 Util.sleep(5000); 424 return super.getState(); 425 } 426 427 public void getState(OutputStream ostream) 428 { 429 super.getState(ostream); 430 Util.sleep(5000); 431 } 432 433 public void setState(InputStream istream) 434 { 435 super.setState(istream); 436 Util.sleep(5000); 437 } 438 } 439 440 441 protected class ConcurrentStartupChannel extends PushChannelApplicationWithSemaphore 442 { 443 final List l = new LinkedList (); 444 445 Channel ch; 446 447 int modCount = 1; 448 449 final Map mods = new TreeMap (); 450 451 public ConcurrentStartupChannel(String name,Semaphore semaphore) throws Exception 452 { 453 super(name,semaphore,true); 454 } 455 456 public ConcurrentStartupChannel(String name,JChannelFactory f,Semaphore semaphore) throws Exception 457 { 458 super(name,f,semaphore); 459 } 460 461 public ConcurrentStartupChannel(String name,Semaphore semaphore,boolean useDispatcher) throws Exception 462 { 463 super(name,semaphore,useDispatcher); 464 } 465 466 public void useChannel() throws Exception 467 { 468 channel.connect("test"); 469 channel.getState(null, 25000); 470 channel.send(null, null, channel.getLocalAddress()); 471 } 472 473 List getList() 474 { 475 return l; 476 } 477 478 Map getModifications() 479 { 480 return mods; 481 } 482 483 public void receive(Message msg) 484 { 485 if (msg.getBuffer() == null) 486 return; 487 Object obj = msg.getObject(); 488 synchronized (this) 489 { 490 l.add(obj); 491 Integer key = new Integer (getMod()); 492 mods.put(key, obj); 493 } 494 } 495 496 public void viewAccepted(View new_view) 497 { 498 super.viewAccepted(new_view); 499 synchronized (this) 500 { 501 Integer key = new Integer (getMod()); 502 mods.put(key, new_view.getVid()); 503 } 504 } 505 506 public void setState(byte[] state) 507 { 508 super.setState(state); 509 try 510 { 511 List tmp = (List ) Util.objectFromByteBuffer(state); 512 synchronized (this) 513 { 514 l.clear(); 515 l.addAll(tmp); 516 log.info("-- [#" + getName() + " (" + channel.getLocalAddress() + ")]: state is " + l); 517 Integer key = new Integer (getMod()); 518 mods.put(key, tmp); 519 } 520 } 521 catch (Exception e) 522 { 523 e.printStackTrace(); 524 } 525 } 526 527 public byte[] getState() 528 { 529 super.getState(); 530 List tmp = null; 531 synchronized (this) 532 { 533 tmp = new LinkedList (l); 534 try 535 { 536 return Util.objectToByteBuffer(tmp); 537 } 538 catch (Exception e) 539 { 540 e.printStackTrace(); 541 return null; 542 } 543 } 544 } 545 546 public void getState(OutputStream ostream) 547 { 548 super.getState(ostream); 549 ObjectOutputStream oos = null; 550 try 551 { 552 oos = new ObjectOutputStream (ostream); 553 List tmp = null; 554 synchronized (this) 555 { 556 tmp = new LinkedList (l); 557 } 558 oos.writeObject(tmp); 559 oos.flush(); 560 } 561 catch (IOException e) 562 { 563 e.printStackTrace(); 564 } 565 finally 566 { 567 Util.close(oos); 568 } 569 } 570 571 public void setState(InputStream istream) 572 { 573 super.setState(istream); 574 ObjectInputStream ois = null; 575 try 576 { 577 ois = new ObjectInputStream (istream); 578 List tmp = (List ) ois.readObject(); 579 synchronized (this) 580 { 581 l.clear(); 582 l.addAll(tmp); 583 log.info("-- [#" + getName() + " (" + channel.getLocalAddress() + ")]: state is " + l); 584 Integer key = new Integer (getMod()); 585 mods.put(key, tmp); 586 } 587 } 588 catch (Exception e) 589 { 590 e.printStackTrace(); 591 } 592 finally 593 { 594 Util.close(ois); 595 } 596 } 597 } 598 599 public static Test suite() 600 { 601 return new TestSuite(ConcurrentStartupTest.class); 602 } 603 604 public static void main(String [] args) 605 { 606 String [] testCaseName = {ConcurrentStartupTest.class.getName()}; 607 junit.textui.TestRunner.main(testCaseName); 608 } 609 } 610 | Popular Tags |