KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > object > RemoteObjectManagerImplTest


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.object;
6
7 import com.tc.exception.ImplementMe;
8 import com.tc.logging.NullTCLogger;
9 import com.tc.net.protocol.tcm.ChannelID;
10 import com.tc.net.protocol.tcm.MessageChannel;
11 import com.tc.net.protocol.tcm.TestChannelIDProvider;
12 import com.tc.object.msg.RequestManagedObjectMessage;
13 import com.tc.object.msg.RequestManagedObjectMessageFactory;
14 import com.tc.object.msg.RequestRootMessage;
15 import com.tc.object.msg.RequestRootMessageFactory;
16 import com.tc.object.session.NullSessionManager;
17 import com.tc.objectserver.core.api.TestDNA;
18 import com.tc.test.TCTestCase;
19 import com.tc.util.concurrent.NoExceptionLinkedQueue;
20
21 import java.util.Collection JavaDoc;
22 import java.util.HashMap JavaDoc;
23 import java.util.HashSet JavaDoc;
24 import java.util.Iterator JavaDoc;
25 import java.util.LinkedList JavaDoc;
26 import java.util.Map JavaDoc;
27 import java.util.Set JavaDoc;
28
29 public class RemoteObjectManagerImplTest extends TCTestCase {
30
31   RemoteObjectManagerImpl manager;
32   ThreadGroup JavaDoc threadGroup;
33   private TestChannelIDProvider channelIDProvider;
34   private TestRequestRootMessageFactory rrmf;
35   private TestRequestManagedObjectMessageFactory rmomf;
36   private RetrieverThreads rt;
37
38   protected void setUp() throws Exception JavaDoc {
39     super.setUp();
40     this.channelIDProvider = new TestChannelIDProvider();
41     this.channelIDProvider.channelID = new ChannelID(1);
42     this.rmomf = new TestRequestManagedObjectMessageFactory();
43     newRmom();
44     this.rrmf = new TestRequestRootMessageFactory();
45     newRrm();
46
47     this.threadGroup = new ThreadGroup JavaDoc(getClass().getName());
48     manager = new RemoteObjectManagerImpl(new NullTCLogger(), channelIDProvider, rrmf, rmomf,
49                                           new NullObjectRequestMonitor(), 500, new NullSessionManager());
50     rt = new RetrieverThreads(Thread.currentThread().getThreadGroup(), manager);
51   }
52
53   public void testRequestOutstandingRequestRootMessages() throws Exception JavaDoc {
54     final Map JavaDoc expectedResent = new HashMap JavaDoc();
55     final Map JavaDoc expectedNotResent = new HashMap JavaDoc();
56     TestRequestRootMessage rrm = newRrm();
57     assertNoMessageSent(rrm);
58     pauseAndStart();
59     manager.requestOutstanding();
60     manager.unpause();
61     assertNoMessageSent(rrm);
62
63     int count = 100;
64     for (int i = 0; i < count; i++) {
65       newRrm();
66       String JavaDoc rootID = "root" + i;
67       rt.startNewRootRetriever(rootID);
68       Object JavaDoc tmp = rrmf.newMessageQueue.take();
69       assertFalse(tmp == rrm);
70       rrm = (TestRequestRootMessage) tmp;
71       assertTrue(rrmf.newMessageQueue.isEmpty());
72       rrm.sendQueue.take();
73       assertTrue(rrm.sendQueue.isEmpty());
74       if (i % 2 == 0) {
75         expectedResent.put(rootID, rrm);
76       } else {
77         expectedNotResent.put(rootID, rrm);
78       }
79     }
80     log("rt.getAliveCount() = " + rt.getAliveCount() + " expectedResent.size() = " + expectedResent.size()
81         + " expectedNotResent.size() = " + expectedNotResent.size());
82     assertEquals(count, rt.getAliveCount());
83     // respond to some of the requests
84
int objectIDCount = 1;
85     for (Iterator JavaDoc i = expectedNotResent.keySet().iterator(); i.hasNext();) {
86       String JavaDoc rootID = (String JavaDoc) i.next();
87       log("Adding Root = " + rootID);
88       manager.addRoot(rootID, new ObjectID(objectIDCount++));
89     }
90     // the threads waiting for the roots we just added should fall through.
91
rt.waitForLowWatermark(count - expectedNotResent.size());
92     assertEquals(count - expectedResent.size(), rt.getAliveCount());
93
94     // TEST REQUEST OUTSTANDING
95
pauseAndStart();
96     manager.requestOutstanding();
97     manager.unpause();
98
99     assertFalse(rrmf.newMessageQueue.isEmpty());
100
101     // Check the messages we expect to have been resent
102
for (Iterator JavaDoc i = expectedResent.values().iterator(); i.hasNext(); i.next()) {
103       rrm = (TestRequestRootMessage) rrmf.newMessageQueue.take();
104       assertNotNull(rrm.sendQueue.poll(1));
105     }
106
107     for (Iterator JavaDoc i = expectedNotResent.values().iterator(); i.hasNext();) {
108       rrm = (TestRequestRootMessage) i.next();
109       assertTrue(rrm.sendQueue.isEmpty());
110     }
111
112     assertTrue(rrmf.newMessageQueue.isEmpty());
113
114     // respond to the rest of the requests
115
for (Iterator JavaDoc i = expectedResent.keySet().iterator(); i.hasNext();) {
116       String JavaDoc rootID = (String JavaDoc) i.next();
117       log("Adding Root = " + rootID);
118       manager.addRoot(rootID, new ObjectID(objectIDCount++));
119     }
120
121     // all the threads should now be able to complete.
122
rt.waitForLowWatermark(0);
123
124   }
125
126   private static void log(String JavaDoc s) {
127     if (false) System.err.println(Thread.currentThread().getName() + " :: " + s);
128   }
129
130   private void pauseAndStart() {
131     manager.pause();
132     // manager.clearCache();
133
manager.starting();
134   }
135
136   public void testRequestOutstandingRequestManagedObjectMessages() throws Exception JavaDoc {
137
138     final Map JavaDoc expectedResent = new HashMap JavaDoc();
139     final Map JavaDoc secondaryResent = new HashMap JavaDoc();
140     final Map JavaDoc expectedNotResent = new HashMap JavaDoc();
141
142     TestRequestManagedObjectMessage rmom = newRmom();
143     assertNoMessageSent(rmom);
144     pauseAndStart();
145     manager.requestOutstanding();
146     manager.unpause();
147     assertNoMessageSent(rmom);
148
149     int count = 50;
150
151     for (int i = 0; i < count; i++) {
152       newRmom();
153       ObjectID id = new ObjectID(i);
154       assertTrue(rmomf.newMessageQueue.isEmpty());
155       rt.startNewObjectRetriever(id);
156       Object JavaDoc tmp = rmomf.newMessageQueue.take();
157       assertTrue(rmomf.newMessageQueue.isEmpty());
158       // make sure we aren't mistakenly using the same message all the time
159
assertFalse(rmom == tmp);
160       rmom = (TestRequestManagedObjectMessage) tmp;
161       rmom.sendQueue.take();
162       assertEquals(i + 1, rt.getAliveCount());
163       if (i % 2 == 0) {
164         expectedResent.put(id, rmom);
165       } else {
166         expectedNotResent.put(id, rmom);
167       }
168     }
169
170     // request the same objects again
171
for (int i = 0; i < count; i++) {
172       newRmom();
173       ObjectID id = new ObjectID(i);
174       assertTrue(rmomf.newMessageQueue.isEmpty());
175       rt.startNewObjectRetriever(id);
176       assertTrue(rmomf.newMessageQueue.isEmpty());
177     }
178
179     assertTrue(rmomf.newMessageQueue.isEmpty());
180
181     // now go through all of the messages we don't expect to be resent and respond to their requests
182
for (Iterator JavaDoc i = expectedNotResent.keySet().iterator(); i.hasNext();) {
183       newRmom();
184       assertTrue(rmomf.newMessageQueue.isEmpty());
185       manager.addObject(new TestDNA((ObjectID) i.next()));
186       // collect the messages sent for the secondary threads...
187
Object JavaDoc tmp = rmomf.newMessageQueue.take();
188       assertFalse(rmom == tmp);
189       rmom = (TestRequestManagedObjectMessage) tmp;
190       rmom.sendQueue.take();
191       assertTrue(rmom.sendQueue.isEmpty());
192       secondaryResent.put(rmom.objectIDs.iterator().next(), rmom);
193     }
194
195     // now tell it to resend outstanding
196
pauseAndStart();
197     manager.requestOutstanding();
198     manager.unpause();
199
200     final Collection JavaDoc c = new LinkedList JavaDoc();
201     c.addAll(expectedResent.values());
202     c.addAll(secondaryResent.values());
203     // now go through all of the messages we DO expect to be resent and make sure that
204
// they WERE resent
205
for (Iterator JavaDoc i = c.iterator(); i.hasNext(); i.next()) {
206       rmom = (TestRequestManagedObjectMessage) rmomf.newMessageQueue.take();
207       assertFalse(rmom.sendQueue.isEmpty());
208       assertNotNull(rmom.sendQueue.poll(1));
209     }
210
211     c.clear();
212
213     // go through all of the messages we DON'T expect to be resent and make sure they WEREN'T resent
214

215     c.addAll(expectedNotResent.values());
216     for (Iterator JavaDoc i = c.iterator(); i.hasNext();) {
217       rmom = (TestRequestManagedObjectMessage) i.next();
218       assertTrue(rmom.sendQueue.isEmpty());
219     }
220   }
221
222   public void testBasics() throws Exception JavaDoc {
223
224     final ObjectID id1 = new ObjectID(1);
225     final ObjectID id2 = new ObjectID(2);
226     final ObjectID id200 = new ObjectID(200);
227     final ObjectID id201 = new ObjectID(201);
228     final Set JavaDoc removed = new HashSet JavaDoc();
229     removed.add(id1);
230     removed.add(id200);
231     removed.add(id201);
232     // set up some removed objects.
233
for (Iterator JavaDoc i = removed.iterator(); i.hasNext();) {
234       this.manager.removed((ObjectID) i.next());
235     }
236
237     TestRequestManagedObjectMessage rmom = this.rmomf.message;
238     assertNoMessageSent(rmom);
239
240     rt.startNewObjectRetriever(id1);
241
242     waitForMessageSend(rmom);
243
244     assertNoMessageSent(rmom);
245
246     // Check to see that the message was initialized with the expected
247
// values
248
verifyRmomInit(id1, removed, rmom);
249
250     assertEquals(1, rt.getAliveCount());
251
252     rmom = newRmom();
253
254     // now request the same object id with a different thread.
255
rt.startNewObjectRetriever(id1);
256
257     // but, no message should have been sent.
258
assertTrue(rmom.sendQueue.isEmpty());
259
260     assertEquals(2, rt.getAliveCount());
261
262     // now request a different object id on a different thread
263
rt.startNewObjectRetriever(id2);
264
265     // this thread should send a message with id2, an empty set for the
266
// removed
267
// ids
268
waitForMessageSend(rmom);
269     verifyRmomInit(id2, new HashSet JavaDoc(), rmom);
270
271     assertEquals(3, rt.getAliveCount());
272
273     assertNoMessageSent(rmom);
274     rmom = newRmom();
275
276     // this should allow the first two threads to fall through
277
// XXX: Actually, it doesn't. The way it's implemented, each object
278
// request
279
// will result in its own message send.
280
//
281
// This is sub-optimal, but it works so we're not changing it right now,
282
// especially since we're going to have to optimize this stuff soon
283
// anyway. --Orion 8/24/05
284

285     manager.addObject(new TestDNA(id1));
286     rt.waitForLowWatermark(2);
287
288     waitForMessageSend(rmom);
289     verifyRmomInit(id1, new HashSet JavaDoc(), rmom);
290
291     rmom = newRmom();
292
293     // the second thread should now create and send a new message
294
manager.addObject(new TestDNA(id1));
295     rt.waitForLowWatermark(1);
296
297     // now, allow the third thread to fall through
298
manager.addObject(new TestDNA(id2));
299     rt.waitForLowWatermark(0);
300
301     // no-one should have sent any messages
302
assertNoMessageSent(rmom);
303   }
304
305   private void assertNoMessageSent(TestRequestManagedObjectMessage rmom) {
306     assertTrue(rmomf.newMessageQueue.isEmpty());
307     assertTrue(rmom.sendQueue.isEmpty());
308   }
309
310   private void assertNoMessageSent(TestRequestRootMessage rrm) {
311     assertTrue(rrmf.newMessageQueue.isEmpty());
312     assertTrue(rrm.sendQueue.isEmpty());
313   }
314
315   private void waitForMessageSend(TestRequestManagedObjectMessage rmom) {
316     rmomf.newMessageQueue.take();
317     rmom.sendQueue.take();
318   }
319
320   /**
321    * Verifies that the object request message initialization was done according to the given arguments.
322    */

323   private void verifyRmomInit(final ObjectID objectID, final Set JavaDoc removed, TestRequestManagedObjectMessage rmom) {
324     Object JavaDoc[] initArgs = (Object JavaDoc[]) rmom.initializeQueue.take();
325     Set JavaDoc oids = new HashSet JavaDoc();
326     oids.add(objectID);
327     assertTrue(rmom.initializeQueue.isEmpty());
328     ObjectRequestContext ctxt = (ObjectRequestContext) initArgs[0];
329     assertEquals(this.channelIDProvider.channelID, ctxt.getChannelID());
330     assertEquals(oids, ctxt.getObjectIDs());
331     // The object id in the request
332
assertEquals(oids, initArgs[1]);
333     // The proper set of removed object ids
334
assertEquals(removed, initArgs[2]);
335   }
336
337   private TestRequestRootMessage newRrm() {
338     TestRequestRootMessage rv = new TestRequestRootMessage();
339     this.rrmf.message = rv;
340     return rv;
341   }
342
343   private TestRequestManagedObjectMessage newRmom() {
344     TestRequestManagedObjectMessage rmom;
345     rmom = new TestRequestManagedObjectMessage();
346     this.rmomf.message = rmom;
347     return rmom;
348   }
349
350   private static class RetrieverThreads {
351     private int threadCount;
352
353     private final RemoteObjectManager manager;
354
355     private final Set JavaDoc inProgress = new HashSet JavaDoc();
356
357     private final ThreadGroup JavaDoc tg;
358
359     public RetrieverThreads(ThreadGroup JavaDoc tg, RemoteObjectManager manager) {
360       this.manager = manager;
361       this.tg = tg;
362     }
363
364     public int getAliveCount() {
365       synchronized (inProgress) {
366         return inProgress.size();
367       }
368     }
369
370     public void waitForLowWatermark(int max) throws InterruptedException JavaDoc {
371       if (getAliveCount() <= max) return;
372       synchronized (inProgress) {
373         while (getAliveCount() > max) {
374           inProgress.wait();
375         }
376       }
377     }
378
379     public Thread JavaDoc startNewRootRetriever(final String JavaDoc rootID) {
380       Thread JavaDoc t = new Thread JavaDoc(tg, new Runnable JavaDoc() {
381
382         public void run() {
383           log("Starting .. " + rootID);
384           manager.retrieveRootID(rootID);
385           log("Retrieved rootID.. " + rootID);
386           synchronized (inProgress) {
387             if (!inProgress.remove(Thread.currentThread())) throw new RuntimeException JavaDoc("Thread not removed!");
388             log("Removed from inProgress .. size = " + inProgress.size());
389             inProgress.notifyAll();
390           }
391         }
392       }, "Root retriever thread " + threadCount++);
393       synchronized (inProgress) {
394         inProgress.add(t);
395         log("Added : inProgress size = " + inProgress.size());
396       }
397       t.start();
398       return t;
399     }
400
401     public Thread JavaDoc startNewObjectRetriever(final ObjectID id) {
402       Thread JavaDoc t = new Thread JavaDoc(tg, new Runnable JavaDoc() {
403
404         public void run() {
405           manager.retrieve(id);
406           synchronized (inProgress) {
407             if (!inProgress.remove(Thread.currentThread())) throw new RuntimeException JavaDoc("Thread not removed!");
408             inProgress.notifyAll();
409           }
410         }
411       }, "Object retriever thread " + threadCount++);
412       synchronized (inProgress) {
413         inProgress.add(t);
414       }
415       t.start();
416       return t;
417     }
418   }
419
420   private static class TestRequestRootMessageFactory implements RequestRootMessageFactory {
421     public final NoExceptionLinkedQueue newMessageQueue = new NoExceptionLinkedQueue();
422     public TestRequestRootMessage message;
423
424     public RequestRootMessage newRequestRootMessage() {
425       newMessageQueue.put(message);
426       return this.message;
427     }
428
429   }
430
431   private static class TestRequestRootMessage implements RequestRootMessage {
432
433     public final NoExceptionLinkedQueue sendQueue = new NoExceptionLinkedQueue();
434
435     public String JavaDoc getRootName() {
436       throw new ImplementMe();
437     }
438
439     public void initialize(String JavaDoc name) {
440       return;
441     }
442
443     public void send() {
444       sendQueue.put(new Object JavaDoc());
445     }
446
447     public ChannelID getChannelID() {
448       throw new ImplementMe();
449     }
450
451     public void recycle() {
452       return;
453     }
454
455   }
456
457   private static class TestRequestManagedObjectMessageFactory implements RequestManagedObjectMessageFactory {
458
459     public final NoExceptionLinkedQueue newMessageQueue = new NoExceptionLinkedQueue();
460
461     public TestRequestManagedObjectMessage message;
462
463     public RequestManagedObjectMessage newRequestManagedObjectMessage() {
464       newMessageQueue.put(message);
465       return message;
466     }
467
468   }
469
470   private static class TestRequestManagedObjectMessage implements RequestManagedObjectMessage {
471
472     public final NoExceptionLinkedQueue initializeQueue = new NoExceptionLinkedQueue();
473     public final NoExceptionLinkedQueue sendQueue = new NoExceptionLinkedQueue();
474     public Set JavaDoc objectIDs;
475
476     public ObjectRequestID getRequestID() {
477       throw new ImplementMe();
478     }
479
480     public Set JavaDoc getObjectIDs() {
481       throw new ImplementMe();
482     }
483
484     public Set JavaDoc getRemoved() {
485       throw new ImplementMe();
486     }
487
488     public void initialize(ObjectRequestContext ctxt, Set JavaDoc oids, Set JavaDoc removedIDs) {
489       this.objectIDs = oids;
490       this.initializeQueue.put(new Object JavaDoc[] { ctxt, oids, removedIDs });
491     }
492
493     public void send() {
494       sendQueue.put(new Object JavaDoc());
495     }
496
497     public MessageChannel getChannel() {
498       throw new ImplementMe();
499     }
500
501     public ChannelID getChannelID() {
502       throw new ImplementMe();
503     }
504
505     public int getRequestDepth() {
506       return 400;
507     }
508
509     public void recycle() {
510       return;
511     }
512
513   }
514
515 }
516
Popular Tags