1 21 22 package org.continuent.sequoia.controller.requestmanager.distributed; 23 24 import java.sql.SQLException ; 25 import java.util.ArrayList ; 26 import java.util.Collection ; 27 import java.util.ConcurrentModificationException ; 28 import java.util.HashMap ; 29 import java.util.Hashtable ; 30 import java.util.Iterator ; 31 import java.util.LinkedList ; 32 import java.util.List ; 33 import java.util.Map ; 34 35 import org.continuent.hedera.adapters.MulticastRequestAdapter; 36 import org.continuent.sequoia.common.exceptions.VirtualDatabaseException; 37 import org.continuent.sequoia.common.log.Trace; 38 import org.continuent.sequoia.controller.requestmanager.TransactionMetaData; 39 import org.continuent.sequoia.controller.requests.AbstractRequest; 40 import org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase; 41 import org.continuent.sequoia.controller.virtualdatabase.protocol.FlushGroupCommunicationMessages; 42 43 49 public class ControllerFailureCleanupThread extends Thread  50 { 51 private Hashtable cleanupThreadsList; 52 private HashMap writesFlushed; 53 private DistributedVirtualDatabase dvdb; 54 private DistributedRequestManager drm; 55 private long failedControllerId; 56 private long failoverTimeoutInMs; 57 private List persistentConnectionsToRecover; 58 private List transactionsToRecover; 59 private Trace logger = Trace 60 .getLogger("org.continuent.sequoia.controller.requestmanager.cleanup"); 61 62 73 public ControllerFailureCleanupThread( 74 DistributedVirtualDatabase distributedVirtualDatabase, 75 long failedControllerId, long failoverTimeoutInMs, 76 Hashtable cleanupThreads, HashMap writesFlushed) 77 { 78 super("ControllerFailureCleanupThread for controller " + failedControllerId); 79 this.dvdb = distributedVirtualDatabase; 80 drm = (DistributedRequestManager) dvdb.getRequestManager(); 81 this.failedControllerId = failedControllerId; 82 this.failoverTimeoutInMs = failoverTimeoutInMs; 83 this.cleanupThreadsList = cleanupThreads; 84 this.writesFlushed = writesFlushed; 85 } 86 87 92 public void run() 93 { 94 try 95 { 96 doRun(); 97 } 98 catch (Throwable t) 99 { 100 logger.fatal("Cleanup failed", t); 101 } 102 } 103 104 107 public void doRun() 108 { 109 Long controllerIdKey = new Long (failedControllerId); 110 try 111 { 112 if (failoverTimeoutInMs == 0) 113 { 114 synchronized (writesFlushed) 115 { 116 writesFlushed.put(controllerIdKey, Boolean.TRUE); 117 writesFlushed.notifyAll(); 118 } 119 return; 120 } 121 122 synchronized (writesFlushed) 124 { 125 if (!writesFlushed.containsKey(controllerIdKey)) 126 writesFlushed.put(controllerIdKey, Boolean.FALSE); 127 } 128 129 synchronized (this) 130 { 131 132 try 133 { 134 139 MulticastRequestAdapter multicastRequestAdapter = dvdb 140 .getMulticastRequestAdapter(); 141 ArrayList dest = new ArrayList (); 142 143 if ((multicastRequestAdapter != null) 144 && (multicastRequestAdapter.getChannel() != null)) 145 { 146 dest.add(multicastRequestAdapter.getChannel().getLocalMembership()); 147 Object ret = multicastRequestAdapter.multicastMessage(dest, 148 new FlushGroupCommunicationMessages(failedControllerId), 149 MulticastRequestAdapter.WAIT_ALL, 0); 150 if (ret instanceof Exception ) 151 throw (Exception ) ret; 152 } 153 } 154 catch (Throwable e) 155 { 156 String errorMessage = "Failed to send flush message in ControllerFailureCleanupThread"; 157 logger.error(errorMessage, e); 158 } 159 160 synchronized (writesFlushed) 163 { 164 writesFlushed.put(controllerIdKey, Boolean.TRUE); 165 writesFlushed.notifyAll(); 166 } 167 168 notifyAll(); 169 } 170 171 try 173 { 174 synchronized (this) 175 { 176 if (logger.isInfoEnabled()) 177 logger.info("Waiting " + failoverTimeoutInMs 178 + "ms for client of controller " + controllerIdKey 179 + " to failover"); 180 wait(failoverTimeoutInMs); 181 } 182 } 183 catch (InterruptedException ignore) 184 { 185 } 186 187 191 drm.cleanupAllFailedQueriesFromController(controllerIdKey.longValue()); 192 193 198 transactionsToRecover = parseTransactionMetadataListForControllerId(drm 199 .getScheduler().getActiveTransactions()); 200 rollbackInactiveTransactions(controllerIdKey); 201 persistentConnectionsToRecover = parsePersistentConnections(drm 202 .getScheduler().getOpenPersistentConnections()); 203 getTransactionAndPersistentConnectionsFromRequests(drm.getScheduler() 206 .getActiveWriteRequests()); 207 208 if ((transactionsToRecover.isEmpty()) 210 && (persistentConnectionsToRecover.isEmpty())) 211 return; 212 213 219 closeRemainingPersistentConnections(controllerIdKey); 221 } 222 finally 223 { 224 logger.info("Cleanup for controller " + failedControllerId 225 + " failure is completed."); 226 227 synchronized (writesFlushed) 230 { 231 writesFlushed.put(controllerIdKey, Boolean.TRUE); 232 writesFlushed.notifyAll(); 233 } 234 235 cleanupThreadsList.remove(this); 237 } 238 } 239 240 257 private void rollbackInactiveTransactions(Long controllerIdKey) 258 { 259 List transactionsRecovered = dvdb.getTransactionsRecovered(controllerIdKey); 260 Map readRequests = drm.getScheduler().getActiveReadRequests(); 261 Map writeRequests = drm.getScheduler().getActiveWriteRequests(); 262 while (!transactionsToRecover.isEmpty()) 263 { 264 int waitingForCompletion = 0; 265 for (Iterator iter = transactionsToRecover.iterator(); iter.hasNext();) 268 { 269 Long lTid = (Long ) iter.next(); 270 271 if ((transactionsRecovered == null) 272 || !transactionsRecovered.contains(lTid)) 273 { 274 if (!hasRequestForTransaction(lTid.longValue(), readRequests) 275 && !hasRequestForTransaction(lTid.longValue(), writeRequests)) 276 { 277 if (logger.isInfoEnabled()) 278 logger.info("Rollingback transaction " + lTid 279 + " started by dead controller " + failedControllerId 280 + " since client did not ask for failover"); 281 282 try 283 { 284 boolean logRollback = dvdb.getRecoveryLog() 285 .hasLoggedBeginForTransaction(lTid); 286 dvdb.rollback(lTid.longValue(), logRollback); 287 } 288 catch (SQLException e) 289 { 290 logger.error("Failed to rollback transaction " + lTid 291 + " started by dead controller " + failedControllerId, e); 292 } 293 catch (VirtualDatabaseException e) 294 { 295 logger.error("Failed to rollback transaction " + lTid 296 + " started by dead controller " + failedControllerId, e); 297 } 298 299 iter.remove(); 300 } 301 else 302 { 303 waitingForCompletion++; 304 if (logger.isDebugEnabled()) 305 logger.debug("Waiting for activity to complete for " + lTid 306 + " started by dead controller " + failedControllerId 307 + " since client did not ask for failover"); 308 } 309 } 310 } 311 312 if (waitingForCompletion == 0) 313 break; 314 try 315 { 316 synchronized (writeRequests) 317 { 318 if (!writeRequests.isEmpty()) 319 { 320 writeRequests.wait(500); 321 continue; 322 } 323 } 324 synchronized (readRequests) 325 { 326 if (!readRequests.isEmpty()) 327 { 328 readRequests.wait(500); 329 continue; 330 } 331 } 332 } 333 catch (InterruptedException e) 334 { 335 } 336 } } 338 339 347 private boolean hasRequestForTransaction(long transactionId, Map map) 348 { 349 synchronized (map) 350 { 351 for (Iterator iter = map.values().iterator(); iter.hasNext();) 352 { 353 AbstractRequest request = (AbstractRequest) iter.next(); 354 if (transactionId == request.getTransactionId()) 355 return true; 356 } 357 } 358 359 return false; 360 } 361 362 366 public synchronized void shutdown() 367 { 368 notifyAll(); 369 try 370 { 371 this.join(); 372 } 373 catch (InterruptedException e) 374 { 375 logger 376 .warn("Controller cleanup thread may not have completed before it was terminated"); 377 } 378 } 379 380 private void closeRemainingPersistentConnections(Long controllerId) 381 { 382 List persistentConnectionsRecovered = dvdb 383 .getControllerPersistentConnectionsRecovered(controllerId); 384 for (Iterator iter = persistentConnectionsToRecover.iterator(); iter 385 .hasNext();) 386 { 387 Long lConnectionId = (Long ) iter.next(); 388 389 if ((persistentConnectionsRecovered == null) 390 || !persistentConnectionsRecovered.contains(lConnectionId)) 391 { 392 if (logger.isInfoEnabled()) 393 logger.info("Closing persistent connection " + lConnectionId 394 + " started by dead controller " + failedControllerId 395 + " since client did not ask for failover"); 396 397 drm.closePersistentConnection(lConnectionId); 398 } 399 } 400 } 401 402 409 private void getTransactionAndPersistentConnectionsFromRequests(Map map) 410 { 411 synchronized (map) 412 { 413 for (Iterator iter = map.keySet().iterator(); iter.hasNext();) 414 { 415 Long lTid = (Long ) iter.next(); 416 if ((lTid.longValue() & DistributedRequestManager.CONTROLLER_ID_BIT_MASK) == failedControllerId) 417 { AbstractRequest request = (AbstractRequest) map.get(lTid); 419 if (!request.isAutoCommit()) 420 { 421 425 if ((request.getTransactionId() & DistributedRequestManager.CONTROLLER_ID_BIT_MASK) == failedControllerId) 426 { 427 Long tidLong = new Long (request.getTransactionId()); 428 if (!transactionsToRecover.contains(tidLong)) 429 { 430 transactionsToRecover.add(tidLong); 431 } 432 } 433 } 434 if (request.isPersistentConnection()) 435 { 436 440 Long connIdLong = new Long (request.getPersistentConnectionId()); 441 if ((request.getPersistentConnectionId() & DistributedRequestManager.CONTROLLER_ID_BIT_MASK) == failedControllerId) 442 if (!persistentConnectionsToRecover.contains(connIdLong)) 443 { 444 persistentConnectionsToRecover.add(connIdLong); 445 } 446 } 447 } 448 } 449 } 450 } 451 452 private List parsePersistentConnections(Map map) 453 { 454 LinkedList result = new LinkedList (); 455 synchronized (map) 456 { 457 for (Iterator iter = map.keySet().iterator(); iter.hasNext();) 458 { 459 Long persistentConnectionId = (Long ) iter.next(); 460 if ((persistentConnectionId.longValue() & DistributedRequestManager.CONTROLLER_ID_BIT_MASK) == failedControllerId) 461 { result.add(persistentConnectionId); 463 } 464 } 465 return result; 466 } 467 } 468 469 477 private List parseTransactionMetadataListForControllerId(Collection list) 478 { 479 LinkedList result = new LinkedList (); 480 synchronized (list) 481 { 482 boolean retry = true; 483 while (retry) 484 { 485 try 486 { 487 for (Iterator iter = list.iterator(); iter.hasNext();) 488 { 489 TransactionMetaData tm = (TransactionMetaData) iter.next(); 490 if ((tm.getTransactionId() & DistributedRequestManager.CONTROLLER_ID_BIT_MASK) == failedControllerId) 491 result.addLast(new Long (tm.getTransactionId())); 492 } 493 494 retry = false; 495 } 496 catch (ConcurrentModificationException e) 497 { 498 } 500 } 501 } 502 return result; 503 } 504 505 } 506 | Popular Tags |