1 24 25 package org.continuent.sequoia.controller.virtualdatabase; 26 27 import java.io.IOException ; 28 import java.io.InputStream ; 29 import java.io.Serializable ; 30 import java.sql.SQLException ; 31 import java.util.ArrayList ; 32 import java.util.Collection ; 33 import java.util.Collections ; 34 import java.util.HashMap ; 35 import java.util.Hashtable ; 36 import java.util.Iterator ; 37 import java.util.LinkedList ; 38 import java.util.List ; 39 import java.util.Map ; 40 import java.util.Properties ; 41 import java.util.Map.Entry; 42 43 import org.continuent.hedera.adapters.MessageListener; 44 import org.continuent.hedera.adapters.MulticastRequestAdapter; 45 import org.continuent.hedera.adapters.MulticastRequestListener; 46 import org.continuent.hedera.adapters.MulticastResponse; 47 import org.continuent.hedera.channel.AbstractReliableGroupChannel; 48 import org.continuent.hedera.channel.ChannelException; 49 import org.continuent.hedera.channel.NotConnectedException; 50 import org.continuent.hedera.common.Group; 51 import org.continuent.hedera.common.GroupIdentifier; 52 import org.continuent.hedera.common.IpAddress; 53 import org.continuent.hedera.common.Member; 54 import org.continuent.hedera.factory.AbstractGroupCommunicationFactory; 55 import org.continuent.hedera.gms.AbstractGroupMembershipService; 56 import org.continuent.hedera.gms.GroupMembershipListener; 57 import org.continuent.sequoia.common.exceptions.ControllerException; 58 import org.continuent.sequoia.common.exceptions.NoMoreBackendException; 59 import org.continuent.sequoia.common.exceptions.SequoiaException; 60 import org.continuent.sequoia.common.exceptions.VirtualDatabaseException; 61 import org.continuent.sequoia.common.i18n.Translate; 62 import org.continuent.sequoia.common.jmx.management.BackendInfo; 63 import org.continuent.sequoia.common.jmx.management.DumpInfo; 64 import org.continuent.sequoia.common.jmx.notifications.SequoiaNotificationList; 65 import org.continuent.sequoia.common.log.Trace; 66 import org.continuent.sequoia.common.sql.metadata.MetadataContainer; 67 import org.continuent.sequoia.common.users.VirtualDatabaseUser; 68 import org.continuent.sequoia.common.util.Constants; 69 import org.continuent.sequoia.common.xml.DatabasesXmlTags; 70 import org.continuent.sequoia.controller.backend.DatabaseBackend; 71 import org.continuent.sequoia.controller.backend.result.ControllerResultSet; 72 import org.continuent.sequoia.controller.backup.DumpTransferInfo; 73 import org.continuent.sequoia.controller.core.Controller; 74 import org.continuent.sequoia.controller.recoverylog.RecoveryLog; 75 import org.continuent.sequoia.controller.recoverylog.events.LogEntry; 76 import org.continuent.sequoia.controller.requestmanager.RAIDbLevels; 77 import org.continuent.sequoia.controller.requestmanager.RequestManager; 78 import org.continuent.sequoia.controller.requestmanager.distributed.ControllerFailureCleanupThread; 79 import org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager; 80 import org.continuent.sequoia.controller.requests.AbstractRequest; 81 import org.continuent.sequoia.controller.sql.schema.DatabaseSchema; 82 import org.continuent.sequoia.controller.virtualdatabase.management.RestoreLogOperation; 83 import org.continuent.sequoia.controller.virtualdatabase.management.TransferBackendOperation; 84 import org.continuent.sequoia.controller.virtualdatabase.management.TransferDumpOperation; 85 import org.continuent.sequoia.controller.virtualdatabase.protocol.AddVirtualDatabaseUser; 86 import org.continuent.sequoia.controller.virtualdatabase.protocol.BackendStatus; 87 import org.continuent.sequoia.controller.virtualdatabase.protocol.BackendTransfer; 88 import org.continuent.sequoia.controller.virtualdatabase.protocol.CompleteRecoveryLogResync; 89 import org.continuent.sequoia.controller.virtualdatabase.protocol.ControllerInformation; 90 import org.continuent.sequoia.controller.virtualdatabase.protocol.CopyLogEntry; 91 import org.continuent.sequoia.controller.virtualdatabase.protocol.DisableBackendsAndSetCheckpoint; 92 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRequest; 93 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedTransactionMarker; 94 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedVirtualDatabaseMessage; 95 import org.continuent.sequoia.controller.virtualdatabase.protocol.GetPreparedStatementMetadata; 96 import org.continuent.sequoia.controller.virtualdatabase.protocol.GetStaticMetadata; 97 import org.continuent.sequoia.controller.virtualdatabase.protocol.InitiateDumpCopy; 98 import org.continuent.sequoia.controller.virtualdatabase.protocol.IsValidUserForAllBackends; 99 import org.continuent.sequoia.controller.virtualdatabase.protocol.MessageTimeouts; 100 import org.continuent.sequoia.controller.virtualdatabase.protocol.RemoveVirtualDatabaseUser; 101 import org.continuent.sequoia.controller.virtualdatabase.protocol.ReplicateLogEntries; 102 import org.continuent.sequoia.controller.virtualdatabase.protocol.ResyncRecoveryLog; 103 import org.continuent.sequoia.controller.virtualdatabase.protocol.VirtualDatabaseConfiguration; 104 import org.continuent.sequoia.controller.virtualdatabase.protocol.VirtualDatabaseConfigurationResponse; 105 106 115 public class DistributedVirtualDatabase extends VirtualDatabase 116 implements 117 MessageListener, 118 MulticastRequestListener, 119 GroupMembershipListener 120 { 121 133 135 136 private String groupName = null; 137 141 private Hashtable controllerJmxAddress; 142 143 private Hashtable controllerIds; 144 145 private Hashtable backendsPerController; 146 147 148 private AbstractReliableGroupChannel channel = null; 149 150 private MulticastRequestAdapter multicastRequestAdapter = null; 151 private MessageTimeouts messageTimeouts; 152 private Group currentGroup = null; 153 private ArrayList allMemberButUs = null; 154 158 public static final long INCOMPATIBLE_CONFIGURATION = -1; 159 private boolean isVirtualDatabaseStarted = false; 160 161 165 private DistributedRequestManager distributedRequestManager; 166 private boolean processMacroBeforeBroadcast; 167 171 private Hashtable cleanupThreads; 172 176 private HashMap writesFlushed; 177 181 private long failoverTimeoutInMs; 182 186 private RequestResultFailoverCache requestResultFailoverCache; 187 188 189 private Trace distributedRequestLogger; 190 private String hederaPropertiesFile; 191 192 private static final Object MESSAGES_IN_HANDLER_SYNC = new Object (); 193 private int messagesInHandlers = 0; 194 private boolean channelShuttingDown = false; 195 private boolean isResynchingFlag; 196 private Hashtable controllerPersistentConnectionsRecovered = new Hashtable (); 197 private Hashtable controllerTransactionsRecovered = new Hashtable (); 198 199 200 private static AbstractGroupCommunicationFactory groupCommunicationFactory = null; 201 202 224 public DistributedVirtualDatabase(Controller controller, String name, 225 String groupName, int maxConnections, boolean pool, int minThreads, 226 int maxThreads, long maxThreadIdleTime, int sqlShortFormLength, 227 long clientFailoverTimeoutInMs, boolean useStaticResultSetMetaData, 228 String hederaPropertiesFile) 229 { 230 super(controller, name, maxConnections, pool, minThreads, maxThreads, 231 maxThreadIdleTime, sqlShortFormLength, useStaticResultSetMetaData); 232 233 this.groupName = groupName; 234 this.processMacroBeforeBroadcast = true; 235 this.failoverTimeoutInMs = clientFailoverTimeoutInMs; 236 requestResultFailoverCache = new RequestResultFailoverCache(logger, 237 failoverTimeoutInMs); 238 backendsPerController = new Hashtable (); 239 controllerJmxAddress = new Hashtable (); 240 controllerIds = new Hashtable (); 241 cleanupThreads = new Hashtable (); 242 writesFlushed = new HashMap (); 243 isVirtualDatabaseStarted = false; 244 distributedRequestLogger = Trace 245 .getLogger("org.continuent.sequoia.controller.distributedvirtualdatabase.request." 246 + name); 247 this.hederaPropertiesFile = hederaPropertiesFile; 248 this.totalOrderQueue = new LinkedList (); 249 } 250 251 256 protected void finalize() throws Throwable 257 { 258 quitChannel(); 259 super.finalize(); 260 } 261 262 269 public Object handleMessageSingleThreaded(Serializable msg, Member sender) 270 { 271 synchronized (MESSAGES_IN_HANDLER_SYNC) 272 { 273 if (channelShuttingDown) 274 return MESSAGES_IN_HANDLER_SYNC; 275 messagesInHandlers++; 276 } 277 278 try 279 { 280 if (msg != null) 281 { 282 if (logger.isDebugEnabled()) 283 logger.debug("handleMessageSingleThreaded (" + msg.getClass() + "): " 284 + msg); 285 286 if (msg instanceof DistributedVirtualDatabaseMessage) 287 { 288 return ((DistributedVirtualDatabaseMessage) msg) 289 .handleMessageSingleThreaded(this, sender); 290 } 291 } 293 else 294 { 295 String errorMsg = "Invalid null message"; 296 logger.error(errorMsg); 297 return new ControllerException(errorMsg); 298 } 299 300 return null; 301 } 302 catch (Exception e) 303 { 304 if (e instanceof RuntimeException ) 305 logger.warn("Error while handling group message:" + msg.getClass(), e); 306 return e; 307 } 308 } 309 310 314 public Serializable handleMessageMultiThreaded(Serializable msg, 315 Member sender, Object handleMessageSingleThreadedResult) 316 { 317 if (msg == MESSAGES_IN_HANDLER_SYNC) 319 return null; 320 321 try 322 { 323 if (msg == null) 324 { 325 String errorMsg = "Invalid null message"; 326 logger.error(errorMsg); 327 return new ControllerException(errorMsg); 328 } 329 330 if (logger.isDebugEnabled()) 331 logger.debug("handleMessageMultiThreaded (" + msg.getClass() + "): " 332 + msg); 333 if (msg instanceof DistributedVirtualDatabaseMessage) 334 { 335 return ((DistributedVirtualDatabaseMessage) msg) 336 .handleMessageMultiThreaded(this, sender, 337 handleMessageSingleThreadedResult); 338 } 339 else 340 logger.warn("Unhandled message type received: " + msg.getClass() + "(" 341 + msg + ")"); 342 343 return null; 344 } 345 catch (Exception e) 346 { 347 if (e instanceof RuntimeException ) 348 logger.warn("Error while handling group message: " + msg.getClass(), e); 349 return e; 350 } 351 finally 352 { 353 synchronized (MESSAGES_IN_HANDLER_SYNC) 354 { 355 messagesInHandlers--; 356 if (messagesInHandlers <= 0) 357 MESSAGES_IN_HANDLER_SYNC.notifyAll(); 358 } 359 360 if (msg != null) 361 { 362 if (msg instanceof DistributedRequest) 365 { 366 synchronized (totalOrderQueue) 367 { 368 if (totalOrderQueue.remove(((DistributedRequest) msg).getRequest())) 369 { 370 if (logger.isWarnEnabled()) 371 logger.warn("Distributed request " 372 + ((DistributedRequest) msg).getRequest().getSqlShortForm( 373 getSqlShortFormLength()) 374 + " did not remove itself from the total order queue"); 375 totalOrderQueue.notifyAll(); 376 } 377 } 378 } 379 else if (msg instanceof DistributedTransactionMarker) 380 { 381 synchronized (totalOrderQueue) 382 { 383 if (totalOrderQueue.remove(msg)) 384 { 385 logger.warn("Distributed " + msg.toString() + " did not remove " 386 + "itself from the total order queue"); 387 totalOrderQueue.notifyAll(); 388 } 389 } 390 } 391 } 392 } 393 } 394 395 public void cancelMessage(Serializable msg) 396 { 397 if (msg instanceof DistributedVirtualDatabaseMessage) 398 { 399 ((DistributedVirtualDatabaseMessage) msg).cancel(this); 400 } 401 else 402 logger.warn("Unhandled message type received: " + msg.getClass() + "(" 403 + msg + ")"); 404 } 405 406 411 public void failedMember(Member failed, GroupIdentifier gid, Member sender) 412 { 413 quitMember(failed, gid); 414 } 415 416 420 public void groupComposition(Group g, IpAddress sender, int gmsStatus) 421 { 422 } 424 425 429 public void networkPartition(GroupIdentifier gid, List mergedGroupCompositions) 430 { 431 if ((channel == null) || channelShuttingDown) 433 return; 434 435 if (gid.equals(currentGroup.getGroupIdentifier())) 436 { 437 if (logger.isFatalEnabled()) 438 logger.fatal("Network partition detected in group " + gid + "."); 439 440 Collections.sort(mergedGroupCompositions); 442 if (logger.isFatalEnabled()) 443 logger 444 .fatal(mergedGroupCompositions.get(0) + " will remain as master."); 445 if (!(mergedGroupCompositions.get(0).equals(channel.getLocalMembership()))) 446 { 447 if (logger.isFatalEnabled()) 448 logger.fatal("Forcing virtual database shutdown here at " 449 + channel.getLocalMembership() + "."); 450 shutdown(Constants.SHUTDOWN_FORCE); 451 } 452 else 453 { 454 if (logger.isFatalEnabled()) 455 logger.fatal("Virtual database here at " 456 + channel.getLocalMembership() + " remaining as master."); 457 } 458 } 459 } 460 461 467 public void joinGroup() throws Exception 468 { 469 RecoveryLog recoveryLog = distributedRequestManager.getRecoveryLog(); 470 if (recoveryLog == null) 471 { 472 String msg = "Distributed virtual database cannot be used without a recovery log defined."; 473 if (logger.isFatalEnabled()) 474 logger.fatal(msg); 475 throw new SequoiaException(msg); 476 } 477 478 try 479 { 480 Properties p = new Properties (); 481 InputStream is = this.getClass() 482 .getResourceAsStream(hederaPropertiesFile); 483 if (is == null) 484 { 485 if (logger.isFatalEnabled()) 486 logger.fatal(Translate.get( 487 "fatal.distributed.no.group.communication.properties", 488 hederaPropertiesFile)); 489 endUserLogger.fatal(Translate.get( 490 "fatal.distributed.no.group.communication.properties", 491 hederaPropertiesFile)); 492 throw new SequoiaException( 493 "Join group failed because Hedera properties file was not found."); 494 } 495 if (logger.isInfoEnabled()) 496 logger.info("Using Hedera properties file: " + hederaPropertiesFile); 497 p.load(is); 498 is.close(); 499 500 if (groupCommunicationFactory == null) 501 { 502 groupCommunicationFactory = (AbstractGroupCommunicationFactory) Class 503 .forName(p.getProperty("hedera.factory")).newInstance(); 504 } 505 Object [] ret = groupCommunicationFactory 506 .createChannelAndGroupMembershipService(p, new GroupIdentifier( 507 groupName)); 508 AbstractGroupMembershipService gms = (AbstractGroupMembershipService) ret[1]; 509 gms.registerGroupMembershipListener(this); 510 channel = (AbstractReliableGroupChannel) ret[0]; 511 512 if (logger.isDebugEnabled()) 513 logger.debug("Group communication channel is configured as follows: " 514 + channel); 515 516 channel.join(); 518 currentGroup = channel.getGroup(); 519 multicastRequestAdapter = new MulticastRequestAdapter(channel , this 522 , this 523 ); 524 multicastRequestAdapter.start(); 525 526 Thread.sleep(2000); 529 530 logger.info("Group " + groupName + " connected to " 531 + channel.getLocalMembership()); 532 533 controllerJmxAddress.put(channel.getLocalMembership(), controller 535 .getJmxName()); 536 537 long controllerId; 538 539 List currentGroupMembers = currentGroup.getMembers(); 541 int groupSize = currentGroupMembers.size(); 542 if (groupSize == 1) 543 { 544 logger.info(Translate.get( 545 "virtualdatabase.distributed.configuration.first.in.group", 546 groupName)); 547 allMemberButUs = new ArrayList (); 548 controllerId = 0; 549 distributedRequestManager.setControllerId(controllerId); 550 distributedRequestManager 552 .initBackendsLastKnownCheckpointFromRecoveryLog(); 553 recoveryLog.checkRecoveryLogConsistency(); 554 } 555 else 556 { 557 logger.info("Group now contains " + groupSize + " controllers."); 558 if (logger.isDebugEnabled()) 559 { 560 logger.debug("Current list of controllers is as follows:"); 561 for (Iterator iter = currentGroupMembers.iterator(); iter.hasNext();) 562 logger.debug("Controller " + iter.next()); 563 } 564 565 refreshGroupMembership(); 567 controllerId = checkConfigurationCompatibilityAndReturnControllerId(getAllMemberButUs()); 569 if (controllerId == INCOMPATIBLE_CONFIGURATION) 570 { 571 String msg = Translate 572 .get("virtualdatabase.distributed.configuration.not.compatible"); 573 logger.error(msg); 574 throw new ControllerException(msg); 575 } 576 else 577 { 578 controllerId += currentGroupMembers.indexOf(channel 584 .getLocalMembership()); 585 586 if (logger.isInfoEnabled()) 587 { 588 logger.info(Translate 589 .get("virtualdatabase.distributed.configuration.compatible")); 590 logger.info("Controller identifier is set to: " + controllerId); 591 } 592 distributedRequestManager.setControllerId(controllerId); 594 } 595 596 distributedRequestManager 598 .initBackendsLastKnownCheckpointFromRecoveryLog(); 599 600 broadcastBackendInformation(getAllMemberButUs()); 603 } 604 605 isVirtualDatabaseStarted = true; 607 608 if ((groupSize > 1) && hasRecoveryLog()) 610 { 611 logger.info("Resyncing recovery log ..."); 612 resyncRecoveryLog(); 613 logger.info("Resyncing recovery log done"); 614 } 615 616 initGlobalCounters(controllerId); 617 618 } 619 catch (Exception e) 620 { 621 if (channel != null) 622 { 623 quitChannel(); 624 } 625 String msg = Translate.get("virtualdatabase.distributed.joingroup.error", 626 groupName); 627 if (e instanceof RuntimeException ) 628 logger.error(msg, e); 629 throw new Exception (msg + " (" + e + ")", e); 630 } 631 632 } 633 634 643 private void resyncRecoveryLog() throws VirtualDatabaseException, 644 SQLException 645 { 646 if (getAllMembers().size() == 1) 647 { 648 logger.info("First controller in vdb, no recovery log resync."); 649 return; 650 } 651 652 String lastShutdownCheckpointName = getLastShutdownCheckpointName(); 653 if (lastShutdownCheckpointName == null) 654 { 655 logger 656 .info("No shutdown checkpoint found in recovery log. Clearing recovery log (dirty)."); 657 logger.info("Please resync manually using 'recover log'."); 658 getRecoveryLog().resetRecoveryLog(); 659 return; 660 } 661 662 isResynchingFlag = true; 664 665 try 666 { 667 logger.info("Resyncing from " + lastShutdownCheckpointName); 668 resyncFromCheckpoint(lastShutdownCheckpointName); 669 } 670 catch (VirtualDatabaseException e) 671 { 672 logger 673 .error("Failed to resync recovery log from last clean shutdown checkpoint. Clearing recovery log (dirty)."); 674 logger.info("Please resync manually using 'recover log'."); 675 getRecoveryLog().resetRecoveryLog(); 676 isResynchingFlag = false; 677 } 678 } 679 680 688 public void initGlobalCounters(long controllerId) throws SQLException 689 { 690 if (!hasRecoveryLog()) 691 return; 693 RecoveryLog recoveryLog = requestManager.getRecoveryLog(); 694 695 requestManager.initializeRequestId(recoveryLog 696 .getLastRequestId(controllerId) + 1); 697 698 requestManager.getScheduler().initializeTransactionId( 699 recoveryLog.getLastTransactionId(controllerId) + 1); 700 701 this.connectionId = recoveryLog.getLastConnectionId(controllerId) + 1; 706 } 707 708 private void resyncFromCheckpoint(String checkpointName) 709 throws VirtualDatabaseException 710 { 711 Member remoteControllerMember = (Member) getAllMemberButUs().get(0); 713 714 sendMessageToController(remoteControllerMember, new ResyncRecoveryLog( 716 checkpointName), messageTimeouts.getDefaultTimeout()); 717 718 721 isResynchingFlag = false; 722 } 723 724 protected boolean isResyncing() 725 { 726 return isResynchingFlag; 727 } 728 729 736 private String getLastShutdownCheckpointName() 737 throws VirtualDatabaseException 738 { 739 ArrayList checkpointNames; 742 try 743 { 744 checkpointNames = getRecoveryLog().getCheckpointNames(); 745 } 746 catch (SQLException e) 747 { 748 logger.error(e.getMessage()); 749 throw new VirtualDatabaseException(e); 750 } 751 752 Iterator iter = checkpointNames.iterator(); 753 while (iter.hasNext()) 754 { 755 String cpName = (String ) iter.next(); 756 if (cpName.startsWith("shutdown-" + getControllerName())) 757 return cpName; 758 } 759 return null; 760 } 761 762 766 public void joinMember(Member m, GroupIdentifier gid) 767 { 768 if (hasRecoveryLog()) 769 { 770 try 771 { 772 requestManager.getRecoveryLog().storeCheckpoint( 773 buildCheckpointName(m + " joined group " + gid)); 774 } 775 catch (SQLException ignore) 776 { 777 logger.warn("Failed to log checkpoint for joining member " + m); 778 } 779 } 780 } 781 782 786 789 public void quitChannel() 790 { 791 quitChannel(Constants.SHUTDOWN_SAFE); 792 } 793 794 799 public void quitChannel(int level) 800 { 801 if (level == Constants.SHUTDOWN_FORCE) 802 { 803 multicastRequestAdapter.cancelRequests(); 804 } 805 synchronized (MESSAGES_IN_HANDLER_SYNC) 806 { 807 channelShuttingDown = true; 808 if (messagesInHandlers > 0) 809 try 810 { 811 MESSAGES_IN_HANDLER_SYNC.wait(); 812 } 813 catch (InterruptedException ignore) 814 { 815 } 816 } 817 818 if (multicastRequestAdapter != null) 819 { 820 multicastRequestAdapter.stop(); 821 multicastRequestAdapter = null; 822 } 823 if (channel != null) 824 { 825 channel.close(); 826 try 827 { 828 channel.quit(); 829 } 830 catch (ChannelException e) 831 { 832 if (logger.isWarnEnabled()) 833 { 834 logger.warn("Problem when quitting channel " + channel, e); 835 } 836 } 837 catch (NotConnectedException e) 838 { 839 if (logger.isWarnEnabled()) 840 { 841 logger.warn("Problem when quitting channel " + channel, e); 842 } 843 } 844 channel = null; 845 } 846 if (groupCommunicationFactory != null) 847 { 848 groupCommunicationFactory.dispose(); 849 groupCommunicationFactory = null; 850 } 851 } 852 853 857 public void quitMember(Member m, GroupIdentifier gid) 858 { 859 synchronized (MESSAGES_IN_HANDLER_SYNC) 860 { 861 if ((channel == null) || (channelShuttingDown)) 863 return; 864 messagesInHandlers++; 865 } 866 867 try 868 { 869 if (isLocalSender(m)) 871 return; 872 873 if (hasRecoveryLog()) 874 { 875 try 876 { 877 requestManager.getRecoveryLog().storeCheckpoint( 878 buildCheckpointName(m + " quit group " + gid)); 879 } 880 catch (SQLException ignore) 881 { 882 logger.warn("Failed to log checkpoint for quitting member " + m); 883 } 884 } 885 886 String remoteControllerName = removeRemoteControllerAndStartCleanupThread(m); 888 if (remoteControllerName != null) 889 { 890 endUserLogger.warn(Translate.get( 891 "notification.distributed.controller.removed", new String []{ 892 m.toString(), name})); 893 logger.warn("Controller " + m + " has left the cluster."); 894 sendJmxNotification( 895 SequoiaNotificationList.DISTRIBUTED_CONTROLLER_REMOVED, Translate 896 .get("notification.distributed.controller.removed")); 897 } 898 899 synchronized (MESSAGES_IN_HANDLER_SYNC) 901 { 902 if (!channelShuttingDown) 904 { 905 int failures = multicastRequestAdapter.memberFailsOnAllReplies(m); 906 logger.info(failures + " requests were waiting responses from " + m); 907 } 908 } 909 } 910 finally 911 { 912 synchronized (MESSAGES_IN_HANDLER_SYNC) 913 { 914 messagesInHandlers--; 915 } 916 } 917 } 918 919 922 public void receive(Serializable msg) 923 { 924 logger.error("Distributed virtual database received unhandled message: " 925 + msg); 926 } 927 928 932 private void refreshGroupMembership() 933 { 934 if (logger.isDebugEnabled()) 935 logger.debug("Refreshing members list:" + currentGroup.getMembers()); 936 937 synchronized (controllerJmxAddress) 938 { 939 allMemberButUs = (ArrayList ) (((ArrayList ) currentGroup.getMembers()) 940 .clone()); 941 allMemberButUs.remove(channel.getLocalMembership()); 942 } 943 } 944 945 949 956 public boolean equals(Object other) 957 { 958 if ((other == null) 959 || (!(other instanceof org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase))) 960 return false; 961 else 962 { 963 DistributedVirtualDatabase db = (org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase) other; 964 return name.equals(db.getDatabaseName()) 965 && groupName.equals(db.getGroupName()); 966 } 967 } 968 969 974 public ArrayList getAllMembers() 975 { 976 synchronized (controllerJmxAddress) 977 { 978 if (currentGroup == null) return new ArrayList (); 980 ArrayList members = (ArrayList ) currentGroup.getMembers(); 981 if (members == null) return new ArrayList (); 983 return (ArrayList ) members.clone(); 984 } 985 } 986 987 993 public ArrayList getAllMemberButUs() 994 { 995 synchronized (controllerJmxAddress) 996 { 997 if (allMemberButUs == null) return new ArrayList (); 999 1000 1009 return allMemberButUs; 1010 } 1011 } 1012 1013 1018 public AbstractReliableGroupChannel getChannel() 1019 { 1020 return channel; 1021 } 1022 1023 1028 public Hashtable getCleanupThreads() 1029 { 1030 return cleanupThreads; 1031 } 1032 1033 1042 public List getTransactionsRecovered(Long controllerId) 1043 { 1044 return (List ) controllerTransactionsRecovered.remove(controllerId); 1045 } 1046 1047 1056 public List getControllerPersistentConnectionsRecovered(Long controllerId) 1057 { 1058 return (List ) controllerPersistentConnectionsRecovered.remove(controllerId); 1059 } 1060 1061 1068 public void notifyPersistentConnectionFailover(Long controllerId, 1069 Long connectionId) 1070 { 1071 synchronized (controllerPersistentConnectionsRecovered) 1072 { 1073 LinkedList persistentConnectionsRecovered = (LinkedList ) controllerPersistentConnectionsRecovered 1074 .get(controllerId); 1075 if (persistentConnectionsRecovered == null) 1076 { 1077 persistentConnectionsRecovered = new LinkedList (); 1078 controllerPersistentConnectionsRecovered.put(controllerId, 1079 persistentConnectionsRecovered); 1080 } 1081 1082 persistentConnectionsRecovered.add(connectionId); 1083 if (logger.isInfoEnabled()) 1084 logger.info("Failover detected for persistent connection " 1085 + connectionId); 1086 } 1087 } 1088 1089 1096 public void notifyTransactionFailover(Long controllerId, Long transactionId) 1097 { 1098 synchronized (controllerTransactionsRecovered) 1099 { 1100 LinkedList transactionsRecovered = (LinkedList ) controllerTransactionsRecovered 1101 .get(controllerId); 1102 if (transactionsRecovered == null) 1103 { 1104 transactionsRecovered = new LinkedList (); 1105 controllerTransactionsRecovered 1106 .put(controllerId, transactionsRecovered); 1107 } 1108 1109 transactionsRecovered.add(transactionId); 1110 if (logger.isInfoEnabled()) 1111 logger.info("Failover detected for transaction " + transactionId); 1112 } 1113 } 1114 1115 1123 private Member getControllerByName(String controllerName) 1124 throws VirtualDatabaseException 1125 { 1126 Iterator iter = controllerJmxAddress.entrySet().iterator(); 1128 Member targetMember = null; 1129 while (iter.hasNext()) 1130 { 1131 Entry entry = (Entry) iter.next(); 1132 if (entry.getValue().equals(controllerName)) 1133 { 1134 targetMember = (Member) entry.getKey(); 1135 break; 1136 } 1137 } 1138 if (targetMember == null) 1139 throw new VirtualDatabaseException("Cannot find controller " 1140 + controllerName + " in group"); 1141 return targetMember; 1142 } 1143 1144 1149 public String getControllerName() 1150 { 1151 return controller.getControllerName(); 1152 } 1153 1154 1159 public long getControllerId() 1160 { 1161 return ((DistributedRequestManager) requestManager).getControllerId(); 1162 } 1163 1164 1169 public Group getCurrentGroup() 1170 { 1171 return currentGroup; 1172 } 1173 1174 1179 public final Trace getDistributedRequestLogger() 1180 { 1181 return distributedRequestLogger; 1182 } 1183 1184 1189 protected String getDistributionXml() 1190 { 1191 StringBuffer info = new StringBuffer (); 1192 info.append("<" + DatabasesXmlTags.ELT_Distribution + " " 1193 + DatabasesXmlTags.ATT_groupName + "=\"" + groupName + "\" " 1194 + DatabasesXmlTags.ATT_hederaPropertiesFile + "=\"" 1195 + hederaPropertiesFile + "\" " 1196 + DatabasesXmlTags.ATT_clientFailoverTimeout + "=\"" 1197 + failoverTimeoutInMs + "\">"); 1198 1199 getMessageTimeouts().generateXml(info); 1200 1201 info.append("</" + DatabasesXmlTags.ELT_Distribution + ">"); 1202 return info.toString(); 1203 } 1204 1205 1211 public String getGroupName() 1212 { 1213 return groupName; 1214 } 1215 1216 1221 public void setGroupName(String groupName) 1222 { 1223 this.groupName = groupName; 1224 } 1225 1226 1231 public MessageTimeouts getMessageTimeouts() 1232 { 1233 return messageTimeouts; 1234 } 1235 1236 1241 public void setMessageTimeouts(MessageTimeouts messageTimeouts) 1242 { 1243 this.messageTimeouts = messageTimeouts; 1244 } 1245 1246 1251 public MulticastRequestAdapter getMulticastRequestAdapter() 1252 { 1253 return multicastRequestAdapter; 1254 } 1255 1256 1260 1263 public long getNextConnectionId() 1264 { 1265 long id = super.getNextConnectionId(); 1266 return distributedRequestManager.getNextConnectionId(id); 1267 } 1268 1269 1273 1277 protected int getNumberOfEnabledBackends() throws VirtualDatabaseException 1278 { 1279 int nbActive = super.getNumberOfEnabledBackends(); 1281 1282 1284 DatabaseBackend b; 1286 Iterator iter = backendsPerController.keySet().iterator(); 1287 while (iter.hasNext()) 1288 { 1289 Member member = (Member) iter.next(); 1290 1291 List remoteBackends = (List ) backendsPerController.get(member); 1292 int size = remoteBackends.size(); 1293 b = null; 1294 for (int i = 0; i < size; i++) 1295 { 1296 b = (DatabaseBackend) remoteBackends.get(i); 1297 if (b.isReadEnabled() || b.isWriteEnabled()) 1298 nbActive++; 1300 } 1301 } 1302 1306 nbActive = -1; 1307 return nbActive; 1308 } 1309 1310 1318 public ControllerResultSet getPreparedStatementGetMetaData( 1319 AbstractRequest request) throws SQLException 1320 { 1321 try 1322 { 1323 return requestManager.getPreparedStatementGetMetaData(request); 1324 } 1325 catch (NoMoreBackendException e) 1326 { 1327 try 1329 { 1330 MulticastResponse rspList = getMulticastRequestAdapter() 1331 .multicastMessage(getAllMemberButUs(), 1332 new GetPreparedStatementMetadata(request), 1333 MulticastRequestAdapter.WAIT_ALL, 1334 getMessageTimeouts().getVirtualDatabaseConfigurationTimeout()); 1335 1336 Map results = rspList.getResults(); 1337 if (results.size() == 0) 1338 if (logger.isWarnEnabled()) 1339 logger 1340 .warn("No response while getting prepared statement metadata from remote controller"); 1341 for (Iterator iter = results.values().iterator(); iter.hasNext();) 1342 { 1343 Object response = iter.next(); 1344 if (response instanceof ControllerException) 1345 { 1346 if (logger.isErrorEnabled()) 1347 { 1348 logger 1349 .error("Error while getting prepared statement metadata from remote controller"); 1350 } 1351 } 1352 else 1353 { 1354 return (ControllerResultSet) response; 1357 } 1358 } 1359 } 1360 catch (NotConnectedException e2) 1361 { 1362 if (logger.isErrorEnabled()) 1363 logger 1364 .error( 1365 "Channel unavailable while getting prepared statement metadata from remote controller", 1366 e2); 1367 } 1368 1369 throw e; 1372 } 1373 } 1374 1375 1380 public boolean isProcessMacroBeforeBroadcast() 1381 { 1382 return processMacroBeforeBroadcast; 1383 } 1384 1385 1390 public void setProcessMacroBeforeBroadcast(boolean processMacros) 1391 { 1392 this.processMacroBeforeBroadcast = processMacros; 1393 } 1394 1395 1401 public RequestResultFailoverCache getRequestResultFailoverCache() 1402 { 1403 return requestResultFailoverCache; 1404 } 1405 1406 1411 public void setRequestManager(RequestManager requestManager) 1412 { 1413 if (!(requestManager instanceof DistributedRequestManager)) 1414 throw new RuntimeException ( 1415 "A distributed virtual database can only work with a distributed request manager."); 1416 1417 distributedRequestManager = (DistributedRequestManager) requestManager; 1418 this.requestManager = distributedRequestManager; 1420 } 1421 1422 1429 public VirtualDatabaseStaticMetaData getStaticMetaData() 1430 { 1431 staticMetadata = doGetStaticMetaData(); 1432 1433 if ((staticMetadata == null) 1435 || (staticMetadata.getMetadataContainer() == null)) 1436 { 1437 try 1438 { 1439 MulticastResponse rspList = getMulticastRequestAdapter() 1440 .multicastMessage(getAllMemberButUs(), new GetStaticMetadata(), 1441 MulticastRequestAdapter.WAIT_ALL, 1442 getMessageTimeouts().getVirtualDatabaseConfigurationTimeout()); 1443 1444 Map results = rspList.getResults(); 1445 if (results.size() == 0) 1446 if (logger.isWarnEnabled()) 1447 logger 1448 .warn("No response while getting static metadata from remote controller"); 1449 for (Iterator iter = results.values().iterator(); iter.hasNext();) 1450 { 1451 Object response = iter.next(); 1452 if (response instanceof ControllerException) 1453 { 1454 if (logger.isErrorEnabled()) 1455 { 1456 logger 1457 .error("Error while getting static metadata from remote controller"); 1458 } 1459 } 1460 else 1461 { 1462 staticMetadata.setMetadataContainer((MetadataContainer) response); 1465 } 1466 } 1467 } 1468 catch (NotConnectedException e2) 1469 { 1470 if (logger.isErrorEnabled()) 1471 logger 1472 .error( 1473 "Channel unavailable while getting static metadata from remote controller", 1474 e2); 1475 } 1476 } 1477 1478 return staticMetadata; 1479 } 1480 1481 1487 public HashMap getWritesFlushed() 1488 { 1489 return writesFlushed; 1490 } 1491 1492 1503 public boolean isCompatibleBackend(BackendInfo backend) 1504 throws VirtualDatabaseException 1505 { 1506 try 1507 { 1508 acquireReadLockBackendLists(); 1509 } 1510 catch (InterruptedException e) 1511 { 1512 String msg = "Unable to acquire read lock on backend list in isCompatibleBackend (" 1513 + e + ")"; 1514 logger.error(msg); 1515 throw new VirtualDatabaseException(msg); 1516 } 1517 1518 try 1519 { 1520 String backendURL = backend.getUrl(); 1522 String backendName = backend.getName(); 1523 int size = backends.size(); 1524 DatabaseBackend b = null; 1525 for (int i = 0; i < size; i++) 1526 { 1527 b = (DatabaseBackend) backends.get(i); 1528 if (b.getURL().equals(backendURL) || b.getName().equals(backendName)) 1529 return false; 1530 } 1531 } 1532 catch (RuntimeException re) 1533 { 1534 throw new VirtualDatabaseException(re); 1535 } 1536 finally 1537 { 1538 releaseReadLockBackendLists(); 1539 } 1540 return true; 1542 } 1543 1544 1553 public boolean isCompatibleDatabaseSchema(DatabaseSchema dbs) 1554 { 1555 if (dbs == null) 1557 { 1558 logger.warn(Translate 1559 .get("virtualdatabase.distributed.configuration.checking.noschema")); 1560 } 1561 else 1562 { 1563 switch (getRequestManager().getLoadBalancer().getRAIDbLevel()) 1565 { 1566 case RAIDbLevels.RAIDb0 : 1567 if (dbs.equals(getRequestManager().getDatabaseSchema())) 1569 { 1570 logger 1571 .warn(Translate 1572 .get("virtualdatabase.distributed.configuration.checking.mismatch.databaseschema")); 1573 return false; 1574 } 1575 break; 1576 case RAIDbLevels.RAIDb1 : 1577 if (!dbs.equals(getRequestManager().getDatabaseSchema())) 1579 { 1580 logger 1581 .warn(Translate 1582 .get("virtualdatabase.distributed.configuration.checking.mismatch.databaseschema")); 1583 return false; 1584 } 1585 break; 1586 case RAIDbLevels.RAIDb2 : 1587 if (!dbs.isCompatibleWith(getRequestManager().getDatabaseSchema())) 1589 { 1590 logger 1591 .warn(Translate 1592 .get("virtualdatabase.distributed.configuration.checking.mismatch.databaseschema")); 1593 return false; 1594 } 1595 break; 1596 case RAIDbLevels.SingleDB : 1597 default : 1598 logger.error("Unsupported RAIDb level: " 1599 + getRequestManager().getLoadBalancer().getRAIDbLevel()); 1600 return false; 1601 } 1602 } 1603 return true; 1604 } 1605 1606 1611 public boolean isDistributed() 1612 { 1613 return true; 1614 } 1615 1616 1621 public final boolean isVirtualDatabaseStarted() 1622 { 1623 return isVirtualDatabaseStarted; 1624 } 1625 1626 1629 public String [] viewControllerList() 1630 { 1631 if (logger.isInfoEnabled()) 1632 { 1633 logger.info(channel.getLocalMembership() + " see members:" 1634 + currentGroup.getMembers() + " and has mapping:" 1635 + controllerJmxAddress); 1636 } 1637 Collection controllerJmxNames = controllerJmxAddress.values(); 1638 return (String []) controllerJmxNames.toArray(new String [controllerJmxNames 1639 .size()]); 1640 } 1641 1642 1645 public void addBackend(DatabaseBackend db) throws VirtualDatabaseException 1646 { 1647 super.addBackend(db); 1649 1650 try 1652 { 1653 broadcastBackendInformation(getAllMemberButUs()); 1654 } 1655 catch (Exception e) 1656 { 1657 String msg = "Error while broadcasting backend information when adding backend"; 1658 logger.error(msg, e); 1659 throw new VirtualDatabaseException(msg, e); 1660 } 1661 } 1662 1663 1670 public void addRemoteControllerId(Member remoteControllerMembership, 1671 long remoteControllerId) 1672 { 1673 controllerIds.put(remoteControllerMembership, new Long (remoteControllerId)); 1674 1675 if (logger.isDebugEnabled()) 1676 logger.debug("Adding new controller id:" + remoteControllerId 1677 + " for member " + remoteControllerMembership); 1678 } 1679 1680 1686 public void addBackendPerController(Member sender, List remoteBackends) 1687 { 1688 backendsPerController.put(sender, remoteBackends); 1689 1690 if (logger.isInfoEnabled()) 1691 logger.info(Translate.get( 1692 "virtualdatabase.distributed.configuration.updating.backend.list", 1693 sender)); 1694 } 1695 1696 1702 public Hashtable getBackendsPerController() 1703 { 1704 return backendsPerController; 1705 } 1706 1707 1715 public void addRemoteControllerJmxName(Member remoteControllerMembership, 1716 String remoteControllerJmxName) 1717 { 1718 controllerJmxAddress.put(remoteControllerMembership, 1719 remoteControllerJmxName); 1720 if (logger.isDebugEnabled()) 1721 logger.debug("Adding new controller " + remoteControllerJmxName 1722 + " for member " + remoteControllerMembership); 1723 1724 sendJmxNotification(SequoiaNotificationList.DISTRIBUTED_CONTROLLER_ADDED, 1725 Translate.get("notification.distributed.controller.added", 1726 new String []{remoteControllerJmxName, name})); 1727 1728 refreshGroupMembership(); 1729 } 1730 1731 1737 private void broadcastBackendInformation(ArrayList dest) 1738 throws NotConnectedException 1739 { 1740 logger 1741 .debug(Translate 1742 .get("virtualdatabase.distributed.configuration.querying.remote.status")); 1743 1744 List backendInfos = DatabaseBackend.toBackendInfos(backends); 1746 MulticastResponse rspList = multicastRequestAdapter.multicastMessage(dest, 1747 new BackendStatus(backendInfos, distributedRequestManager 1748 .getControllerId()), MulticastRequestAdapter.WAIT_ALL, 1749 messageTimeouts.getBackendStatusTimeout()); 1750 1751 int size = dest.size(); 1752 for (int i = 0; i < size; i++) 1753 { 1754 Member m = (Member) dest.get(i); 1756 if (rspList.getResult(m) != null) 1757 { 1758 BackendStatus bs = (BackendStatus) rspList.getResult(m); 1759 List remoteBackendInfos = bs.getBackendInfos(); 1761 List remoteBackends = BackendInfo.toDatabaseBackends(this, 1763 remoteBackendInfos); 1764 backendsPerController.put(m, remoteBackends); 1765 if (logger.isDebugEnabled()) 1766 logger 1767 .debug(Translate 1768 .get( 1769 "virtualdatabase.distributed.configuration.updating.backend.list", 1770 m.toString())); 1771 } 1772 else 1773 logger.warn(Translate.get( 1774 "virtualdatabase.distributed.unable.get.remote.status", m 1775 .toString())); 1776 } 1777 } 1778 1779 1789 private long checkConfigurationCompatibilityAndReturnControllerId( 1790 ArrayList dest) 1791 { 1792 if (logger.isInfoEnabled()) 1793 logger.info(Translate 1794 .get("virtualdatabase.distributed.configuration.checking")); 1795 1796 MulticastResponse rspList; 1798 try 1799 { 1800 rspList = multicastRequestAdapter.multicastMessage(dest, 1801 new VirtualDatabaseConfiguration(this), 1802 MulticastRequestAdapter.WAIT_ALL, messageTimeouts 1803 .getVirtualDatabaseConfigurationTimeout()); 1804 } 1805 catch (NotConnectedException e) 1806 { 1807 logger.error( 1808 "Channel unavailable while checking configuration compatibility", e); 1809 return INCOMPATIBLE_CONFIGURATION; 1810 } 1811 1812 Map results = rspList.getResults(); 1814 int size = results.size(); 1815 if (size == 0) 1816 logger.warn(Translate 1817 .get("virtualdatabase.distributed.configuration.checking.noanswer")); 1818 1819 long highestRemoteControllerId = 0; 1820 for (Iterator iter = results.values().iterator(); iter.hasNext();) 1821 { 1822 Object response = iter.next(); 1823 if (response instanceof VirtualDatabaseConfigurationResponse) 1824 { 1825 VirtualDatabaseConfigurationResponse vdbcr = (VirtualDatabaseConfigurationResponse) response; 1830 long remoteControllerId = vdbcr.getControllerId(); 1831 if (remoteControllerId == INCOMPATIBLE_CONFIGURATION) 1832 { 1833 return INCOMPATIBLE_CONFIGURATION; 1834 } 1835 if (logger.isWarnEnabled()) 1838 { 1839 logger 1840 .warn("Some virtual database users are missing from this configuration, trying to create them transparently..."); 1841 } 1842 if (vdbcr.getAdditionalVdbUsers() != null) 1843 { 1844 for (Iterator iter2 = vdbcr.getAdditionalVdbUsers().iterator(); iter2 1845 .hasNext();) 1846 { 1847 VirtualDatabaseUser vdbUser = (VirtualDatabaseUser) iter2.next(); 1848 1849 super.checkAndAddVirtualDatabaseUser(vdbUser); 1855 1856 if (!getAuthenticationManager().isValidVirtualUser(vdbUser)) 1857 { 1858 return INCOMPATIBLE_CONFIGURATION; 1859 } 1860 } 1861 } 1862 1863 if (highestRemoteControllerId < remoteControllerId) 1864 highestRemoteControllerId = remoteControllerId; 1865 } 1866 else 1867 { 1868 logger 1869 .error("Unexpected response while checking configuration compatibility: " 1870 + response); 1871 return INCOMPATIBLE_CONFIGURATION; 1872 } 1873 } 1874 1875 return ((highestRemoteControllerId >> DistributedRequestManager.CONTROLLER_ID_SHIFT_BITS) & DistributedRequestManager.CONTROLLER_ID_BITS) + 1; 1879 } 1880 1881 1884 public void checkAndAddVirtualDatabaseUser(VirtualDatabaseUser vdbUser) 1885 { 1886 MulticastResponse rspList; 1888 try 1889 { 1890 rspList = multicastRequestAdapter.multicastMessage(getAllMembers(), 1891 new IsValidUserForAllBackends(vdbUser), 1892 MulticastRequestAdapter.WAIT_ALL, messageTimeouts 1893 .getVirtualDatabaseConfigurationTimeout()); 1894 } 1895 catch (NotConnectedException e) 1896 { 1897 logger.error("Channel unavailable while checking validity of vdb user " 1898 + vdbUser.getLogin(), e); 1899 return; 1900 } 1901 1902 Map results = rspList.getResults(); 1904 int size = results.size(); 1905 if (size == 0) 1906 logger.warn("No response while checking validity of vdb user " 1907 + vdbUser.getLogin()); 1908 for (Iterator iter = results.values().iterator(); iter.hasNext();) 1909 { 1910 Object response = iter.next(); 1911 if (response instanceof Boolean ) 1912 { 1913 if (!((Boolean ) response).booleanValue()) 1914 { 1915 if (logger.isWarnEnabled()) 1916 { 1917 logger.warn("Could not create new vdb user " + vdbUser.getLogin() 1918 + " because it does not exist on all backends"); 1919 } 1920 return; 1921 } 1922 } 1923 else 1924 { 1925 logger.error("Unexpected response while checking validity of vdb user " 1926 + vdbUser.getLogin() + " : " + response); 1927 return; 1928 } 1929 } 1930 1931 try 1933 { 1934 rspList = multicastRequestAdapter.multicastMessage(getAllMembers(), 1935 new AddVirtualDatabaseUser(vdbUser), 1936 MulticastRequestAdapter.WAIT_ALL, messageTimeouts 1937 .getVirtualDatabaseConfigurationTimeout()); 1938 } 1939 catch (NotConnectedException e) 1940 { 1941 logger.error("Channel unavailable while adding vdb user " 1942 + vdbUser.getLogin() + ", trying to clean-up...", e); 1943 removeVirtualDatabaseUser(vdbUser); 1944 } 1945 1946 results = rspList.getResults(); 1948 size = results.size(); 1949 if (size == 0) 1950 logger.warn("No response while adding vdb user " + vdbUser.getLogin()); 1951 for (Iterator iter = results.values().iterator(); iter.hasNext();) 1952 { 1953 Object response = iter.next(); 1954 if (response instanceof ControllerException) 1955 { 1956 if (logger.isErrorEnabled()) 1957 { 1958 logger.error("Error while adding vdb user " + vdbUser.getLogin() 1959 + ", trying to clean-up..."); 1960 } 1961 removeVirtualDatabaseUser(vdbUser); 1962 return; 1963 } 1964 } 1965 } 1966 1967 1971 public void closePersistentConnection(String login, 1972 long persistentConnectionId) 1973 { 1974 distributedRequestManager.distributedClosePersistentConnection(login, 1975 persistentConnectionId); 1976 } 1977 1978 1982 public void openPersistentConnection(String login, long persistentConnectionId) 1983 throws SQLException 1984 { 1985 distributedRequestManager.distributedOpenPersistentConnection(login, 1986 persistentConnectionId); 1987 } 1988 1989 1993 public void copyLogFromCheckpoint(String dumpName, String controllerName) 1994 throws VirtualDatabaseException 1995 { 1996 super.copyLogFromCheckpoint(dumpName, controllerName); 1999 2000 Member controllerByName = getControllerByName(controllerName); 2001 if (isLocalSender(controllerByName)) 2002 throw new VirtualDatabaseException( 2003 "A restore log command must be applied to a remote controller"); 2004 2005 String dumpCheckpointName; 2007 DumpInfo dumpInfo; 2008 2009 try 2010 { 2011 dumpInfo = getRecoveryLog().getDumpInfo(dumpName); 2012 } 2013 catch (SQLException e) 2014 { 2015 throw new VirtualDatabaseException( 2016 "Recovery log error access occured while checking for dump" 2017 + dumpName, e); 2018 } 2019 2020 if (dumpInfo == null) 2021 throw new VirtualDatabaseException( 2022 "No information was found in the dump table for dump " + dumpName); 2023 2024 RestoreLogOperation restoreLogOperation = new RestoreLogOperation(dumpName, 2025 controllerName); 2026 addAdminOperation(restoreLogOperation); 2027 try 2028 { 2029 dumpCheckpointName = dumpInfo.getCheckpointName(); 2030 2031 String nowCheckpointName = setLogReplicationCheckpoint(controllerName); 2033 2034 2036 long nowCheckpointId; 2038 RecoveryLog recoveryLog = getRequestManager().getRecoveryLog(); 2039 try 2040 { 2041 try 2042 { 2043 nowCheckpointId = recoveryLog.getCheckpointLogId(nowCheckpointName); 2044 } 2045 catch (SQLException e) 2046 { 2047 String errorMessage = "Cannot find 'now checkpoint' log entry"; 2048 logger.error(errorMessage); 2049 throw new VirtualDatabaseException(errorMessage); 2050 } 2051 2052 sendMessageToController(controllerByName, new ReplicateLogEntries( 2054 nowCheckpointName, null, dumpName, nowCheckpointId), 2055 messageTimeouts.getReplicateLogEntriesTimeout()); 2056 } 2057 finally 2058 { 2059 getRequestManager().resumeActivity(); 2060 } 2061 2062 2064 recoveryLog.beginRecovery(); 2067 2068 try 2073 { 2074 ArrayList dest = new ArrayList (); 2075 dest.add(controllerByName); 2076 long copyLogEntryTimeout = getMessageTimeouts() 2077 .getCopyLogEntryTimeout(); 2078 long dumpId = recoveryLog.getCheckpointLogId(dumpCheckpointName); 2079 2080 if (logger.isDebugEnabled()) 2081 { 2082 logger.debug("Resynchronizing from checkpoint " + dumpCheckpointName 2083 + " (" + dumpId + ") to checkpoint " + nowCheckpointName + " (" 2084 + nowCheckpointId + ")"); 2085 } 2086 2087 for (long id = dumpId; id != nowCheckpointId; id++) 2088 { 2089 LogEntry entry = recoveryLog.getNextLogEntry(id); 2090 if (entry == null) 2091 { 2092 break; 2094 } 2095 2096 id = entry.getLogId() - 1; 2099 2100 MulticastResponse resp = getMulticastRequestAdapter() 2101 .multicastMessage(dest, new CopyLogEntry(entry), 2102 MulticastRequestAdapter.WAIT_NONE, copyLogEntryTimeout); 2103 if (resp.getFailedMembers() != null) 2104 throw new IOException ("Failed to deliver log entry " + id 2105 + " to remote controller " + controllerName); 2106 } 2107 2108 long localNbOfLogEntries = recoveryLog.getNumberOfLogEntries(dumpId, 2112 nowCheckpointId); 2113 2114 if (logger.isDebugEnabled()) 2115 { 2116 logger.debug("Checking that " + localNbOfLogEntries 2117 + " entries were resynchronized in remote log"); 2118 } 2119 2120 Serializable replyValue = sendMessageToController(controllerByName, 2121 new CompleteRecoveryLogResync(dumpId, nowCheckpointName, 2122 localNbOfLogEntries), getMessageTimeouts() 2123 .getReplicateLogEntriesTimeout()); 2124 if (replyValue instanceof Long ) 2125 { 2126 long diff = ((Long ) replyValue).longValue(); 2127 if (diff != 0) 2128 throw new VirtualDatabaseException( 2129 "Recovery log resynchronization reports a difference of " 2130 + diff + " entries"); 2131 } 2132 else 2133 throw new RuntimeException ( 2134 "Invalid answer from remote controller on CompleteRecoveryLogResync (" 2135 + replyValue + ")"); 2136 2137 sendMessageToController(controllerName, new ReplicateLogEntries(null, 2139 dumpCheckpointName, dumpName, dumpId), messageTimeouts 2140 .getReplicateLogEntriesTimeout()); 2141 } 2142 catch (Exception e) 2143 { 2144 String errorMessage = "Failed to send log entries"; 2145 logger.error(errorMessage, e); 2146 throw new VirtualDatabaseException(errorMessage); 2147 } 2148 finally 2149 { 2150 recoveryLog.endRecovery(); } 2152 } 2153 finally 2154 { 2155 removeAdminOperation(restoreLogOperation); 2156 } 2157 } 2158 2159 2162 public void failoverForPersistentConnection(long persistentConnectionId) 2163 { 2164 distributedRequestManager 2165 .distributedFailoverForPersistentConnection(persistentConnectionId); 2166 } 2167 2168 2171 public void failoverForTransaction(long currentTid) 2172 { 2173 distributedRequestManager.distributedFailoverForTransaction(currentTid); 2174 } 2175 2176 2182 public RecoveryLog getRecoveryLog() throws VirtualDatabaseException 2183 { 2184 if (!hasRecoveryLog()) 2185 throw new VirtualDatabaseException(Translate 2186 .get("virtualdatabase.no.recovery.log")); 2187 2188 return getRequestManager().getRecoveryLog(); 2189 } 2190 2191 2198 public void handleRemoteDisableBackendNotification( 2199 DatabaseBackend disabledBackend, Member sender) 2200 { 2201 synchronized (backendsPerController) 2202 { 2203 List remoteBackends = (List ) backendsPerController.get(sender); 2204 if (remoteBackends == null) 2205 { logger.warn("No information has been found for remote controller " 2209 + sender); 2210 remoteBackends = new ArrayList (); 2211 backendsPerController.put(sender, remoteBackends); 2212 } 2213 int size = remoteBackends.size(); 2214 boolean backendFound = false; 2215 for (int i = 0; i < size; i++) 2216 { 2217 DatabaseBackend remoteBackend = (DatabaseBackend) remoteBackends.get(i); 2218 if (remoteBackend.equals(disabledBackend)) 2219 { 2220 logger.info("Backend " + remoteBackend.getName() 2221 + " disabled on controller " + sender); 2222 remoteBackends.set(i, disabledBackend); 2223 backendFound = true; 2224 break; 2225 } 2226 } 2227 if (!backendFound) 2228 { 2229 logger.warn("Updating backend list with unknown backend " 2230 + disabledBackend.getName() + " disabled on controller " + sender); 2231 remoteBackends.add(disabledBackend); 2232 } 2233 } 2234 } 2235 2236 2243 public void handleRemoteDisableBackendsNotification( 2244 ArrayList disabledBackendInfos, Member sender) 2245 { 2246 synchronized (backendsPerController) 2247 { 2248 List remoteBackends = (List ) backendsPerController.get(sender); 2249 if (remoteBackends == null) 2250 { logger.warn("No information has been found for remote controller " 2254 + sender); 2255 remoteBackends = new ArrayList (); 2256 backendsPerController.put(sender, remoteBackends); 2257 } 2258 Iterator iter = disabledBackendInfos.iterator(); 2259 while (iter.hasNext()) 2260 { 2261 BackendInfo backendInfo = (BackendInfo) iter.next(); 2262 DatabaseBackend backend = backendInfo.getDatabaseBackend(this); 2263 2264 if (remoteBackends.contains(backend)) 2265 { 2266 logger.info("Backend " + backend.getName() 2267 + " disabled on controller " + sender); 2268 remoteBackends.set(remoteBackends.indexOf(backend), backend); 2269 } 2270 else 2271 { 2272 remoteBackends.add(backend); 2273 logger.warn("Updating backend list with unknown backend " 2274 + backendInfo.getName() + " disabled on controller " + sender); 2275 } 2276 } 2277 } 2278 } 2279 2280 2287 public void sendLocalConfiguration(Member dest) throws NotConnectedException 2288 { 2289 if (logger.isDebugEnabled()) 2291 logger.debug("Sending local controller name to joining controller (" 2292 + dest + ")"); 2293 2294 List target = new ArrayList (); 2295 target.add(dest); 2296 multicastRequestAdapter.multicastMessage(target, new ControllerInformation( 2297 controller.getControllerName(), controller.getJmxName(), 2298 distributedRequestManager.getControllerId()), 2299 MulticastRequestAdapter.WAIT_ALL, messageTimeouts 2300 .getControllerNameTimeout()); 2301 2302 if (logger.isDebugEnabled()) 2304 { 2305 logger.debug("Sending backend status name to joining controller (" + dest 2306 + ")"); 2307 } 2308 List backendInfos = DatabaseBackend.toBackendInfos(backends); 2309 multicastRequestAdapter.multicastMessage(target, new BackendStatus( 2310 backendInfos, distributedRequestManager.getControllerId()), 2311 MulticastRequestAdapter.WAIT_ALL, messageTimeouts 2312 .getBackendStatusTimeout()); 2313 } 2314 2315 2321 public boolean isAliveController(Long controllerId) 2322 { 2323 return controllerIds.containsValue(controllerId); 2324 } 2325 2326 2332 public boolean isLocalSender(Member sender) 2333 { 2334 return channel.getLocalMembership().equals(sender); 2335 } 2336 2337 2342 public void removeBackend(String backend) throws VirtualDatabaseException 2343 { 2344 super.removeBackend(backend); 2345 2346 try 2347 { 2348 broadcastBackendInformation(getAllMemberButUs()); 2350 } 2351 catch (Exception e) 2352 { 2353 String msg = "An error occured while multicasting new backedn information"; 2354 logger.error(msg, e); 2355 throw new VirtualDatabaseException(msg, e); 2356 } 2357 } 2358 2359 2369 private String removeRemoteControllerAndStartCleanupThread( 2370 Member remoteControllerMembership) 2371 { 2372 String remoteControllerJmxName = (String ) controllerJmxAddress 2373 .remove(remoteControllerMembership); 2374 if (remoteControllerJmxName == null) 2375 { 2376 logger.warn("Unable to find remote controller name for member " 2377 + remoteControllerMembership + " in list " 2378 + controllerJmxAddress.toString()); 2379 } 2380 else if (logger.isDebugEnabled()) 2381 logger.debug("Removing controller " + remoteControllerJmxName); 2382 2383 backendsPerController.remove(remoteControllerMembership); 2385 refreshGroupMembership(); 2386 2387 Long failedControllerId = (Long ) controllerIds 2391 .remove(remoteControllerMembership); 2392 2393 if (failedControllerId != null) 2394 { 2395 ControllerFailureCleanupThread controllerFailureCleanupThread = new ControllerFailureCleanupThread( 2396 this, failedControllerId.longValue(), failoverTimeoutInMs, 2397 cleanupThreads, writesFlushed); 2398 cleanupThreads.put(failedControllerId, controllerFailureCleanupThread); 2399 controllerFailureCleanupThread.start(); 2400 } 2401 2402 return remoteControllerJmxName; 2403 } 2404 2405 2408 private void removeVirtualDatabaseUser(VirtualDatabaseUser vdbUser) 2409 { 2410 try 2411 { 2412 multicastRequestAdapter.multicastMessage(getAllMembers(), 2413 new RemoveVirtualDatabaseUser(vdbUser), 2414 MulticastRequestAdapter.WAIT_NONE, messageTimeouts 2415 .getVirtualDatabaseConfigurationTimeout()); 2416 } 2417 catch (NotConnectedException e) 2418 { 2419 logger.error("Channel unavailable while removing vdb user " 2420 + vdbUser.getLogin(), e); 2421 return; 2422 } 2423 } 2424 2425 2438 public void setGroupCheckpoint(String checkpointName, ArrayList groupMembers) 2439 throws VirtualDatabaseException 2440 { 2441 try 2442 { 2443 distributedRequestManager.suspendActivity(); 2445 getMulticastRequestAdapter().multicastMessage(groupMembers, 2446 new DisableBackendsAndSetCheckpoint(new ArrayList (), checkpointName), 2447 MulticastRequestAdapter.WAIT_ALL, 2448 messageTimeouts.getSetCheckpointTimeout()); 2449 } 2450 catch (Exception e) 2451 { 2452 String msg = "Set group checkpoint failed: checkpointName=" 2453 + checkpointName; 2454 logger.error(msg, e); 2455 throw new VirtualDatabaseException(msg); 2456 } 2457 } 2458 2459 2468 private String setLogReplicationCheckpoint(String controllerName) 2469 throws VirtualDatabaseException 2470 { 2471 return setLogReplicationCheckpoint(getControllerByName(controllerName)); 2472 } 2473 2474 2485 public String setLogReplicationCheckpoint(Member controller) 2486 throws VirtualDatabaseException 2487 { 2488 String checkpointName = buildCheckpointName("now"); 2489 2490 ArrayList dest = new ArrayList (); 2492 dest.add(controller); 2493 dest.add(channel.getLocalMembership()); 2494 setGroupCheckpoint(checkpointName, dest); 2495 return checkpointName; 2496 } 2497 2498 2502 public void setShutdownCheckpoint() 2503 { 2504 try 2506 { 2507 setGroupCheckpoint(buildCheckpointName("shutdown"), getAllMembers()); 2508 } 2509 catch (VirtualDatabaseException e) 2510 { 2511 logger.warn("Error while setting shutdown checkpoint", e); 2512 } 2513 finally 2514 { 2515 if(isShuttingDown()) 2516 setRejectingNewTransaction(true); 2517 distributedRequestManager.resumeActivity(); 2518 } 2519 } 2520 2521 2531 private void sendMessageToController(String controllerName, 2532 Serializable message, long timeout) throws VirtualDatabaseException 2533 { 2534 sendMessageToController(getControllerByName(controllerName), message, 2535 timeout); 2536 } 2537 2538 2551 public Serializable sendMessageToController(Member controllerMember, 2552 Serializable message, long timeout) throws VirtualDatabaseException 2553 { 2554 try 2555 { 2556 ArrayList dest = new ArrayList (); 2557 dest.add(controllerMember); 2558 MulticastResponse resp = getMulticastRequestAdapter().multicastMessage( 2559 dest, message, MulticastRequestAdapter.WAIT_ALL, timeout); 2560 Object o = resp.getResult(controllerMember); 2561 if (o instanceof Exception ) 2562 throw (Exception ) o; 2563 return (Serializable ) o; 2564 } 2565 catch (Exception e) 2566 { 2567 logger.error(e); 2568 throw new VirtualDatabaseException(e); 2569 } 2570 } 2571 2572 2575 public void shutdown(int level) 2576 { 2577 for (Iterator iter = cleanupThreads.values().iterator(); iter.hasNext();) 2579 { 2580 ControllerFailureCleanupThread thread = (ControllerFailureCleanupThread) iter 2581 .next(); 2582 thread.shutdown(); 2583 } 2584 2585 requestResultFailoverCache.shutdown(); 2587 2588 super.shutdown(level); 2589 } 2590 2591 2595 public void transferBackend(String backend, String controllerDestination) 2596 throws VirtualDatabaseException 2597 { 2598 TransferBackendOperation transferOperation = new TransferBackendOperation( 2599 backend, controllerDestination); 2600 addAdminOperation(transferOperation); 2601 try 2602 { 2603 Member targetMember = getControllerByName(controllerDestination); 2604 2605 DatabaseBackend db = getAndCheckBackend(backend, CHECK_BACKEND_DISABLE); 2607 String transfertCheckpointName = buildCheckpointName("transfer backend: " 2608 + db.getName() + " from " + controller.getControllerName() + " to " 2609 + targetMember.getUid()); 2610 2611 if (logger.isDebugEnabled()) 2612 logger.debug("**** Disabling backend for transfer"); 2613 2614 try 2616 { 2617 if (!hasRecoveryLog()) 2618 throw new VirtualDatabaseException( 2619 "Transfer is not supported on virtual databases without a recovery log"); 2620 2621 distributedRequestManager.disableBackendWithCheckpoint(db, 2622 transfertCheckpointName); 2623 } 2624 catch (SQLException e) 2625 { 2626 throw new VirtualDatabaseException(e.getMessage()); 2627 } 2628 2629 try 2631 { 2632 if (logger.isDebugEnabled()) 2633 logger.debug("**** Sending transfer message to:" + targetMember); 2634 2635 ArrayList dest = new ArrayList (1); 2636 dest.add(targetMember); 2637 2638 sendMessageToController(targetMember, 2639 new BackendTransfer(controllerDestination, transfertCheckpointName, 2640 new BackendInfo(db)), messageTimeouts 2641 .getBackendTransferTimeout()); 2642 2643 if (logger.isDebugEnabled()) 2644 logger.debug("**** Removing local backend"); 2645 2646 removeBackend(db); 2648 2649 broadcastBackendInformation(getAllMemberButUs()); 2651 } 2652 catch (Exception e) 2653 { 2654 String msg = "An error occured while transfering the backend"; 2655 logger.error(msg, e); 2656 throw new VirtualDatabaseException(msg, e); 2657 } 2658 } 2659 finally 2660 { 2661 removeAdminOperation(transferOperation); 2662 } 2663 } 2664 2665 2669 public void transferDump(String dumpName, String remoteControllerName, 2670 boolean noCopy) throws VirtualDatabaseException 2671 { 2672 TransferDumpOperation transferOperation = new TransferDumpOperation( 2673 dumpName, remoteControllerName); 2674 addAdminOperation(transferOperation); 2675 try 2676 { 2677 DumpInfo dumpInfo = null; 2679 try 2680 { 2681 dumpInfo = getRecoveryLog().getDumpInfo(dumpName); 2682 2686 } 2687 catch (SQLException e) 2688 { 2689 String msg = "getting dump info from backup manager failed"; 2690 throw new VirtualDatabaseException(msg, e); 2691 } 2692 2693 if (dumpInfo == null) 2694 throw new VirtualDatabaseException("no dump info for dump '" + dumpName 2695 + "'"); 2696 2697 if (remoteControllerName.equals(controller.getJmxName())) 2698 throw new VirtualDatabaseException("Not transfering dump to myself"); 2699 2700 DumpTransferInfo dumpTransferInfo = null; 2704 if (!noCopy) 2705 { 2706 try 2707 { 2708 dumpTransferInfo = getRequestManager().getBackupManager() 2709 .getBackuperByFormat(dumpInfo.getDumpFormat()).setupDumpServer(); 2710 } 2711 catch (IOException e) 2712 { 2713 throw new VirtualDatabaseException(e); 2714 } 2715 } 2716 2717 sendMessageToController(remoteControllerName, new InitiateDumpCopy( 2720 dumpInfo, dumpTransferInfo), messageTimeouts 2721 .getInitiateDumpCopyTimeout()); 2722 } 2723 finally 2724 { 2725 removeAdminOperation(transferOperation); 2726 } 2727 } 2728 2729 2742 public boolean waitForTotalOrder(Object request, boolean errorIfNotFound) 2743 { 2744 synchronized (totalOrderQueue) 2745 { 2746 int index = totalOrderQueue.indexOf(request); 2747 while (index > 0) 2748 { 2749 if (logger.isDebugEnabled()) 2750 logger.debug("Waiting for " + index 2751 + " queries to execute (current is " + totalOrderQueue.get(0) 2752 + ")"); 2753 try 2754 { 2755 totalOrderQueue.wait(); 2756 } 2757 catch (InterruptedException ignore) 2758 { 2759 } 2760 index = totalOrderQueue.indexOf(request); 2761 } 2762 if (index == -1) 2763 { 2764 if (errorIfNotFound) 2765 logger 2766 .error("Request was not found in total order queue, posting out of order (" 2767 + request + ")"); 2768 return false; 2769 } 2770 else 2771 return true; 2772 } 2773 } 2774} | Popular Tags |