1 package org.jgroups.tests; 2 3 import junit.framework.Test; 4 import junit.framework.TestSuite; 5 import org.jgroups.*; 6 import org.jgroups.mux.MuxChannel; 7 import org.jgroups.stack.IpAddress; 8 import org.jgroups.stack.ProtocolStack; 9 import org.jgroups.stack.Protocol; 10 import org.jgroups.util.Util; 11 12 import java.util.*; 13 import java.io.*; 14 15 20 public class MultiplexerTest extends ChannelTestBase { 21 private Cache c1, c2, c1_repl, c2_repl; 22 private Channel ch1, ch2, ch1_repl, ch2_repl; 23 JChannelFactory factory, factory2; 24 25 public MultiplexerTest(String name) { 26 super(name); 27 } 28 29 30 public void setUp() throws Exception { 31 super.setUp(); 32 factory=new JChannelFactory(); 33 factory.setMultiplexerConfig(MUX_CHANNEL_CONFIG); 34 35 factory2=new JChannelFactory(); 36 factory2.setMultiplexerConfig(MUX_CHANNEL_CONFIG); 37 } 38 39 public void tearDown() throws Exception { 40 if(ch1_repl != null) 41 ch1_repl.close(); 42 if(ch2_repl != null) 43 ch2_repl.close(); 44 if(ch1 != null) 45 ch1.close(); 46 if(ch2 != null) 47 ch2.close(); 48 if(ch1 != null) { 49 assertFalse(((MuxChannel)ch1).getChannel().isOpen()); 50 assertFalse(((MuxChannel)ch1).getChannel().isConnected()); 51 } 52 if(ch2 != null) { 53 assertFalse(((MuxChannel)ch2).getChannel().isOpen()); 54 assertFalse(((MuxChannel)ch2).getChannel().isConnected()); 55 } 56 if(ch1_repl != null) { 57 assertFalse(((MuxChannel)ch1_repl).getChannel().isOpen()); 58 assertFalse(((MuxChannel)ch1_repl).getChannel().isConnected()); 59 } 60 if(ch2_repl != null) { 61 assertFalse(((MuxChannel)ch2_repl).getChannel().isOpen()); 62 assertFalse(((MuxChannel)ch2_repl).getChannel().isConnected()); 63 } 64 65 if(c1 != null) c1.clear(); 66 if(c2 != null) c2.clear(); 67 if(c1_repl != null) c1_repl.clear(); 68 if(c2_repl != null) c2_repl.clear(); 69 70 ch1_repl=ch2_repl=ch1=ch2=null; 71 c1=c2=c1_repl=c2_repl=null; 72 73 super.tearDown(); 74 } 75 76 77 public void testReplicationWithOneChannel() throws Exception { 78 ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1"); 79 ch1.connect("bla"); 80 c1=new Cache(ch1, "cache-1"); 81 assertEquals("cache has to be empty initially", 0, c1.size()); 82 c1.put("name", "Bela"); 83 Util.sleep(300); assertEquals(1, c1.size()); 85 assertEquals("Bela", c1.get("name")); 86 } 87 88 89 public void testLifecycle() throws Exception { 90 ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1"); 91 assertTrue(ch1.isOpen()); 92 assertFalse(ch1.isConnected()); 93 94 ch1.connect("bla"); 95 assertTrue(ch1.isOpen()); 96 assertTrue(ch1.isConnected()); 97 98 ch2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2"); 99 assertTrue(ch2.isOpen()); 100 assertFalse(ch2.isConnected()); 101 102 ch2.connect("bla"); 103 assertTrue(ch2.isOpen()); 104 assertTrue(ch2.isConnected()); 105 106 ch2.disconnect(); 107 assertTrue(ch2.isOpen()); 108 assertFalse(ch2.isConnected()); 109 110 ch2.connect("bla"); 111 assertTrue(ch2.isOpen()); 112 assertTrue(ch2.isConnected()); 113 114 ch2.disconnect(); 115 assertTrue(ch2.isOpen()); 116 assertFalse(ch2.isConnected()); 117 118 ch2.close(); 119 assertFalse(ch2.isOpen()); 120 assertFalse(ch2.isConnected()); 121 122 ch2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2"); 123 ch2.connect("bla"); 124 assertTrue(ch2.isOpen()); 125 assertTrue(ch2.isConnected()); 126 127 ch2.close(); 128 assertFalse(ch2.isOpen()); 129 assertFalse(ch2.isConnected()); 130 } 131 132 133 public void testDisconnect() throws Exception { 134 ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1"); 135 assertTrue(ch1.isOpen()); 136 assertFalse(ch1.isConnected()); 137 assertTrue(((MuxChannel)ch1).getChannel().isOpen()); 138 assertFalse(((MuxChannel)ch1).getChannel().isConnected()); 139 140 ch1.connect("bla"); 141 assertTrue(ch1.isOpen()); 142 assertTrue(ch1.isConnected()); 143 assertTrue(((MuxChannel)ch1).getChannel().isOpen()); 144 assertTrue(((MuxChannel)ch1).getChannel().isConnected()); 145 146 ch2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2"); 147 assertTrue(ch2.isOpen()); 148 assertFalse(ch2.isConnected()); 149 150 ch1.disconnect(); 151 assertTrue(ch1.isOpen()); 152 assertFalse(ch1.isConnected()); 153 154 ch1.connect("bla"); 155 assertTrue(ch1.isOpen()); 156 assertTrue(ch1.isConnected()); 157 158 ch1.close(); 159 assertFalse(ch1.isOpen()); 160 assertFalse(ch1.isConnected()); 161 assertTrue(((MuxChannel)ch1).getChannel().isOpen()); 162 assertTrue(((MuxChannel)ch1).getChannel().isConnected()); 163 164 ch2.close(); 165 assertFalse(ch2.isOpen()); 166 assertFalse(ch2.isConnected()); 167 } 168 169 public void testDisconnect2() throws Exception { 170 ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1"); 171 assertTrue(ch1.isOpen()); 172 assertFalse(ch1.isConnected()); 173 174 ch1.connect("bla"); 175 assertTrue(ch1.isOpen()); 176 assertTrue(ch1.isConnected()); 177 178 ch2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2"); 179 assertTrue(ch2.isOpen()); 180 assertFalse(ch2.isConnected()); 181 182 ch1.disconnect(); 183 assertTrue(ch1.isOpen()); 184 assertFalse(ch1.isConnected()); 185 186 assertTrue(ch2.isOpen()); 187 assertFalse(ch2.isConnected()); 188 189 ch1.connect("bla"); 190 assertTrue(ch1.isOpen()); 191 assertTrue(ch1.isConnected()); 192 193 assertTrue(ch2.isOpen()); 194 assertFalse(ch2.isConnected()); 195 } 196 197 198 public void testClose() throws Exception { 199 ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1"); 200 ch1.connect("bla"); 201 ch2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2"); 202 ch2.connect("bla"); 203 ch1.close(); 204 ch2.close(); 205 } 206 207 208 public void testReplicationWithTwoChannels() throws Exception { 209 ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1"); 210 c1=new Cache(ch1, "cache-1"); 211 assertEquals("cache has to be empty initially", 0, c1.size()); 212 ch1.connect("bla"); 213 214 ch1_repl=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1"); 215 c1_repl=new Cache(ch1_repl, "cache-1-repl"); 216 assertEquals("cache has to be empty initially", 0, c1_repl.size()); 217 ch1_repl.connect("bla"); 218 Util.sleep(200); 219 220 View v=ch1_repl.getView(); 221 assertNotNull(v); 222 assertEquals("view is " + v, 2, v.size()); 223 v=ch1.getView(); 224 assertNotNull(v); 225 assertEquals(2, v.size()); 226 227 c1.put("name", "Bela"); 229 if(ch1.flushSupported()) { 230 boolean success=ch1.startFlush(5000, true); 231 System.out.println("startFlush(): " + success); 232 assertTrue(success); 233 } 234 else 235 Util.sleep(10000); 236 237 System.out.println("c1: " + c1 + ", c1_repl: " + c1_repl); 238 239 assertEquals(1, c1.size()); 240 assertEquals("Bela", c1.get("name")); 241 242 Util.sleep(500); assertEquals(1, c1_repl.size()); 244 assertEquals("Bela", c1_repl.get("name")); 245 246 c1.put("id", new Long (322649)); 247 c1_repl.put("hobbies", "biking"); 248 c1_repl.put("bike", "Centurion"); 249 if(ch1.flushSupported()) { 250 boolean success=ch1.startFlush(5000, true); 251 System.out.println("startFlush(): " + success); 252 assertTrue(success); 253 } 254 else 255 Util.sleep(10000); 256 257 System.out.println("c1: " + c1 + ", c1_repl: " + c1_repl); 258 259 assertEquals("c1: " + c1, 4, c1.size()); 260 assertEquals("c1_repl: " + c1_repl, 4, c1_repl.size()); 261 262 assertEquals(new Long (322649), c1.get("id")); 263 assertEquals(new Long (322649), c1_repl.get("id")); 264 265 assertEquals("biking", c1.get("hobbies")); 266 assertEquals("biking", c1_repl.get("hobbies")); 267 268 assertEquals("Centurion", c1.get("bike")); 269 assertEquals("Centurion", c1_repl.get("bike")); 270 } 271 272 273 public void testVirtualSynchrony() throws Exception { 274 ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1"); 275 c1=new Cache(ch1, "cache-1"); 276 ch1.connect("bla"); 277 278 ch1_repl=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1"); 279 c1_repl=new Cache(ch1_repl, "cache-1-repl"); 280 ch1_repl.connect("bla"); 281 assertEquals("view: " + ch1.getView(), 2, ch1.getView().size()); 282 283 flush(ch1, 5000); 286 for(int i=1; i <= 20; i++) { 287 if(i % 2 == 0) { 288 c1.put(i, Boolean.TRUE); } 290 else { 291 c1_repl.put(i, Boolean.TRUE); } 293 } 294 295 flush(ch1, 5000); 296 System.out.println("c1 (" + c1.size() + " elements):\n" + c1.printKeys() + 297 "\nc1_repl (" + c1_repl.size() + " elements):\n" + c1_repl.printKeys()); 298 assertEquals(c1.size(), c1_repl.size()); 299 assertEquals(20, c1.size()); 300 } 301 302 303 private static void flush(Channel channel, long timeout) { 304 if(channel.flushSupported()) { 305 boolean success=channel.startFlush(timeout, true); 306 System.out.println("startFlush(): " + success); 307 assertTrue(success); 308 } 309 else 310 Util.sleep(timeout); 311 } 312 313 public void testReplicationWithReconnect() throws Exception { 314 ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1"); 315 ch1.connect("bla"); 316 c1=new Cache(ch1, "cache-1"); 317 assertEquals("cache has to be empty initially", 0, c1.size()); 318 c1.put("name", "Bela"); 319 Util.sleep(300); assertEquals(1, c1.size()); 321 assertEquals("Bela", c1.get("name")); 322 323 ch1.disconnect(); 324 325 ch1.connect("bla"); 326 327 c2=new Cache(ch1, "cache-1"); 328 assertEquals("cache has to be empty initially", 0, c2.size()); 329 c2.put("name", "Bela"); 330 Util.sleep(300); assertEquals(1, c2.size()); 332 assertEquals("Bela", c2.get("name")); 333 334 } 335 336 337 public void testStateTransfer() throws Exception { 338 ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1"); 339 ch1.connect("bla"); 340 c1=new Cache(ch1, "cache-1"); 341 assertEquals("cache has to be empty initially", 0, c1.size()); 342 343 ch1_repl=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1"); 344 345 c1.put("name", "Bela"); 346 c1.put("id", new Long (322649)); 347 c1.put("hobbies", "biking"); 348 c1.put("bike", "Centurion"); 349 350 351 ch1_repl.connect("bla"); 352 c1_repl=new Cache(ch1_repl, "cache-1-repl"); 353 boolean rc=ch1_repl.getState(null, 5000); 354 System.out.println("state transfer: " + rc); 355 Util.sleep(500); 356 357 System.out.println("c1_repl: " + c1_repl); 358 assertEquals("initial state should have been transferred", 4, c1_repl.size()); 359 360 assertEquals(new Long (322649), c1.get("id")); 361 assertEquals(new Long (322649), c1_repl.get("id")); 362 363 assertEquals("biking", c1.get("hobbies")); 364 assertEquals("biking", c1_repl.get("hobbies")); 365 366 assertEquals("Centurion", c1.get("bike")); 367 assertEquals("Centurion", c1_repl.get("bike")); 368 } 369 370 371 public void testStateTransferWithTwoApplications() throws Exception { 372 ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1"); 373 ch1.connect("bla"); 374 c1=new Cache(ch1, "cache-1"); 375 assertEquals("cache has to be empty initially", 0, c1.size()); 376 377 ch2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2"); 378 ch2.connect("bla"); 379 c2=new Cache(ch2, "cache-2"); 380 assertEquals("cache has to be empty initially", 0, c2.size()); 381 382 ch1_repl=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1"); 383 384 ch2_repl=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2"); 385 386 387 c1.put("name", "cache-1"); 388 c2.put("name", "cache-2"); 389 390 ch1_repl.connect("bla"); 391 c1_repl=new Cache(ch1_repl, "cache-1-repl"); 392 boolean rc=ch1_repl.getState(null, 5000); 393 System.out.println("state transfer: " + rc); 394 395 ch2_repl.connect("bla"); 396 c2_repl=new Cache(ch2_repl, "cache-2-repl"); 397 rc=ch2_repl.getState(null, 5000); 398 System.out.println("state transfer: " + rc); 399 Util.sleep(500); 400 401 System.out.println("Caches after state transfers:"); 402 System.out.println("c1: " + c1); 403 System.out.println("c1_repl: " + c1_repl); 404 System.out.println("c2: " + c2); 405 System.out.println("c2_repl: " + c2_repl); 406 407 assertEquals(1, c1.size()); 408 assertEquals(1, c1_repl.size()); 409 410 assertEquals(1, c2.size()); 411 assertEquals(1, c2_repl.size()); 412 413 assertEquals("cache-1", c1.get("name")); 414 assertEquals("cache-1", c1_repl.get("name")); 415 416 assertEquals("cache-2", c2.get("name")); 417 assertEquals("cache-2", c2_repl.get("name")); 418 } 419 420 421 public void testStateTransferWithRegistration() throws Exception { 422 ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1"); 423 ch1.connect("bla"); 424 c1=new Cache(ch1, "cache-1"); 425 assertEquals("cache has to be empty initially", 0, c1.size()); 426 427 ch2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2"); 428 ch2.connect("bla"); 429 c2=new Cache(ch2, "cache-2"); 430 assertEquals("cache has to be empty initially", 0, c2.size()); 431 c1.put("name", "cache-1"); 432 c2.put("name", "cache-2"); 433 434 ch1_repl=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1", true, null); ch2_repl=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2", true, null); 437 ch1_repl.connect("bla"); 438 c1_repl=new Cache(ch1_repl, "cache-1-repl"); 439 boolean rc=ch1_repl.getState(null, 5000); System.out.println("state transfer: " + rc); 441 442 ch2_repl.connect("bla"); 443 c2_repl=new Cache(ch2_repl, "cache-2-repl"); 444 rc=ch2_repl.getState(null, 5000); System.out.println("state transfer: " + rc); 446 Util.sleep(500); 447 448 System.out.println("Caches after state transfers:"); 449 System.out.println("c1: " + c1); 450 System.out.println("c1_repl: " + c1_repl); 451 System.out.println("c2: " + c2); 452 System.out.println("c2_repl: " + c2_repl); 453 454 assertEquals(1, c1.size()); 455 assertEquals(1, c1_repl.size()); 456 457 assertEquals(1, c2.size()); 458 assertEquals(1, c2_repl.size()); 459 460 assertEquals("cache-1", c1.get("name")); 461 assertEquals("cache-1", c1_repl.get("name")); 462 463 assertEquals("cache-2", c2.get("name")); 464 assertEquals("cache-2", c2_repl.get("name")); 465 c1.clear(); 466 c1_repl.clear(); 467 c2.clear(); 468 c2_repl.clear(); 469 } 470 471 472 private void setCorrectPortRange(Channel ch) { 473 ProtocolStack stack=((MuxChannel)ch).getProtocolStack(); 474 Protocol tcpping=stack.findProtocol("TCPPING"); 475 if(tcpping == null) 476 return; 477 478 Properties props=tcpping.getProperties(); 479 String port_range=props.getProperty("port_range"); 480 if(port_range != null) { 481 System.out.println("port_range in TCPPING: " + port_range + ", setting it to 2"); 482 port_range="2"; 483 Properties p=new Properties(); 484 p.setProperty("port_range", port_range); 486 tcpping.setProperties(p); 487 } 488 } 489 490 491 public void testStateTransferWithReconnect() throws Exception { 492 ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1"); 493 setCorrectPortRange(ch1); 494 495 assertTrue(ch1.isOpen()); 496 assertFalse(ch1.isConnected()); 497 ch1.connect("bla"); 498 assertTrue(ch1.isOpen()); 499 assertTrue(ch1.isConnected()); 500 assertServiceAndClusterView(ch1, 1, 1); 501 502 c1=new Cache(ch1, "cache-1"); 503 assertEquals("cache has to be empty initially", 0, c1.size()); 504 505 ch1_repl=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1"); 506 setCorrectPortRange(ch1_repl); 507 assertTrue(ch1_repl.isOpen()); 508 assertFalse(ch1_repl.isConnected()); 509 510 c1.put("name", "Bela"); 511 c1.put("id", new Long (322649)); 512 c1.put("hobbies", "biking"); 513 c1.put("bike", "Centurion"); 514 515 ch1_repl.connect("bla"); 516 assertTrue(ch1_repl.isOpen()); 517 assertTrue(ch1_repl.isConnected()); 518 assertServiceAndClusterView(ch1_repl, 2, 2); 519 Util.sleep(500); 520 assertServiceAndClusterView(ch1, 2, 2); 521 522 c1_repl=new Cache(ch1_repl, "cache-1-repl"); 523 boolean rc=ch1_repl.getState(null, 5000); 524 System.out.println("state transfer: " + rc); 525 Util.sleep(500); 526 527 System.out.println("c1_repl: " + c1_repl); 528 assertEquals("initial state should have been transferred", 4, c1_repl.size()); 529 assertEquals(new Long (322649), c1.get("id")); 530 assertEquals(new Long (322649), c1_repl.get("id")); 531 532 assertEquals("biking", c1.get("hobbies")); 533 assertEquals("biking", c1_repl.get("hobbies")); 534 535 assertEquals("Centurion", c1.get("bike")); 536 assertEquals("Centurion", c1_repl.get("bike")); 537 538 ch1_repl.disconnect(); 539 assertTrue(ch1_repl.isOpen()); 540 assertFalse(ch1_repl.isConnected()); 541 Util.sleep(1000); 542 assertServiceAndClusterView(ch1, 1, 1); 543 544 c1_repl.clear(); 545 546 ch1_repl.connect("bla"); 547 assertTrue(ch1_repl.isOpen()); 548 assertTrue(ch1_repl.isConnected()); 549 assertServiceAndClusterView(ch1_repl, 2, 2); 550 Util.sleep(300); 551 assertServiceAndClusterView(ch1, 2, 2); 552 553 assertEquals("cache has to be empty initially", 0, c1_repl.size()); 554 555 rc=ch1_repl.getState(null, 5000); 556 System.out.println("state transfer: " + rc); 557 Util.sleep(500); 558 559 System.out.println("c1_repl: " + c1_repl); 560 assertEquals("initial state should have been transferred", 4, c1_repl.size()); 561 562 assertEquals(new Long (322649), c1.get("id")); 563 assertEquals(new Long (322649), c1_repl.get("id")); 564 565 assertEquals("biking", c1.get("hobbies")); 566 assertEquals("biking", c1_repl.get("hobbies")); 567 568 assertEquals("Centurion", c1.get("bike")); 569 assertEquals("Centurion", c1_repl.get("bike")); 570 571 ch2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2"); 576 setCorrectPortRange(ch2); 577 assertTrue(ch2.isOpen()); 578 assertFalse(ch2.isConnected()); 579 assertServiceAndClusterView(ch1, 2, 2); 580 assertServiceAndClusterView(ch1_repl, 2, 2); 581 582 583 ch1.disconnect(); 584 Util.sleep(500); 586 assertTrue(ch1.isOpen()); 587 assertFalse(ch1.isConnected()); 588 assertServiceAndClusterView(ch1_repl, 1, 1); 589 assertTrue(ch2.isOpen()); 590 assertFalse(ch2.isConnected()); 591 592 c1.clear(); 593 594 ch1.connect("bla"); 595 assertTrue(ch1.isOpen()); 596 assertTrue(ch1.isConnected()); 597 assertServiceAndClusterView(ch1, 2, 2); 598 Util.sleep(500); 599 assertServiceAndClusterView(ch1_repl, 2, 2); 600 assertTrue(ch2.isOpen()); 601 assertFalse(ch2.isConnected()); 602 603 assertEquals("cache has to be empty initially", 0, c1.size()); 604 605 rc=ch1.getState(null, 5000); 606 System.out.println("state transfer: " + rc); 607 Util.sleep(500); 608 609 System.out.println("c1: " + c1); 610 assertEquals("initial state should have been transferred", 4, c1.size()); 611 612 assertEquals(new Long (322649), c1.get("id")); 613 assertEquals(new Long (322649), c1_repl.get("id")); 614 615 assertEquals("biking", c1.get("hobbies")); 616 assertEquals("biking", c1_repl.get("hobbies")); 617 618 assertEquals("Centurion", c1.get("bike")); 619 assertEquals("Centurion", c1_repl.get("bike")); 620 } 621 622 623 private void assertServiceAndClusterView(Channel ch, int num_service_view_mbrs, int num_cluster_view_mbrs) { 624 View service_view, cluster_view; 625 service_view=ch.getView(); 626 cluster_view=((MuxChannel)ch).getClusterView(); 627 628 String msg="cluster view=" + cluster_view + ", service view=" + service_view; 629 630 assertNotNull(service_view); 631 assertNotNull(cluster_view); 632 633 assertEquals(msg, num_service_view_mbrs, service_view.size()); 634 assertEquals(msg, num_cluster_view_mbrs, cluster_view.size()); 635 } 636 637 638 public void testStateTransferFromSelfWithRegularChannel() throws Exception { 639 JChannel ch=new JChannel(); 640 ch.connect("X"); 641 try { 642 boolean rc=ch.getState(null, 2000); 643 assertFalse("getState() on singleton should return false", rc); 644 } 645 finally { 646 ch.close(); 647 } 648 } 649 650 public void testStateTransferFromSelf() throws Exception { 651 ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1"); 652 ch1.connect("bla"); 653 boolean rc=ch1.getState(null, 2000); 654 assertFalse("getState() on singleton should return false", rc); 655 ch2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2"); 656 ch2.connect("foo"); 657 rc=ch2.getState(null, 2000); 658 assertFalse("getState() on singleton should return false", rc); 659 } 660 661 662 public void testAdditionalData() throws Exception { 663 byte[] additional_data=new byte[]{'b', 'e', 'l', 'a'}; 664 ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1"); 665 Map m=new HashMap(1); 666 m.put("additional_data", additional_data); 667 ch1.down(new Event(Event.CONFIG, m)); 668 ch1.connect("bla"); 669 IpAddress local_addr=(IpAddress)ch1.getLocalAddress(); 670 assertNotNull(local_addr); 671 byte[] tmp=local_addr.getAdditionalData(); 672 assertNotNull(tmp); 673 assertEquals(tmp, additional_data); 674 675 ch2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2"); 676 ch2.connect("foo"); 677 local_addr=(IpAddress)ch2.getLocalAddress(); 678 assertNotNull(local_addr); 679 tmp=local_addr.getAdditionalData(); 680 assertNotNull(tmp); 681 assertEquals(tmp, additional_data); 682 } 683 684 public void testAdditionalData2() throws Exception { 685 byte[] additional_data=new byte[]{'b', 'e', 'l', 'a'}; 686 byte[] additional_data2=new byte[]{'m', 'i', 'c', 'h', 'i'}; 687 ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1"); 688 ch1.connect("bla"); 689 IpAddress local_addr=(IpAddress)ch1.getLocalAddress(); 690 assertNotNull(local_addr); 691 byte[] tmp=local_addr.getAdditionalData(); 692 assertNull(tmp); 693 694 ch2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2"); 695 Map m=new HashMap(1); 696 m.put("additional_data", additional_data); 697 ch2.down(new Event(Event.CONFIG, m)); 698 ch2.connect("foo"); 699 local_addr=(IpAddress)ch2.getLocalAddress(); 700 assertNotNull(local_addr); 701 tmp=local_addr.getAdditionalData(); 702 assertNotNull(tmp); 703 assertEquals(tmp, additional_data); 704 705 local_addr=(IpAddress)ch1.getLocalAddress(); 706 assertNotNull(local_addr); 707 tmp=local_addr.getAdditionalData(); 708 assertNotNull(tmp); 709 assertEquals(tmp, additional_data); 710 711 m.clear(); 712 m.put("additional_data", additional_data2); 713 ch2.down(new Event(Event.CONFIG, m)); 714 local_addr=(IpAddress)ch2.getLocalAddress(); 715 assertNotNull(local_addr); 716 tmp=local_addr.getAdditionalData(); 717 assertNotNull(tmp); 718 assertEquals(tmp, additional_data2); 719 assertFalse(Arrays.equals(tmp, additional_data)); 720 } 721 722 public void testGetSubstates() throws Exception { 723 ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1"); 724 ch1.connect("bla"); 725 c1=new ExtendedCache(ch1, "cache-1"); 726 assertEquals("cache has to be empty initially", 0, c1.size()); 727 728 ch2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2"); 729 ch2.connect("bla"); 730 c2=new ExtendedCache(ch2, "cache-2"); 731 assertEquals("cache has to be empty initially", 0, c2.size()); 732 733 for(int i=0; i < 10; i++) { 734 c1.put(new Integer (i), new Integer (i)); 735 c2.put(new Integer (i), new Integer (i)); 736 } 737 738 ch1_repl=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1"); 739 ch2_repl=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2"); 740 ch1_repl.connect("bla"); 741 c1_repl=new ExtendedCache(ch1_repl, "cache-1-repl"); 742 boolean rc=ch1_repl.getState(null, "odd", 5000); 743 System.out.println("state transfer: " + rc); 744 745 ch2_repl.connect("bla"); 746 c2_repl=new ExtendedCache(ch2_repl, "cache-2-repl"); 747 rc=ch2_repl.getState(null, "even", 5000); 748 System.out.println("state transfer: " + rc); 749 Util.sleep(500); 750 751 System.out.println("Caches after state transfers:"); 752 System.out.println("c1: " + c1); 753 System.out.println("c2: " + c2); 754 755 System.out.println("c1_repl (removed odd substate): " + c1_repl); 756 System.out.println("c2_repl (removed even substate): " + c2_repl); 757 758 assertEquals(5, c1_repl.size()); 759 assertEquals(5, c2_repl.size()); 760 761 _testEvenNumbersPresent(c1_repl); 762 _testOddNumbersPresent(c2_repl); 763 } 764 765 private void _testEvenNumbersPresent(Cache c) { 766 Integer [] evens=new Integer []{new Integer (0), new Integer (2), new Integer (4), new Integer (6), new Integer (8)}; 767 _testNumbersPresent(c, evens); 768 769 } 770 771 private void _testOddNumbersPresent(Cache c) { 772 Integer [] odds=new Integer []{new Integer (1), new Integer (3), new Integer (5), new Integer (7), new Integer (9)}; 773 _testNumbersPresent(c, odds); 774 } 775 776 private void _testNumbersPresent(Cache c, Integer [] numbers) { 777 int len=numbers.length; 778 assertEquals(len, c.size()); 779 for(int i=0; i < numbers.length; i++) { 780 Integer number=numbers[i]; 781 assertEquals(number, c.get(number)); 782 } 783 } 784 785 786 787 public void testGetSubstatesMultipleTimes() throws Exception { 788 ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1"); 789 ch1.connect("bla"); 790 c1=new ExtendedCache(ch1, "cache-1"); 791 assertEquals("cache has to be empty initially", 0, c1.size()); 792 793 ch2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2"); 794 ch2.connect("bla"); 795 c2=new ExtendedCache(ch2, "cache-2"); 796 assertEquals("cache has to be empty initially", 0, c2.size()); 797 798 for(int i=0; i < 10; i++) { 799 c1.put(new Integer (i), new Integer (i)); 800 c2.put(new Integer (i), new Integer (i)); 801 } 802 803 ch1_repl=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1"); 804 ch2_repl=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2"); 805 ch1_repl.connect("bla"); 806 c1_repl=new ExtendedCache(ch1_repl, "cache-1-repl"); 807 boolean rc=ch1_repl.getState(null, "odd", 5000); 808 System.out.println("state transfer: " + rc); 809 810 ch2_repl.connect("bla"); 811 c2_repl=new ExtendedCache(ch2_repl, "cache-2-repl"); 812 rc=ch2_repl.getState(null, "even", 5000); 813 System.out.println("state transfer: " + rc); 814 Util.sleep(500); 815 _testOddNumbersPresent(c2_repl); 816 817 System.out.println("Caches after state transfers:"); 818 System.out.println("c1: " + c1); 819 System.out.println("c2: " + c2); 820 System.out.println("c1_repl (removed odd substate): " + c1_repl); 821 System.out.println("c2_repl (removed even substate): " + c2_repl); 822 823 assertEquals(5, c2_repl.size()); 824 rc=ch2_repl.getState(null, "odd", 5000); 825 Util.sleep(500); 826 System.out.println("c2_repl (removed odd substate): " + c2_repl); 827 _testEvenNumbersPresent(c2_repl); 828 829 assertEquals(5, c2_repl.size()); 830 rc=ch2_repl.getState(null, "even", 5000); 831 Util.sleep(500); 832 System.out.println("c2_repl (removed even substate): " + c2_repl); 833 _testOddNumbersPresent(c2_repl); 834 835 assertEquals(5, c2_repl.size()); 836 rc=ch2_repl.getState(null, "odd", 5000); 837 Util.sleep(500); 838 System.out.println("c2_repl (removed odd substate): " + c2_repl); 839 _testEvenNumbersPresent(c2_repl); 840 } 841 842 843 public void testOrdering() throws Exception { 844 final int NUM=100; 845 ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1"); 846 ch1.connect("bla"); 847 MyReceiver receiver=new MyReceiver(NUM); 848 ch1.setReceiver(receiver); 849 for(int i=1; i <= NUM; i++) { 850 ch1.send(new Message(null, null, new Integer (i))); 851 System.out.println("-- sent " + i); 852 } 853 854 receiver.waitForCompletion(); 855 856 List<Integer > nums=receiver.getNums(); 857 checkMonotonicallyIncreasingNumbers(nums); 858 System.out.println(NUM + " messages were received in the correct order"); 859 } 860 861 private static void checkMonotonicallyIncreasingNumbers(List<Integer > nums) { 862 int current=-1; 863 for(int num: nums) { 864 if(current < 0) { 865 current=num; 866 } 867 else { 868 assertEquals("list is " + nums, ++current, num); 869 } 870 } 871 } 872 873 874 private static class MyReceiver extends ReceiverAdapter { 875 final List<Integer > nums=new LinkedList<Integer >(); 876 final int expected; 877 878 public MyReceiver(int expected) { 879 this.expected=expected; 880 } 881 882 public List<Integer > getNums() { 883 return nums; 884 } 885 886 public void waitForCompletion() throws InterruptedException { 887 synchronized(nums) { 888 while(nums.size() < expected) { 889 nums.wait(); 890 } 891 } 892 } 893 894 public void receive(Message msg) { 895 Util.sleepRandom(100); 896 Integer num=(Integer )msg.getObject(); 897 synchronized(nums) { 898 System.out.println("-- received " + num); 899 nums.add(num); 900 if(nums.size() >= expected) { 901 nums.notifyAll(); 902 } 903 } 904 Util.sleepRandom(100); 905 } 906 } 907 908 909 public static Test suite() { 910 return new TestSuite(MultiplexerTest.class); 911 } 912 913 public static void main(String [] args) { 914 junit.textui.TestRunner.run(MultiplexerTest.suite()); 915 } 916 917 private static class Cache extends ExtendedReceiverAdapter { 918 protected final Map data ; 919 Channel ch; 920 String name; 921 922 public Cache(Channel ch, String name) { 923 this.data=new TreeMap(); 924 this.ch=ch; 925 this.name=name; 926 this.ch.setReceiver(this); 927 } 928 929 protected Object get(Object key) { 930 synchronized(data) { 931 return data.get(key); 932 } 933 } 934 935 protected void put(Object key, Object val) throws Exception { 936 Object [] buf=new Object [2]; 937 buf[0]=key; buf[1]=val; 938 synchronized(data) { 939 data.put(key, val); 940 } 941 Message msg=new Message(null, null, buf); 942 ch.send(msg); 943 } 944 945 protected int size() { 946 synchronized(data) { 947 return data.size(); 948 } 949 } 950 951 952 public void receive(Message msg) { 953 if(ch.getLocalAddress().equals(msg.getSrc())) 954 return; 955 Object [] modification=(Object [])msg.getObject(); 956 Object key=modification[0]; 957 Object val=modification[1]; 958 synchronized(data) { 959 data.put(key,val); 961 } 962 } 963 964 public byte[] getState() { 965 byte[] state=null; 966 synchronized(data) { 967 try { 968 state=Util.objectToByteBuffer(data); 969 } 970 catch(Exception e) { 971 e.printStackTrace(); 972 return null; 973 } 974 } 975 return state; 976 } 977 978 public byte[] getState(String state_id) { 979 return getState(); 980 } 981 982 983 public void setState(byte[] state) { 984 Map m; 985 try { 986 m=(Map)Util.objectFromByteBuffer(state); 987 synchronized(data) { 988 data.clear(); 989 data.putAll(m); 990 } 991 } 992 catch(Exception e) { 993 e.printStackTrace(); 994 } 995 } 996 997 public void setState(String state_id, byte[] state) { 998 setState(state); 999 } 1000 1001 public void getState(OutputStream ostream){ 1002 ObjectOutputStream oos = null; 1003 try{ 1004 oos = new ObjectOutputStream(ostream); 1005 synchronized(data){ 1006 oos.writeObject(data); 1007 } 1008 oos.flush(); 1009 } 1010 catch (IOException e){} 1011 finally{ 1012 try{ 1013 if(oos != null) 1014 oos.close(); 1015 } 1016 catch (IOException e){ 1017 System.err.println(e); 1018 } 1019 } 1020 } 1021 1022 public void getState(String state_id, OutputStream ostream) { 1023 getState(ostream); 1024 } 1025 1026 public void setState(InputStream istream) { 1027 ObjectInputStream ois = null; 1028 try { 1029 ois = new ObjectInputStream(istream); 1030 Map m = (Map)ois.readObject(); 1031 synchronized (data) 1032 { 1033 data.clear(); 1034 data.putAll(m); 1035 } 1036 1037 } catch (Exception e) {} 1038 finally{ 1039 try { 1040 if(ois != null) 1041 ois.close(); 1042 } catch (IOException e) { 1043 System.err.println(e); 1044 } 1045 } 1046 } 1047 1048 public void setState(String state_id, InputStream istream) { 1049 setState(istream); 1050 } 1051 1052 public void clear() { 1053 synchronized (data){ 1054 data.clear(); 1055 } 1056 } 1057 1058 1059 public void viewAccepted(View new_view) { 1060 log("view is " + new_view); 1061 } 1062 1063 public String toString() { 1064 synchronized(data){ 1065 return data.toString(); 1066 } 1067 } 1068 1069 1070 public String printKeys() { 1071 return data.keySet().toString(); 1072 } 1073 1074 private void log(String msg) { 1075 System.out.println("-- [" + name + "] " + msg); 1076 } 1077 1078 } 1079 1080 1081 static class ExtendedCache extends Cache { 1082 1083 public ExtendedCache(Channel ch, String name) { 1084 super(ch, name); 1085 } 1086 1087 1088 public byte[] getState(String state_id) { 1089 Map copy=null; 1090 synchronized (data){ 1091 copy=new HashMap(data); 1092 } 1093 for(Iterator it=copy.keySet().iterator(); it.hasNext();) { 1094 Integer key=(Integer )it.next(); 1095 if(state_id.equals("odd") && key.intValue() % 2 != 0) 1096 it.remove(); 1097 else if(state_id.equals("even") && key.intValue() % 2 == 0) 1098 it.remove(); 1099 } 1100 try { 1101 return Util.objectToByteBuffer(copy); 1102 } 1103 catch(Exception e) { 1104 e.printStackTrace(); 1105 return null; 1106 } 1107 } 1108 1109 public void getState(String state_id,OutputStream os) { 1110 Map copy=null; 1111 synchronized (data){ 1112 copy=new HashMap(data); 1113 } 1114 for(Iterator it=copy.keySet().iterator(); it.hasNext();) { 1115 Integer key=(Integer )it.next(); 1116 if(state_id.equals("odd") && key.intValue() % 2 != 0) 1117 it.remove(); 1118 else if(state_id.equals("even") && key.intValue() % 2 == 0) 1119 it.remove(); 1120 } 1121 ObjectOutputStream oos = null; 1122 try { 1123 oos=new ObjectOutputStream(os); 1124 oos.writeObject(copy); 1125 oos.flush(); 1126 } 1127 catch (IOException e){} 1128 finally{ 1129 try{ 1130 if(oos != null) 1131 oos.close(); 1132 } 1133 catch (IOException e){ 1134 System.err.println(e); 1135 } 1136 } 1137 } 1138 1139 public void setState(String state_id, InputStream is){ 1140 setState(is); 1141 } 1142 1143 public void setState(String state_id, byte[] state) { 1144 setState(state); 1145 } 1146 1147 public String toString() { 1148 synchronized(data) { 1149 Set keys=new TreeSet(data.keySet()); 1150 StringBuilder sb=new StringBuilder (); 1151 for(Iterator it=keys.iterator(); it.hasNext();) { 1152 Object o=it.next(); 1153 sb.append(o).append("=").append(data.get(o)).append(" "); 1154 } 1155 return sb.toString(); 1156 } 1157 } 1158 } 1159 1160} 1161 | Popular Tags |