KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > memorydatastore > server > MemoryDataStoreServer


1 package com.tc.memorydatastore.server;
2
3 import com.tc.async.api.ConfigurationContext;
4 import com.tc.async.api.Stage;
5 import com.tc.async.api.StageManager;
6 import com.tc.async.impl.StageManagerImpl;
7 import com.tc.exception.TCRuntimeException;
8 import com.tc.lang.TCThreadGroup;
9 import com.tc.lang.ThrowableHandler;
10 import com.tc.logging.TCLogger;
11 import com.tc.logging.TCLogging;
12 import com.tc.memorydatastore.message.MemoryDataStoreRequestMessage;
13 import com.tc.memorydatastore.message.MemoryDataStoreResponseMessage;
14 import com.tc.memorydatastore.server.handler.MemoryDataStoreRequestHandler;
15 import com.tc.net.TCSocketAddress;
16 import com.tc.net.protocol.PlainNetworkStackHarnessFactory;
17 import com.tc.net.protocol.tcm.CommunicationsManager;
18 import com.tc.net.protocol.tcm.CommunicationsManagerImpl;
19 import com.tc.net.protocol.tcm.HydrateHandler;
20 import com.tc.net.protocol.tcm.NetworkListener;
21 import com.tc.net.protocol.tcm.NullMessageMonitor;
22 import com.tc.net.protocol.tcm.TCMessageType;
23 import com.tc.net.protocol.transport.DefaultConnectionIdFactory;
24 import com.tc.net.protocol.transport.NullConnectionPolicy;
25 import com.tc.object.session.NullSessionManager;
26 import com.tc.util.TCTimeoutException;
27
28 import java.io.IOException JavaDoc;
29 import java.util.Collections JavaDoc;
30
31 public class MemoryDataStoreServer {
32   public static final int DEFAULT_PORT = 9001;
33
34   private static final String JavaDoc MEMORY_DATA_STORE_REQUEST_STAGE = "memory_data_store_request_stage";
35   private final static int STARTED = 1;
36   private final static int STOPPED = 2;
37
38   private int serverPort;
39   private int state;
40   private NetworkListener lsnr;
41   private CommunicationsManager communicationManager;
42
43   public static MemoryDataStoreServer createInstance() {
44     return new MemoryDataStoreServer(DEFAULT_PORT);
45   }
46
47   public static MemoryDataStoreServer createInstance(int port) {
48     return new MemoryDataStoreServer(port);
49   }
50
51   private MemoryDataStoreServer(int serverPort) {
52     super();
53     this.serverPort = serverPort;
54   }
55
56   public int getListenPort() {
57     return lsnr.getBindPort();
58   }
59
60   private StageManager getStageManager() {
61     ThrowableHandler throwableHandler = new ThrowableHandler(TCLogging.getLogger(MemoryDataStoreServer.class));
62     TCThreadGroup threadGroup = new TCThreadGroup(throwableHandler);
63     return new StageManagerImpl(threadGroup);
64   }
65
66   private void setupListener(int serverPort) {
67     this.communicationManager = new CommunicationsManagerImpl(new NullMessageMonitor(),
68         new PlainNetworkStackHarnessFactory(), new NullConnectionPolicy());
69     this.lsnr = communicationManager.createListener(new NullSessionManager(), new TCSocketAddress(
70         TCSocketAddress.WILDCARD_ADDR, serverPort), true, new DefaultConnectionIdFactory());
71   }
72
73   public void start() throws IOException JavaDoc {
74     StageManager stageManager = getStageManager();
75     setupListener(serverPort);
76
77     lsnr.addClassMapping(TCMessageType.MEMORY_DATA_STORE_REQUEST_MESSAGE, MemoryDataStoreRequestMessage.class);
78     lsnr.addClassMapping(TCMessageType.MEMORY_DATA_STORE_RESPONSE_MESSAGE, MemoryDataStoreResponseMessage.class);
79
80     Stage hydrateStage = stageManager.createStage("hydrate_message_stage", new HydrateHandler(), 1, 500); // temporary
81
// hardcoded
82

83     MemoryDataStoreRequestHandler memoryDataStoreRequestHandler = new MemoryDataStoreRequestHandler();
84     Stage memoryDataStoreRequestStage = stageManager.createStage(MEMORY_DATA_STORE_REQUEST_STAGE,
85         memoryDataStoreRequestHandler, 1, 1);
86     lsnr.routeMessageType(TCMessageType.MEMORY_DATA_STORE_REQUEST_MESSAGE, memoryDataStoreRequestStage.getSink(),
87         hydrateStage.getSink());
88
89     stageManager.startAll(new NullContext(stageManager)); // temporary hack to
90
// start the stage
91
lsnr.start(Collections.EMPTY_SET);
92     this.state = STARTED;
93   }
94
95   public void shutdown() throws TCTimeoutException {
96     this.lsnr.stop(5000);
97     this.communicationManager.shutdown();
98     this.state = STOPPED;
99   }
100
101   public int getState() {
102     return this.state;
103   }
104
105   // Temporary hack
106
private static class NullContext implements ConfigurationContext {
107
108     private final StageManager manager;
109
110     public NullContext(StageManager manager) {
111       this.manager = manager;
112     }
113
114     public TCLogger getLogger(Class JavaDoc clazz) {
115       return TCLogging.getLogger(clazz);
116     }
117
118     public Stage getStage(String JavaDoc name) {
119       return manager.getStage(name);
120     }
121
122   }
123
124   public static void main(String JavaDoc[] args) {
125     MemoryDataStoreServer server = createInstance(0);
126     try {
127       server.start();
128
129       while (server.getState() == STARTED) {
130         Thread.sleep(Long.MAX_VALUE);
131       }
132     } catch (InterruptedException JavaDoc e) {
133       throw new TCRuntimeException(e);
134     } catch (IOException JavaDoc e) {
135       throw new TCRuntimeException(e);
136     }
137   }
138 }
139
Popular Tags