1 21 22 package org.continuent.sequoia.controller.virtualdatabase; 23 24 import java.io.Serializable ; 25 import java.util.HashMap ; 26 import java.util.Iterator ; 27 import java.util.Map ; 28 import java.util.Map.Entry; 29 30 import org.continuent.sequoia.common.log.Trace; 31 import org.continuent.sequoia.controller.requests.AbstractRequest; 32 33 46 public class RequestResultFailoverCache implements Runnable 47 { 48 49 52 private static final long CACHE_CLEANUP_TIMEOUT = 10000; 53 54 58 private long entryTimeout; 59 60 61 private Trace logger; 62 63 private Map requestIdResult = new HashMap (); 66 67 private Map transactionIdRequestId = new HashMap (); 70 71 private Map connectionIdRequestId = new HashMap (); 74 75 private boolean isKilled = false; 76 77 private class CachedResult 80 { 81 private Serializable result; 82 private long expirationDate; 83 84 90 public CachedResult(Serializable result, long entryTimeout) 91 { 92 this.result = result; 93 expirationDate = System.currentTimeMillis() + entryTimeout; 94 } 95 96 101 public Serializable getResult() 102 { 103 return result; 104 } 105 106 111 public long getExpirationDate() 112 { 113 return expirationDate; 114 } 115 } 116 117 125 public RequestResultFailoverCache(Trace logger, long entryTimeout) 126 { 127 this.logger = logger; 128 this.entryTimeout = entryTimeout; 129 (new Thread (this, "RequestResultFailoverCacheCleanupThread")).start(); 130 } 131 132 138 public synchronized void store(AbstractRequest request, Serializable result) 139 { 140 Long requestId = new Long (request.getId()); 141 142 synchronized (requestIdResult) 145 { 146 if (requestIdResult.isEmpty()) 147 { 148 requestIdResult.notify(); 150 } 151 requestIdResult.put(requestId, new CachedResult(result, entryTimeout)); 152 } 153 154 if (!request.isAutoCommit()) 155 { Long transactionId = new Long (request.getTransactionId()); 157 if (transactionIdRequestId.containsKey(transactionId)) 158 { 159 requestIdResult.remove(transactionIdRequestId.get(transactionId)); 160 } 161 transactionIdRequestId.put(transactionId, requestId); 162 } 163 164 if (request.isPersistentConnection()) 165 { Long connectionId = new Long (request.getPersistentConnectionId()); 167 if (connectionIdRequestId.containsKey(connectionId)) 168 { 169 requestIdResult.remove(connectionIdRequestId.get(connectionId)); 170 } 171 connectionIdRequestId.put(connectionId, requestId); 172 } 173 174 if (logger.isDebugEnabled()) 175 logger.debug("Stored result for request ID: " + request.getId() + " -> " 176 + result); 177 } 178 179 185 public synchronized Serializable retrieve(long requestId) 186 { 187 Serializable res = null; 188 Long requestIdLong = new Long (requestId); 189 if (requestIdResult.containsKey(requestIdLong)) 190 { 191 res = ((CachedResult) requestIdResult.get(requestIdLong)).getResult(); 192 if (logger.isDebugEnabled()) 193 logger.debug("Retrieved result for request ID: " + requestId + " -> " 194 + res); 195 } 196 else 197 { if (logger.isDebugEnabled()) 199 logger.debug("No result found in failover cache for request " 200 + requestId); 201 } 202 return res; 203 } 204 205 210 public void run() 211 { 212 while (!isKilled) 214 { 215 try 216 { 217 synchronized (requestIdResult) 218 { 219 if (requestIdResult.isEmpty()) 222 requestIdResult.wait(); 223 else 224 requestIdResult.wait(CACHE_CLEANUP_TIMEOUT); 225 } 226 } 227 catch (InterruptedException e) 228 { 229 } 231 removeOldEntries(); 232 } 233 } 234 235 240 public void shutdown() 241 { 242 isKilled = true; 243 synchronized (requestIdResult) 244 { 245 requestIdResult.notifyAll(); 246 } 247 } 248 249 private void removeOldEntries() 250 { 251 if (logger.isDebugEnabled()) 252 logger.debug("Cleaning-up request result failover cache..."); 253 synchronized (this) 254 { 255 long currentTimeMillis = System.currentTimeMillis(); 256 257 for (Iterator iter = requestIdResult.entrySet().iterator(); iter 259 .hasNext();) 260 { 261 CachedResult cachedResult = (CachedResult) ((Entry) iter.next()) 262 .getValue(); 263 if ((currentTimeMillis > cachedResult.getExpirationDate())) 264 { 265 iter.remove(); 266 if (logger.isDebugEnabled()) 267 logger.debug("Removed result from failover cache: " 268 + cachedResult.getResult()); 269 } 270 } 271 272 for (Iterator iter = transactionIdRequestId.entrySet().iterator(); iter 274 .hasNext();) 275 { 276 Entry entry = (Entry) iter.next(); 277 if (requestIdResult.get(entry.getValue()) == null) 278 { iter.remove(); 280 if (logger.isDebugEnabled()) 281 logger.debug("Removed transaction " + entry.getKey() 282 + " from failover cache"); 283 } 284 } 285 286 for (Iterator iter = connectionIdRequestId.entrySet().iterator(); iter 288 .hasNext();) 289 { 290 Entry entry = (Entry) iter.next(); 291 if (requestIdResult.get(entry.getValue()) == null) 292 { iter.remove(); 294 if (logger.isDebugEnabled()) 295 logger.debug("Removed persistent connection " + entry.getKey() 296 + " from failover cache"); 297 } 298 } 299 300 } 301 } 302 } 303 | Popular Tags |