1 4 package com.tc.memorydatastore.client; 5 6 import com.tc.config.schema.dynamic.FixedValueConfigItem; 7 import com.tc.exception.TCRuntimeException; 8 import com.tc.memorydatastore.message.MemoryDataStoreRequestMessage; 9 import com.tc.memorydatastore.message.MemoryDataStoreResponseMessage; 10 import com.tc.memorydatastore.server.MemoryDataStoreServer; 11 import com.tc.net.MaxConnectionsExceededException; 12 import com.tc.net.core.ConnectionInfo; 13 import com.tc.net.protocol.PlainNetworkStackHarnessFactory; 14 import com.tc.net.protocol.tcm.ClientMessageChannel; 15 import com.tc.net.protocol.tcm.CommunicationsManager; 16 import com.tc.net.protocol.tcm.CommunicationsManagerImpl; 17 import com.tc.net.protocol.tcm.NullMessageMonitor; 18 import com.tc.net.protocol.tcm.TCMessage; 19 import com.tc.net.protocol.tcm.TCMessageType; 20 import com.tc.net.protocol.transport.NullConnectionPolicy; 21 import com.tc.object.lockmanager.api.ThreadID; 22 import com.tc.object.session.NullSessionManager; 23 import com.tc.util.Assert; 24 import com.tc.util.TCTimeoutException; 25 import com.tc.util.concurrent.ThreadUtil; 26 27 import java.io.IOException ; 28 import java.net.ConnectException ; 29 import java.util.Collection ; 30 import java.util.HashMap ; 31 import java.util.Map ; 32 33 public class MemoryDataStoreClient implements MemoryDataMap { 34 private final static CommunicationsManager communicationsManager = new CommunicationsManagerImpl( 35 new NullMessageMonitor(), 36 new PlainNetworkStackHarnessFactory(), 37 new NullConnectionPolicy()); 38 39 private ClientMessageChannel channel; 40 private final Map pendingRequests = new HashMap (); 41 private final Map waitObjectMap = new HashMap (); 42 private final Map pendingResponses = new HashMap (); 43 private final String storeName; 44 private final ThreadLocal threadID = new ThreadLocal (); 45 46 private long threadIDSequence; 47 48 public MemoryDataStoreClient(String storeName, String serverHost, int serverPort) { 49 super(); 50 this.storeName = storeName; 51 setupClient(serverHost, serverPort); 52 } 53 54 public MemoryDataStoreClient(String storeName) { 55 this(storeName, "localhost", MemoryDataStoreServer.DEFAULT_PORT); 56 } 57 58 private void setupClient(String serverHost, int serverPort) { 59 60 this.channel = communicationsManager.createClientChannel(new NullSessionManager(), -1, serverHost, serverPort, 61 10000, new FixedValueConfigItem(new ConnectionInfo[] { new ConnectionInfo(serverHost, serverPort) })); 62 63 channel.addClassMapping(TCMessageType.MEMORY_DATA_STORE_RESPONSE_MESSAGE, MemoryDataStoreResponseMessage.class); 64 channel.addClassMapping(TCMessageType.MEMORY_DATA_STORE_REQUEST_MESSAGE, MemoryDataStoreRequestMessage.class); 65 channel.routeMessageType(TCMessageType.MEMORY_DATA_STORE_RESPONSE_MESSAGE, new MemoryDataStoreResponseSink(this)); 66 67 while (true) { 68 try { 69 channel.open(); 70 break; 71 } catch (TCTimeoutException tcte) { 72 ThreadUtil.reallySleep(5000); 73 } catch (ConnectException e) { 74 ThreadUtil.reallySleep(5000); 75 } catch (MaxConnectionsExceededException e) { 76 ThreadUtil.reallySleep(5000); 77 } catch (IOException ioe) { 78 ioe.printStackTrace(); 79 throw new RuntimeException (ioe); 80 } 81 } 82 } 83 84 public void close() { 85 channel.close(); 86 } 87 88 private synchronized long nextThreadID() { 89 return ++threadIDSequence; 90 } 91 92 private ThreadID getThreadID() { 93 ThreadID rv = (ThreadID) threadID.get(); 94 if (rv == null) { 95 rv = new ThreadID(nextThreadID()); 96 threadID.set(rv); 97 } 98 99 return rv; 100 } 101 102 public void put(byte[] key, byte[] value) { 103 ThreadID threadID = getThreadID(); 104 MemoryDataStoreRequestMessage request = (MemoryDataStoreRequestMessage) channel 105 .createMessage(TCMessageType.MEMORY_DATA_STORE_REQUEST_MESSAGE); 106 request.initializePut(threadID, this.storeName, key, value); 107 request.send(); 108 } 112 113 public byte[] get(byte[] key) { 114 ThreadID threadID = getThreadID(); 115 MemoryDataStoreRequestMessage request = (MemoryDataStoreRequestMessage) channel 116 .createMessage(TCMessageType.MEMORY_DATA_STORE_REQUEST_MESSAGE); 117 request.initializeGet(threadID, this.storeName, key, false); 118 119 Object waitObject = getWaitObject(threadID, request); 120 121 request.send(); 122 MemoryDataStoreResponseMessage responseMessage = waitForResponse(threadID, waitObject); 123 Assert.assertTrue(responseMessage.isRequestCompletedFlag()); 124 return responseMessage.getValue(); 125 } 126 127 public Collection getAll(byte[] key) { 128 ThreadID threadID = getThreadID(); 129 MemoryDataStoreRequestMessage request = (MemoryDataStoreRequestMessage) channel 130 .createMessage(TCMessageType.MEMORY_DATA_STORE_REQUEST_MESSAGE); 131 request.initializeGet(threadID, this.storeName, key, true); 132 133 Object waitObject = getWaitObject(threadID, request); 134 135 request.send(); 136 MemoryDataStoreResponseMessage responseMessage = waitForResponse(threadID, waitObject); 137 Assert.assertTrue(responseMessage.isRequestCompletedFlag()); 138 return responseMessage.getValues(); 139 } 140 141 public void remove(byte[] key) { 142 ThreadID threadID = getThreadID(); 143 MemoryDataStoreRequestMessage request = (MemoryDataStoreRequestMessage) channel 144 .createMessage(TCMessageType.MEMORY_DATA_STORE_REQUEST_MESSAGE); 145 request.initializeRemove(threadID, this.storeName, key, false); 146 request.send(); 147 } 152 153 public void removeAll(byte[] key) { 154 ThreadID threadID = getThreadID(); 155 MemoryDataStoreRequestMessage request = (MemoryDataStoreRequestMessage) channel 156 .createMessage(TCMessageType.MEMORY_DATA_STORE_REQUEST_MESSAGE); 157 request.initializeRemove(threadID, this.storeName, key, true); 158 request.send(); 159 } 164 165 void notifyResponse(ThreadID threadID, MemoryDataStoreResponseMessage response) { 166 Object waitObject = null; 167 synchronized (this) { 168 waitObject = this.waitObjectMap.get(threadID); 169 Object pendingRequest = this.pendingRequests.remove(threadID); 170 this.pendingResponses.put(threadID, response); 171 Assert.assertNotNull(waitObject); 172 Assert.assertNotNull(pendingRequest); 173 } 174 synchronized (waitObject) { 175 waitObject.notifyAll(); 176 } 177 } 178 179 private MemoryDataStoreResponseMessage waitForResponse(ThreadID threadID, Object waitObject) { 180 synchronized (waitObject) { 181 while (hasPendingRequest(threadID)) { 182 try { 183 waitObject.wait(); 184 } catch (InterruptedException e) { 185 throw new TCRuntimeException(e); 186 } 187 } 188 } 189 synchronized (this) { 190 MemoryDataStoreResponseMessage responseMessage = (MemoryDataStoreResponseMessage) this.pendingResponses 191 .remove(threadID); 192 Assert.assertNotNull(responseMessage); 193 return responseMessage; 194 } 195 } 196 197 private boolean hasPendingRequest(ThreadID threadID) { 198 return this.pendingRequests.get(threadID) != null; 199 } 200 201 private synchronized Object getWaitObject(ThreadID threadID, TCMessage message) { 202 Object waitObject = new Object (); 203 this.waitObjectMap.put(threadID, waitObject); 204 this.pendingRequests.put(threadID, message); 205 return waitObject; 206 } 207 } 208 | Popular Tags |