1 24 25 package org.objectweb.cjdbc.controller.virtualdatabase; 26 27 import java.io.IOException ; 28 import java.io.Serializable ; 29 import java.net.URL ; 30 import java.sql.SQLException ; 31 import java.util.ArrayList ; 32 import java.util.Date ; 33 import java.util.HashMap ; 34 import java.util.Hashtable ; 35 import java.util.Iterator ; 36 import java.util.Map.Entry; 37 38 import javax.management.NotCompliantMBeanException ; 39 40 import org.objectweb.cjdbc.common.exceptions.BackupException; 41 import org.objectweb.cjdbc.common.exceptions.ControllerException; 42 import org.objectweb.cjdbc.common.exceptions.NoMoreBackendException; 43 import org.objectweb.cjdbc.common.exceptions.VirtualDatabaseException; 44 import org.objectweb.cjdbc.common.i18n.Translate; 45 import org.objectweb.cjdbc.common.jmx.JmxException; 46 import org.objectweb.cjdbc.common.jmx.notifications.CjdbcNotificationList; 47 import org.objectweb.cjdbc.common.log.Trace; 48 import org.objectweb.cjdbc.common.shared.BackendInfo; 49 import org.objectweb.cjdbc.common.shared.DumpInfo; 50 import org.objectweb.cjdbc.common.sql.AbstractRequest; 51 import org.objectweb.cjdbc.common.sql.filters.AbstractBlobFilter; 52 import org.objectweb.cjdbc.common.sql.schema.DatabaseSchema; 53 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags; 54 import org.objectweb.cjdbc.controller.backend.DatabaseBackend; 55 import org.objectweb.cjdbc.controller.backup.Backuper; 56 import org.objectweb.cjdbc.controller.backup.DumpTransferInfo; 57 import org.objectweb.cjdbc.controller.core.Controller; 58 import org.objectweb.cjdbc.controller.jmx.MBeanServerManager; 59 import org.objectweb.cjdbc.controller.jmx.RmiConnector; 60 import org.objectweb.cjdbc.controller.recoverylog.RecoveryLog; 61 import org.objectweb.cjdbc.controller.recoverylog.events.LogEntry; 62 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels; 63 import org.objectweb.cjdbc.controller.requestmanager.RequestManager; 64 import org.objectweb.cjdbc.controller.requestmanager.distributed.DistributedRequestManager; 65 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.BackendStatus; 66 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.BackendTransfer; 67 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.CJDBCGroupMessage; 68 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ControllerName; 69 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.CopyLogEntry; 70 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.DisableBackend; 71 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.DistributedRequest; 72 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.DistributedTransactionMarker; 73 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.EnableBackend; 74 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.InitiateDumpCopy; 75 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ReplicateLogEntries; 76 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.SetCheckpoint; 77 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.VirtualDatabaseConfiguration; 78 import org.objectweb.tribe.adapters.MulticastRequestAdapter; 79 import org.objectweb.tribe.adapters.MulticastRequestListener; 80 import org.objectweb.tribe.adapters.MulticastResponse; 81 import org.objectweb.tribe.channel.JGroupsReliableChannelWithGms; 82 import org.objectweb.tribe.channel.ReliableGroupChannelWithGms; 83 import org.objectweb.tribe.common.Address; 84 import org.objectweb.tribe.common.Group; 85 import org.objectweb.tribe.common.GroupIdentifier; 86 import org.objectweb.tribe.common.Member; 87 import org.objectweb.tribe.exceptions.ChannelException; 88 import org.objectweb.tribe.exceptions.NotConnectedException; 89 import org.objectweb.tribe.exceptions.TimeoutException; 90 import org.objectweb.tribe.gms.GroupMembershipListener; 91 import org.objectweb.tribe.gms.JGroupsMembershipService; 92 import org.objectweb.tribe.messages.MessageListener; 93 94 102 public class DistributedVirtualDatabase extends VirtualDatabase 103 implements 104 MessageListener, 105 MulticastRequestListener, 106 GroupMembershipListener 107 { 108 120 122 123 private String groupName = null; 124 125 private Hashtable controllersMap; 126 127 private Hashtable backendsPerController; 128 129 130 private ReliableGroupChannelWithGms channel = null; 131 132 private MulticastRequestAdapter multicastRequestAdapter; 133 private Group currentGroup = null; 134 private ArrayList allMemberButUs = null; 135 private static final long INCOMPATIBLE_CONFIGURATION = -1; 136 private boolean isVirtualDatabaseStarted = false; 137 138 142 private DistributedRequestManager distributedRequestManager; 143 144 145 private static Trace distributedRequestLogger; 146 147 169 public DistributedVirtualDatabase(Controller controller, String name, 170 String groupName, int maxConnections, boolean pool, int minThreads, 171 int maxThreads, long maxThreadIdleTime, int sqlShortFormLength, 172 AbstractBlobFilter blobFilter) throws NotCompliantMBeanException , 173 JmxException 174 { 175 super(controller, name, maxConnections, pool, minThreads, maxThreads, 176 maxThreadIdleTime, sqlShortFormLength, blobFilter); 177 178 this.groupName = groupName; 179 backendsPerController = new Hashtable (); 180 controllersMap = new Hashtable (); 181 isVirtualDatabaseStarted = false; 182 distributedRequestLogger = Trace 183 .getLogger("org.objectweb.cjdbc.controller.distributedvirtualdatabase.request." 184 + name); 185 } 186 187 192 protected void finalize() throws Throwable 193 { 194 quitChannel(); 195 super.finalize(); 196 } 197 198 201 public void addBackend(DatabaseBackend db) throws VirtualDatabaseException 202 { 203 super.addBackend(db); 205 206 try 208 { 209 broadcastBackendInformation(getAllMemberButUs()); 210 } 211 catch (Exception e) 212 { 213 String msg = "Error while broadcasting backend information when adding backend"; 214 logger.error(msg, e); 215 throw new VirtualDatabaseException(msg, e); 216 } 217 } 218 219 225 public void quitChannel() throws ChannelException, NotConnectedException 226 { 227 if (multicastRequestAdapter != null) 228 { 229 multicastRequestAdapter.stop(); 230 } 231 if (channel != null) 232 { 233 channel.close(); 234 } 235 } 236 237 242 public String getControllerName() 243 { 244 return controller.getControllerName(); 245 } 246 247 253 public String getGroupName() 254 { 255 return groupName; 256 } 257 258 263 public void setGroupName(String groupName) 264 { 265 this.groupName = groupName; 266 } 267 268 273 public void setRequestManager(RequestManager requestManager) 274 { 275 if (!(requestManager instanceof DistributedRequestManager)) 276 throw new RuntimeException ( 277 "A distributed virtual database can only work with a distributed request manager."); 278 279 distributedRequestManager = (DistributedRequestManager) requestManager; 280 this.requestManager = distributedRequestManager; 282 } 283 284 288 294 public void joinGroup() throws Exception 295 { 296 try 297 { 298 URL jgroupConfigFile = DistributedVirtualDatabase.class 300 .getResource("/jgroups.xml"); 301 if (jgroupConfigFile == null) 302 logger.warn(Translate 303 .get("virtualdatabase.distributed.jgroups.xml.not.found")); 304 else 305 logger.info(Translate.get("virtualdatabase.distributed.jgroups.using", 306 jgroupConfigFile.toString())); 307 JGroupsMembershipService gms = new JGroupsMembershipService( 308 jgroupConfigFile); 309 gms.registerGroupMembershipListener(this); 310 channel = new JGroupsReliableChannelWithGms(gms); 311 if (logger.isDebugEnabled()) 312 logger.debug("Group communication channel is configured as follows: " 313 + ((JGroupsReliableChannelWithGms) channel).getProperties()); 314 315 channel.join(new Group(new GroupIdentifier(groupName))); 317 multicastRequestAdapter = new MulticastRequestAdapter(channel , this 320 , this 321 ); 322 323 Thread.sleep(2000); 326 327 logger.info("Group " + groupName + " connected to " 328 + channel.getLocalMembership()); 329 330 controllersMap.put(channel.getLocalMembership(), controller.getJmxName()); 332 333 currentGroup = channel.getCurrentGroup(); 335 ArrayList currentGroupMembers = currentGroup.getMembers(); 336 int groupSize = currentGroupMembers.size(); 337 if (groupSize == 1) 338 { 339 logger.info(Translate.get( 340 "virtualdatabase.distributed.configuration.first.in.group", 341 groupName)); 342 allMemberButUs = new ArrayList (); 343 distributedRequestManager.setControllerId(0); 344 isVirtualDatabaseStarted = true; 345 return; 346 } 347 348 logger.info("Group now contains " + groupSize + " controllers."); 349 if (logger.isDebugEnabled()) 350 { 351 logger.debug("Current list of controllers is as follows:"); 352 for (Iterator iter = currentGroupMembers.iterator(); iter.hasNext();) 353 logger.debug("Controller " + iter.next()); 354 } 355 356 refreshGroupMembership(); 358 long controllerId = checkConfigurationCompatibilityAndReturnControllerId(getAllMemberButUs()); 360 if (controllerId == INCOMPATIBLE_CONFIGURATION) 361 { 362 String msg = Translate 363 .get("virtualdatabase.distributed.configuration.not.compatible"); 364 logger.error(msg); 365 throw new ControllerException(msg); 366 } 367 else 368 { 369 controllerId += currentGroupMembers.indexOf(channel 374 .getLocalMembership()); 375 376 if (logger.isInfoEnabled()) 377 { 378 logger.info(Translate 379 .get("virtualdatabase.distributed.configuration.compatible")); 380 logger.info("Controller identifier is set to: " + controllerId); 381 } 382 distributedRequestManager.setControllerId(controllerId); 384 } 385 386 broadcastBackendInformation(getAllMemberButUs()); 389 } 390 catch (Exception e) 391 { 392 String msg = Translate.get("virtualdatabase.distributed.joingroup.error", 393 groupName); 394 if (e instanceof RuntimeException ) 395 logger.error(msg, e); 396 throw new Exception (msg + " (" + e + ")"); 397 } 398 isVirtualDatabaseStarted = true; 399 } 400 401 406 public ArrayList getAllMembers() 407 { 408 synchronized (controllersMap) 409 { 410 if (currentGroup == null) return new ArrayList (); 412 return ((ArrayList ) currentGroup.getMembers().clone()); 413 } 414 } 415 416 422 public ArrayList getAllMemberButUs() 423 { 424 synchronized (controllersMap) 425 { 426 if (allMemberButUs == null) return new ArrayList (); 428 429 438 return allMemberButUs; 439 } 440 } 441 442 447 public ReliableGroupChannelWithGms getChannel() 448 { 449 return channel; 450 } 451 452 457 public Group getCurrentGroup() 458 { 459 return currentGroup; 460 } 461 462 467 public MulticastRequestAdapter getMulticastRequestAdapter() 468 { 469 return multicastRequestAdapter; 470 } 471 472 482 private long checkConfigurationCompatibilityAndReturnControllerId( 483 ArrayList dest) 484 { 485 if (logger.isInfoEnabled()) 486 logger.info(Translate 487 .get("virtualdatabase.distributed.configuration.checking")); 488 489 MulticastResponse rspList; 491 try 492 { 493 rspList = multicastRequestAdapter.multicastMessage(dest, 494 new VirtualDatabaseConfiguration(this), 495 MulticastRequestAdapter.WAIT_ALL, 496 CJDBCGroupMessage.defaultCastTimeOut); 497 } 498 catch (TimeoutException e) 499 { 500 logger.error( 501 "Timeout occured while checking configuration compatibility", e); 502 return INCOMPATIBLE_CONFIGURATION; 503 } 504 catch (ChannelException e) 505 { 506 logger 507 .error( 508 "Communication error occured while checking configuration compatibility", 509 e); 510 return INCOMPATIBLE_CONFIGURATION; 511 } 512 catch (NotConnectedException e) 513 { 514 logger.error( 515 "Channel unavailable while checking configuration compatibility", e); 516 return INCOMPATIBLE_CONFIGURATION; 517 } 518 519 HashMap results = rspList.getResults(); 521 int size = results.size(); 522 if (size == 0) 523 logger.warn(Translate 524 .get("virtualdatabase.distributed.configuration.checking.noanswer")); 525 526 long highestRemoteControllerId = 0; 527 for (Iterator iter = results.values().iterator(); iter.hasNext();) 528 { 529 Object response = iter.next(); 530 if (response instanceof Long ) 531 { 532 long remoteControllerId = ((Long ) response).longValue(); 537 if (remoteControllerId == INCOMPATIBLE_CONFIGURATION) 538 return INCOMPATIBLE_CONFIGURATION; 539 else if (highestRemoteControllerId < remoteControllerId) 540 highestRemoteControllerId = remoteControllerId; 541 } 542 else 543 { 544 logger 545 .error("Unexpected response while checking configuration compatibility: " 546 + response); 547 return INCOMPATIBLE_CONFIGURATION; 548 } 549 } 550 551 return ((highestRemoteControllerId >> DistributedRequestManager.CONTROLLER_ID_SHIFT_BITS) & DistributedRequestManager.CONTROLLER_ID_BITS) + 1; 555 } 556 557 565 private void broadcastBackendInformation(ArrayList dest) 566 throws TimeoutException, ChannelException, NotConnectedException 567 { 568 logger 569 .debug(Translate 570 .get("virtualdatabase.distributed.configuration.querying.remote.status")); 571 572 MulticastResponse rspList = multicastRequestAdapter.multicastMessage(dest, 574 new BackendStatus(getBackendsInfo(backends)), 575 MulticastRequestAdapter.WAIT_ALL, CJDBCGroupMessage.defaultCastTimeOut); 576 577 int size = dest.size(); 578 for (int i = 0; i < size; i++) 579 { 580 Member m = (Member) dest.get(i); 582 if (rspList.getResult(m) != null) 583 { 584 if (logger.isDebugEnabled()) 585 logger 586 .debug(Translate 587 .get( 588 "virtualdatabase.distributed.configuration.updating.backend.list", 589 m.toString())); 590 } 591 else 592 logger.warn(Translate.get( 593 "virtualdatabase.distributed.unable.get.remote.status", m 594 .toString())); 595 } 596 } 597 598 608 public boolean isCompatibleBackend(BackendInfo backend) 609 throws VirtualDatabaseException 610 { 611 try 612 { 613 acquireReadLockBackendLists(); 614 } 615 catch (InterruptedException e) 616 { 617 String msg = "Unable to acquire read lock on backend list in isCompatibleBackend (" 618 + e + ")"; 619 logger.error(msg); 620 throw new VirtualDatabaseException(msg); 621 } 622 623 try 624 { 625 String backendURL = backend.getUrl(); 627 int size = backends.size(); 628 DatabaseBackend b = null; 629 for (int i = 0; i < size; i++) 630 { 631 b = (DatabaseBackend) backends.get(i); 632 if (b.getURL().equals(backendURL)) 633 return false; 634 } 635 } 636 catch (RuntimeException re) 637 { 638 throw new VirtualDatabaseException(re); 639 } 640 finally 641 { 642 releaseReadLockBackendLists(); 643 } 644 return true; 646 } 647 648 657 public boolean isCompatibleDatabaseSchema(DatabaseSchema dbs) 658 { 659 if (dbs == null) 661 { 662 logger.warn(Translate 663 .get("virtualdatabase.distributed.configuration.checking.noschema")); 664 } 665 else 666 { 667 switch (getRequestManager().getLoadBalancer().getRAIDbLevel()) 669 { 670 case RAIDbLevels.RAIDb0 : 671 if (dbs.equals(getRequestManager().getDatabaseSchema())) 673 { 674 logger 675 .warn(Translate 676 .get("virtualdatabase.distributed.configuration.checking.mismatch.databaseschema")); 677 return false; 678 } 679 break; 680 case RAIDbLevels.RAIDb1 : 681 if (!dbs.equals(getRequestManager().getDatabaseSchema())) 683 { 684 logger 685 .warn(Translate 686 .get("virtualdatabase.distributed.configuration.checking.mismatch.databaseschema")); 687 return false; 688 } 689 break; 690 case RAIDbLevels.RAIDb2 : 691 if (!dbs.isCompatibleWith(getRequestManager().getDatabaseSchema())) 693 { 694 logger 695 .warn(Translate 696 .get("virtualdatabase.distributed.configuration.checking.mismatch.databaseschema")); 697 return false; 698 } 699 break; 700 case RAIDbLevels.SingleDB : 701 default : 702 logger.error("Unsupported RAIDb level: " 703 + getRequestManager().getLoadBalancer().getRAIDbLevel()); 704 return false; 705 } 706 } 707 return true; 708 } 709 710 714 717 public void receive(Serializable msg) 718 { 719 logger.error("Distributed virtual database received unhandled message: " 720 + msg); 721 } 722 723 730 public Object handleMessageSingleThreaded(Serializable msg, Member sender) 731 { 732 try 733 { 734 if (msg != null) 735 { 736 if (logger.isDebugEnabled()) 737 logger.debug("handleMessageSingleThreaded (" + msg.getClass() + "): " 738 + msg); 739 740 if (msg instanceof DistributedRequest) 742 { if (!isVirtualDatabaseStarted) 744 return null; 746 if (logger.isDebugEnabled()) 747 logger.debug(getControllerName() 748 + ": Scheduling distributedRequest " 749 + ((DistributedRequest) msg).getRequest().getId() + " from " 750 + sender); 751 752 if (distributedRequestLogger.isInfoEnabled()) 753 { 754 AbstractRequest request = ((DistributedRequest) msg).getRequest(); 755 distributedRequestLogger.info((request.isSelect() ? "S " : "W ") 756 + request.getTransactionId() + " " + request.getSQL()); 757 } 758 ((DistributedRequest) msg).scheduleRequest(distributedRequestManager); 759 return null; 760 } 761 if (msg instanceof DistributedTransactionMarker) 762 { if (!isVirtualDatabaseStarted) 764 return null; 766 if (logger.isDebugEnabled()) 767 logger.debug(getControllerName() + ": Scheduling " + msg + " from " 768 + sender); 769 770 if (distributedRequestLogger.isInfoEnabled()) 771 distributedRequestLogger.info(msg); 772 773 ((DistributedTransactionMarker) msg) 774 .scheduleCommand(distributedRequestManager); 775 return null; 776 } 777 if (msg instanceof SetCheckpoint) 778 setCheckpoint(((SetCheckpoint) msg).getCheckpointName()); 779 else if (msg instanceof ReplicateLogEntries) 780 handleReplicateLogEntries((ReplicateLogEntries) msg); 781 else if (msg instanceof CopyLogEntry) 782 handleCopyLogEntry((CopyLogEntry) msg); 783 } 785 else 786 { 787 String errorMsg = "Invalid null message"; 788 logger.error(errorMsg); 789 return new ControllerException(errorMsg); 790 } 791 792 return null; 793 } 794 catch (Exception e) 795 { 796 if (e instanceof RuntimeException ) 797 logger.warn("Error while handling group message:" + msg.getClass(), e); 798 return e; 799 } 800 } 801 802 806 public Serializable handleMessageMultiThreaded(Serializable msg, 807 Member sender, Object handleMessageSingleThreadedResult) 808 { 809 try 810 { 811 if (msg != null) 812 { 813 if (logger.isDebugEnabled()) 814 logger.debug("handleMessageMultiThreaded (" + msg.getClass() + "): " 815 + msg); 816 if (msg instanceof SetCheckpoint) 817 { 818 return null; } 820 if (msg instanceof DistributedRequest) 821 { if (!isVirtualDatabaseStarted) 823 return new NoMoreBackendException( 824 "Controller is in intialization phase"); 825 826 if (logger.isDebugEnabled()) 827 logger.debug(getControllerName() 828 + ": Executing distributedRequest " 829 + ((DistributedRequest) msg).getRequest().getId() + " from " 830 + sender); 831 Serializable result = ((Serializable ) ((DistributedRequest) msg) 832 .executeScheduledRequest(distributedRequestManager)); 833 return result; 834 } 835 if (msg instanceof DistributedTransactionMarker) 836 { if (!isVirtualDatabaseStarted) 838 return new NoMoreBackendException( 839 "Controller is in intialization phase"); 840 841 if (logger.isDebugEnabled()) 842 logger.debug(getControllerName() + ": " + msg + " from " + sender); 843 return ((Serializable ) ((DistributedTransactionMarker) msg) 844 .executeCommand(distributedRequestManager)); 845 } 846 if (msg instanceof ControllerName) 847 { 848 addRemoteController(sender, ((ControllerName) msg).getJmxName()); 849 return Boolean.TRUE; 850 } 851 else if (msg instanceof VirtualDatabaseConfiguration) 852 { return handleVirtualDatabaseConfiguration( 854 (VirtualDatabaseConfiguration) msg, sender); 855 } 856 else if (msg instanceof BackendTransfer) 857 { return handleBackendTransfer((BackendTransfer) msg); 859 } 860 else if (msg instanceof BackendStatus) 861 handleBackendStatus((BackendStatus) msg, sender); 862 else if (msg instanceof EnableBackend) 863 handleEnableBackend((EnableBackend) msg, sender); 864 else if (msg instanceof DisableBackend) 865 handleDisableBackend((DisableBackend) msg, sender); 866 else if (msg instanceof ReplicateLogEntries) 867 return null; else if (msg instanceof CopyLogEntry) 869 return null; else if (msg instanceof InitiateDumpCopy) 871 handleInitiateDumpCopy((InitiateDumpCopy) msg); 872 else 873 logger.warn("Unhandled message type received: " + msg.getClass() 874 + "(" + msg + ")"); 875 } 876 else 877 { 878 String errorMsg = "Invalid null message"; 879 logger.error(errorMsg); 880 return new ControllerException(errorMsg); 881 } 882 return null; 883 } 884 catch (Exception e) 885 { 886 if (e instanceof RuntimeException ) 887 logger.warn("Error while handling group message:" + msg.getClass(), e); 888 return e; 889 } 890 finally 891 { 892 if (msg != null) 893 { 894 if (msg instanceof DistributedRequest) 897 { 898 synchronized (totalOrderQueue) 899 { 900 if (totalOrderQueue.remove(((DistributedRequest) msg).getRequest())) 901 { 902 logger.warn("Distributed request " 903 + ((DistributedRequest) msg).getRequest() 904 + " did not remove itself from the total order queue"); 905 totalOrderQueue.notifyAll(); 906 } 907 } 908 } 909 else if (msg instanceof DistributedTransactionMarker) 910 { 911 synchronized (totalOrderQueue) 912 { 913 if (totalOrderQueue.remove(msg)) 914 { 915 logger.warn("Distributed " + msg.toString() + " did not remove " 916 + "itself from the total order queue"); 917 totalOrderQueue.notifyAll(); 918 } 919 } 920 } 921 } 922 } 923 } 924 925 private RecoveryLog getRecoveryLog() throws VirtualDatabaseException 926 { 927 if (!hasRecoveryLog()) 928 throw new VirtualDatabaseException(Translate 929 .get("virtualdatabase.no.recovery.log")); 930 931 return getRequestManager().getRecoveryLog(); 932 } 933 934 private void handleInitiateDumpCopy(InitiateDumpCopy msg) 935 throws VirtualDatabaseException, BackupException, IOException 936 { 937 if (msg.getDumpTransferInfo() != null) 939 { 940 Backuper backuper = getRequestManager().getBackupManager() 941 .getBackuperByFormat(msg.getDumpInfo().getDumpFormat()); 942 943 backuper.fetchDump(msg.getDumpTransferInfo(), msg.getDumpInfo() 944 .getDumpPath(), msg.getDumpInfo().getDumpName()); 945 } 946 947 getRecoveryLog().setDumpInfo(msg.getDumpInfo()); 949 } 950 951 956 private void handleCopyLogEntry(CopyLogEntry msg) 957 { 958 if (!hasRecoveryLog()) 959 { 960 logger.warn("Tentative handleCopyLogEntry on vdb with no recovery log"); 961 return; 962 } 963 964 getRequestManager().getRecoveryLog().logLogEntry(msg.getEntry()); 965 } 966 967 977 private void handleReplicateLogEntries(ReplicateLogEntries msg) 978 throws SQLException 979 { 980 if (!hasRecoveryLog()) 981 { 982 logger 983 .warn("Tentative handleReplicateLogEntries on vdb with no recovery log"); 984 return; 985 } 986 987 if (msg.getDumpCheckpointName() != null) 988 { getRequestManager().getRecoveryLog().storeDumpCheckpointName( 990 msg.getDumpCheckpointName(), msg.getCheckpointId()); 991 } 992 else 993 { getRequestManager().getRecoveryLog().resetLogTableIdAndDeleteRecoveryLog( 995 msg.getCheckpointName(), msg.getCheckpointId()); 996 } 997 } 998 999 1005 private void handleBackendStatus(BackendStatus msg, Member sender) 1006 { 1007 ArrayList remoteBackendInfoList = msg.getBackends(); 1009 ArrayList remoteBackendList = new ArrayList (remoteBackendInfoList.size()); 1011 for (Iterator iter = remoteBackendInfoList.iterator(); iter.hasNext();) 1012 { 1013 BackendInfo info = (BackendInfo) iter.next(); 1014 remoteBackendList.add(info.getDatabaseBackend()); 1015 } 1016 backendsPerController.put(sender, remoteBackendList); 1017 if (logger.isInfoEnabled()) 1018 logger.info(Translate.get( 1019 "virtualdatabase.distributed.configuration.updating.backend.list", 1020 sender)); 1021 } 1022 1023 1033 private Serializable handleBackendTransfer(BackendTransfer msg) 1034 throws NotCompliantMBeanException , VirtualDatabaseException 1035 { 1036 if (logger.isInfoEnabled()) 1037 logger.info(getControllerName() 1038 + ": Received transfer command. Checkpoint: " 1039 + msg.getCheckpointName()); 1040 BackendInfo info = msg.getInfo(); 1041 DatabaseBackend backend = new DatabaseBackend(info); 1042 this.addBackend(backend); 1043 if (logger.isInfoEnabled()) 1044 logger.info(getControllerName() + ": Enable backend"); 1045 enableBackendFromCheckpoint(backend.getName(), msg.getCheckpointName()); 1046 return Boolean.TRUE; 1047 } 1048 1049 1055 private void handleDisableBackend(DisableBackend msg, Member sender) 1056 { 1057 ArrayList remoteBackendList = (ArrayList ) backendsPerController.get(sender); 1058 if (remoteBackendList == null) 1059 { logger.warn("No information has been found for remote controller " 1063 + sender); 1064 remoteBackendList = new ArrayList (); 1065 backendsPerController.put(sender, remoteBackendList); 1066 } 1067 DatabaseBackend disabledBackend = msg.getDatabaseBackend(); 1068 int size = remoteBackendList.size(); 1069 boolean backendFound = false; 1070 for (int i = 0; i < size; i++) 1071 { 1072 DatabaseBackend b = (DatabaseBackend) remoteBackendList.get(i); 1073 if (b.equals(disabledBackend)) 1074 { 1075 logger.info("Backend " + b.getName() + " disabled on controller " 1076 + sender); 1077 remoteBackendList.set(i, disabledBackend); 1078 backendFound = true; 1079 break; 1080 } 1081 } 1082 if (!backendFound) 1083 { 1084 logger.warn("Updating backend list with unknown backend " 1085 + disabledBackend.getName() + " disabled on controller " + sender); 1086 remoteBackendList.add(disabledBackend); 1087 } 1088 } 1089 1090 1096 private void handleEnableBackend(EnableBackend msg, Member sender) 1097 { 1098 ArrayList remoteBackendList = (ArrayList ) backendsPerController.get(sender); 1099 if (remoteBackendList == null) 1100 { logger.warn("No information has been found for remote controller " 1104 + sender); 1105 remoteBackendList = new ArrayList (); 1106 backendsPerController.put(sender, remoteBackendList); 1107 } 1108 DatabaseBackend enabledBackend = msg.getDatabaseBackend(); 1109 int size = remoteBackendList.size(); 1110 boolean backendFound = false; 1111 for (int i = 0; i < size; i++) 1112 { 1113 DatabaseBackend b = (DatabaseBackend) remoteBackendList.get(i); 1114 if (b.equals(enabledBackend)) 1115 { 1116 logger.info("Backend " + b.getName() + " enabled on controller " 1117 + sender); 1118 remoteBackendList.set(i, enabledBackend); 1119 backendFound = true; 1120 break; 1121 } 1122 } 1123 if (!backendFound) 1124 { 1125 logger.warn("Updating backend list with unknown backend " 1126 + enabledBackend.getName() + " enabled on controller " + sender); 1127 remoteBackendList.add(enabledBackend); 1128 } 1129 } 1130 1131 1143 private Serializable handleVirtualDatabaseConfiguration( 1144 VirtualDatabaseConfiguration msg, Member sender) throws TimeoutException, 1145 ChannelException, NotConnectedException 1146 { 1147 1148 if (logger.isInfoEnabled()) 1149 logger.info("Checking virtual database configuration from " 1150 + msg.getControllerName()); 1151 1152 if (!isLocalSender(sender) && !msg.isCompatible(this)) 1153 return new Long (INCOMPATIBLE_CONFIGURATION); 1155 1156 if (MBeanServerManager.isJmxEnabled()) 1158 { 1159 Hashtable data = new Hashtable (); 1160 data.put("controllerName", msg.getControllerName()); 1161 data.put("rmiconnector", new String []{msg.getRmiHostname(), 1162 msg.getRmiPort()}); 1163 RmiConnector.broadcastNotification(this, 1164 CjdbcNotificationList.DISTRIBUTED_CONTROLLER_ADDED, 1165 CjdbcNotificationList.NOTIFICATION_LEVEL_INFO, Translate.get( 1166 "notification.distributed.controller.added", this 1167 .getVirtualDatabaseName()), data); 1168 } 1169 1170 addRemoteController(sender, msg.getControllerJmxName()); 1171 1172 if (logger.isDebugEnabled()) 1174 logger.debug("Sending local controller name to joining controller (" 1175 + sender + ")"); 1176 ArrayList target = new ArrayList (); 1177 target.add(sender); 1178 multicastRequestAdapter.multicastMessage(target, new ControllerName( 1179 controller.getControllerName(), controller.getJmxName()), 1180 MulticastRequestAdapter.WAIT_ALL, CJDBCGroupMessage.defaultCastTimeOut); 1181 1182 if (logger.isDebugEnabled()) 1184 logger.debug("Sending backend status name to joining controller (" 1185 + sender + ")"); 1186 multicastRequestAdapter.multicastMessage(target, new BackendStatus( 1187 getBackendsInfo(backends)), MulticastRequestAdapter.WAIT_ALL, 1188 CJDBCGroupMessage.defaultCastTimeOut); 1189 1190 if (logger.isInfoEnabled()) 1191 logger.info("Controller " + msg.getControllerName() 1192 + " is compatible with the local configuration"); 1193 return new Long (distributedRequestManager.getControllerId()); 1194 } 1195 1196 1202 private boolean isLocalSender(Member sender) 1203 { 1204 return channel.getLocalMembership().equals(sender); 1205 } 1206 1207 1214 public void getBackendStatus() throws TimeoutException, ChannelException, 1215 NotConnectedException 1216 { 1217 if (logger.isDebugEnabled()) 1218 logger.debug("Requesting remote controllers status"); 1219 MulticastResponse rspList = multicastRequestAdapter.multicastMessage(null, 1220 new BackendStatus(getBackendsInfo(backends)), 1221 MulticastRequestAdapter.WAIT_ALL, CJDBCGroupMessage.defaultCastTimeOut); 1222 1223 HashMap results = rspList.getResults(); 1224 for (Iterator iter = results.values().iterator(); iter.hasNext();) 1225 { 1226 ArrayList b = (ArrayList ) iter.next(); 1227 int bSize = b.size(); 1228 if (bSize == 0) 1229 logger.debug("No Database backends"); 1230 else 1231 for (int j = 0; j < bSize; j++) 1232 logger.debug(((DatabaseBackend) b.get(j)).getXml()); 1233 } 1234 } 1235 1236 1240 1248 public void addRemoteController(Member remoteControllerMembership, 1249 String remoteControllerJmxName) 1250 { 1251 controllersMap.put(remoteControllerMembership, remoteControllerJmxName); 1252 if (logger.isDebugEnabled()) 1253 logger.debug("Adding new controller " + remoteControllerJmxName); 1254 refreshGroupMembership(); 1255 } 1256 1257 1266 public String removeRemoteController(Member remoteControllerMembership) 1267 { 1268 String remoteControllerJmxName = (String ) controllersMap 1269 .remove(remoteControllerMembership); 1270 if (logger.isDebugEnabled()) 1271 logger.debug("Removing controller " + remoteControllerJmxName); 1272 1273 backendsPerController.remove(remoteControllerMembership); 1275 refreshGroupMembership(); 1276 return remoteControllerJmxName; 1277 } 1278 1279 1286 private void setEventCheckpoint(String name, Member m, GroupIdentifier gid) 1287 { 1288 if (!hasRecoveryLog()) 1289 { 1290 logger.warn("Trying to set checkpoint with no recoveryLog. Ignored."); 1291 return; 1292 } 1293 1294 try 1295 { 1296 getRequestManager().getRecoveryLog().storeCheckpoint( 1297 (name + " member:" + m + " gid:" + gid + " " + new Date ()) 1298 .replaceAll(" ", "_")); 1299 } 1300 catch (SQLException e) 1301 { 1302 logger.warn("setEventCheckpoint: " + name, e); 1303 } 1304 } 1305 1306 1310 public void joinMember(Member m, GroupIdentifier gid) 1311 { 1312 if (hasRecoveryLog()) 1313 setEventCheckpoint("joining", m, gid); 1314 } 1315 1316 1320 public void quitMember(Member m, GroupIdentifier gid) 1321 { 1322 if (hasRecoveryLog()) 1323 setEventCheckpoint("quitting", m, gid); 1324 1325 int failures = multicastRequestAdapter.memberFailsOnAllReplies(m); 1327 logger.info(failures + " requests were waiting responses from " + m); 1328 1329 String remoteControllerName = removeRemoteController(m); 1331 if (remoteControllerName != null) 1332 { 1333 logger.warn("Controller " + m + " has left the cluster."); 1334 if (MBeanServerManager.isJmxEnabled()) 1336 { 1337 Hashtable data = new Hashtable (); 1338 data.put("controllerName", remoteControllerName); 1339 RmiConnector.broadcastNotification(this, 1340 CjdbcNotificationList.DISTRIBUTED_CONTROLLER_REMOVED, 1341 CjdbcNotificationList.NOTIFICATION_LEVEL_INFO, Translate.get( 1342 "notification.distributed.controller.removed", this 1343 .getVirtualDatabaseName()), data); 1344 } 1345 } 1346 } 1347 1348 1352 public void groupComposition(Group g, Address sender) 1353 { 1354 } 1356 1357 1362 public void failedMember(Member failed, GroupIdentifier gid, Member sender) 1363 { 1364 } 1366 1367 1371 private void refreshGroupMembership() 1372 { 1373 if (logger.isDebugEnabled()) 1374 logger.debug("Refreshing members list:" + currentGroup.getMembers()); 1375 1376 synchronized (controllersMap) 1377 { 1378 allMemberButUs = ((ArrayList ) currentGroup.getMembers().clone()); 1379 allMemberButUs.remove(channel.getLocalMembership()); 1380 } 1381 } 1382 1383 1387 1392 public boolean isDistributed() 1393 { 1394 return true; 1395 } 1396 1397 1404 public boolean equals(Object other) 1405 { 1406 if ((other == null) 1407 || (!(other instanceof org.objectweb.cjdbc.controller.virtualdatabase.DistributedVirtualDatabase))) 1408 return false; 1409 else 1410 { 1411 DistributedVirtualDatabase db = (org.objectweb.cjdbc.controller.virtualdatabase.DistributedVirtualDatabase) other; 1412 return name.equals(db.getDatabaseName()) 1413 && groupName.equals(db.getGroupName()); 1414 } 1415 } 1416 1417 1422 protected String getDistributionXml() 1423 { 1424 StringBuffer info = new StringBuffer (); 1425 info.append("<" + DatabasesXmlTags.ELT_Distribution + " " 1426 + DatabasesXmlTags.ATT_groupName + "=\"" + groupName + "\">"); 1427 1428 info.append("</" + DatabasesXmlTags.ELT_Distribution + ">"); 1429 return info.toString(); 1430 } 1431 1432 1435 public String [] viewControllerList() 1436 { 1437 if (logger.isInfoEnabled()) 1438 { 1439 logger.info(channel.getLocalMembership() + " see members:" 1440 + currentGroup.getMembers() + " and has mapping:" + controllersMap); 1441 } 1442 String [] members = new String [controllersMap.keySet().size()]; 1443 Iterator iter = controllersMap.keySet().iterator(); 1444 int i = 0; 1445 while (iter.hasNext()) 1446 { 1447 members[i] = (String ) controllersMap.get(iter.next()); 1448 i++; 1449 } 1450 return members; 1451 } 1452 1453 1456 public Hashtable viewGroupBackends() throws VirtualDatabaseException 1457 { 1458 Hashtable map = new Hashtable (controllersMap.size()); 1459 Iterator iter = backendsPerController.keySet().iterator(); 1460 Member member; 1461 while (iter.hasNext()) 1462 { 1463 member = (Member) iter.next(); 1464 map.put(controllersMap.get(member), backendsPerController.get(member)); 1465 } 1466 return map; 1467 } 1468 1469 1480 public ArrayList getBackendsInfo(ArrayList backendsObject) 1481 { 1482 int size = backendsObject.size(); 1483 ArrayList infos = new ArrayList (size); 1484 DatabaseBackend backend; 1485 for (int i = 0; i < size; i++) 1486 { 1487 backend = (DatabaseBackend) backendsObject.get(i); 1488 infos.add(createBackendInfo(backend, false)); 1489 } 1490 return infos; 1491 } 1492 1493 1501 public BackendInfo createBackendInfo(DatabaseBackend backend, boolean useXml) 1502 { 1503 BackendInfo info = new BackendInfo(backend); 1504 if (!useXml) 1505 info.setXml(null); 1506 return info; 1507 } 1508 1509 1512 public void removeBackend(String backend) throws VirtualDatabaseException 1513 { 1514 super.removeBackend(backend); 1515 1516 try 1517 { 1518 broadcastBackendInformation(getAllMemberButUs()); 1520 } 1521 catch (Exception e) 1522 { 1523 String msg = "An error occured while multicasting new backedn information"; 1524 logger.error(msg, e); 1525 throw new VirtualDatabaseException(msg, e); 1526 } 1527 } 1528 1529 1533 public void transferBackend(String backend, String controllerDestination) 1534 throws VirtualDatabaseException 1535 { 1536 Member targetMember = getControllerByName(controllerDestination); 1537 1538 DatabaseBackend db = getAndCheckBackend(backend, CHECK_BACKEND_DISABLE); 1540 String transfertCheckpointName = makeTransferCheckpointName(db, 1541 targetMember); 1542 1543 if (logger.isDebugEnabled()) 1544 logger.debug("**** Disabling backend for transfer"); 1545 1546 try 1548 { 1549 if (!hasRecoveryLog()) 1550 throw new VirtualDatabaseException( 1551 "Transfer is not supported on virtual databases without a recovery log"); 1552 1553 distributedRequestManager.disableBackendForCheckpoint(db, 1554 transfertCheckpointName); 1555 } 1556 catch (SQLException e) 1557 { 1558 throw new VirtualDatabaseException(e.getMessage()); 1559 } 1560 1561 try 1563 { 1564 if (logger.isDebugEnabled()) 1565 logger.debug("**** Sending transfer message to:" + targetMember); 1566 1567 ArrayList dest = new ArrayList (1); 1568 dest.add(targetMember); 1569 1570 multicastRequestAdapter.multicastMessage(dest, new BackendTransfer( 1571 controllerDestination, transfertCheckpointName, createBackendInfo(db, 1572 true)), MulticastRequestAdapter.WAIT_ALL, 1573 CJDBCGroupMessage.defaultCastTimeOut); 1574 1575 if (logger.isDebugEnabled()) 1576 logger.debug("**** Removing local backend"); 1577 1578 removeBackend(db); 1580 1581 broadcastBackendInformation(getAllMemberButUs()); 1583 } 1584 catch (Exception e) 1585 { 1586 String msg = "An error occured while transfering the backend"; 1587 logger.error(msg, e); 1588 throw new VirtualDatabaseException(msg, e); 1589 } 1590 } 1591 1592 1600 private Member getControllerByName(String controllerName) 1601 throws VirtualDatabaseException 1602 { 1603 Iterator iter = controllersMap.entrySet().iterator(); 1605 Member targetMember = null; 1606 while (iter.hasNext()) 1607 { 1608 Entry entry = (Entry) iter.next(); 1609 if (entry.getValue().equals(controllerName)) 1610 { 1611 targetMember = (Member) entry.getKey(); 1612 break; 1613 } 1614 } 1615 if (targetMember == null) 1616 throw new VirtualDatabaseException("Cannot find controller:" 1617 + controllerName + " in group"); 1618 return targetMember; 1619 } 1620 1621 private String makeTransferCheckpointName(DatabaseBackend db, Member dest) 1622 { 1623 return ("transfer-checkpoint: " + db.getName() + " from " 1624 + this.controller.getControllerName() + " to " + dest.getUid() + " " + new Date ()) 1625 .replaceAll(" ", "_"); 1626 } 1627 1628 1643 public void setGroupCheckpoint(String checkpointName, ArrayList groupMembers) 1644 throws VirtualDatabaseException 1645 { 1646 try 1647 { 1648 getMulticastRequestAdapter().multicastMessage(groupMembers, 1649 new SetCheckpoint(checkpointName), MulticastRequestAdapter.WAIT_ALL, 1650 CJDBCGroupMessage.defaultCastTimeOut); 1651 } 1652 catch (Exception e) 1653 { 1654 String msg = "Set group checkpoint failed: checkpointName=" 1655 + checkpointName; 1656 logger.error(msg, e); 1657 throw new VirtualDatabaseException(msg); 1658 } 1659 } 1660 1661 1669 private String setLogReplicationCheckpoint(String controllerName) 1670 throws VirtualDatabaseException 1671 { 1672 String checkpointName = "now-" + new Date (); 1673 1674 ArrayList dest = new ArrayList (); 1675 dest.add(getControllerByName(controllerName)); 1676 dest.add(channel.getLocalMembership()); 1677 setGroupCheckpoint(checkpointName, dest); 1678 return checkpointName; 1679 } 1680 1681 1685 public void copyLogFromCheckpoint(String dumpName, String controllerName) 1686 throws VirtualDatabaseException 1687 { 1688 super.copyLogFromCheckpoint(dumpName, controllerName); 1691 1692 String dumpCheckpointName; 1694 DumpInfo dumpInfo; 1695 1696 try 1697 { 1698 dumpInfo = getRecoveryLog().getDumpInfo(dumpName); 1699 } 1700 catch (SQLException e) 1701 { 1702 throw new VirtualDatabaseException( 1703 "Recovery log error access occured while checking for dump" 1704 + dumpName, e); 1705 } 1706 1707 if (dumpInfo == null) 1708 throw new VirtualDatabaseException( 1709 "No information was found in the dump table for dump " + dumpName); 1710 1711 dumpCheckpointName = dumpInfo.getCheckpointName(); 1712 1713 String nowCheckpointName = setLogReplicationCheckpoint(controllerName); 1715 1716 long nowCheckpointId; 1719 RecoveryLog recoveryLog = getRequestManager().getRecoveryLog(); 1720 try 1721 { 1722 nowCheckpointId = recoveryLog.getCheckpointRequestId(nowCheckpointName); 1723 } 1724 catch (SQLException e) 1725 { 1726 String errorMessage = "Can not find 'now checkpoint' log entry"; 1727 logger.error(errorMessage); 1728 throw new VirtualDatabaseException(errorMessage); 1729 } 1730 1731 sendMessageToController(controllerName, new ReplicateLogEntries( 1733 nowCheckpointName, null, nowCheckpointId)); 1734 1735 recoveryLog.beginRecovery(); 1738 1739 1743 try 1746 { 1747 long dumpId = recoveryLog.getCheckpointRequestId(dumpCheckpointName); 1748 long nowId = recoveryLog.getCheckpointRequestId(nowCheckpointName); 1749 1750 for (long id = dumpId; id != nowId; id++) 1751 { 1752 LogEntry entry = recoveryLog.getNextLogEntry(id); 1753 if (entry == null) 1754 { 1755 String errorMessage = "Can not find expected log entry: " + id; 1756 logger.error(errorMessage); 1757 throw new VirtualDatabaseException(errorMessage); 1758 } 1759 1760 id = entry.getId() - 1; 1763 sendMessageToController(controllerName, new CopyLogEntry(entry)); 1764 } 1765 1766 sendMessageToController(controllerName, new ReplicateLogEntries(null, 1768 dumpCheckpointName, dumpId)); 1769 } 1770 catch (SQLException e) 1771 { 1772 String errorMessage = "Failed to send log entries"; 1773 logger.error(errorMessage, e); 1774 throw new VirtualDatabaseException(errorMessage); 1775 } 1776 finally 1777 { 1778 recoveryLog.endRecovery(); } 1780 1781 } 1782 1783 1792 private void sendMessageToController(String controllerName, 1793 Serializable message) throws VirtualDatabaseException 1794 { 1795 try 1796 { 1797 ArrayList dest = new ArrayList (); 1798 dest.add(getControllerByName(controllerName)); 1799 getMulticastRequestAdapter().multicastMessage(dest, message, 1800 MulticastRequestAdapter.WAIT_ALL, 1801 CJDBCGroupMessage.defaultCastTimeOut); 1802 } 1803 catch (Exception e) 1804 { 1805 String errorMessage = message.getClass().getName() + ": send failed"; 1806 logger.error(errorMessage, e); 1807 throw new VirtualDatabaseException(errorMessage); 1808 } 1809 } 1810 1811 1823 public void copyDump(String dumpName, String remoteControllerName) 1824 throws VirtualDatabaseException 1825 { 1826 transferDump(dumpName, remoteControllerName, false); 1827 } 1828 1829 1833 public void transferDump(String dumpName, String remoteControllerName, 1834 boolean noCopy) throws VirtualDatabaseException 1835 { 1836 DumpInfo dumpInfo = null; 1838 try 1839 { 1840 dumpInfo = getRecoveryLog().getDumpInfo(dumpName); 1841 1845 } 1846 catch (SQLException e) 1847 { 1848 String msg = "getting dump info from backup manager failed"; 1849 throw new VirtualDatabaseException(msg, e); 1850 } 1851 1852 if (dumpInfo == null) 1853 throw new VirtualDatabaseException("no dump info for dump '" + dumpName 1854 + "'"); 1855 1856 DumpTransferInfo dumpTransferInfo = null; 1859 if (!noCopy) 1860 { 1861 try 1862 { 1863 dumpTransferInfo = getRequestManager().getBackupManager() 1864 .getBackuperByFormat(dumpInfo.getDumpFormat()).setupServer(); 1865 } 1866 catch (IOException e) 1867 { 1868 throw new VirtualDatabaseException(e); 1869 } 1870 } 1871 1872 sendMessageToController(remoteControllerName, new InitiateDumpCopy( 1875 dumpInfo, dumpTransferInfo)); 1876 1877 } 1878 1879} | Popular Tags |