1 2 3 4 package net.nutch.ipc; 5 6 import net.nutch.io.Writable; 7 import net.nutch.io.LongWritable; 8 9 import java.util.Random ; 10 import java.io.IOException ; 11 import java.net.InetSocketAddress ; 12 13 import junit.framework.TestCase; 14 15 import java.util.logging.Logger ; 16 import java.util.logging.Level ; 17 18 import net.nutch.util.LogFormatter; 19 20 21 public class TestIPC extends TestCase { 22 public static final Logger LOG = 23 LogFormatter.getLogger("net.nutch.ipc.TestIPC"); 24 25 static { 27 LOG.setLevel(Level.WARNING); 28 Client.LOG.setLevel(Level.WARNING); 29 Server.LOG.setLevel(Level.WARNING); 30 } 31 32 public TestIPC(String name) { super(name); } 33 34 private static final Random RANDOM = new Random (); 35 36 private static final int PORT = 1234; 37 38 private static class TestServer extends Server { 39 private boolean sleep; 40 41 public TestServer(int port, int handlerCount, boolean sleep) { 42 super(port, LongWritable.class, handlerCount); 43 this.setTimeout(1000); 44 this.sleep = sleep; 45 } 46 47 public Writable call(Writable param) throws IOException { 48 if (sleep) { 49 try { 50 Thread.sleep(RANDOM.nextInt(200)); } catch (InterruptedException e) {} 52 } 53 return param; } 55 } 56 57 private static class SerialCaller extends Thread { 58 private Client client; 59 private int count; 60 private boolean failed; 61 62 public SerialCaller(Client client, int count) { 63 this.client = client; 64 this.count = count; 65 client.setTimeout(1000); 66 } 67 68 public void run() { 69 for (int i = 0; i < count; i++) { 70 try { 71 LongWritable param = new LongWritable(RANDOM.nextLong()); 72 LongWritable value = 73 (LongWritable)client.call(param, new InetSocketAddress (PORT)); 74 if (!param.equals(value)) { 75 LOG.severe("Call failed!"); 76 failed = true; 77 break; 78 } 79 } catch (Exception e) { 80 LOG.severe("Caught: " + e); 81 failed = true; 82 } 83 } 84 } 85 } 86 87 private static class ParallelCaller extends Thread { 88 private Client client; 89 private int count; 90 private InetSocketAddress [] addresses; 91 private boolean failed; 92 93 public ParallelCaller(Client client, InetSocketAddress [] addresses, 94 int count) { 95 this.client = client; 96 this.addresses = addresses; 97 this.count = count; 98 client.setTimeout(1000); 99 } 100 101 public void run() { 102 for (int i = 0; i < count; i++) { 103 try { 104 Writable[] params = new Writable[addresses.length]; 105 for (int j = 0; j < addresses.length; j++) 106 params[j] = new LongWritable(RANDOM.nextLong()); 107 Writable[] values = client.call(params, addresses); 108 for (int j = 0; j < addresses.length; j++) { 109 if (!params[j].equals(values[j])) { 110 LOG.severe("Call failed!"); 111 failed = true; 112 break; 113 } 114 } 115 } catch (Exception e) { 116 LOG.severe("Caught: " + e); 117 failed = true; 118 } 119 } 120 } 121 } 122 123 public void testSerial() throws Exception { 124 testSerial(3, false, 2, 5, 100); 125 } 126 127 public void testSerial(int handlerCount, boolean handlerSleep, 128 int clientCount, int callerCount, int callCount) 129 throws Exception { 130 Server server = new TestServer(PORT, handlerCount, handlerSleep); 131 server.start(); 132 133 Client[] clients = new Client[clientCount]; 134 for (int i = 0; i < clientCount; i++) { 135 clients[i] = new Client(LongWritable.class); 136 } 137 138 SerialCaller[] callers = new SerialCaller[callerCount]; 139 for (int i = 0; i < callerCount; i++) { 140 callers[i] = new SerialCaller(clients[i%clientCount], callCount); 141 callers[i].start(); 142 } 143 for (int i = 0; i < callerCount; i++) { 144 callers[i].join(); 145 assertFalse(callers[i].failed); 146 } 147 for (int i = 0; i < clientCount; i++) { 148 clients[i].stop(); 149 } 150 server.stop(); 151 } 152 153 public void testParallel() throws Exception { 154 testParallel(10, false, 2, 4, 2, 4, 100); 155 } 156 157 public void testParallel(int handlerCount, boolean handlerSleep, 158 int serverCount, int addressCount, 159 int clientCount, int callerCount, int callCount) 160 throws Exception { 161 Server[] servers = new Server[serverCount]; 162 for (int i = 0; i < serverCount; i++) { 163 servers[i] = new TestServer(PORT+i, handlerCount, handlerSleep); 164 servers[i].start(); 165 } 166 167 InetSocketAddress [] addresses = new InetSocketAddress [addressCount]; 168 for (int i = 0; i < addressCount; i++) { 169 addresses[i] = new InetSocketAddress (PORT+(i%serverCount)); 170 } 171 172 Client[] clients = new Client[clientCount]; 173 for (int i = 0; i < clientCount; i++) { 174 clients[i] = new Client(LongWritable.class); 175 } 176 177 ParallelCaller[] callers = new ParallelCaller[callerCount]; 178 for (int i = 0; i < callerCount; i++) { 179 callers[i] = 180 new ParallelCaller(clients[i%clientCount], addresses, callCount); 181 callers[i].start(); 182 } 183 for (int i = 0; i < callerCount; i++) { 184 callers[i].join(); 185 assertFalse(callers[i].failed); 186 } 187 for (int i = 0; i < clientCount; i++) { 188 clients[i].stop(); 189 } 190 for (int i = 0; i < serverCount; i++) { 191 servers[i].stop(); 192 } 193 } 194 195 public static void main(String [] args) throws Exception { 196 LOG.setLevel(Level.FINE); 198 Client.LOG.setLevel(Level.FINE); 199 Server.LOG.setLevel(Level.FINE); 200 LogFormatter.setShowThreadIDs(true); 201 202 204 new TestIPC("test").testParallel(10, false, 2, 4, 2, 4, 1000); 205 206 } 207 208 } 209 | Popular Tags |