1 4 package com.tctest; 5 6 import EDU.oswego.cs.dl.util.concurrent.BrokenBarrierException; 7 import EDU.oswego.cs.dl.util.concurrent.CyclicBarrier; 8 9 import com.tc.exception.TCRuntimeException; 10 import com.tc.memorydatastore.client.MemoryDataStoreClient; 11 import com.tc.memorydatastore.message.TCByteArrayKeyValuePair; 12 import com.tc.memorydatastore.server.MemoryDataStoreServer; 13 import com.tc.util.Assert; 14 15 import java.util.Collection ; 16 import java.util.Iterator ; 17 18 import junit.framework.TestCase; 19 20 public class MemoryDataMapTest extends TestCase { 21 private MemoryDataStoreServer server = null; 22 23 protected void setUp() throws Exception { 24 super.setUp(); 25 server = MemoryDataStoreServer.createInstance(0); 26 server.start(); 27 System.err.println("Server started on port " + server.getListenPort()); 28 } 29 30 protected void tearDown() throws Exception { 31 super.tearDown(); 32 server.shutdown(); 33 server = null; 34 Thread.sleep(5000); 35 } 36 37 public void testPerf() throws Exception { 38 int numOfPut = 5000; 39 MemoryDataStoreClient client = new MemoryDataStoreClient("storePerf", "localhost", server.getListenPort()); 40 for (int i=0; i<numOfPut; i++) { 41 String key = "key"+i; 42 String value = "value"+i; 43 client.put(key.getBytes(), value.getBytes()); 44 if (i%100 == 0) { 45 Thread.sleep(1); 46 } 47 if (i%100000 == 0) { 48 System.err.println("Send out: " + i + " requests"); 49 } 50 } 51 long start = System.currentTimeMillis(); 52 Object o = client.get("key10".getBytes()); 53 Assert.assertNotNull(o); 54 long end = System.currentTimeMillis(); 55 System.err.println("Time to get 1 item: " + (end-start) + "ms"); 56 57 byte[] bytes = new byte[]{107, 101}; 58 start = System.currentTimeMillis(); 59 Collection allValues = client.getAll(bytes); 60 end = System.currentTimeMillis(); 61 Assert.assertEquals(numOfPut, allValues.size()); 62 System.err.println("Time to get " + numOfPut + " items: " + (end-start) + "ms"); 63 64 client.close(); 65 } 66 67 public void testBasic() throws Exception { 68 CyclicBarrier barrier = new CyclicBarrier(16); 69 70 MemoryDataStoreClient client1 = new MemoryDataStoreClient("store1", "localhost", server.getListenPort()); 71 MemoryDataStoreClient client2 = new MemoryDataStoreClient("store2", "localhost", server.getListenPort()); 72 73 Runnable [] putClients = new Runnable [15]; 74 putClients[0] = new TestPutClient(barrier, client1, "key1".getBytes(), "value1".getBytes()); 75 putClients[1] = new TestPutClient(barrier, client1, "key2".getBytes(), "value2".getBytes()); 76 putClients[2] = new TestPutClient(barrier, client1, "key3".getBytes(), "value3".getBytes()); 77 putClients[3] = new TestPutClient(barrier, client1, "key4".getBytes(), "value4".getBytes()); 78 putClients[4] = new TestPutClient(barrier, client1, "key5".getBytes(), "value5".getBytes()); 79 putClients[5] = new TestPutClient(barrier, client1, "key6".getBytes(), "value6".getBytes()); 80 putClients[6] = new TestPutClient(barrier, client1, "key7".getBytes(), "value7".getBytes()); 81 putClients[7] = new TestPutClient(barrier, client1, "key8".getBytes(), "value8".getBytes()); 82 putClients[8] = new TestPutClient(barrier, client1, "key9".getBytes(), "value9".getBytes()); 83 putClients[9] = new TestPutClient(barrier, client1, "key10".getBytes(), "value10".getBytes()); 84 putClients[10] = new TestPutClient(barrier, client2, "key1".getBytes(), "value1".getBytes()); 85 putClients[11] = new TestPutClient(barrier, client2, "key2".getBytes(), "value2".getBytes()); 86 putClients[12] = new TestPutClient(barrier, client2, "key3".getBytes(), "value3".getBytes()); 87 putClients[13] = new TestPutClient(barrier, client2, "key4".getBytes(), "value4".getBytes()); 88 putClients[14] = new TestPutClient(barrier, client2, "key5".getBytes(), "value5".getBytes()); 89 90 runAllClients(putClients); 91 92 barrier.barrier(); 93 94 Runnable [] getClients = new Runnable [17]; 95 barrier = new CyclicBarrier(18); 96 97 getClients[0] = new TestGetClient(barrier, client1, "key1".getBytes(), "value1".getBytes()); 98 getClients[1] = new TestGetClient(barrier, client1, "key2".getBytes(), "value2".getBytes()); 99 getClients[2] = new TestGetClient(barrier, client1, "key3".getBytes(), "value3".getBytes()); 100 getClients[3] = new TestGetClient(barrier, client1, "key4".getBytes(), "value4".getBytes()); 101 getClients[4] = new TestGetClient(barrier, client1, "key5".getBytes(), "value5".getBytes()); 102 getClients[5] = new TestGetClient(barrier, client1, "key6".getBytes(), "value6".getBytes()); 103 getClients[6] = new TestGetClient(barrier, client1, "key7".getBytes(), "value7".getBytes()); 104 getClients[7] = new TestGetClient(barrier, client1, "key8".getBytes(), "value8".getBytes()); 105 getClients[8] = new TestGetClient(barrier, client1, "key9".getBytes(), "value9".getBytes()); 106 getClients[9] = new TestGetClient(barrier, client1, "key10".getBytes(), "value10".getBytes()); 107 getClients[10] = new TestGetClient(barrier, client2, "key1".getBytes(), "value1".getBytes()); 108 getClients[11] = new TestGetClient(barrier, client2, "key2".getBytes(), "value2".getBytes()); 109 getClients[12] = new TestGetClient(barrier, client2, "key3".getBytes(), "value3".getBytes()); 110 getClients[13] = new TestGetClient(barrier, client2, "key4".getBytes(), "value4".getBytes()); 111 getClients[14] = new TestGetClient(barrier, client2, "key5".getBytes(), "value5".getBytes()); 112 getClients[15] = new TestGetClient(barrier, client2, "key6".getBytes(), null); 113 getClients[16] = new TestGetClient(barrier, client1, "key11".getBytes(), null); 114 115 runAllClients(getClients); 116 117 barrier.barrier(); 118 119 client1.close(); 120 client2.close(); 121 122 MemoryDataStoreClient client = new MemoryDataStoreClient("store2", "localhost", server.getListenPort()); 123 byte[] bytes = new byte[]{107, 101}; 124 Collection allValues = client.getAll(bytes); 125 System.err.println("Getting all of [107 101]:"); 126 printByteArrayCollection(allValues); 127 128 client = new MemoryDataStoreClient("store1", "localhost", server.getListenPort()); 129 client.remove("key1".getBytes()); 130 131 Object o = client.get("key1".getBytes()); 132 Assert.assertNull(o); 133 134 bytes = new byte[]{107, 101}; 135 client.removeAll(bytes); 136 137 o = client.get("key5".getBytes()); 138 Assert.assertNull(o); 139 140 } 141 142 private void printByteArrayCollection(Collection allValues) { 143 int size = allValues.size(); 144 145 System.err.println("No of values: " + size); 146 for (Iterator i=allValues.iterator(); i.hasNext(); ) { 147 TCByteArrayKeyValuePair keyValuePair = (TCByteArrayKeyValuePair)i.next(); 148 149 byte[] key = keyValuePair.getKey(); 150 byte[] value = keyValuePair.getValue(); 151 152 System.err.print("key: ["); 153 for (int j=0; j<key.length; j++) { 154 System.err.print(key[j]); 155 System.err.print(" "); 156 } 157 System.err.print("], value: ["); 158 for (int j=0; j<value.length; j++) { 159 System.err.print(value[j]); 160 System.err.print(" "); 161 } 162 System.err.println("]"); 163 } 164 } 165 166 private static void runAllClients(Runnable [] runnableClients) { 167 Thread [] allClients = new Thread [runnableClients.length]; 168 for (int i=0; i<runnableClients.length; i++) { 169 allClients[i] = new Thread (runnableClients[i]); 170 allClients[i].start(); 171 } 172 } 173 174 private static class TestGetClient implements Runnable { 175 private final CyclicBarrier barrier; 176 private final byte[] key; 177 private final byte[] expectedValue; 178 private final MemoryDataStoreClient client; 179 180 public TestGetClient(CyclicBarrier barrier, MemoryDataStoreClient client, byte[] key, byte[] expectedValue) { 181 this.barrier = barrier; 182 this.key = key; 183 this.expectedValue = expectedValue; 184 this.client = client; 185 } 186 187 public void run() { 188 byte[] value = client.get(key); 189 Assert.assertEquals(expectedValue, value); 190 191 try { 192 barrier.barrier(); 193 } catch (InterruptedException e) { 194 throw new TCRuntimeException(e); 195 } catch (BrokenBarrierException e) { 196 throw new TCRuntimeException(e); 197 } 198 System.err.println("Finish running get client"); 199 } 200 } 201 202 private static class TestPutClient implements Runnable { 203 private final CyclicBarrier barrier; 204 private final byte[] key; 205 private final byte[] value; 206 private final MemoryDataStoreClient client; 207 208 public TestPutClient(CyclicBarrier barrier, MemoryDataStoreClient client, byte[] key, byte[] value) { 209 this.barrier = barrier; 210 this.key = key; 211 this.value = value; 212 this.client = client; 213 } 214 215 public void run() { 216 client.put(key, value); 217 218 try { 219 barrier.barrier(); 220 } catch (InterruptedException e) { 221 throw new TCRuntimeException(e); 222 } catch (BrokenBarrierException e) { 223 throw new TCRuntimeException(e); 224 } 225 } 226 } 227 228 } 229 | Popular Tags |