1 22 23 package org.continuent.sequoia.controller.virtualdatabase.protocol; 24 25 import java.io.IOException ; 26 import java.io.Serializable ; 27 import java.sql.SQLException ; 28 import java.util.ArrayList ; 29 30 import org.continuent.hedera.adapters.MulticastRequestAdapter; 31 import org.continuent.hedera.adapters.MulticastResponse; 32 import org.continuent.hedera.common.Member; 33 import org.continuent.sequoia.common.exceptions.VirtualDatabaseException; 34 import org.continuent.sequoia.common.i18n.Translate; 35 import org.continuent.sequoia.common.log.Trace; 36 import org.continuent.sequoia.controller.recoverylog.RecoveryLog; 37 import org.continuent.sequoia.controller.recoverylog.events.LogEntry; 38 import org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase; 39 40 54 public class ResyncRecoveryLog extends DistributedVirtualDatabaseMessage 55 { 56 private static final long serialVersionUID = 3246850782028970719L; 57 58 private String checkpointName; 59 60 65 public ResyncRecoveryLog(String checkpointName) 66 { 67 this.checkpointName = checkpointName; 68 } 69 70 75 public String getCheckpointName() 76 { 77 return checkpointName; 78 } 79 80 84 public Object handleMessageSingleThreaded(DistributedVirtualDatabase dvdb, 85 Member sender) 86 { 87 return null; 88 } 89 90 94 public Serializable handleMessageMultiThreaded( 95 DistributedVirtualDatabase dvdb, Member sender, 96 Object handleMessageSingleThreadedResult) 97 { 98 if (!dvdb.hasRecoveryLog()) 99 return new VirtualDatabaseException(Translate 100 .get("virtualdatabase.no.recovery.log")); 101 102 RecoveryLog recoveryLog = dvdb.getRequestManager().getRecoveryLog(); 103 104 long commonCheckpointId; 105 try 106 { 107 commonCheckpointId = recoveryLog.getCheckpointLogId(checkpointName); 108 } 109 catch (SQLException e) 110 { 111 return new VirtualDatabaseException("Unable to retrieve checkpoint " 112 + checkpointName, e); 113 } 114 115 119 String nowCheckpointName; 120 try 121 { 122 nowCheckpointName = dvdb.setLogReplicationCheckpoint(sender); 123 } 124 catch (VirtualDatabaseException e) 125 { 126 return e; 127 } 128 129 long nowCheckpointId; 131 Trace logger = dvdb.getLogger(); 132 try 133 { 134 nowCheckpointId = recoveryLog.getCheckpointLogId(nowCheckpointName); 135 } 136 catch (SQLException e) 137 { 138 dvdb.getRequestManager().resumeActivity(); 139 String errorMessage = "Cannot find 'now checkpoint' log entry"; 140 logger.error(errorMessage); 141 return new VirtualDatabaseException(errorMessage); 142 } 143 144 long nbOfEntriesToResync = nowCheckpointId - commonCheckpointId; 148 long diff; 149 try 150 { 151 Serializable replyValue = dvdb.sendMessageToController(sender, 152 new InitiateRecoveryLogResync(checkpointName, commonCheckpointId, 153 nowCheckpointName, nbOfEntriesToResync), dvdb 154 .getMessageTimeouts().getReplicateLogEntriesTimeout()); 155 if (replyValue instanceof Long ) 156 diff = ((Long ) replyValue).longValue(); 157 else 158 throw new RuntimeException ( 159 "Invalid answer from remote controller on InitiateRecoveryLogResync (" 160 + replyValue + ")"); 161 } 162 catch (Exception e) 163 { 164 String errorMessage = "Failed to initialize recovery log resynchronization"; 165 logger.error(errorMessage, e); 166 return new VirtualDatabaseException(errorMessage, e); 167 } 168 finally 169 { 170 dvdb.getRequestManager().resumeActivity(); 171 } 172 173 if (logger.isDebugEnabled()) 174 { 175 logger.debug("Resynchronizing from checkpoint " + checkpointName + " (" 176 + commonCheckpointId + " to checkpoint " + nowCheckpointName + " (" 177 + nowCheckpointId + ")"); 178 } 179 180 recoveryLog.beginRecovery(); 183 184 try 188 { 189 ArrayList dest = new ArrayList (); 190 dest.add(sender); 191 long copyLogEntryTimeout = dvdb.getMessageTimeouts() 192 .getCopyLogEntryTimeout(); 193 for (long id = commonCheckpointId; id != nowCheckpointId; id++) 194 { 195 LogEntry entry = recoveryLog.getNextLogEntry(id); 196 if (entry == null) 197 { 198 String errorMessage = "Cannot find expected log entry: " + id; 199 logger.error(errorMessage); 200 return new VirtualDatabaseException(errorMessage); 201 } 202 203 id = entry.getLogId() - 1; 206 entry.setLogId(entry.getLogId() + diff); 207 if (entry.getCompletionLogId() > 0) 208 entry.setCompletionLogId(entry.getCompletionLogId() + diff); 209 210 MulticastResponse resp = dvdb.getMulticastRequestAdapter() 211 .multicastMessage(dest, new CopyLogEntry(entry), 212 MulticastRequestAdapter.WAIT_NONE, copyLogEntryTimeout); 213 if (resp.getFailedMembers() != null) 214 throw new IOException ("Failed to deliver log entry " + id 215 + " to remote controller " + sender); 216 } 217 } 218 catch (Exception e) 219 { 220 String errorMessage = "Failed to complete recovery log resynchronization"; 221 logger.error(errorMessage, e); 222 return new VirtualDatabaseException(errorMessage, e); 223 } 224 finally 225 { 226 recoveryLog.endRecovery(); } 228 229 try 233 { 234 long localNbOfLogEntries = recoveryLog.getNumberOfLogEntries( 235 commonCheckpointId, nowCheckpointId); 236 237 if (logger.isDebugEnabled()) 238 logger.debug("Checking that " + localNbOfLogEntries 239 + " entries were replicated on remote controller"); 240 241 Serializable replyValue = dvdb.sendMessageToController(sender, 242 new CompleteRecoveryLogResync(checkpointName, nowCheckpointName, 243 localNbOfLogEntries), dvdb.getMessageTimeouts() 244 .getReplicateLogEntriesTimeout()); 245 if (replyValue instanceof Long ) 246 { 247 diff = ((Long ) replyValue).longValue(); 248 if (diff != 0) 249 return new VirtualDatabaseException( 250 "Recovery log resynchronization reports a difference of " + diff 251 + " entries"); 252 } 253 else 254 throw new RuntimeException ( 255 "Invalid answer from remote controller on CompleteRecoveryLogResync (" 256 + replyValue + ")"); 257 } 258 catch (Exception e) 259 { 260 String errorMessage = "Failed to initialize recovery log resynchronization"; 261 logger.error(errorMessage, e); 262 return new VirtualDatabaseException(errorMessage, e); 263 } 264 265 return null; 266 } 267 268 } 269 | Popular Tags |