1 package com.tc.memorydatastore.server; 2 3 import com.tc.async.api.ConfigurationContext; 4 import com.tc.async.api.Stage; 5 import com.tc.async.api.StageManager; 6 import com.tc.async.impl.StageManagerImpl; 7 import com.tc.exception.TCRuntimeException; 8 import com.tc.lang.TCThreadGroup; 9 import com.tc.lang.ThrowableHandler; 10 import com.tc.logging.TCLogger; 11 import com.tc.logging.TCLogging; 12 import com.tc.memorydatastore.message.MemoryDataStoreRequestMessage; 13 import com.tc.memorydatastore.message.MemoryDataStoreResponseMessage; 14 import com.tc.memorydatastore.server.handler.MemoryDataStoreRequestHandler; 15 import com.tc.net.TCSocketAddress; 16 import com.tc.net.protocol.PlainNetworkStackHarnessFactory; 17 import com.tc.net.protocol.tcm.CommunicationsManager; 18 import com.tc.net.protocol.tcm.CommunicationsManagerImpl; 19 import com.tc.net.protocol.tcm.HydrateHandler; 20 import com.tc.net.protocol.tcm.NetworkListener; 21 import com.tc.net.protocol.tcm.NullMessageMonitor; 22 import com.tc.net.protocol.tcm.TCMessageType; 23 import com.tc.net.protocol.transport.DefaultConnectionIdFactory; 24 import com.tc.net.protocol.transport.NullConnectionPolicy; 25 import com.tc.object.session.NullSessionManager; 26 import com.tc.util.TCTimeoutException; 27 28 import java.io.IOException ; 29 import java.util.Collections ; 30 31 public class MemoryDataStoreServer { 32 public static final int DEFAULT_PORT = 9001; 33 34 private static final String MEMORY_DATA_STORE_REQUEST_STAGE = "memory_data_store_request_stage"; 35 private final static int STARTED = 1; 36 private final static int STOPPED = 2; 37 38 private int serverPort; 39 private int state; 40 private NetworkListener lsnr; 41 private CommunicationsManager communicationManager; 42 43 public static MemoryDataStoreServer createInstance() { 44 return new MemoryDataStoreServer(DEFAULT_PORT); 45 } 46 47 public static MemoryDataStoreServer createInstance(int port) { 48 return new MemoryDataStoreServer(port); 49 } 50 51 private MemoryDataStoreServer(int serverPort) { 52 super(); 53 this.serverPort = serverPort; 54 } 55 56 public int getListenPort() { 57 return lsnr.getBindPort(); 58 } 59 60 private StageManager getStageManager() { 61 ThrowableHandler throwableHandler = new ThrowableHandler(TCLogging.getLogger(MemoryDataStoreServer.class)); 62 TCThreadGroup threadGroup = new TCThreadGroup(throwableHandler); 63 return new StageManagerImpl(threadGroup); 64 } 65 66 private void setupListener(int serverPort) { 67 this.communicationManager = new CommunicationsManagerImpl(new NullMessageMonitor(), 68 new PlainNetworkStackHarnessFactory(), new NullConnectionPolicy()); 69 this.lsnr = communicationManager.createListener(new NullSessionManager(), new TCSocketAddress( 70 TCSocketAddress.WILDCARD_ADDR, serverPort), true, new DefaultConnectionIdFactory()); 71 } 72 73 public void start() throws IOException { 74 StageManager stageManager = getStageManager(); 75 setupListener(serverPort); 76 77 lsnr.addClassMapping(TCMessageType.MEMORY_DATA_STORE_REQUEST_MESSAGE, MemoryDataStoreRequestMessage.class); 78 lsnr.addClassMapping(TCMessageType.MEMORY_DATA_STORE_RESPONSE_MESSAGE, MemoryDataStoreResponseMessage.class); 79 80 Stage hydrateStage = stageManager.createStage("hydrate_message_stage", new HydrateHandler(), 1, 500); 83 MemoryDataStoreRequestHandler memoryDataStoreRequestHandler = new MemoryDataStoreRequestHandler(); 84 Stage memoryDataStoreRequestStage = stageManager.createStage(MEMORY_DATA_STORE_REQUEST_STAGE, 85 memoryDataStoreRequestHandler, 1, 1); 86 lsnr.routeMessageType(TCMessageType.MEMORY_DATA_STORE_REQUEST_MESSAGE, memoryDataStoreRequestStage.getSink(), 87 hydrateStage.getSink()); 88 89 stageManager.startAll(new NullContext(stageManager)); lsnr.start(Collections.EMPTY_SET); 92 this.state = STARTED; 93 } 94 95 public void shutdown() throws TCTimeoutException { 96 this.lsnr.stop(5000); 97 this.communicationManager.shutdown(); 98 this.state = STOPPED; 99 } 100 101 public int getState() { 102 return this.state; 103 } 104 105 private static class NullContext implements ConfigurationContext { 107 108 private final StageManager manager; 109 110 public NullContext(StageManager manager) { 111 this.manager = manager; 112 } 113 114 public TCLogger getLogger(Class clazz) { 115 return TCLogging.getLogger(clazz); 116 } 117 118 public Stage getStage(String name) { 119 return manager.getStage(name); 120 } 121 122 } 123 124 public static void main(String [] args) { 125 MemoryDataStoreServer server = createInstance(0); 126 try { 127 server.start(); 128 129 while (server.getState() == STARTED) { 130 Thread.sleep(Long.MAX_VALUE); 131 } 132 } catch (InterruptedException e) { 133 throw new TCRuntimeException(e); 134 } catch (IOException e) { 135 throw new TCRuntimeException(e); 136 } 137 } 138 } 139 | Popular Tags |