KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > continuent > sequoia > controller > virtualdatabase > protocol > ResyncRecoveryLog


1 /**
2  * Sequoia: Database clustering technology.
3  * Copyright (C) 2005 Emic Networks.
4  * Contact: sequoia@continuent.org
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * Initial developer(s): Olivier Fambon.
20  * Contributor(s): ______________________.
21  */

22
23 package org.continuent.sequoia.controller.virtualdatabase.protocol;
24
25 import java.io.IOException JavaDoc;
26 import java.io.Serializable JavaDoc;
27 import java.sql.SQLException JavaDoc;
28 import java.util.ArrayList JavaDoc;
29
30 import org.continuent.hedera.adapters.MulticastRequestAdapter;
31 import org.continuent.hedera.adapters.MulticastResponse;
32 import org.continuent.hedera.common.Member;
33 import org.continuent.sequoia.common.exceptions.VirtualDatabaseException;
34 import org.continuent.sequoia.common.i18n.Translate;
35 import org.continuent.sequoia.common.log.Trace;
36 import org.continuent.sequoia.controller.recoverylog.RecoveryLog;
37 import org.continuent.sequoia.controller.recoverylog.events.LogEntry;
38 import org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase;
39
40 /**
41  * This message is used attempt an automatic recovery log resynchronization when
42  * a vdb is loaded on a controller that is not the first in it's group. Upon
43  * reception of this message, the 'mother' vdb checks to see if the specified
44  * checkpoint exists. If not, the call fails, and the sender should try another
45  * chekpoint name. If the specified checkpoint exists, a 'now' checkpoint is set
46  * (cluster-wide) and the call returns with this object containing the name of
47  * this checkpoint and the size of the chunck or LogEntries that it will send.
48  *
49  * @see org.continuent.sequoia.controller.virtualdatabase.protocol.ReplicateLogEntries
50  * @see org.continuent.sequoia.controller.virtualdatabase.protocol.CopyLogEntry
51  * @author <a HREF="mailto:Olivier.Fambon@continuent.com>Olivier Fambon </a>
52  * @version 1.0
53  */

54 public class ResyncRecoveryLog extends DistributedVirtualDatabaseMessage
55 {
56   private static final long serialVersionUID = 3246850782028970719L;
57
58   private String JavaDoc checkpointName;
59
60   /**
61    * Creates a new <code>ResyncRecoveryLog</code> message
62    *
63    * @param checkpointName The checkpoint from which to resync the log.
64    */

65   public ResyncRecoveryLog(String JavaDoc checkpointName)
66   {
67     this.checkpointName = checkpointName;
68   }
69
70   /**
71    * Returns the checkpointName value.
72    *
73    * @return Returns the 'now' checkpointName.
74    */

75   public String JavaDoc getCheckpointName()
76   {
77     return checkpointName;
78   }
79
80   /**
81    * @see org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedVirtualDatabaseMessage#handleMessageSingleThreaded(org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase,
82    * org.continuent.hedera.common.Member)
83    */

84   public Object JavaDoc handleMessageSingleThreaded(DistributedVirtualDatabase dvdb,
85       Member sender)
86   {
87     return null;
88   }
89
90   /**
91    * @see org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedVirtualDatabaseMessage#handleMessageMultiThreaded(org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase,
92    * org.continuent.hedera.common.Member, java.lang.Object)
93    */

94   public Serializable JavaDoc handleMessageMultiThreaded(
95       DistributedVirtualDatabase dvdb, Member sender,
96       Object JavaDoc handleMessageSingleThreadedResult)
97   {
98     if (!dvdb.hasRecoveryLog())
99       return new VirtualDatabaseException(Translate
100           .get("virtualdatabase.no.recovery.log"));
101
102     RecoveryLog recoveryLog = dvdb.getRequestManager().getRecoveryLog();
103
104     long commonCheckpointId;
105     try
106     {
107       commonCheckpointId = recoveryLog.getCheckpointLogId(checkpointName);
108     }
109     catch (SQLException JavaDoc e)
110     {
111       return new VirtualDatabaseException("Unable to retrieve checkpoint "
112           + checkpointName, e);
113     }
114
115     /* set a global 'now' checkpoint (temporary)
116      * When this request completes all activity on the system is
117      * suspended and must be resumed by calling RequestManager.resumeActivity()
118      */

119     String JavaDoc nowCheckpointName;
120     try
121     {
122       nowCheckpointName = dvdb.setLogReplicationCheckpoint(sender);
123     }
124     catch (VirtualDatabaseException e)
125     {
126       return e;
127     }
128
129     // get it's id (ewerk) so that we can replicate it on the other side
130
long nowCheckpointId;
131     Trace logger = dvdb.getLogger();
132     try
133     {
134       nowCheckpointId = recoveryLog.getCheckpointLogId(nowCheckpointName);
135     }
136     catch (SQLException JavaDoc e)
137     {
138       dvdb.getRequestManager().resumeActivity();
139       String JavaDoc errorMessage = "Cannot find 'now checkpoint' log entry";
140       logger.error(errorMessage);
141       return new VirtualDatabaseException(errorMessage);
142     }
143
144     // Compute the number of entries to be replayed and send it to the remote
145
// controller so that it can allocate the proper number of entries in its
146
// recovery log
147
long nbOfEntriesToResync = nowCheckpointId - commonCheckpointId;
148     long diff;
149     try
150     {
151       Serializable JavaDoc replyValue = dvdb.sendMessageToController(sender,
152           new InitiateRecoveryLogResync(checkpointName, commonCheckpointId,
153               nowCheckpointName, nbOfEntriesToResync), dvdb
154               .getMessageTimeouts().getReplicateLogEntriesTimeout());
155       if (replyValue instanceof Long JavaDoc)
156         diff = ((Long JavaDoc) replyValue).longValue();
157       else
158         throw new RuntimeException JavaDoc(
159             "Invalid answer from remote controller on InitiateRecoveryLogResync ("
160                 + replyValue + ")");
161     }
162     catch (Exception JavaDoc e)
163     {
164       String JavaDoc errorMessage = "Failed to initialize recovery log resynchronization";
165       logger.error(errorMessage, e);
166       return new VirtualDatabaseException(errorMessage, e);
167     }
168     finally
169     {
170       dvdb.getRequestManager().resumeActivity();
171     }
172     
173     if (logger.isDebugEnabled())
174     {
175       logger.debug("Resynchronizing from checkpoint " + checkpointName + " ("
176           + commonCheckpointId + " to checkpoint " + nowCheckpointName + " ("
177           + nowCheckpointId + ")");
178     }
179
180     // protect from concurrent log updates: fake a recovery (increments
181
// semaphore)
182
recoveryLog.beginRecovery();
183
184     // copy the entries over to the remote controller.
185
// Send them one by one over to the remote controller, coz each LogEntry can
186
// potentially be huge (e.g. if it contains a blob)
187
try
188     {
189       ArrayList JavaDoc dest = new ArrayList JavaDoc();
190       dest.add(sender);
191       long copyLogEntryTimeout = dvdb.getMessageTimeouts()
192           .getCopyLogEntryTimeout();
193       for (long id = commonCheckpointId; id != nowCheckpointId; id++)
194       {
195         LogEntry entry = recoveryLog.getNextLogEntry(id);
196         if (entry == null)
197         {
198           String JavaDoc errorMessage = "Cannot find expected log entry: " + id;
199           logger.error(errorMessage);
200           return new VirtualDatabaseException(errorMessage);
201         }
202
203         // Because 'getNextLogEntry()' will hunt for the next valid log entry,
204
// we need to update the iterator with the new id value - 1
205
id = entry.getLogId() - 1;
206         entry.setLogId(entry.getLogId() + diff);
207         if (entry.getCompletionLogId() > 0)
208           entry.setCompletionLogId(entry.getCompletionLogId() + diff);
209         
210         MulticastResponse resp = dvdb.getMulticastRequestAdapter()
211             .multicastMessage(dest, new CopyLogEntry(entry),
212                 MulticastRequestAdapter.WAIT_NONE, copyLogEntryTimeout);
213         if (resp.getFailedMembers() != null)
214           throw new IOException JavaDoc("Failed to deliver log entry " + id
215               + " to remote controller " + sender);
216       }
217     }
218     catch (Exception JavaDoc e)
219     {
220       String JavaDoc errorMessage = "Failed to complete recovery log resynchronization";
221       logger.error(errorMessage, e);
222       return new VirtualDatabaseException(errorMessage, e);
223     }
224     finally
225     {
226       recoveryLog.endRecovery(); // release semaphore
227
}
228
229     // Now check that no entry was missed by the other controller since we
230
// shipped all entries asynchronously without getting any individual ack
231
// (much faster to address SEQUOIA-504)
232
try
233     {
234       long localNbOfLogEntries = recoveryLog.getNumberOfLogEntries(
235           commonCheckpointId, nowCheckpointId);
236
237       if (logger.isDebugEnabled())
238         logger.debug("Checking that " + localNbOfLogEntries
239             + " entries were replicated on remote controller");
240
241       Serializable JavaDoc replyValue = dvdb.sendMessageToController(sender,
242           new CompleteRecoveryLogResync(checkpointName, nowCheckpointName,
243               localNbOfLogEntries), dvdb.getMessageTimeouts()
244               .getReplicateLogEntriesTimeout());
245       if (replyValue instanceof Long JavaDoc)
246       {
247         diff = ((Long JavaDoc) replyValue).longValue();
248         if (diff != 0)
249           return new VirtualDatabaseException(
250               "Recovery log resynchronization reports a difference of " + diff
251                   + " entries");
252       }
253       else
254         throw new RuntimeException JavaDoc(
255             "Invalid answer from remote controller on CompleteRecoveryLogResync ("
256                 + replyValue + ")");
257     }
258     catch (Exception JavaDoc e)
259     {
260       String JavaDoc errorMessage = "Failed to initialize recovery log resynchronization";
261       logger.error(errorMessage, e);
262       return new VirtualDatabaseException(errorMessage, e);
263     }
264
265     return null;
266   }
267
268 }
269
Popular Tags