1 5 package com.tc.object; 6 7 import com.tc.exception.ImplementMe; 8 import com.tc.logging.NullTCLogger; 9 import com.tc.net.protocol.tcm.ChannelID; 10 import com.tc.net.protocol.tcm.MessageChannel; 11 import com.tc.net.protocol.tcm.TestChannelIDProvider; 12 import com.tc.object.msg.RequestManagedObjectMessage; 13 import com.tc.object.msg.RequestManagedObjectMessageFactory; 14 import com.tc.object.msg.RequestRootMessage; 15 import com.tc.object.msg.RequestRootMessageFactory; 16 import com.tc.object.session.NullSessionManager; 17 import com.tc.objectserver.core.api.TestDNA; 18 import com.tc.test.TCTestCase; 19 import com.tc.util.concurrent.NoExceptionLinkedQueue; 20 21 import java.util.Collection ; 22 import java.util.HashMap ; 23 import java.util.HashSet ; 24 import java.util.Iterator ; 25 import java.util.LinkedList ; 26 import java.util.Map ; 27 import java.util.Set ; 28 29 public class RemoteObjectManagerImplTest extends TCTestCase { 30 31 RemoteObjectManagerImpl manager; 32 ThreadGroup threadGroup; 33 private TestChannelIDProvider channelIDProvider; 34 private TestRequestRootMessageFactory rrmf; 35 private TestRequestManagedObjectMessageFactory rmomf; 36 private RetrieverThreads rt; 37 38 protected void setUp() throws Exception { 39 super.setUp(); 40 this.channelIDProvider = new TestChannelIDProvider(); 41 this.channelIDProvider.channelID = new ChannelID(1); 42 this.rmomf = new TestRequestManagedObjectMessageFactory(); 43 newRmom(); 44 this.rrmf = new TestRequestRootMessageFactory(); 45 newRrm(); 46 47 this.threadGroup = new ThreadGroup (getClass().getName()); 48 manager = new RemoteObjectManagerImpl(new NullTCLogger(), channelIDProvider, rrmf, rmomf, 49 new NullObjectRequestMonitor(), 500, new NullSessionManager()); 50 rt = new RetrieverThreads(Thread.currentThread().getThreadGroup(), manager); 51 } 52 53 public void testRequestOutstandingRequestRootMessages() throws Exception { 54 final Map expectedResent = new HashMap (); 55 final Map expectedNotResent = new HashMap (); 56 TestRequestRootMessage rrm = newRrm(); 57 assertNoMessageSent(rrm); 58 pauseAndStart(); 59 manager.requestOutstanding(); 60 manager.unpause(); 61 assertNoMessageSent(rrm); 62 63 int count = 100; 64 for (int i = 0; i < count; i++) { 65 newRrm(); 66 String rootID = "root" + i; 67 rt.startNewRootRetriever(rootID); 68 Object tmp = rrmf.newMessageQueue.take(); 69 assertFalse(tmp == rrm); 70 rrm = (TestRequestRootMessage) tmp; 71 assertTrue(rrmf.newMessageQueue.isEmpty()); 72 rrm.sendQueue.take(); 73 assertTrue(rrm.sendQueue.isEmpty()); 74 if (i % 2 == 0) { 75 expectedResent.put(rootID, rrm); 76 } else { 77 expectedNotResent.put(rootID, rrm); 78 } 79 } 80 log("rt.getAliveCount() = " + rt.getAliveCount() + " expectedResent.size() = " + expectedResent.size() 81 + " expectedNotResent.size() = " + expectedNotResent.size()); 82 assertEquals(count, rt.getAliveCount()); 83 int objectIDCount = 1; 85 for (Iterator i = expectedNotResent.keySet().iterator(); i.hasNext();) { 86 String rootID = (String ) i.next(); 87 log("Adding Root = " + rootID); 88 manager.addRoot(rootID, new ObjectID(objectIDCount++)); 89 } 90 rt.waitForLowWatermark(count - expectedNotResent.size()); 92 assertEquals(count - expectedResent.size(), rt.getAliveCount()); 93 94 pauseAndStart(); 96 manager.requestOutstanding(); 97 manager.unpause(); 98 99 assertFalse(rrmf.newMessageQueue.isEmpty()); 100 101 for (Iterator i = expectedResent.values().iterator(); i.hasNext(); i.next()) { 103 rrm = (TestRequestRootMessage) rrmf.newMessageQueue.take(); 104 assertNotNull(rrm.sendQueue.poll(1)); 105 } 106 107 for (Iterator i = expectedNotResent.values().iterator(); i.hasNext();) { 108 rrm = (TestRequestRootMessage) i.next(); 109 assertTrue(rrm.sendQueue.isEmpty()); 110 } 111 112 assertTrue(rrmf.newMessageQueue.isEmpty()); 113 114 for (Iterator i = expectedResent.keySet().iterator(); i.hasNext();) { 116 String rootID = (String ) i.next(); 117 log("Adding Root = " + rootID); 118 manager.addRoot(rootID, new ObjectID(objectIDCount++)); 119 } 120 121 rt.waitForLowWatermark(0); 123 124 } 125 126 private static void log(String s) { 127 if (false) System.err.println(Thread.currentThread().getName() + " :: " + s); 128 } 129 130 private void pauseAndStart() { 131 manager.pause(); 132 manager.starting(); 134 } 135 136 public void testRequestOutstandingRequestManagedObjectMessages() throws Exception { 137 138 final Map expectedResent = new HashMap (); 139 final Map secondaryResent = new HashMap (); 140 final Map expectedNotResent = new HashMap (); 141 142 TestRequestManagedObjectMessage rmom = newRmom(); 143 assertNoMessageSent(rmom); 144 pauseAndStart(); 145 manager.requestOutstanding(); 146 manager.unpause(); 147 assertNoMessageSent(rmom); 148 149 int count = 50; 150 151 for (int i = 0; i < count; i++) { 152 newRmom(); 153 ObjectID id = new ObjectID(i); 154 assertTrue(rmomf.newMessageQueue.isEmpty()); 155 rt.startNewObjectRetriever(id); 156 Object tmp = rmomf.newMessageQueue.take(); 157 assertTrue(rmomf.newMessageQueue.isEmpty()); 158 assertFalse(rmom == tmp); 160 rmom = (TestRequestManagedObjectMessage) tmp; 161 rmom.sendQueue.take(); 162 assertEquals(i + 1, rt.getAliveCount()); 163 if (i % 2 == 0) { 164 expectedResent.put(id, rmom); 165 } else { 166 expectedNotResent.put(id, rmom); 167 } 168 } 169 170 for (int i = 0; i < count; i++) { 172 newRmom(); 173 ObjectID id = new ObjectID(i); 174 assertTrue(rmomf.newMessageQueue.isEmpty()); 175 rt.startNewObjectRetriever(id); 176 assertTrue(rmomf.newMessageQueue.isEmpty()); 177 } 178 179 assertTrue(rmomf.newMessageQueue.isEmpty()); 180 181 for (Iterator i = expectedNotResent.keySet().iterator(); i.hasNext();) { 183 newRmom(); 184 assertTrue(rmomf.newMessageQueue.isEmpty()); 185 manager.addObject(new TestDNA((ObjectID) i.next())); 186 Object tmp = rmomf.newMessageQueue.take(); 188 assertFalse(rmom == tmp); 189 rmom = (TestRequestManagedObjectMessage) tmp; 190 rmom.sendQueue.take(); 191 assertTrue(rmom.sendQueue.isEmpty()); 192 secondaryResent.put(rmom.objectIDs.iterator().next(), rmom); 193 } 194 195 pauseAndStart(); 197 manager.requestOutstanding(); 198 manager.unpause(); 199 200 final Collection c = new LinkedList (); 201 c.addAll(expectedResent.values()); 202 c.addAll(secondaryResent.values()); 203 for (Iterator i = c.iterator(); i.hasNext(); i.next()) { 206 rmom = (TestRequestManagedObjectMessage) rmomf.newMessageQueue.take(); 207 assertFalse(rmom.sendQueue.isEmpty()); 208 assertNotNull(rmom.sendQueue.poll(1)); 209 } 210 211 c.clear(); 212 213 215 c.addAll(expectedNotResent.values()); 216 for (Iterator i = c.iterator(); i.hasNext();) { 217 rmom = (TestRequestManagedObjectMessage) i.next(); 218 assertTrue(rmom.sendQueue.isEmpty()); 219 } 220 } 221 222 public void testBasics() throws Exception { 223 224 final ObjectID id1 = new ObjectID(1); 225 final ObjectID id2 = new ObjectID(2); 226 final ObjectID id200 = new ObjectID(200); 227 final ObjectID id201 = new ObjectID(201); 228 final Set removed = new HashSet (); 229 removed.add(id1); 230 removed.add(id200); 231 removed.add(id201); 232 for (Iterator i = removed.iterator(); i.hasNext();) { 234 this.manager.removed((ObjectID) i.next()); 235 } 236 237 TestRequestManagedObjectMessage rmom = this.rmomf.message; 238 assertNoMessageSent(rmom); 239 240 rt.startNewObjectRetriever(id1); 241 242 waitForMessageSend(rmom); 243 244 assertNoMessageSent(rmom); 245 246 verifyRmomInit(id1, removed, rmom); 249 250 assertEquals(1, rt.getAliveCount()); 251 252 rmom = newRmom(); 253 254 rt.startNewObjectRetriever(id1); 256 257 assertTrue(rmom.sendQueue.isEmpty()); 259 260 assertEquals(2, rt.getAliveCount()); 261 262 rt.startNewObjectRetriever(id2); 264 265 waitForMessageSend(rmom); 269 verifyRmomInit(id2, new HashSet (), rmom); 270 271 assertEquals(3, rt.getAliveCount()); 272 273 assertNoMessageSent(rmom); 274 rmom = newRmom(); 275 276 285 manager.addObject(new TestDNA(id1)); 286 rt.waitForLowWatermark(2); 287 288 waitForMessageSend(rmom); 289 verifyRmomInit(id1, new HashSet (), rmom); 290 291 rmom = newRmom(); 292 293 manager.addObject(new TestDNA(id1)); 295 rt.waitForLowWatermark(1); 296 297 manager.addObject(new TestDNA(id2)); 299 rt.waitForLowWatermark(0); 300 301 assertNoMessageSent(rmom); 303 } 304 305 private void assertNoMessageSent(TestRequestManagedObjectMessage rmom) { 306 assertTrue(rmomf.newMessageQueue.isEmpty()); 307 assertTrue(rmom.sendQueue.isEmpty()); 308 } 309 310 private void assertNoMessageSent(TestRequestRootMessage rrm) { 311 assertTrue(rrmf.newMessageQueue.isEmpty()); 312 assertTrue(rrm.sendQueue.isEmpty()); 313 } 314 315 private void waitForMessageSend(TestRequestManagedObjectMessage rmom) { 316 rmomf.newMessageQueue.take(); 317 rmom.sendQueue.take(); 318 } 319 320 323 private void verifyRmomInit(final ObjectID objectID, final Set removed, TestRequestManagedObjectMessage rmom) { 324 Object [] initArgs = (Object []) rmom.initializeQueue.take(); 325 Set oids = new HashSet (); 326 oids.add(objectID); 327 assertTrue(rmom.initializeQueue.isEmpty()); 328 ObjectRequestContext ctxt = (ObjectRequestContext) initArgs[0]; 329 assertEquals(this.channelIDProvider.channelID, ctxt.getChannelID()); 330 assertEquals(oids, ctxt.getObjectIDs()); 331 assertEquals(oids, initArgs[1]); 333 assertEquals(removed, initArgs[2]); 335 } 336 337 private TestRequestRootMessage newRrm() { 338 TestRequestRootMessage rv = new TestRequestRootMessage(); 339 this.rrmf.message = rv; 340 return rv; 341 } 342 343 private TestRequestManagedObjectMessage newRmom() { 344 TestRequestManagedObjectMessage rmom; 345 rmom = new TestRequestManagedObjectMessage(); 346 this.rmomf.message = rmom; 347 return rmom; 348 } 349 350 private static class RetrieverThreads { 351 private int threadCount; 352 353 private final RemoteObjectManager manager; 354 355 private final Set inProgress = new HashSet (); 356 357 private final ThreadGroup tg; 358 359 public RetrieverThreads(ThreadGroup tg, RemoteObjectManager manager) { 360 this.manager = manager; 361 this.tg = tg; 362 } 363 364 public int getAliveCount() { 365 synchronized (inProgress) { 366 return inProgress.size(); 367 } 368 } 369 370 public void waitForLowWatermark(int max) throws InterruptedException { 371 if (getAliveCount() <= max) return; 372 synchronized (inProgress) { 373 while (getAliveCount() > max) { 374 inProgress.wait(); 375 } 376 } 377 } 378 379 public Thread startNewRootRetriever(final String rootID) { 380 Thread t = new Thread (tg, new Runnable () { 381 382 public void run() { 383 log("Starting .. " + rootID); 384 manager.retrieveRootID(rootID); 385 log("Retrieved rootID.. " + rootID); 386 synchronized (inProgress) { 387 if (!inProgress.remove(Thread.currentThread())) throw new RuntimeException ("Thread not removed!"); 388 log("Removed from inProgress .. size = " + inProgress.size()); 389 inProgress.notifyAll(); 390 } 391 } 392 }, "Root retriever thread " + threadCount++); 393 synchronized (inProgress) { 394 inProgress.add(t); 395 log("Added : inProgress size = " + inProgress.size()); 396 } 397 t.start(); 398 return t; 399 } 400 401 public Thread startNewObjectRetriever(final ObjectID id) { 402 Thread t = new Thread (tg, new Runnable () { 403 404 public void run() { 405 manager.retrieve(id); 406 synchronized (inProgress) { 407 if (!inProgress.remove(Thread.currentThread())) throw new RuntimeException ("Thread not removed!"); 408 inProgress.notifyAll(); 409 } 410 } 411 }, "Object retriever thread " + threadCount++); 412 synchronized (inProgress) { 413 inProgress.add(t); 414 } 415 t.start(); 416 return t; 417 } 418 } 419 420 private static class TestRequestRootMessageFactory implements RequestRootMessageFactory { 421 public final NoExceptionLinkedQueue newMessageQueue = new NoExceptionLinkedQueue(); 422 public TestRequestRootMessage message; 423 424 public RequestRootMessage newRequestRootMessage() { 425 newMessageQueue.put(message); 426 return this.message; 427 } 428 429 } 430 431 private static class TestRequestRootMessage implements RequestRootMessage { 432 433 public final NoExceptionLinkedQueue sendQueue = new NoExceptionLinkedQueue(); 434 435 public String getRootName() { 436 throw new ImplementMe(); 437 } 438 439 public void initialize(String name) { 440 return; 441 } 442 443 public void send() { 444 sendQueue.put(new Object ()); 445 } 446 447 public ChannelID getChannelID() { 448 throw new ImplementMe(); 449 } 450 451 public void recycle() { 452 return; 453 } 454 455 } 456 457 private static class TestRequestManagedObjectMessageFactory implements RequestManagedObjectMessageFactory { 458 459 public final NoExceptionLinkedQueue newMessageQueue = new NoExceptionLinkedQueue(); 460 461 public TestRequestManagedObjectMessage message; 462 463 public RequestManagedObjectMessage newRequestManagedObjectMessage() { 464 newMessageQueue.put(message); 465 return message; 466 } 467 468 } 469 470 private static class TestRequestManagedObjectMessage implements RequestManagedObjectMessage { 471 472 public final NoExceptionLinkedQueue initializeQueue = new NoExceptionLinkedQueue(); 473 public final NoExceptionLinkedQueue sendQueue = new NoExceptionLinkedQueue(); 474 public Set objectIDs; 475 476 public ObjectRequestID getRequestID() { 477 throw new ImplementMe(); 478 } 479 480 public Set getObjectIDs() { 481 throw new ImplementMe(); 482 } 483 484 public Set getRemoved() { 485 throw new ImplementMe(); 486 } 487 488 public void initialize(ObjectRequestContext ctxt, Set oids, Set removedIDs) { 489 this.objectIDs = oids; 490 this.initializeQueue.put(new Object [] { ctxt, oids, removedIDs }); 491 } 492 493 public void send() { 494 sendQueue.put(new Object ()); 495 } 496 497 public MessageChannel getChannel() { 498 throw new ImplementMe(); 499 } 500 501 public ChannelID getChannelID() { 502 throw new ImplementMe(); 503 } 504 505 public int getRequestDepth() { 506 return 400; 507 } 508 509 public void recycle() { 510 return; 511 } 512 513 } 514 515 } 516 | Popular Tags |