KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > net > protocol > tcm > MessageChannelTest


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.net.protocol.tcm;
6
7 import EDU.oswego.cs.dl.util.concurrent.SynchronizedRef;
8
9 import com.tc.config.schema.dynamic.FixedValueConfigItem;
10 import com.tc.logging.TCLogger;
11 import com.tc.logging.TCLogging;
12 import com.tc.net.TCSocketAddress;
13 import com.tc.net.core.ConnectionInfo;
14 import com.tc.net.protocol.NetworkStackHarnessFactory;
15 import com.tc.net.protocol.PlainNetworkStackHarnessFactory;
16 import com.tc.net.protocol.tcm.msgs.PingMessage;
17 import com.tc.net.protocol.transport.DefaultConnectionIdFactory;
18 import com.tc.net.protocol.transport.NullConnectionPolicy;
19 import com.tc.object.session.NullSessionManager;
20 import com.tc.test.TCTestCase;
21 import com.tc.util.SequenceGenerator;
22 import com.tc.util.TCTimeoutException;
23 import com.tc.util.concurrent.ThreadUtil;
24
25 import gnu.trove.TLongHashSet;
26
27 import java.io.IOException JavaDoc;
28 import java.net.UnknownHostException JavaDoc;
29 import java.text.SimpleDateFormat JavaDoc;
30 import java.util.Date JavaDoc;
31 import java.util.HashSet JavaDoc;
32 import java.util.LinkedList JavaDoc;
33 import java.util.List JavaDoc;
34 import java.util.Random JavaDoc;
35
36 /**
37  * This is a test case for MessageChannel. XXX: This test could use some work. It's not very coherent and uses sleeps.
38  * --Orion 12/19/2005
39  */

40 public class MessageChannelTest extends TCTestCase {
41   static final int ITERATIONS = 100;
42   static final int WAIT_PERIOD = 100;
43   static final int WAIT = ITERATIONS * WAIT_PERIOD;
44   static final int MESSAGE_COUNT = 250;
45
46   TCLogger logger = TCLogging.getLogger(getClass());
47   NetworkListener lsnr;
48   CommunicationsManager clientComms;
49   CommunicationsManager serverComms;
50   ClientMessageChannel clientChannel;
51   SequenceGenerator sequence;
52   MessageSendAndReceiveWatcher clientWatcher;
53   MessageSendAndReceiveWatcher serverWatcher;
54   SynchronizedRef error = new SynchronizedRef(null);
55   SequenceGenerator sq = new SequenceGenerator();
56
57   private int port = 0;
58
59   // public MessageChannelTest() {
60
// disableAllUntil("2006-02-15");
61
// }
62

63   protected void setUp(int maxReconnectTries) throws Exception JavaDoc {
64     setUp(maxReconnectTries, new PlainNetworkStackHarnessFactory(), new PlainNetworkStackHarnessFactory());
65   }
66
67   protected void setUp(int maxReconnectTries, NetworkStackHarnessFactory clientStackHarnessFactory,
68                        NetworkStackHarnessFactory serverStackHarnessFactory) throws Exception JavaDoc {
69     super.setUp();
70
71     clientWatcher = new MessageSendAndReceiveWatcher();
72     serverWatcher = new MessageSendAndReceiveWatcher();
73
74     this.sequence = new SequenceGenerator();
75
76     MessageMonitor mm = new NullMessageMonitor();
77     clientComms = new CommunicationsManagerImpl(mm, clientStackHarnessFactory, new NullConnectionPolicy());
78     serverComms = new CommunicationsManagerImpl(mm, serverStackHarnessFactory, new NullConnectionPolicy());
79
80     initListener(clientWatcher, serverWatcher);
81     this.clientChannel = createClientMessageChannel(maxReconnectTries);
82     this.setUpClientReceiveSink();
83   }
84
85   private void initListener(final MessageSendAndReceiveWatcher myClientSenderWatcher,
86                             final MessageSendAndReceiveWatcher myServerSenderWatcher) throws IOException JavaDoc,
87       TCTimeoutException {
88     if (lsnr != null) {
89       lsnr.stop(WAIT);
90     }
91
92     lsnr = serverComms.createListener(new NullSessionManager(), new TCSocketAddress(port), false,
93                                       new DefaultConnectionIdFactory());
94     lsnr.addClassMapping(TCMessageType.PING_MESSAGE, PingMessage.class);
95     lsnr.routeMessageType(TCMessageType.PING_MESSAGE, new TCMessageSink() {
96       public void putMessage(TCMessage message) throws UnsupportedMessageTypeException {
97         // System.out.println(message);
98

99         PingMessage ping = (PingMessage) message;
100         try {
101           message.hydrate();
102         } catch (Exception JavaDoc e) {
103           setError(e);
104         }
105         myClientSenderWatcher.addMessageReceived(ping);
106
107         PingMessage pong = ping.createResponse();
108         pong.send();
109         myServerSenderWatcher.addMessageSent(pong);
110       }
111     });
112     lsnr.start(new HashSet());
113     this.port = lsnr.getBindPort();
114   }
115
116   protected void tearDown() throws Exception JavaDoc {
117     super.tearDown();
118
119     final Throwable JavaDoc lastError = (Throwable JavaDoc) error.get();
120     if (lastError != null) { throw new Exception JavaDoc(lastError); }
121
122     if (lsnr != null) lsnr.stop(WAIT);
123     if (this.clientChannel != null) this.clientChannel.close();
124     if (clientComms != null) clientComms.shutdown();
125     if (serverComms != null) serverComms.shutdown();
126   }
127
128   public void testAttachments() throws Exception JavaDoc {
129     setUp(10);
130     String JavaDoc key = "key";
131     MessageChannel channel = createClientMessageChannel(10);
132     assertNull(channel.getAttachment(key));
133     assertNull(channel.removeAttachment(key));
134
135     Object JavaDoc attachment = new Object JavaDoc();
136     Object JavaDoc attachment2 = new Object JavaDoc();
137     channel.addAttachment(key, attachment, false);
138     assertSame(attachment, channel.getAttachment(key));
139     channel.addAttachment(key, attachment, false);
140     assertSame(attachment, channel.getAttachment(key));
141
142     channel.addAttachment(key, attachment2, true);
143     assertSame(attachment2, channel.getAttachment(key));
144
145     Object JavaDoc removed = channel.removeAttachment(key);
146     assertSame(attachment2, removed);
147
148     removed = channel.removeAttachment(key);
149     assertNull(removed);
150     assertNull(channel.getAttachment(key));
151   }
152
153   public void testAutomaticReconnect() throws Exception JavaDoc {
154     setUp(10);
155     assertEquals(0, clientChannel.getConnectCount());
156     assertEquals(0, clientChannel.getConnectAttemptCount());
157     clientChannel.open();
158     assertEquals(1, clientChannel.getConnectCount());
159     assertEquals(1, clientChannel.getConnectAttemptCount());
160
161     final int closeCount = new Random JavaDoc().nextInt(MESSAGE_COUNT);
162
163     for (int i = 0; i < MESSAGE_COUNT; i++) {
164       if (i == closeCount) {
165         waitForArrivalOrFail(clientWatcher, i);
166         waitForArrivalOrFail(serverWatcher, i);
167         clientComms.getConnectionManager().closeAllConnections(WAIT);
168         if (!waitUntilReconnected()) {
169           fail("Didn't reconnect");
170         }
171       }
172       createAndSendMessage();
173     }
174     assertTrue(clientChannel.getConnectAttemptCount() > 1);
175     assertTrue(clientChannel.getConnectCount() > 1);
176
177     waitForMessages(MESSAGE_COUNT);
178   }
179
180   private void waitForMessages(int count) throws InterruptedException JavaDoc {
181     waitForArrivalOrFail(clientWatcher, count);
182     waitForArrivalOrFail(serverWatcher, count);
183
184     String JavaDoc msg = "expected: " + count + ", client sent: " + clientWatcher.sent() + ", client received: "
185                  + clientWatcher.received() + ", server sent: " + serverWatcher.sent() + ", server received: "
186                  + serverWatcher.received();
187
188     assertEquals(msg, count, clientWatcher.sent());
189     assertEquals(msg, count, clientWatcher.received());
190     assertEquals(msg, count, serverWatcher.sent());
191     assertEquals(msg, count, serverWatcher.received());
192   }
193
194   public void testManualReconnectAfterFailure() throws Exception JavaDoc {
195     setUp(0);
196
197     lsnr.stop(WAIT);
198     serverComms.getConnectionManager().closeAllConnections(WAIT);
199     clientComms.getConnectionManager().closeAllConnections(WAIT);
200
201     for (int i = 0; i < 10; i++) {
202       try {
203         clientChannel.open();
204         fail("Should have thrown an exception");
205       } catch (TCTimeoutException e) {
206         // expected
207
} catch (UnknownHostException JavaDoc e) {
208         fail(e.getMessage());
209       } catch (IOException JavaDoc e) {
210         // expected
211
}
212
213       assertFalse(clientChannel.isConnected());
214     }
215
216     initListener(this.clientWatcher, this.serverWatcher);
217     clientChannel.open();
218     assertTrue(clientChannel.isConnected());
219   }
220
221   public void testSendAfterDisconnect() throws Exception JavaDoc {
222     setUp(0);
223     clientChannel.open();
224
225     createAndSendMessage();
226     waitForArrivalOrFail(clientWatcher, 1);
227     waitForArrivalOrFail(serverWatcher, 1);
228
229     sendMessagesWhileDisconnected(MESSAGE_COUNT, 25);
230
231     // don't explicitly need to do this, but if we wait, it's possible an error will happen on another thread
232
ThreadUtil.reallySleep(5000);
233   }
234
235   public void testZeroMaxRetriesDoesntAutoreconnect() throws Exception JavaDoc {
236     setUp(0);
237     assertEquals(0, clientChannel.getConnectAttemptCount());
238     assertEquals(0, clientChannel.getConnectCount());
239
240     clientChannel.open();
241     assertEquals(1, clientChannel.getConnectAttemptCount());
242     assertEquals(1, clientChannel.getConnectCount());
243     clientComms.getConnectionManager().closeAllConnections(WAIT);
244     ThreadUtil.reallySleep(5000);
245     assertEquals(1, clientChannel.getConnectAttemptCount());
246     assertEquals(1, clientChannel.getConnectCount());
247   }
248
249   public void testNegativeMaxRetriesAlwaysReconnects() throws Exception JavaDoc {
250     setUp(-1);
251
252     assertEquals(0, clientChannel.getConnectCount());
253     assertEquals(0, clientChannel.getConnectAttemptCount());
254
255     clientChannel.open();
256
257     assertEquals(1, clientChannel.getConnectCount());
258     assertEquals(1, clientChannel.getConnectAttemptCount());
259
260     lsnr.stop(WAIT);
261     assertEquals(0, serverComms.getAllListeners().length);
262
263     clientComms.getConnectionManager().closeAllConnections(5000);
264     int count = clientChannel.getConnectAttemptCount();
265     ThreadUtil.reallySleep(WAIT * 4);
266     assertTrue(clientChannel.getConnectAttemptCount() + " vs " + count, clientChannel.getConnectAttemptCount() > count);
267     assertEquals(1, clientChannel.getConnectCount());
268   }
269
270   // public void testSendBeforeOpen() throws Exception {
271
// setUp(0);
272
// PingMessage ping = createMessage();
273
// assertTrue(clientChannel.getStatus().isClosed());
274
// try {
275
// ping.send();
276
// fail("Should have thrown an assertion error");
277
// } catch (TCAssertionError e) {
278
// // expected
279
// }
280
// }
281
//
282
// public void testSendAfterClose() throws Exception {
283
// setUp(0);
284
// clientChannel.open();
285
// assertTrue(clientChannel.getStatus().isOpen());
286
//
287
// PingMessage ping = createMessage();
288
// clientChannel.close();
289
// assertTrue(clientChannel.isClosed());
290
//
291
// try {
292
// // send should fail
293
// ping.send();
294
// fail("should have thrown an exception");
295
// } catch (TCAssertionError err) {
296
// // expected
297
// }
298
// }
299

300   public void testGetStatus() throws Exception JavaDoc {
301     setUp(0);
302     clientChannel.open();
303     assertTrue(clientChannel.isOpen());
304     clientChannel.close();
305     assertTrue(clientChannel.isClosed());
306   }
307
308   public void testSend() throws Exception JavaDoc {
309     setUp(0);
310     clientChannel.open();
311     int count = 100;
312     List JavaDoc messages = new LinkedList JavaDoc();
313     for (int i = 0; i < count; i++) {
314       messages.add(createAndSendMessage());
315     }
316     waitForMessages(count);
317
318   }
319
320   public void testSocketInfo() throws Exception JavaDoc {
321     setUp(0);
322
323     assertNull(clientChannel.getRemoteAddress());
324     assertNull(clientChannel.getLocalAddress());
325
326     clientChannel.open();
327     createAndSendMessage();
328     waitForMessages(1);
329
330     TCSocketAddress clientRemote = clientChannel.getRemoteAddress();
331     TCSocketAddress clientLocal = clientChannel.getLocalAddress();
332
333     MessageChannelInternal[] serverChannels = lsnr.getChannelManager().getChannels();
334     assertEquals(1, serverChannels.length);
335     MessageChannelInternal serverChannel = serverChannels[0];
336
337     TCSocketAddress serverRemote = serverChannel.getRemoteAddress();
338     TCSocketAddress serverLocal = serverChannel.getLocalAddress();
339
340     assertEquals(clientRemote, serverLocal);
341     assertEquals(clientLocal, serverRemote);
342   }
343
344   private PingMessage createAndSendMessage() {
345     PingMessage ping = createMessage();
346     clientWatcher.addMessageSent(ping);
347     ping.send();
348     return ping;
349   }
350
351   private static void waitForArrivalOrFail(MessageSendAndReceiveWatcher watcher, int count) throws InterruptedException JavaDoc {
352     int i = 0;
353     while (!watcher.allReceived() || (watcher.sent() < count) || (watcher.received() < count)) {
354       if (i == ITERATIONS) {
355         fail((watcher.sent() - watcher.received()) + " messages of " + watcher.sent()
356              + " messages total failed to arrive in " + ITERATIONS + " iterations of " + WAIT_PERIOD + " ms. waiting.");
357       }
358
359       Thread.sleep(WAIT_PERIOD);
360       i++;
361     }
362   }
363
364   private ClientMessageChannel createClientMessageChannel(int maxReconnectTries) {
365     ClientMessageChannel ch = clientComms
366         .createClientChannel(new NullSessionManager(), maxReconnectTries, TCSocketAddress.LOOPBACK_IP, lsnr
367             .getBindPort(), WAIT, new FixedValueConfigItem(new ConnectionInfo[] { new ConnectionInfo("localhost", lsnr
368             .getBindPort()) }));
369     ch.addClassMapping(TCMessageType.PING_MESSAGE, PingMessage.class);
370     return ch;
371   }
372
373   private PingMessage createMessage() {
374     PingMessage ping = (PingMessage) clientChannel.createMessage(TCMessageType.PING_MESSAGE);
375     ping.initialize(sq);
376     return ping;
377   }
378
379   private void sendMessagesWhileDisconnected(int count, int afterCount) throws InterruptedException JavaDoc {
380     Random JavaDoc rnd = new Random JavaDoc();
381     final int closeCount = rnd.nextInt(count);
382     final boolean serverClose = rnd.nextBoolean();
383
384     Thread JavaDoc thread = null;
385
386     for (int i = 0; i < count; i++) {
387       if (i == closeCount) {
388         // close down the connection in a seperate thread to increase the timing randomness
389
thread = new Thread JavaDoc("Connection closer thread") {
390           public void run() {
391             try {
392               if (serverClose) {
393                 logger.info("Initiating close on the SERVER side...");
394                 serverComms.getConnectionManager().asynchCloseAllConnections();
395               } else {
396                 logger.info("Initiating close on the CLIENT side...");
397                 clientComms.getConnectionManager().asynchCloseAllConnections();
398               }
399             } catch (Throwable JavaDoc t) {
400               setError(t);
401             }
402           }
403         };
404         Thread.sleep(rnd.nextInt(25) + 10);
405         thread.setDaemon(true);
406         thread.start();
407       }
408
409       createAndSendMessage();
410     }
411
412     thread.join(WAIT);
413     assertFalse(thread.isAlive());
414
415     // make sure we send messages after the connection has actually closed for good measure
416
for (int i = 0; i < afterCount; i++) {
417       createAndSendMessage();
418     }
419   }
420
421   private boolean waitUntilReconnected() {
422     final long start = System.currentTimeMillis();
423     while (System.currentTimeMillis() - start < WAIT) {
424       if (clientChannel.isConnected()) return true;
425       try {
426         Thread.sleep(WAIT_PERIOD);
427       } catch (InterruptedException JavaDoc e) {
428         e.printStackTrace();
429       }
430     }
431     return false;
432   }
433
434   private void setUpClientReceiveSink() {
435     final MessageSendAndReceiveWatcher myServerSenderWatcher = this.serverWatcher;
436     clientChannel.routeMessageType(TCMessageType.PING_MESSAGE, new TCMessageSink() {
437       public void putMessage(TCMessage message) throws UnsupportedMessageTypeException {
438         try {
439           PingMessage ping = (PingMessage) message;
440           ping.hydrate();
441           // System.out.println("CLIENT RECEIVE: " + ping.getSequence());
442
} catch (Exception JavaDoc e) {
443           setError(e);
444         }
445         PingMessage ping = (PingMessage) message;
446         myServerSenderWatcher.addMessageReceived(ping);
447       }
448     });
449   }
450
451   private void setError(Throwable JavaDoc t) {
452     synchronized (System.err) {
453       System.err.println(new SimpleDateFormat JavaDoc("yyyy-MM-dd HH:mm:ss,S").format(new Date JavaDoc())
454                          + ": Exception Thrown in thread [" + Thread.currentThread().getName() + "]");
455       t.printStackTrace(System.err);
456     }
457     error.set(t);
458   }
459
460   public class MessageSendAndReceiveWatcher {
461
462     private TLongHashSet sentSequences = new TLongHashSet();
463     private TLongHashSet receivedSequences = new TLongHashSet();
464
465     public synchronized void addMessageSent(PingMessage sent) {
466       sentSequences.add(sent.getSequence());
467     }
468
469     public synchronized void addMessageReceived(PingMessage received) {
470       receivedSequences.add(received.getSequence());
471     }
472
473     public int sent() {
474       return sentSequences.size();
475     }
476
477     public int received() {
478       return receivedSequences.size();
479     }
480
481     public synchronized boolean allReceived() {
482       return receivedSequences.containsAll(sentSequences.toArray());
483     }
484   }
485 }
486
Popular Tags