1 4 package com.tc.net.core; 5 6 import EDU.oswego.cs.dl.util.concurrent.SynchronizedRef; 7 8 import com.tc.bytes.TCByteBuffer; 9 import com.tc.bytes.TCByteBufferFactory; 10 import com.tc.net.TCSocketAddress; 11 import com.tc.net.protocol.GenericNetworkMessage; 12 import com.tc.net.protocol.GenericNetworkMessageSink; 13 import com.tc.net.protocol.GenericProtocolAdaptor; 14 import com.tc.util.Assert; 15 import com.tc.util.concurrent.SetOnceFlag; 16 import com.tc.util.concurrent.ThreadUtil; 17 18 import java.util.HashMap ; 19 import java.util.Iterator ; 20 import java.util.Random ; 21 22 25 public class VerifierClient implements Runnable { 26 private static final int TIMEOUT = 60000 * 2; 27 private static int clientCounter = 0; 28 29 private final TCConnectionManager connMgr; 30 private final TCSocketAddress addr; 31 private final int clientNum; 32 private final int dataSize; 33 private final int numToSend; 34 private final int maxDelay; 35 private final int minDelay; 36 private final SynchronizedRef error = new SynchronizedRef(null); 37 private final Random random; 38 private final Verifier verifier; 39 private final Verifier sendVerifier; 40 private int sendSequence = 0; 41 private int sendCounter = 0; 42 private int numToRecv; 43 44 public VerifierClient(TCConnectionManager connMgr, TCSocketAddress addr, int dataSize, int numToSend, int minDelay, 45 int maxDelay) { 46 47 if ((maxDelay < 0) || (minDelay < 0)) { 48 throw new IllegalArgumentException ("delay values must be greater than or equal to zero"); 50 } 51 52 if (maxDelay < minDelay) { throw new IllegalArgumentException ("max cannot be less than min"); } 53 54 this.clientNum = getNextClientNum(); 55 this.connMgr = connMgr; 56 this.addr = addr; 57 this.dataSize = dataSize; 58 this.minDelay = minDelay; 59 this.maxDelay = maxDelay; 60 this.numToSend = numToSend; 61 this.random = new Random (); 62 this.verifier = new Verifier(this.clientNum); 63 this.sendVerifier = new Verifier(this.clientNum); 64 this.numToRecv = numToSend; 65 } 66 67 private static synchronized int getNextClientNum() { 68 return ++clientCounter; 69 } 70 71 private void delay() { 72 final int range = minDelay - maxDelay; 73 if (range > 0) { 74 final long sleepFor = minDelay + random.nextInt(maxDelay - minDelay); 75 ThreadUtil.reallySleep(sleepFor); 76 } 77 } 78 79 private class Sink implements GenericNetworkMessageSink { 80 public void putMessage(GenericNetworkMessage msg) { 81 try { 82 verifier.putMessage(msg); 83 } catch (Throwable t) { 84 setError(t); 85 } finally { 86 msgRecv(); 87 } 88 } 89 } 90 91 private void msgRecv() { 92 synchronized (this) { 93 numToRecv--; 94 notify(); 95 } 96 } 97 98 private void setError(Throwable t) { 99 t.printStackTrace(); 100 error.set(t); 101 } 102 103 public void run() { 104 try { 105 run0(); 106 } catch (Throwable t) { 107 setError(t); 108 } finally { 109 checkForError(); 110 } 111 } 112 113 public void run0() throws Throwable { 114 final HashMap sentCallbacks = new HashMap (); 115 final TCConnection conn = connMgr.createConnection(new GenericProtocolAdaptor(new Sink())); 116 conn.connect(addr, TIMEOUT); 117 118 for (int i = 0; i < numToSend; i++) { 119 checkForError(); 120 delay(); 121 122 final GenericNetworkMessage msg = makeNextMessage(conn); 123 sendVerifier.putMessage(msg); 124 125 synchronized (sentCallbacks) { 126 sentCallbacks.put(msg, new SetOnceFlag()); 127 } 128 129 msg.setSentCallback(new Runnable () { 130 public void run() { 131 synchronized (sentCallbacks) { 132 ((SetOnceFlag) sentCallbacks.get(msg)).set(); 133 } 134 } 135 }); 136 137 conn.putMessage(msg); 138 } 139 140 checkForError(); 141 142 synchronized (this) { 143 while (numToRecv > 0) { 144 wait(); 145 } 146 } 147 148 checkForError(); 149 150 conn.close(TIMEOUT); 151 152 for (final Iterator iter = sentCallbacks.values().iterator(); iter.hasNext();) { 154 SetOnceFlag sent = (SetOnceFlag) iter.next(); 155 Assert.eval(sent.isSet()); 156 iter.remove(); 157 } 158 159 checkForError(); 160 } 161 162 private GenericNetworkMessage makeNextMessage(TCConnection conn) { 163 int extra = 8 + (8 * random.nextInt(13)); 166 TCByteBuffer data[] = TCByteBufferFactory.getFixedSizedInstancesForLength(false, 4096 * dataSize + extra); 167 168 if (this.dataSize == 0) { 169 Assert.assertEquals(1, data.length); 170 } 171 172 for (int d = 0; d < data.length; d++) { 173 TCByteBuffer buf = data[d]; 174 Assert.eval((buf.limit() % 8) == 0); 175 176 while (buf.hasRemaining()) { 177 buf.putInt(clientNum); 178 buf.putInt(sendCounter++); 179 } 180 181 buf.flip(); 182 } 183 184 GenericNetworkMessage msg = new GenericNetworkMessage(conn, data); 185 msg.setSequence(sendSequence++); 186 msg.setClientNum(this.clientNum); 187 return msg; 188 } 189 190 private void checkForError() { 191 final Throwable t = (Throwable ) error.get(); 192 if (t != null) { throw new RuntimeException (t); } 193 } 194 } 195 | Popular Tags |