KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > memorydatastore > client > MemoryDataStoreClient


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
3  */

4 package com.tc.memorydatastore.client;
5
6 import com.tc.config.schema.dynamic.FixedValueConfigItem;
7 import com.tc.exception.TCRuntimeException;
8 import com.tc.memorydatastore.message.MemoryDataStoreRequestMessage;
9 import com.tc.memorydatastore.message.MemoryDataStoreResponseMessage;
10 import com.tc.memorydatastore.server.MemoryDataStoreServer;
11 import com.tc.net.MaxConnectionsExceededException;
12 import com.tc.net.core.ConnectionInfo;
13 import com.tc.net.protocol.PlainNetworkStackHarnessFactory;
14 import com.tc.net.protocol.tcm.ClientMessageChannel;
15 import com.tc.net.protocol.tcm.CommunicationsManager;
16 import com.tc.net.protocol.tcm.CommunicationsManagerImpl;
17 import com.tc.net.protocol.tcm.NullMessageMonitor;
18 import com.tc.net.protocol.tcm.TCMessage;
19 import com.tc.net.protocol.tcm.TCMessageType;
20 import com.tc.net.protocol.transport.NullConnectionPolicy;
21 import com.tc.object.lockmanager.api.ThreadID;
22 import com.tc.object.session.NullSessionManager;
23 import com.tc.util.Assert;
24 import com.tc.util.TCTimeoutException;
25 import com.tc.util.concurrent.ThreadUtil;
26
27 import java.io.IOException JavaDoc;
28 import java.net.ConnectException JavaDoc;
29 import java.util.Collection JavaDoc;
30 import java.util.HashMap JavaDoc;
31 import java.util.Map JavaDoc;
32
33 public class MemoryDataStoreClient implements MemoryDataMap {
34   private final static CommunicationsManager communicationsManager = new CommunicationsManagerImpl(
35                                                                        new NullMessageMonitor(),
36                                                                        new PlainNetworkStackHarnessFactory(),
37                                                                        new NullConnectionPolicy());
38
39   private ClientMessageChannel channel;
40   private final Map JavaDoc pendingRequests = new HashMap JavaDoc();
41   private final Map JavaDoc waitObjectMap = new HashMap JavaDoc();
42   private final Map JavaDoc pendingResponses = new HashMap JavaDoc();
43   private final String JavaDoc storeName;
44   private final ThreadLocal JavaDoc threadID = new ThreadLocal JavaDoc();
45
46   private long threadIDSequence;
47
48   public MemoryDataStoreClient(String JavaDoc storeName, String JavaDoc serverHost, int serverPort) {
49     super();
50     this.storeName = storeName;
51     setupClient(serverHost, serverPort);
52   }
53
54   public MemoryDataStoreClient(String JavaDoc storeName) {
55     this(storeName, "localhost", MemoryDataStoreServer.DEFAULT_PORT);
56   }
57
58   private void setupClient(String JavaDoc serverHost, int serverPort) {
59
60     this.channel = communicationsManager.createClientChannel(new NullSessionManager(), -1, serverHost, serverPort,
61         10000, new FixedValueConfigItem(new ConnectionInfo[] { new ConnectionInfo(serverHost, serverPort) }));
62
63     channel.addClassMapping(TCMessageType.MEMORY_DATA_STORE_RESPONSE_MESSAGE, MemoryDataStoreResponseMessage.class);
64     channel.addClassMapping(TCMessageType.MEMORY_DATA_STORE_REQUEST_MESSAGE, MemoryDataStoreRequestMessage.class);
65     channel.routeMessageType(TCMessageType.MEMORY_DATA_STORE_RESPONSE_MESSAGE, new MemoryDataStoreResponseSink(this));
66
67     while (true) {
68       try {
69         channel.open();
70         break;
71       } catch (TCTimeoutException tcte) {
72         ThreadUtil.reallySleep(5000);
73       } catch (ConnectException JavaDoc e) {
74         ThreadUtil.reallySleep(5000);
75       } catch (MaxConnectionsExceededException e) {
76         ThreadUtil.reallySleep(5000);
77       } catch (IOException JavaDoc ioe) {
78         ioe.printStackTrace();
79         throw new RuntimeException JavaDoc(ioe);
80       }
81     }
82   }
83
84   public void close() {
85     channel.close();
86   }
87
88   private synchronized long nextThreadID() {
89     return ++threadIDSequence;
90   }
91
92   private ThreadID getThreadID() {
93     ThreadID rv = (ThreadID) threadID.get();
94     if (rv == null) {
95       rv = new ThreadID(nextThreadID());
96       threadID.set(rv);
97     }
98
99     return rv;
100   }
101
102   public void put(byte[] key, byte[] value) {
103     ThreadID threadID = getThreadID();
104     MemoryDataStoreRequestMessage request = (MemoryDataStoreRequestMessage) channel
105         .createMessage(TCMessageType.MEMORY_DATA_STORE_REQUEST_MESSAGE);
106     request.initializePut(threadID, this.storeName, key, value);
107     request.send();
108     // MemoryDataStoreResponseMessage responseMessage =
109
// waitForResponse(threadID, request);
110
// Assert.assertTrue(responseMessage.isRequestCompletedFlag());
111
}
112
113   public byte[] get(byte[] key) {
114     ThreadID threadID = getThreadID();
115     MemoryDataStoreRequestMessage request = (MemoryDataStoreRequestMessage) channel
116         .createMessage(TCMessageType.MEMORY_DATA_STORE_REQUEST_MESSAGE);
117     request.initializeGet(threadID, this.storeName, key, false);
118
119     Object JavaDoc waitObject = getWaitObject(threadID, request);
120
121     request.send();
122     MemoryDataStoreResponseMessage responseMessage = waitForResponse(threadID, waitObject);
123     Assert.assertTrue(responseMessage.isRequestCompletedFlag());
124     return responseMessage.getValue();
125   }
126
127   public Collection JavaDoc getAll(byte[] key) {
128     ThreadID threadID = getThreadID();
129     MemoryDataStoreRequestMessage request = (MemoryDataStoreRequestMessage) channel
130         .createMessage(TCMessageType.MEMORY_DATA_STORE_REQUEST_MESSAGE);
131     request.initializeGet(threadID, this.storeName, key, true);
132
133     Object JavaDoc waitObject = getWaitObject(threadID, request);
134
135     request.send();
136     MemoryDataStoreResponseMessage responseMessage = waitForResponse(threadID, waitObject);
137     Assert.assertTrue(responseMessage.isRequestCompletedFlag());
138     return responseMessage.getValues();
139   }
140
141   public void remove(byte[] key) {
142     ThreadID threadID = getThreadID();
143     MemoryDataStoreRequestMessage request = (MemoryDataStoreRequestMessage) channel
144         .createMessage(TCMessageType.MEMORY_DATA_STORE_REQUEST_MESSAGE);
145     request.initializeRemove(threadID, this.storeName, key, false);
146     request.send();
147     // MemoryDataStoreResponseMessage responseMessage =
148
// waitForResponse(threadID, request);
149
// Assert.assertTrue(responseMessage.isRequestCompletedFlag());
150
// return responseMessage.getValue();
151
}
152
153   public void removeAll(byte[] key) {
154     ThreadID threadID = getThreadID();
155     MemoryDataStoreRequestMessage request = (MemoryDataStoreRequestMessage) channel
156         .createMessage(TCMessageType.MEMORY_DATA_STORE_REQUEST_MESSAGE);
157     request.initializeRemove(threadID, this.storeName, key, true);
158     request.send();
159     // MemoryDataStoreResponseMessage responseMessage =
160
// waitForResponse(threadID, request);
161
// Assert.assertTrue(responseMessage.isRequestCompletedFlag());
162
// return responseMessage.getNumOfRemove();
163
}
164
165   void notifyResponse(ThreadID threadID, MemoryDataStoreResponseMessage response) {
166     Object JavaDoc waitObject = null;
167     synchronized (this) {
168       waitObject = this.waitObjectMap.get(threadID);
169       Object JavaDoc pendingRequest = this.pendingRequests.remove(threadID);
170       this.pendingResponses.put(threadID, response);
171       Assert.assertNotNull(waitObject);
172       Assert.assertNotNull(pendingRequest);
173     }
174     synchronized (waitObject) {
175       waitObject.notifyAll();
176     }
177   }
178
179   private MemoryDataStoreResponseMessage waitForResponse(ThreadID threadID, Object JavaDoc waitObject) {
180     synchronized (waitObject) {
181       while (hasPendingRequest(threadID)) {
182         try {
183           waitObject.wait();
184         } catch (InterruptedException JavaDoc e) {
185           throw new TCRuntimeException(e);
186         }
187       }
188     }
189     synchronized (this) {
190       MemoryDataStoreResponseMessage responseMessage = (MemoryDataStoreResponseMessage) this.pendingResponses
191           .remove(threadID);
192       Assert.assertNotNull(responseMessage);
193       return responseMessage;
194     }
195   }
196
197   private boolean hasPendingRequest(ThreadID threadID) {
198     return this.pendingRequests.get(threadID) != null;
199   }
200
201   private synchronized Object JavaDoc getWaitObject(ThreadID threadID, TCMessage message) {
202     Object JavaDoc waitObject = new Object JavaDoc();
203     this.waitObjectMap.put(threadID, waitObject);
204     this.pendingRequests.put(threadID, message);
205     return waitObject;
206   }
207 }
208
Popular Tags