1 package org.jgroups.tests; 2 3 import junit.framework.TestCase; 4 import junit.framework.TestSuite; 5 import org.jgroups.ChannelException; 6 import org.jgroups.JChannel; 7 import org.jgroups.Message; 8 import org.jgroups.blocks.GroupRequest; 9 import org.jgroups.blocks.MessageDispatcher; 10 import org.jgroups.blocks.RequestHandler; 11 import org.jgroups.protocols.UDP; 12 import org.jgroups.stack.ProtocolStack; 13 import org.jgroups.util.Rsp; 14 import org.jgroups.util.RspList; 15 import org.jgroups.util.Util; 16 17 import java.util.Properties ; 18 19 20 25 public class MessageDispatcherUnitTest extends TestCase { 26 MessageDispatcher disp, disp2; 27 JChannel ch, ch2; 28 static final String props="udp.xml"; 29 30 31 public MessageDispatcherUnitTest(String name) { 32 super(name); 33 } 34 35 36 protected void setUp() throws Exception { 37 super.setUp(); 38 ch=new JChannel(props); 39 disableBundling(ch); 40 disp=new MessageDispatcher(ch, null, null, null); 41 ch.connect("x"); 42 } 43 44 45 46 47 protected void tearDown() throws Exception { 48 disp.stop(); 49 ch.close(); 50 if(ch2 != null) { 51 disp2.stop(); 52 ch2.close(); 53 } 54 Util.sleep(500); 55 super.tearDown(); 56 } 57 58 59 public void testNullMessageToSelf() { 60 MyHandler handler=new MyHandler(null); 61 disp.setRequestHandler(handler); 62 RspList rsps=disp.castMessage(null, new Message(), GroupRequest.GET_ALL, 0); 63 System.out.println("rsps:\n" + rsps); 64 assertNotNull(rsps); 65 assertEquals(1, rsps.size()); 66 Object obj=rsps.getFirst(); 67 assertNull(obj); 68 } 69 70 71 public void test200ByteMessageToSelf() { 72 sendMessage(200); 73 } 74 75 76 public void test2000ByteMessageToSelf() { 77 sendMessage(2000); 78 } 79 80 public void test20000ByteMessageToSelf() { 81 sendMessage(20000); 82 } 83 84 85 public void testNullMessageToAll() throws ChannelException { 86 disp.setRequestHandler(new MyHandler(null)); 87 88 ch2=new JChannel(props); 89 disableBundling(ch2); 90 long stop,start=System.currentTimeMillis(); 91 disp2=new MessageDispatcher(ch2, null, null, new MyHandler(null)); 92 stop=System.currentTimeMillis(); 93 ch2.connect("x"); 94 assertEquals(2, ch2.getView().size()); 95 System.out.println("view: " + ch2.getView()); 96 97 System.out.println("casting message"); 98 start=System.currentTimeMillis(); 99 RspList rsps=disp.castMessage(null, new Message(), GroupRequest.GET_ALL, 0); 100 stop=System.currentTimeMillis(); 101 System.out.println("rsps:\n" + rsps); 102 System.out.println("call took " + (stop-start) + " ms"); 103 assertNotNull(rsps); 104 assertEquals(2, rsps.size()); 105 Rsp rsp=(Rsp)rsps.get(ch.getLocalAddress()); 106 assertNotNull(rsp); 107 Object ret=rsp.getValue(); 108 assertNull(ret); 109 110 111 rsp=(Rsp)rsps.get(ch2.getLocalAddress()); 112 assertNotNull(rsp); 113 ret=rsp.getValue(); 114 assertNull(ret); 115 } 116 117 public void test200ByteMessageToAll() throws ChannelException { 118 sendMessageToBothChannels(200); 119 } 120 121 public void test2000ByteMessageToAll() throws ChannelException { 122 sendMessageToBothChannels(2000); 123 } 124 125 public void test20000ByteMessageToAll() throws ChannelException { 126 sendMessageToBothChannels(20000); 127 } 128 129 private void sendMessage(int size) { 130 long start, stop; 131 MyHandler handler=new MyHandler(new byte[size]); 132 disp.setRequestHandler(handler); 133 start=System.currentTimeMillis(); 134 RspList rsps=disp.castMessage(null, new Message(), GroupRequest.GET_ALL, 0); 135 stop=System.currentTimeMillis(); 136 System.out.println("rsps:\n" + rsps); 137 System.out.println("call took " + (stop-start) + " ms"); 138 assertNotNull(rsps); 139 assertEquals(1, rsps.size()); 140 byte[] buf=(byte[])rsps.getFirst(); 141 assertNotNull(buf); 142 assertEquals(size, buf.length); 143 } 144 145 146 147 private void sendMessageToBothChannels(int size) throws ChannelException { 148 long start, stop; 149 disp.setRequestHandler(new MyHandler(new byte[size])); 150 151 ch2=new JChannel(props); 152 disableBundling(ch2); 153 disp2=new MessageDispatcher(ch2, null, null, new MyHandler(new byte[size])); 154 ch2.connect("x"); 155 assertEquals(2, ch2.getView().size()); 156 157 System.out.println("casting message"); 158 start=System.currentTimeMillis(); 159 RspList rsps=disp.castMessage(null, new Message(), GroupRequest.GET_ALL, 0); 160 stop=System.currentTimeMillis(); 161 System.out.println("rsps:\n" + rsps); 162 System.out.println("call took " + (stop-start) + " ms"); 163 assertNotNull(rsps); 164 assertEquals(2, rsps.size()); 165 Rsp rsp=(Rsp)rsps.get(ch.getLocalAddress()); 166 assertNotNull(rsp); 167 byte[] ret=(byte[])rsp.getValue(); 168 assertEquals(size, ret.length); 169 170 rsp=(Rsp)rsps.get(ch2.getLocalAddress()); 171 assertNotNull(rsp); 172 ret=(byte[])rsp.getValue(); 173 assertEquals(size, ret.length); 174 } 175 176 177 private void disableBundling(JChannel ch) { 178 ProtocolStack stack=ch.getProtocolStack(); 179 UDP transport=(UDP)stack.findProtocol("UDP"); 180 if(transport != null) { 181 Properties tmp=new Properties (); 182 tmp.setProperty("enable_bundling", "false"); 183 transport.setProperties(tmp); 184 } 185 } 186 187 188 public static junit.framework.Test suite() { 189 return new TestSuite(MessageDispatcherUnitTest.class); 190 } 191 192 public static void main(String [] args) { 193 junit.textui.TestRunner.run(suite()); 194 } 195 196 197 private static class MyHandler implements RequestHandler { 198 byte[] retval=null; 199 200 201 public MyHandler(byte[] retval) { 202 this.retval=retval; 203 } 204 205 public Object handle(Message msg) { 206 return retval; 207 } 208 } 209 } 210 | Popular Tags |