1 4 package com.tc.net.protocol.tcm; 5 6 import EDU.oswego.cs.dl.util.concurrent.SynchronizedRef; 7 8 import com.tc.net.protocol.tcm.msgs.PingMessage; 9 import com.tc.util.concurrent.SetOnceFlag; 10 11 import java.security.SecureRandom ; 12 import java.util.Random ; 13 14 import junit.framework.TestCase; 15 16 public class TCMessageRouterTest extends TestCase { 17 18 public void testDefaultRoute() { 19 20 try { 21 TCMessageRouter router = new TCMessageRouterImpl(); 22 router.putMessage(createMessage()); 23 fail(); 24 } catch (UnsupportedMessageTypeException umte) { 25 } 27 28 final SynchronizedRef msg = new SynchronizedRef(null); 29 TCMessageRouter router = new TCMessageRouterImpl(new TCMessageSink() { 30 public void putMessage(TCMessage message) { 31 msg.set(message); 32 } 33 }); 34 TCMessage message = createMessage(); 35 router.putMessage(message); 36 assertSame(message, msg.get()); 37 38 msg.set(null); 39 router.routeMessageType(TCMessageType.PING_MESSAGE, new TCMessageSink() { 40 public void putMessage(TCMessage m) { 41 } 43 }); 44 router.putMessage(createMessage()); 45 assertNull(msg.get()); 46 } 47 48 public void testRouteByType() { 49 final SynchronizedRef defmsg = new SynchronizedRef(null); 50 TCMessageRouter router = new TCMessageRouterImpl(new TCMessageSink() { 51 public void putMessage(TCMessage m) { 52 defmsg.set(m); 53 } 54 }); 55 56 final SynchronizedRef msg = new SynchronizedRef(null); 57 router.routeMessageType(TCMessageType.PING_MESSAGE, new TCMessageSink() { 58 public void putMessage(TCMessage m) { 59 msg.set(m); 60 } 61 }); 62 TCMessage message = createMessage(); 63 router.putMessage(message); 64 assertSame(message, msg.get()); 65 assertNull(defmsg.get()); 66 67 msg.set(null); 68 defmsg.set(null); 69 router.unrouteMessageType(TCMessageType.PING_MESSAGE); 70 router.putMessage(message); 71 assertNull(msg.get()); 72 assertSame(message, defmsg.get()); 73 } 74 75 public void testConcurrency() throws Exception { 76 final Random random = new SecureRandom (); 77 final SynchronizedRef error = new SynchronizedRef(null); 78 final SetOnceFlag stop = new SetOnceFlag(); 79 final TCMessageSink nullSink = new TCMessageSink() { 80 public void putMessage(TCMessage message) { 81 } 83 }; 84 final TCMessageRouter router = new TCMessageRouterImpl(nullSink); 85 86 final Runnable putter = new Runnable () { 87 public void run() { 88 TCMessage msg = createMessage(); 89 try { 90 while (true) { 91 for (int i = 0; i < 100; i++) { 92 router.putMessage(msg); 93 } 94 if (stop.isSet()) { return; } 95 } 96 } catch (Throwable t) { 97 setError(t, error); 98 } 99 } 100 }; 101 102 final Runnable changer = new Runnable () { 103 public void run() { 104 try { 105 while (true) { 106 for (int i = 0; i < 100; i++) { 107 if (random.nextBoolean()) { 108 router.routeMessageType(TCMessageType.PING_MESSAGE, nullSink); 109 } else { 110 router.unrouteMessageType(TCMessageType.PING_MESSAGE); 111 } 112 } 113 if (stop.isSet()) { return; } 114 } 115 } catch (Throwable t) { 116 setError(t, error); 117 } 118 } 119 }; 120 121 Thread [] threads = new Thread [10]; 122 for (int i = 0; i < 5; i++) { 123 threads[i] = new Thread (putter); 124 threads[5+i] = new Thread (changer); 125 } 126 127 for (int i = 0; i < threads.length; i++) { 128 threads[i].setDaemon(true); 129 threads[i].start(); 130 } 131 132 Thread.sleep(5000); 133 stop.set(); 134 135 for (int i = 0; i < threads.length; i++) { 136 threads[i].join(5000); 137 } 138 139 assertNull(error.get()); 140 } 141 142 private static void setError(Throwable t, SynchronizedRef error) { 143 t.printStackTrace(); 144 error.set(t); 145 } 146 147 private PingMessage createMessage() { 148 PingMessage rv = new PingMessage(new NullMessageMonitor()); 149 rv.dehydrate(); 150 return rv; 151 } 152 153 } | Popular Tags |