1 4 package com.tc.memorydatastore.server.handler; 5 6 import com.tc.async.api.AbstractEventHandler; 7 import com.tc.async.api.EventContext; 8 import com.tc.async.api.EventHandlerException; 9 import com.tc.memorydatastore.message.MemoryDataStoreRequestMessage; 10 import com.tc.memorydatastore.message.MemoryDataStoreResponseMessage; 11 import com.tc.memorydatastore.server.MemoryDataStore; 12 import com.tc.net.protocol.tcm.MessageChannel; 13 import com.tc.net.protocol.tcm.TCMessageType; 14 import com.tc.object.lockmanager.api.ThreadID; 15 import com.tc.util.Assert; 16 17 import java.util.Collection ; 18 19 public class MemoryDataStoreRequestHandler extends AbstractEventHandler { 20 private final static String DATA_STORE_NAME_ATTACHMENT_KEY = "DataStoreName"; 21 22 private final MemoryDataStore store = new MemoryDataStore(); 23 24 private long numOfRequestsProcessed = 0; 25 private long totalTimeProcessed = 0; 26 27 28 public void handleEvent(EventContext context) throws EventHandlerException { 29 long startTime = System.currentTimeMillis(); 30 31 MemoryDataStoreRequestMessage message = (MemoryDataStoreRequestMessage)context; 32 MemoryDataStoreRequestMessage dataStoreRequestMessage = (MemoryDataStoreRequestMessage) message; 33 34 MessageChannel channel = message.getChannel(); 35 36 serviceRequest(channel, dataStoreRequestMessage); 37 38 long endTime = System.currentTimeMillis(); 39 40 synchronized(this) { 41 totalTimeProcessed += (endTime - startTime); 42 numOfRequestsProcessed++; 43 if (numOfRequestsProcessed % 10000 == 0) { 44 System.err.println(numOfRequestsProcessed + " requests processed with average processing time: " + (totalTimeProcessed*1.0/numOfRequestsProcessed) + "ms"); 45 } 46 } 47 } 48 49 private void serviceRequest(MessageChannel channel, MemoryDataStoreRequestMessage requestMessage) { 50 int type = requestMessage.getType(); 51 String dataStoreName = getDataStoreName(channel, requestMessage); 52 53 byte[] key = requestMessage.getKey(); 54 byte[] value = requestMessage.getValue(); 55 56 switch (type) { 57 case MemoryDataStoreRequestMessage.PUT: 58 Assert.assertNotNull(key); 59 Assert.assertNotNull(value); 60 61 64 store.put(dataStoreName, key, value); 65 sendPutResponseMessage(channel, requestMessage.getThreadID(), true); 66 break; 67 case MemoryDataStoreRequestMessage.GET: 68 Assert.assertNotNull(key); 69 70 if (requestMessage.isGetAll()) { 71 Collection values = store.getAll(dataStoreName, key); 72 sendGetAllResponseMessage(channel, requestMessage.getThreadID(), values, true); 74 } else { 75 value = store.get(dataStoreName, key); 76 sendGetResponseMessage(channel, requestMessage.getThreadID(), value, true); 79 } 80 break; 81 case MemoryDataStoreRequestMessage.REMOVE: 82 Assert.assertNotNull(key); 83 84 if (requestMessage.isRemoveAll()) { 85 int numOfRemove = store.removeAll(dataStoreName, key); 86 sendRemoveAllResponseMessage(channel, requestMessage.getThreadID(), numOfRemove, true); 88 } else { 89 value = store.remove(dataStoreName, key); 90 sendRemoveResponseMessage(channel, requestMessage.getThreadID(), value, true); 93 } 94 break; 95 } 96 97 } 98 99 private String getDataStoreName(MessageChannel channel, MemoryDataStoreRequestMessage requestMessage) { 100 String dataStoreName = (String ) channel.getAttachment(DATA_STORE_NAME_ATTACHMENT_KEY); 101 if (dataStoreName == null) { 102 dataStoreName = requestMessage.getDataStoreName(); 103 channel.addAttachment(DATA_STORE_NAME_ATTACHMENT_KEY, dataStoreName, false); 104 } 105 return dataStoreName; 106 } 107 108 private void sendRemoveResponseMessage(MessageChannel channel, ThreadID threadID, byte[] value, 109 boolean requestCompletedStatus) { 110 MemoryDataStoreResponseMessage response = (MemoryDataStoreResponseMessage) channel 111 .createMessage(TCMessageType.MEMORY_DATA_STORE_RESPONSE_MESSAGE); 112 response.initializeRemoveResponse(threadID, value, requestCompletedStatus); 113 response.send(); 114 } 115 116 private void sendRemoveAllResponseMessage(MessageChannel channel, ThreadID threadID, int numOfRemove, 117 boolean requestCompletedStatus) { 118 MemoryDataStoreResponseMessage response = (MemoryDataStoreResponseMessage) channel 119 .createMessage(TCMessageType.MEMORY_DATA_STORE_RESPONSE_MESSAGE); 120 response.initializeRemoveAllResponse(threadID, numOfRemove, requestCompletedStatus); 121 response.send(); 122 } 123 124 private void sendGetResponseMessage(MessageChannel channel, ThreadID threadID, byte[] value, 125 boolean requestCompletedStatus) { 126 MemoryDataStoreResponseMessage response = (MemoryDataStoreResponseMessage) channel 127 .createMessage(TCMessageType.MEMORY_DATA_STORE_RESPONSE_MESSAGE); 128 response.initializeGetResponse(threadID, value, requestCompletedStatus); 129 response.send(); 130 } 131 132 private void sendGetAllResponseMessage(MessageChannel channel, ThreadID threadID, Collection values, 133 boolean requestCompletedStatus) { 134 MemoryDataStoreResponseMessage response = (MemoryDataStoreResponseMessage) channel 135 .createMessage(TCMessageType.MEMORY_DATA_STORE_RESPONSE_MESSAGE); 136 response.initializeGetAllResponse(threadID, values, requestCompletedStatus); 137 response.send(); 138 } 139 140 private void sendPutResponseMessage(MessageChannel channel, ThreadID threadID, boolean requestCompletedStatus) { 141 MemoryDataStoreResponseMessage response = (MemoryDataStoreResponseMessage) channel 142 .createMessage(TCMessageType.MEMORY_DATA_STORE_RESPONSE_MESSAGE); 143 response.initializePutResponse(threadID, requestCompletedStatus); 144 response.send(); 145 } 146 147 148 } 149 | Popular Tags |