KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > continuent > sequoia > controller > virtualdatabase > DistributedVirtualDatabase


1 /**
2  * Sequoia: Database clustering technology.
3  * Copyright (C) 2002-2004 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
6  * Copyright (C) 2005-2006 Continuent, Inc.
7  * Contact: sequoia@continuent.org
8  *
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  * http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  *
21  * Initial developer(s): Emmanuel Cecchet.
22  * Contributor(s): Olivier Fambon, Damian Arregui, Karl Cassaigne.
23  */

24
25 package org.continuent.sequoia.controller.virtualdatabase;
26
27 import java.io.IOException JavaDoc;
28 import java.io.InputStream JavaDoc;
29 import java.io.Serializable JavaDoc;
30 import java.sql.SQLException JavaDoc;
31 import java.util.ArrayList JavaDoc;
32 import java.util.Collection JavaDoc;
33 import java.util.Collections JavaDoc;
34 import java.util.HashMap JavaDoc;
35 import java.util.Hashtable JavaDoc;
36 import java.util.Iterator JavaDoc;
37 import java.util.LinkedList JavaDoc;
38 import java.util.List JavaDoc;
39 import java.util.Map JavaDoc;
40 import java.util.Properties JavaDoc;
41 import java.util.Map.Entry;
42
43 import org.continuent.hedera.adapters.MessageListener;
44 import org.continuent.hedera.adapters.MulticastRequestAdapter;
45 import org.continuent.hedera.adapters.MulticastRequestListener;
46 import org.continuent.hedera.adapters.MulticastResponse;
47 import org.continuent.hedera.channel.AbstractReliableGroupChannel;
48 import org.continuent.hedera.channel.ChannelException;
49 import org.continuent.hedera.channel.NotConnectedException;
50 import org.continuent.hedera.common.Group;
51 import org.continuent.hedera.common.GroupIdentifier;
52 import org.continuent.hedera.common.IpAddress;
53 import org.continuent.hedera.common.Member;
54 import org.continuent.hedera.factory.AbstractGroupCommunicationFactory;
55 import org.continuent.hedera.gms.AbstractGroupMembershipService;
56 import org.continuent.hedera.gms.GroupMembershipListener;
57 import org.continuent.sequoia.common.exceptions.ControllerException;
58 import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
59 import org.continuent.sequoia.common.exceptions.SequoiaException;
60 import org.continuent.sequoia.common.exceptions.VirtualDatabaseException;
61 import org.continuent.sequoia.common.i18n.Translate;
62 import org.continuent.sequoia.common.jmx.management.BackendInfo;
63 import org.continuent.sequoia.common.jmx.management.DumpInfo;
64 import org.continuent.sequoia.common.jmx.notifications.SequoiaNotificationList;
65 import org.continuent.sequoia.common.log.Trace;
66 import org.continuent.sequoia.common.sql.metadata.MetadataContainer;
67 import org.continuent.sequoia.common.users.VirtualDatabaseUser;
68 import org.continuent.sequoia.common.util.Constants;
69 import org.continuent.sequoia.common.xml.DatabasesXmlTags;
70 import org.continuent.sequoia.controller.backend.DatabaseBackend;
71 import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
72 import org.continuent.sequoia.controller.backup.DumpTransferInfo;
73 import org.continuent.sequoia.controller.core.Controller;
74 import org.continuent.sequoia.controller.recoverylog.RecoveryLog;
75 import org.continuent.sequoia.controller.recoverylog.events.LogEntry;
76 import org.continuent.sequoia.controller.requestmanager.RAIDbLevels;
77 import org.continuent.sequoia.controller.requestmanager.RequestManager;
78 import org.continuent.sequoia.controller.requestmanager.distributed.ControllerFailureCleanupThread;
79 import org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager;
80 import org.continuent.sequoia.controller.requests.AbstractRequest;
81 import org.continuent.sequoia.controller.sql.schema.DatabaseSchema;
82 import org.continuent.sequoia.controller.virtualdatabase.management.RestoreLogOperation;
83 import org.continuent.sequoia.controller.virtualdatabase.management.TransferBackendOperation;
84 import org.continuent.sequoia.controller.virtualdatabase.management.TransferDumpOperation;
85 import org.continuent.sequoia.controller.virtualdatabase.protocol.AddVirtualDatabaseUser;
86 import org.continuent.sequoia.controller.virtualdatabase.protocol.BackendStatus;
87 import org.continuent.sequoia.controller.virtualdatabase.protocol.BackendTransfer;
88 import org.continuent.sequoia.controller.virtualdatabase.protocol.CompleteRecoveryLogResync;
89 import org.continuent.sequoia.controller.virtualdatabase.protocol.ControllerInformation;
90 import org.continuent.sequoia.controller.virtualdatabase.protocol.CopyLogEntry;
91 import org.continuent.sequoia.controller.virtualdatabase.protocol.DisableBackendsAndSetCheckpoint;
92 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRequest;
93 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedTransactionMarker;
94 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedVirtualDatabaseMessage;
95 import org.continuent.sequoia.controller.virtualdatabase.protocol.GetPreparedStatementMetadata;
96 import org.continuent.sequoia.controller.virtualdatabase.protocol.GetStaticMetadata;
97 import org.continuent.sequoia.controller.virtualdatabase.protocol.InitiateDumpCopy;
98 import org.continuent.sequoia.controller.virtualdatabase.protocol.IsValidUserForAllBackends;
99 import org.continuent.sequoia.controller.virtualdatabase.protocol.MessageTimeouts;
100 import org.continuent.sequoia.controller.virtualdatabase.protocol.RemoveVirtualDatabaseUser;
101 import org.continuent.sequoia.controller.virtualdatabase.protocol.ReplicateLogEntries;
102 import org.continuent.sequoia.controller.virtualdatabase.protocol.ResyncRecoveryLog;
103 import org.continuent.sequoia.controller.virtualdatabase.protocol.VirtualDatabaseConfiguration;
104 import org.continuent.sequoia.controller.virtualdatabase.protocol.VirtualDatabaseConfigurationResponse;
105
106 /**
107  * A <code>DistributedVirtualDatabase</code> is a virtual database hosted by
108  * several controllers. Communication between the controllers is achieved with
109  * reliable multicast provided by Javagroups.
110  *
111  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
112  * @author <a HREF="mailto:Damian.Arregui@continuent.com">Damian Arregui</a>
113  * @version 1.0
114  */

115 public class DistributedVirtualDatabase extends VirtualDatabase
116     implements
117       MessageListener,
118       MulticastRequestListener,
119       GroupMembershipListener
120 {
121   //
122
// How the code is organized ?
123
//
124
// 1. Member variables
125
// 2. Constructor(s)
126
// 3. Request handling
127
// 4. Transaction handling
128
// 5. Database backend management
129
// 6. Distribution management (multicast)
130
// 7. Getter/Setter (possibly in alphabetical order)
131
//
132

133   // Distribution
134

135   /** Group name */
136   private String JavaDoc groupName = null;
137   /**
138    * Hashtable&lt;Member,String&gt;, the String being the controller JMX name
139    * corresponding to the Member
140    */

141   private Hashtable JavaDoc controllerJmxAddress;
142   /** Hashtable&lt;Member, Long&gt; */
143   private Hashtable JavaDoc controllerIds;
144   /** Hashtable&lt;Member, List&lt;DatabaseBackend&gt;&gt; */
145   private Hashtable JavaDoc backendsPerController;
146
147   /** Hedera channel */
148   private AbstractReliableGroupChannel channel = null;
149   /** MessageDispatcher to communicate with the group */
150   private MulticastRequestAdapter multicastRequestAdapter = null;
151   private MessageTimeouts messageTimeouts;
152   private Group currentGroup = null;
153   private ArrayList JavaDoc allMemberButUs = null;
154   /**
155    * Used by VirtualDatabaseConfiguration if a remote controller config is not
156    * compatible
157    */

158   public static final long INCOMPATIBLE_CONFIGURATION = -1;
159   private boolean isVirtualDatabaseStarted = false;
160
161   /**
162    * Our view of the request manager, same as super.requestManager, only just
163    * typed properly.
164    */

165   private DistributedRequestManager distributedRequestManager;
166   private boolean processMacroBeforeBroadcast;
167   /**
168    * List of threads that are cleaning up resources allocated by a dead remote
169    * controller
170    */

171   private Hashtable JavaDoc cleanupThreads;
172   /**
173    * Stores the "flushed writes" status for each failed controller: true if
174    * writes have been flushed by the clean-up thread, false otherwise.
175    */

176   private HashMap JavaDoc writesFlushed;
177   /**
178    * Maximum time in ms allowed for clients to failover in case of a controller
179    * failure
180    */

181   private long failoverTimeoutInMs;
182   /**
183    * Cache of request results to allow transparent failover if a failure occurs
184    * during a write request.
185    */

186   private RequestResultFailoverCache requestResultFailoverCache;
187
188   /** Logger for distributed request execution */
189   private Trace distributedRequestLogger;
190   private String JavaDoc hederaPropertiesFile;
191
192   private static final Object JavaDoc MESSAGES_IN_HANDLER_SYNC = new Object JavaDoc();
193   private int messagesInHandlers = 0;
194   private boolean channelShuttingDown = false;
195   private boolean isResynchingFlag;
196   private Hashtable JavaDoc controllerPersistentConnectionsRecovered = new Hashtable JavaDoc();
197   private Hashtable JavaDoc controllerTransactionsRecovered = new Hashtable JavaDoc();
198
199   /** JVM-wide group communication factory */
200   private static AbstractGroupCommunicationFactory groupCommunicationFactory = null;
201
202   /**
203    * Creates a new <code>DistributedVirtualDatabase</code> instance.
204    *
205    * @param controller the controller we belong to
206    * @param name the virtual database name
207    * @param groupName the virtual database group name
208    * @param maxConnections maximum number of concurrent connections.
209    * @param pool should we use a pool of threads for handling connections?
210    * @param minThreads minimum number of threads in the pool
211    * @param maxThreads maximum number of threads in the pool
212    * @param maxThreadIdleTime maximum time a thread can remain idle before being
213    * removed from the pool.
214    * @param clientFailoverTimeoutInMs maximum time for clients to failover in
215    * case of a controller failure
216    * @param sqlShortFormLength maximum number of characters of an SQL statement
217    * to display in traces or exceptions
218    * @param useStaticResultSetMetaData true if DatabaseResultSetMetaData should
219    * use static fields or try to fetch the metadata from the underlying
220    * database
221    * @param hederaPropertiesFile Hedera properties file defines the group
222    * communication factory and its parameters
223    */

224   public DistributedVirtualDatabase(Controller controller, String JavaDoc name,
225       String JavaDoc groupName, int maxConnections, boolean pool, int minThreads,
226       int maxThreads, long maxThreadIdleTime, int sqlShortFormLength,
227       long clientFailoverTimeoutInMs, boolean useStaticResultSetMetaData,
228       String JavaDoc hederaPropertiesFile)
229   {
230     super(controller, name, maxConnections, pool, minThreads, maxThreads,
231         maxThreadIdleTime, sqlShortFormLength, useStaticResultSetMetaData);
232
233     this.groupName = groupName;
234     this.processMacroBeforeBroadcast = true;
235     this.failoverTimeoutInMs = clientFailoverTimeoutInMs;
236     requestResultFailoverCache = new RequestResultFailoverCache(logger,
237         failoverTimeoutInMs);
238     backendsPerController = new Hashtable JavaDoc();
239     controllerJmxAddress = new Hashtable JavaDoc();
240     controllerIds = new Hashtable JavaDoc();
241     cleanupThreads = new Hashtable JavaDoc();
242     writesFlushed = new HashMap JavaDoc();
243     isVirtualDatabaseStarted = false;
244     distributedRequestLogger = Trace
245         .getLogger("org.continuent.sequoia.controller.distributedvirtualdatabase.request."
246             + name);
247     this.hederaPropertiesFile = hederaPropertiesFile;
248     this.totalOrderQueue = new LinkedList JavaDoc();
249   }
250
251   /**
252    * Disconnect the channel and close it.
253    *
254    * @see java.lang.Object#finalize()
255    */

256   protected void finalize() throws Throwable JavaDoc
257   {
258     quitChannel();
259     super.finalize();
260   }
261
262   /**
263    * This method handle the scheduling part of the queries to be sure that the
264    * query is scheduled in total order before letting other queries to execute.
265    *
266    * @see org.continuent.hedera.adapters.MulticastRequestListener#handleMessageSingleThreaded(java.io.Serializable,
267    * org.continuent.hedera.common.Member)
268    */

269   public Object JavaDoc handleMessageSingleThreaded(Serializable JavaDoc msg, Member sender)
270   {
271     synchronized (MESSAGES_IN_HANDLER_SYNC)
272     {
273       if (channelShuttingDown)
274         return MESSAGES_IN_HANDLER_SYNC;
275       messagesInHandlers++;
276     }
277
278     try
279     {
280       if (msg != null)
281       {
282         if (logger.isDebugEnabled())
283           logger.debug("handleMessageSingleThreaded (" + msg.getClass() + "): "
284               + msg);
285
286         if (msg instanceof DistributedVirtualDatabaseMessage)
287         {
288           return ((DistributedVirtualDatabaseMessage) msg)
289               .handleMessageSingleThreaded(this, sender);
290         }
291         // Other message types will be handled in multithreaded handler
292
}
293       else
294       {
295         String JavaDoc errorMsg = "Invalid null message";
296         logger.error(errorMsg);
297         return new ControllerException(errorMsg);
298       }
299
300       return null;
301     }
302     catch (Exception JavaDoc e)
303     {
304       if (e instanceof RuntimeException JavaDoc)
305         logger.warn("Error while handling group message:" + msg.getClass(), e);
306       return e;
307     }
308   }
309
310   /**
311    * @see org.continuent.hedera.adapters.MulticastRequestListener#handleMessageMultiThreaded(java.io.Serializable,
312    * org.continuent.hedera.common.Member, java.lang.Object)
313    */

314   public Serializable JavaDoc handleMessageMultiThreaded(Serializable JavaDoc msg,
315       Member sender, Object JavaDoc handleMessageSingleThreadedResult)
316   {
317     // Check if we are shutting down first
318
if (msg == MESSAGES_IN_HANDLER_SYNC)
319       return null;
320
321     try
322     {
323       if (msg == null)
324       {
325         String JavaDoc errorMsg = "Invalid null message";
326         logger.error(errorMsg);
327         return new ControllerException(errorMsg);
328       }
329
330       if (logger.isDebugEnabled())
331         logger.debug("handleMessageMultiThreaded (" + msg.getClass() + "): "
332             + msg);
333       if (msg instanceof DistributedVirtualDatabaseMessage)
334       {
335         return ((DistributedVirtualDatabaseMessage) msg)
336             .handleMessageMultiThreaded(this, sender,
337                 handleMessageSingleThreadedResult);
338       }
339       else
340         logger.warn("Unhandled message type received: " + msg.getClass() + "("
341             + msg + ")");
342
343       return null;
344     }
345     catch (Exception JavaDoc e)
346     {
347       if (e instanceof RuntimeException JavaDoc)
348         logger.warn("Error while handling group message: " + msg.getClass(), e);
349       return e;
350     }
351     finally
352     {
353       synchronized (MESSAGES_IN_HANDLER_SYNC)
354       {
355         messagesInHandlers--;
356         if (messagesInHandlers <= 0)
357           MESSAGES_IN_HANDLER_SYNC.notifyAll();
358       }
359
360       if (msg != null)
361       {
362         // Just in case something bad happen and the request was not properly
363
// removed from the queue.
364
if (msg instanceof DistributedRequest)
365         {
366           synchronized (totalOrderQueue)
367           {
368             if (totalOrderQueue.remove(((DistributedRequest) msg).getRequest()))
369             {
370               if (logger.isWarnEnabled())
371                 logger.warn("Distributed request "
372                     + ((DistributedRequest) msg).getRequest().getSqlShortForm(
373                         getSqlShortFormLength())
374                     + " did not remove itself from the total order queue");
375               totalOrderQueue.notifyAll();
376             }
377           }
378         }
379         else if (msg instanceof DistributedTransactionMarker)
380         {
381           synchronized (totalOrderQueue)
382           {
383             if (totalOrderQueue.remove(msg))
384             {
385               logger.warn("Distributed " + msg.toString() + " did not remove "
386                   + "itself from the total order queue");
387               totalOrderQueue.notifyAll();
388             }
389           }
390         }
391       }
392     }
393   }
394
395   public void cancelMessage(Serializable JavaDoc msg)
396   {
397     if (msg instanceof DistributedVirtualDatabaseMessage)
398     {
399       ((DistributedVirtualDatabaseMessage) msg).cancel(this);
400     }
401     else
402       logger.warn("Unhandled message type received: " + msg.getClass() + "("
403           + msg + ")");
404   }
405
406   /**
407    * @see org.continuent.hedera.gms.GroupMembershipListener#failedMember(org.continuent.hedera.common.Member,
408    * org.continuent.hedera.common.GroupIdentifier,
409    * org.continuent.hedera.common.Member)
410    */

411   public void failedMember(Member failed, GroupIdentifier gid, Member sender)
412   {
413     quitMember(failed, gid);
414   }
415
416   /**
417    * @see org.continuent.hedera.gms.GroupMembershipListener#groupComposition(org.continuent.hedera.common.Group,
418    * org.continuent.hedera.common.IpAddress, int)
419    */

420   public void groupComposition(Group g, IpAddress sender, int gmsStatus)
421   {
422     // Just ignore
423
}
424
425   /**
426    * @see org.continuent.hedera.gms.GroupMembershipListener#networkPartition(org.continuent.hedera.common.GroupIdentifier,
427    * java.util.List)
428    */

429   public void networkPartition(GroupIdentifier gid, List JavaDoc mergedGroupCompositions)
430   {
431     // We already left the group. Late notification. Ignore.
432
if ((channel == null) || channelShuttingDown)
433       return;
434
435     if (gid.equals(currentGroup.getGroupIdentifier()))
436     {
437       if (logger.isFatalEnabled())
438         logger.fatal("Network partition detected in group " + gid + ".");
439
440       // Decide which controller will survive
441
Collections.sort(mergedGroupCompositions);
442       if (logger.isFatalEnabled())
443         logger
444             .fatal(mergedGroupCompositions.get(0) + " will remain as master.");
445       if (!(mergedGroupCompositions.get(0).equals(channel.getLocalMembership())))
446       {
447         if (logger.isFatalEnabled())
448           logger.fatal("Forcing virtual database shutdown here at "
449               + channel.getLocalMembership() + ".");
450         shutdown(Constants.SHUTDOWN_FORCE);
451       }
452       else
453       {
454         if (logger.isFatalEnabled())
455           logger.fatal("Virtual database here at "
456               + channel.getLocalMembership() + " remaining as master.");
457       }
458     }
459   }
460
461   /**
462    * Makes this virtual database join a virtual database group. Those groups are
463    * mapped to JavaGroups groups.
464    *
465    * @exception Exception if an error occurs
466    */

467   public void joinGroup() throws Exception JavaDoc
468   {
469     RecoveryLog recoveryLog = distributedRequestManager.getRecoveryLog();
470     if (recoveryLog == null)
471     {
472       String JavaDoc msg = "Distributed virtual database cannot be used without a recovery log defined.";
473       if (logger.isFatalEnabled())
474         logger.fatal(msg);
475       throw new SequoiaException(msg);
476     }
477
478     try
479     {
480       Properties JavaDoc p = new Properties JavaDoc();
481       InputStream JavaDoc is = this.getClass()
482           .getResourceAsStream(hederaPropertiesFile);
483       if (is == null)
484       {
485         if (logger.isFatalEnabled())
486           logger.fatal(Translate.get(
487               "fatal.distributed.no.group.communication.properties",
488               hederaPropertiesFile));
489         endUserLogger.fatal(Translate.get(
490             "fatal.distributed.no.group.communication.properties",
491             hederaPropertiesFile));
492         throw new SequoiaException(
493             "Join group failed because Hedera properties file was not found.");
494       }
495       if (logger.isInfoEnabled())
496         logger.info("Using Hedera properties file: " + hederaPropertiesFile);
497       p.load(is);
498       is.close();
499
500       if (groupCommunicationFactory == null)
501       {
502         groupCommunicationFactory = (AbstractGroupCommunicationFactory) Class
503             .forName(p.getProperty("hedera.factory")).newInstance();
504       }
505       Object JavaDoc[] ret = groupCommunicationFactory
506           .createChannelAndGroupMembershipService(p, new GroupIdentifier(
507               groupName));
508       AbstractGroupMembershipService gms = (AbstractGroupMembershipService) ret[1];
509       gms.registerGroupMembershipListener(this);
510       channel = (AbstractReliableGroupChannel) ret[0];
511
512       if (logger.isDebugEnabled())
513         logger.debug("Group communication channel is configured as follows: "
514             + channel);
515
516       // Join the group
517
channel.join();
518       currentGroup = channel.getGroup();
519       multicastRequestAdapter = new MulticastRequestAdapter(channel // group
520
// channel
521
, this /* MessageListener */
522           , this /* MulticastRequestListener */
523       );
524       multicastRequestAdapter.start();
525
526       // Let the MulticastRequestAdapter thread pump the membership out of the
527
// JGroups channel.
528
Thread.sleep(2000);
529
530       logger.info("Group " + groupName + " connected to "
531           + channel.getLocalMembership());
532
533       // Add ourselves to the list of controllers
534
controllerJmxAddress.put(channel.getLocalMembership(), controller
535           .getJmxName());
536
537       long controllerId;
538
539       // Check if we are alone or not
540
List JavaDoc currentGroupMembers = currentGroup.getMembers();
541       int groupSize = currentGroupMembers.size();
542       if (groupSize == 1)
543       {
544         logger.info(Translate.get(
545             "virtualdatabase.distributed.configuration.first.in.group",
546             groupName));
547         allMemberButUs = new ArrayList JavaDoc();
548         controllerId = 0;
549         distributedRequestManager.setControllerId(controllerId);
550         // Init backends states from persistence base
551
distributedRequestManager
552             .initBackendsLastKnownCheckpointFromRecoveryLog();
553         recoveryLog.checkRecoveryLogConsistency();
554       }
555       else
556       {
557         logger.info("Group now contains " + groupSize + " controllers.");
558         if (logger.isDebugEnabled())
559         {
560           logger.debug("Current list of controllers is as follows:");
561           for (Iterator JavaDoc iter = currentGroupMembers.iterator(); iter.hasNext();)
562             logger.debug("Controller " + iter.next());
563         }
564
565         refreshGroupMembership(); // also updates allMemberButUs
566

567         // Check with the other controller that our config is compatible
568
controllerId = checkConfigurationCompatibilityAndReturnControllerId(getAllMemberButUs());
569         if (controllerId == INCOMPATIBLE_CONFIGURATION)
570         {
571           String JavaDoc msg = Translate
572               .get("virtualdatabase.distributed.configuration.not.compatible");
573           logger.error(msg);
574           throw new ControllerException(msg);
575         }
576         else
577         {
578           // In case several controllers join at the same time they would get
579
// the
580
// same highest controller id value and here we discriminate them by
581
// adding their position in the membership. This assumes that the
582
// membership is ordered the same way at all nodes.
583
controllerId += currentGroupMembers.indexOf(channel
584               .getLocalMembership());
585
586           if (logger.isInfoEnabled())
587           {
588             logger.info(Translate
589                 .get("virtualdatabase.distributed.configuration.compatible"));
590             logger.info("Controller identifier is set to: " + controllerId);
591           }
592           // Set the controller Id
593
distributedRequestManager.setControllerId(controllerId);
594         }
595
596         // Init backends states from persistence base
597
distributedRequestManager
598             .initBackendsLastKnownCheckpointFromRecoveryLog();
599
600         // Distribute backends among controllers knowing that at this point
601
// there is no conflict on the backend distribution policies.
602
broadcastBackendInformation(getAllMemberButUs());
603       }
604
605       // Now let group comm messages flow in
606
isVirtualDatabaseStarted = true;
607
608       // Resync the recovery log, if any
609
if ((groupSize > 1) && hasRecoveryLog())
610       {
611         logger.info("Resyncing recovery log ...");
612         resyncRecoveryLog();
613         logger.info("Resyncing recovery log done");
614       }
615
616       initGlobalCounters(controllerId);
617
618     }
619     catch (Exception JavaDoc e)
620     {
621       if (channel != null)
622       {
623         quitChannel();
624       }
625       String JavaDoc msg = Translate.get("virtualdatabase.distributed.joingroup.error",
626           groupName);
627       if (e instanceof RuntimeException JavaDoc)
628         logger.error(msg, e);
629       throw new Exception JavaDoc(msg + " (" + e + ")", e);
630     }
631
632   }
633
634   /**
635    * Checks if re-synchronizing the recovery log is necessary, and if so,
636    * initiates the recovery log recovery process and waits until it is complete.
637    * This is called as part of the vdb startup process, before backends are
638    * ready to be enabled.
639    *
640    * @throws VirtualDatabaseException in case of error
641    * @throws SQLException rethrown from recoverylog operations
642    */

643   private void resyncRecoveryLog() throws VirtualDatabaseException,
644       SQLException JavaDoc
645   {
646     if (getAllMembers().size() == 1)
647     {
648       logger.info("First controller in vdb, no recovery log resync.");
649       return;
650     }
651
652     String JavaDoc lastShutdownCheckpointName = getLastShutdownCheckpointName();
653     if (lastShutdownCheckpointName == null)
654     {
655       logger
656           .info("No shutdown checkpoint found in recovery log. Clearing recovery log (dirty).");
657       logger.info("Please resync manually using 'recover log'.");
658       getRecoveryLog().resetRecoveryLog();
659       return;
660     }
661
662     // try to resync from the last shutdown checkpoint
663
isResynchingFlag = true;
664
665     try
666     {
667       logger.info("Resyncing from " + lastShutdownCheckpointName);
668       resyncFromCheckpoint(lastShutdownCheckpointName);
669     }
670     catch (VirtualDatabaseException e)
671     {
672       logger
673           .error("Failed to resync recovery log from last clean shutdown checkpoint. Clearing recovery log (dirty).");
674       logger.info("Please resync manually using 'recover log'.");
675       getRecoveryLog().resetRecoveryLog();
676       isResynchingFlag = false;
677     }
678   }
679
680   /**
681    * Initializes global counters based on recovery log.
682    *
683    * @param controllerId this controller id, as allocated by hte group
684    * communication. Base of all global counters numbering: counters are
685    * layed out as [ controllerId | <local count> ]
686    * @throws SQLException if an error occurs accessing the recovery log
687    */

688   public void initGlobalCounters(long controllerId) throws SQLException JavaDoc
689   {
690     if (!hasRecoveryLog())
691       return; // no recovery log: no init. Stick to default values (zero)
692

693     RecoveryLog recoveryLog = requestManager.getRecoveryLog();
694
695     requestManager.initializeRequestId(recoveryLog
696         .getLastRequestId(controllerId) + 1);
697
698     requestManager.getScheduler().initializeTransactionId(
699         recoveryLog.getLastTransactionId(controllerId) + 1);
700
701     // This is not synchronized, since this is not supposed to happen
702
// concurrently with application opening connections on the vdb (the vdb is
703
// not up yet). The connectionId member is used for both normal and
704
// persistent connection ids.
705
this.connectionId = recoveryLog.getLastConnectionId(controllerId) + 1;
706   }
707
708   private void resyncFromCheckpoint(String JavaDoc checkpointName)
709       throws VirtualDatabaseException
710   {
711     // talk to first other member in group
712
Member remoteControllerMember = (Member) getAllMemberButUs().get(0);
713
714     // send message, no return expected, just block until it's done.
715
sendMessageToController(remoteControllerMember, new ResyncRecoveryLog(
716         checkpointName), messageTimeouts.getDefaultTimeout());
717
718     // If a remote error occured, or if the resync failed for any reason
719
// whatsoever, a VirtualDatabaseException has been thrown.
720

721     isResynchingFlag = false;
722   }
723
724   protected boolean isResyncing()
725   {
726     return isResynchingFlag;
727   }
728
729   /**
730    * Return the last ShutdownCheckpointName, or null if none was found.
731    *
732    * @return the last ShutdownCheckpointName, or null if none was found.
733    * @throws VirtualDatabaseException in case getting the cp names list from the
734    * recovery log failed
735    */

736   private String JavaDoc getLastShutdownCheckpointName()
737       throws VirtualDatabaseException
738   {
739     // get checkpoint names and see which is the last shutdown checkpoint. This
740
// list is expected to be ordered, newest first.
741
ArrayList JavaDoc checkpointNames;
742     try
743     {
744       checkpointNames = getRecoveryLog().getCheckpointNames();
745     }
746     catch (SQLException JavaDoc e)
747     {
748       logger.error(e.getMessage());
749       throw new VirtualDatabaseException(e);
750     }
751
752     Iterator JavaDoc iter = checkpointNames.iterator();
753     while (iter.hasNext())
754     {
755       String JavaDoc cpName = (String JavaDoc) iter.next();
756       if (cpName.startsWith("shutdown-" + getControllerName()))
757         return cpName;
758     }
759     return null;
760   }
761
762   /**
763    * @see org.continuent.hedera.gms.GroupMembershipListener#joinMember(org.continuent.hedera.common.Member,
764    * org.continuent.hedera.common.GroupIdentifier)
765    */

766   public void joinMember(Member m, GroupIdentifier gid)
767   {
768     if (hasRecoveryLog())
769     {
770       try
771       {
772         requestManager.getRecoveryLog().storeCheckpoint(
773             buildCheckpointName(m + " joined group " + gid));
774       }
775       catch (SQLException JavaDoc ignore)
776       {
777         logger.warn("Failed to log checkpoint for joining member " + m);
778       }
779     }
780   }
781
782   //
783
// Message dispatcher request handling
784
//
785

786 /**
787    * Terminate the multicast request adapter and quit the Hedera channel.
788    */

789   public void quitChannel()
790   {
791     quitChannel(Constants.SHUTDOWN_SAFE);
792   }
793
794   /**
795    * Terminate the multicast request adapter and quit the Hedera channel.
796    *
797    * @param level of the vdb shutdown operation being executed
798    */

799   public void quitChannel(int level)
800   {
801     if (level == Constants.SHUTDOWN_FORCE)
802     {
803       multicastRequestAdapter.cancelRequests();
804     }
805     synchronized (MESSAGES_IN_HANDLER_SYNC)
806     {
807       channelShuttingDown = true;
808       if (messagesInHandlers > 0)
809         try
810         {
811           MESSAGES_IN_HANDLER_SYNC.wait();
812         }
813         catch (InterruptedException JavaDoc ignore)
814         {
815         }
816     }
817
818     if (multicastRequestAdapter != null)
819     {
820       multicastRequestAdapter.stop();
821       multicastRequestAdapter = null;
822     }
823     if (channel != null)
824     {
825       channel.close();
826       try
827       {
828         channel.quit();
829       }
830       catch (ChannelException e)
831       {
832         if (logger.isWarnEnabled())
833         {
834           logger.warn("Problem when quitting channel " + channel, e);
835         }
836       }
837       catch (NotConnectedException e)
838       {
839         if (logger.isWarnEnabled())
840         {
841           logger.warn("Problem when quitting channel " + channel, e);
842         }
843       }
844       channel = null;
845     }
846     if (groupCommunicationFactory != null)
847     {
848       groupCommunicationFactory.dispose();
849       groupCommunicationFactory = null;
850     }
851   }
852
853   /**
854    * @see org.continuent.hedera.gms.GroupMembershipListener#quitMember(org.continuent.hedera.common.Member,
855    * org.continuent.hedera.common.GroupIdentifier)
856    */

857   public void quitMember(Member m, GroupIdentifier gid)
858   {
859     synchronized (MESSAGES_IN_HANDLER_SYNC)
860     {
861       // Ignore if channel has been closed (i.e. vdb shutdown)
862
if ((channel == null) || (channelShuttingDown))
863         return;
864       messagesInHandlers++;
865     }
866
867     try
868     {
869       // Ignore our own quit message
870
if (isLocalSender(m))
871         return;
872
873       if (hasRecoveryLog())
874       {
875         try
876         {
877           requestManager.getRecoveryLog().storeCheckpoint(
878               buildCheckpointName(m + " quit group " + gid));
879         }
880         catch (SQLException JavaDoc ignore)
881         {
882           logger.warn("Failed to log checkpoint for quitting member " + m);
883         }
884       }
885
886       // Remove controller from list and notify JMX listeners
887
String JavaDoc remoteControllerName = removeRemoteControllerAndStartCleanupThread(m);
888       if (remoteControllerName != null)
889       {
890         endUserLogger.warn(Translate.get(
891             "notification.distributed.controller.removed", new String JavaDoc[]{
892                 m.toString(), name}));
893         logger.warn("Controller " + m + " has left the cluster.");
894         sendJmxNotification(
895             SequoiaNotificationList.DISTRIBUTED_CONTROLLER_REMOVED, Translate
896                 .get("notification.distributed.controller.removed"));
897       }
898
899       // Notify adapter that we do not expect responses anymore from this member
900
synchronized (MESSAGES_IN_HANDLER_SYNC)
901       {
902         // Ignore if channel is being closed, i.e. vdb shutdown.
903
if (!channelShuttingDown)
904         {
905           int failures = multicastRequestAdapter.memberFailsOnAllReplies(m);
906           logger.info(failures + " requests were waiting responses from " + m);
907         }
908       }
909     }
910     finally
911     {
912       synchronized (MESSAGES_IN_HANDLER_SYNC)
913       {
914         messagesInHandlers--;
915       }
916     }
917   }
918
919   /**
920    * @see org.continuent.hedera.adapters.MessageListener#receive(java.io.Serializable)
921    */

922   public void receive(Serializable JavaDoc msg)
923   {
924     logger.error("Distributed virtual database received unhandled message: "
925         + msg);
926   }
927
928   /**
929    * Refresh the current group membership when someone has joined or left the
930    * group.
931    */

932   private void refreshGroupMembership()
933   {
934     if (logger.isDebugEnabled())
935       logger.debug("Refreshing members list:" + currentGroup.getMembers());
936
937     synchronized (controllerJmxAddress)
938     {
939       allMemberButUs = (ArrayList JavaDoc) (((ArrayList JavaDoc) currentGroup.getMembers())
940           .clone());
941       allMemberButUs.remove(channel.getLocalMembership());
942     }
943   }
944
945   //
946
// Getter/Setter and tools (equals, ...)
947
//
948

949   /**
950    * Two virtual databases are equal if they have the same name, login and
951    * password.
952    *
953    * @param other an object
954    * @return a <code>boolean</code> value
955    */

956   public boolean equals(Object JavaDoc other)
957   {
958     if ((other == null)
959         || (!(other instanceof org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase)))
960       return false;
961     else
962     {
963       DistributedVirtualDatabase db = (org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase) other;
964       return name.equals(db.getDatabaseName())
965           && groupName.equals(db.getGroupName());
966     }
967   }
968
969   /**
970    * Synchronized access to current group members.
971    *
972    * @return a clone of the list of all members (never null).
973    */

974   public ArrayList JavaDoc getAllMembers()
975   {
976     synchronized (controllerJmxAddress)
977     {
978       if (currentGroup == null) // this happens if we did not #joinGroup()
979
return new ArrayList JavaDoc();
980       ArrayList JavaDoc members = (ArrayList JavaDoc) currentGroup.getMembers();
981       if (members == null) // SEQUOIA-745 fix
982
return new ArrayList JavaDoc();
983       return (ArrayList JavaDoc) members.clone();
984     }
985   }
986
987   /**
988    * Returns the list of all members in the group except us. Consider the value
989    * read-only (do not alter).
990    *
991    * @return the allMembersButUs field (never null).
992    */

993   public ArrayList JavaDoc getAllMemberButUs()
994   {
995     synchronized (controllerJmxAddress)
996     {
997       if (allMemberButUs == null) // this happens if we did not #joinGroup()
998
return new ArrayList JavaDoc();
999
1000      /**
1001       * This synchronized block might seem loussy, but actually it's enough, as
1002       * long as no caller alters the returned value: field allMembersButUs is
1003       * replaced (as opposed to updated) by refreshGroupMembership(). So
1004       * someone who has called this lives with a (possibly) outdated list, but,
1005       * still, with a safe list (never updated concurently by vdb threads). If
1006       * clients/callers are not trusted to leave the returned value un-touched,
1007       * use a clone
1008       */

1009      return allMemberButUs;
1010    }
1011  }
1012
1013  /**
1014   * Get the group channel used for group communications
1015   *
1016   * @return a <code>JChannel</code>
1017   */

1018  public AbstractReliableGroupChannel getChannel()
1019  {
1020    return channel;
1021  }
1022
1023  /**
1024   * Returns the cleanupThreads value.
1025   *
1026   * @return Returns the cleanupThreads.
1027   */

1028  public Hashtable JavaDoc getCleanupThreads()
1029  {
1030    return cleanupThreads;
1031  }
1032
1033  /**
1034   * Used by the ControllerFailureCleanupThread to cleanup following a
1035   * controller failure. This returned the list of recovered transactions and
1036   * removes the list.
1037   *
1038   * @param controllerId the id of the failed controller
1039   * @return List of recovered transactions for the given controller id (null if
1040   * none)
1041   */

1042  public List JavaDoc getTransactionsRecovered(Long JavaDoc controllerId)
1043  {
1044    return (List JavaDoc) controllerTransactionsRecovered.remove(controllerId);
1045  }
1046
1047  /**
1048   * Used by the ControllerFailureCleanupThread to cleanup following a
1049   * controller failure. This returned the list of recovered persistent
1050   * connections and removes the list.
1051   *
1052   * @param controllerId the id of the failed controller
1053   * @return List of recovered persistent connections for the given controller
1054   * id (null if none)
1055   */

1056  public List JavaDoc getControllerPersistentConnectionsRecovered(Long JavaDoc controllerId)
1057  {
1058    return (List JavaDoc) controllerPersistentConnectionsRecovered.remove(controllerId);
1059  }
1060
1061  /**
1062   * Called by FailoverForPersistentConnection when a client reconnects
1063   * following a controller failure.
1064   *
1065   * @param controllerId the id of the failed controller
1066   * @param connectionId the id of the persistent connection
1067   */

1068  public void notifyPersistentConnectionFailover(Long JavaDoc controllerId,
1069      Long JavaDoc connectionId)
1070  {
1071    synchronized (controllerPersistentConnectionsRecovered)
1072    {
1073      LinkedList JavaDoc persistentConnectionsRecovered = (LinkedList JavaDoc) controllerPersistentConnectionsRecovered
1074          .get(controllerId);
1075      if (persistentConnectionsRecovered == null)
1076      {
1077        persistentConnectionsRecovered = new LinkedList JavaDoc();
1078        controllerPersistentConnectionsRecovered.put(controllerId,
1079            persistentConnectionsRecovered);
1080      }
1081
1082      persistentConnectionsRecovered.add(connectionId);
1083      if (logger.isInfoEnabled())
1084        logger.info("Failover detected for persistent connection "
1085            + connectionId);
1086    }
1087  }
1088
1089  /**
1090   * Called by FailoverForTransaction when a client reconnects following a
1091   * controller failure.
1092   *
1093   * @param controllerId the id of the failed controller
1094   * @param transactionId the id of the transaction
1095   */

1096  public void notifyTransactionFailover(Long JavaDoc controllerId, Long JavaDoc transactionId)
1097  {
1098    synchronized (controllerTransactionsRecovered)
1099    {
1100      LinkedList JavaDoc transactionsRecovered = (LinkedList JavaDoc) controllerTransactionsRecovered
1101          .get(controllerId);
1102      if (transactionsRecovered == null)
1103      {
1104        transactionsRecovered = new LinkedList JavaDoc();
1105        controllerTransactionsRecovered
1106            .put(controllerId, transactionsRecovered);
1107      }
1108
1109      transactionsRecovered.add(transactionId);
1110      if (logger.isInfoEnabled())
1111        logger.info("Failover detected for transaction " + transactionId);
1112    }
1113  }
1114
1115  /**
1116   * Gets a Controller specified by its name as a Member object suitable for
1117   * group communication.
1118   *
1119   * @param controllerName the name of the target controller
1120   * @return a Member representing the target controller
1121   * @throws VirtualDatabaseException
1122   */

1123  private Member getControllerByName(String JavaDoc controllerName)
1124      throws VirtualDatabaseException
1125  {
1126    // Get the target controller
1127
Iterator JavaDoc iter = controllerJmxAddress.entrySet().iterator();
1128    Member targetMember = null;
1129    while (iter.hasNext())
1130    {
1131      Entry entry = (Entry) iter.next();
1132      if (entry.getValue().equals(controllerName))
1133      {
1134        targetMember = (Member) entry.getKey();
1135        break;
1136      }
1137    }
1138    if (targetMember == null)
1139      throw new VirtualDatabaseException("Cannot find controller "
1140          + controllerName + " in group");
1141    return targetMember;
1142  }
1143
1144  /**
1145   * Returns the controllerName value.
1146   *
1147   * @return Returns the controllerName.
1148   */

1149  public String JavaDoc getControllerName()
1150  {
1151    return controller.getControllerName();
1152  }
1153
1154  /**
1155   * Returns the controller ID.
1156   *
1157   * @return Returns the controller ID.
1158   */

1159  public long getControllerId()
1160  {
1161    return ((DistributedRequestManager) requestManager).getControllerId();
1162  }
1163
1164  /**
1165   * Returns the currentGroup value.
1166   *
1167   * @return Returns the currentGroup.
1168   */

1169  public Group getCurrentGroup()
1170  {
1171    return currentGroup;
1172  }
1173
1174  /**
1175   * Returns the distributedRequestLogger value.
1176   *
1177   * @return Returns the distributedRequestLogger.
1178   */

1179  public final Trace getDistributedRequestLogger()
1180  {
1181    return distributedRequestLogger;
1182  }
1183
1184  /**
1185   * Get the XML dump of the Distribution element.
1186   *
1187   * @return XML dump of the Distribution element
1188   */

1189  protected String JavaDoc getDistributionXml()
1190  {
1191    StringBuffer JavaDoc info = new StringBuffer JavaDoc();
1192    info.append("<" + DatabasesXmlTags.ELT_Distribution + " "
1193        + DatabasesXmlTags.ATT_groupName + "=\"" + groupName + "\" "
1194        + DatabasesXmlTags.ATT_hederaPropertiesFile + "=\""
1195        + hederaPropertiesFile + "\" "
1196        + DatabasesXmlTags.ATT_clientFailoverTimeout + "=\""
1197        + failoverTimeoutInMs + "\">");
1198
1199    getMessageTimeouts().generateXml(info);
1200
1201    info.append("</" + DatabasesXmlTags.ELT_Distribution + ">");
1202    return info.toString();
1203  }
1204
1205  /**
1206   * Returns the group name this virtual database belongs to.
1207   *
1208   * @return a <code>String</code> value. Returns <code>null</code> if this
1209   * virtual database is standalone
1210   */

1211  public String JavaDoc getGroupName()
1212  {
1213    return groupName;
1214  }
1215
1216  /**
1217   * Sets the group name used by the controllers hosting this virtual database.
1218   *
1219   * @param groupName the group name to set
1220   */

1221  public void setGroupName(String JavaDoc groupName)
1222  {
1223    this.groupName = groupName;
1224  }
1225
1226  /**
1227   * Returns the messageTimeouts value.
1228   *
1229   * @return Returns the messageTimeouts.
1230   */

1231  public MessageTimeouts getMessageTimeouts()
1232  {
1233    return messageTimeouts;
1234  }
1235
1236  /**
1237   * Sets the messageTimeouts value.
1238   *
1239   * @param messageTimeouts The messageTimeouts to set.
1240   */

1241  public void setMessageTimeouts(MessageTimeouts messageTimeouts)
1242  {
1243    this.messageTimeouts = messageTimeouts;
1244  }
1245
1246  /**
1247   * Return the group communication multicast request adapter.
1248   *
1249   * @return the group communication multicast request adapter
1250   */

1251  public MulticastRequestAdapter getMulticastRequestAdapter()
1252  {
1253    return multicastRequestAdapter;
1254  }
1255
1256  //
1257
// Getter/Setter and tools (equals, ...)
1258
//
1259

1260  /**
1261   * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#getNextConnectionId()
1262   */

1263  public long getNextConnectionId()
1264  {
1265    long id = super.getNextConnectionId();
1266    return distributedRequestManager.getNextConnectionId(id);
1267  }
1268
1269  //
1270
// Getter/Setter and tools (equals, ...)
1271
//
1272

1273  //
1274
// Getter/Setter and tools (equals, ...)
1275
//
1276

1277  protected int getNumberOfEnabledBackends() throws VirtualDatabaseException
1278  {
1279    // 1/ get number of local active backends
1280
int nbActive = super.getNumberOfEnabledBackends();
1281
1282    // 2/ add remote active backends
1283

1284    // TODO: synchronize this access to backendsPerController (and others)
1285
DatabaseBackend b;
1286    Iterator JavaDoc iter = backendsPerController.keySet().iterator();
1287    while (iter.hasNext())
1288    {
1289      Member member = (Member) iter.next();
1290
1291      List JavaDoc remoteBackends = (List JavaDoc) backendsPerController.get(member);
1292      int size = remoteBackends.size();
1293      b = null;
1294      for (int i = 0; i < size; i++)
1295      {
1296        b = (DatabaseBackend) remoteBackends.get(i);
1297        if (b.isReadEnabled() || b.isWriteEnabled())
1298          // test symetrical to RequestManager.backupBackend()
1299
nbActive++;
1300      }
1301    }
1302    /*
1303     * temporary, until backendsPerController is really updated (not yet done),
1304     * make as is force=true in backupBackend().
1305     */

1306    nbActive = -1;
1307    return nbActive;
1308  }
1309
1310  /**
1311   * Return a ControllerResultSet containing the PreparedStatement metaData of
1312   * the given sql template
1313   *
1314   * @param request the request containing the sql template
1315   * @return an empty ControllerResultSet with the metadata
1316   * @throws SQLException if a database error occurs
1317   */

1318  public ControllerResultSet getPreparedStatementGetMetaData(
1319      AbstractRequest request) throws SQLException JavaDoc
1320  {
1321    try
1322    {
1323      return requestManager.getPreparedStatementGetMetaData(request);
1324    }
1325    catch (NoMoreBackendException e)
1326    {
1327      // Try remote controllers
1328
try
1329      {
1330        MulticastResponse rspList = getMulticastRequestAdapter()
1331            .multicastMessage(getAllMemberButUs(),
1332                new GetPreparedStatementMetadata(request),
1333                MulticastRequestAdapter.WAIT_ALL,
1334                getMessageTimeouts().getVirtualDatabaseConfigurationTimeout());
1335
1336        Map JavaDoc results = rspList.getResults();
1337        if (results.size() == 0)
1338          if (logger.isWarnEnabled())
1339            logger
1340                .warn("No response while getting prepared statement metadata from remote controller");
1341        for (Iterator JavaDoc iter = results.values().iterator(); iter.hasNext();)
1342        {
1343          Object JavaDoc response = iter.next();
1344          if (response instanceof ControllerException)
1345          {
1346            if (logger.isErrorEnabled())
1347            {
1348              logger
1349                  .error("Error while getting prepared statement metadata from remote controller");
1350            }
1351          }
1352          else
1353          {
1354            // Here we succeded in getting prepared statement metadata from a
1355
// remote controller
1356
return (ControllerResultSet) response;
1357          }
1358        }
1359      }
1360      catch (NotConnectedException e2)
1361      {
1362        if (logger.isErrorEnabled())
1363          logger
1364              .error(
1365                  "Channel unavailable while getting prepared statement metadata from remote controller",
1366                  e2);
1367      }
1368
1369      // Here we didn't succeded in getting prepared statement metadata from
1370
// another controller
1371
throw e;
1372    }
1373  }
1374
1375  /**
1376   * Returns the processMacroBeforeBroadcast value.
1377   *
1378   * @return Returns the processMacroBeforeBroadcast.
1379   */

1380  public boolean isProcessMacroBeforeBroadcast()
1381  {
1382    return processMacroBeforeBroadcast;
1383  }
1384
1385  /**
1386   * Sets the processMacroBeforeBroadcast value.
1387   *
1388   * @param processMacros true if macros must be processed before broadcast.
1389   */

1390  public void setProcessMacroBeforeBroadcast(boolean processMacros)
1391  {
1392    this.processMacroBeforeBroadcast = processMacros;
1393  }
1394
1395  /**
1396   * Returns the request result failover cache associated to this distributed
1397   * virtual database.
1398   *
1399   * @return a <code>RequestResultFailoverCache</code> object.
1400   */

1401  public RequestResultFailoverCache getRequestResultFailoverCache()
1402  {
1403    return requestResultFailoverCache;
1404  }
1405
1406  /**
1407   * Sets a new distributed request manager for this database.
1408   *
1409   * @param requestManager the new request manager.
1410   */

1411  public void setRequestManager(RequestManager requestManager)
1412  {
1413    if (!(requestManager instanceof DistributedRequestManager))
1414      throw new RuntimeException JavaDoc(
1415          "A distributed virtual database can only work with a distributed request manager.");
1416
1417    distributedRequestManager = (DistributedRequestManager) requestManager;
1418    // really, this is super.requestManager
1419
this.requestManager = distributedRequestManager;
1420  }
1421
1422  /**
1423   * Get the whole static metadata for this virtual database. A new empty
1424   * metadata object is created if there was none yet. It will be filled later
1425   * by gatherStaticMetadata() when the backend is enabled.
1426   *
1427   * @return Virtual database static metadata
1428   */

1429  public VirtualDatabaseStaticMetaData getStaticMetaData()
1430  {
1431    staticMetadata = doGetStaticMetaData();
1432
1433    // If no backends enabled and vdb is distributed try remote controllers
1434
if ((staticMetadata == null)
1435        || (staticMetadata.getMetadataContainer() == null))
1436    {
1437      try
1438      {
1439        MulticastResponse rspList = getMulticastRequestAdapter()
1440            .multicastMessage(getAllMemberButUs(), new GetStaticMetadata(),
1441                MulticastRequestAdapter.WAIT_ALL,
1442                getMessageTimeouts().getVirtualDatabaseConfigurationTimeout());
1443
1444        Map JavaDoc results = rspList.getResults();
1445        if (results.size() == 0)
1446          if (logger.isWarnEnabled())
1447            logger
1448                .warn("No response while getting static metadata from remote controller");
1449        for (Iterator JavaDoc iter = results.values().iterator(); iter.hasNext();)
1450        {
1451          Object JavaDoc response = iter.next();
1452          if (response instanceof ControllerException)
1453          {
1454            if (logger.isErrorEnabled())
1455            {
1456              logger
1457                  .error("Error while getting static metadata from remote controller");
1458            }
1459          }
1460          else
1461          {
1462            // Here we succeded in getting static metadata from a remote
1463
// controller
1464
staticMetadata.setMetadataContainer((MetadataContainer) response);
1465          }
1466        }
1467      }
1468      catch (NotConnectedException e2)
1469      {
1470        if (logger.isErrorEnabled())
1471          logger
1472              .error(
1473                  "Channel unavailable while getting static metadata from remote controller",
1474                  e2);
1475      }
1476    }
1477
1478    return staticMetadata;
1479  }
1480
1481  /**
1482   * Returns the "writes flushed" status associated to this distributed virtual
1483   * database.
1484   *
1485   * @return a <code>HashMap</code> object.
1486   */

1487  public HashMap JavaDoc getWritesFlushed()
1488  {
1489    return writesFlushed;
1490  }
1491
1492  /**
1493   * Check if the given backend definition is compatible with the backend
1494   * definitions of this distributed virtual database. Not that if the given
1495   * backend does not exist in the current configuration, it is considered as
1496   * compatible. Incompatibility results from 2 backends with the same JDBC URL
1497   * or same logical name.
1498   *
1499   * @param backend the backend to check
1500   * @return true if the backend is compatible with the local definition
1501   * @throws VirtualDatabaseException if locking the local backend list fails
1502   */

1503  public boolean isCompatibleBackend(BackendInfo backend)
1504      throws VirtualDatabaseException
1505  {
1506    try
1507    {
1508      acquireReadLockBackendLists();
1509    }
1510    catch (InterruptedException JavaDoc e)
1511    {
1512      String JavaDoc msg = "Unable to acquire read lock on backend list in isCompatibleBackend ("
1513          + e + ")";
1514      logger.error(msg);
1515      throw new VirtualDatabaseException(msg);
1516    }
1517
1518    try
1519    {
1520      // Find the backend
1521
String JavaDoc backendURL = backend.getUrl();
1522      String JavaDoc backendName = backend.getName();
1523      int size = backends.size();
1524      DatabaseBackend b = null;
1525      for (int i = 0; i < size; i++)
1526      {
1527        b = (DatabaseBackend) backends.get(i);
1528        if (b.getURL().equals(backendURL) || b.getName().equals(backendName))
1529          return false;
1530      }
1531    }
1532    catch (RuntimeException JavaDoc re)
1533    {
1534      throw new VirtualDatabaseException(re);
1535    }
1536    finally
1537    {
1538      releaseReadLockBackendLists();
1539    }
1540    // This backend does not exist here
1541
return true;
1542  }
1543
1544  /**
1545   * Return true if the provided schema is compatible with the existing schema
1546   * of this distributed virtual database. Note that if the given schema is
1547   * null, this function returns true.
1548   *
1549   * @param dbs the database schema to compare with
1550   * @return true if dbs is compatible with the current schema (according to
1551   * RAIDb level)
1552   */

1553  public boolean isCompatibleDatabaseSchema(DatabaseSchema dbs)
1554  {
1555    // Database schema checking (if any)
1556
if (dbs == null)
1557    {
1558      logger.warn(Translate
1559          .get("virtualdatabase.distributed.configuration.checking.noschema"));
1560    }
1561    else
1562    {
1563      // Check database schemas compatibility
1564
switch (getRequestManager().getLoadBalancer().getRAIDbLevel())
1565      {
1566        case RAIDbLevels.RAIDb0 :
1567          // There must be no overlap between schemas
1568
if (dbs.equals(getRequestManager().getDatabaseSchema()))
1569          {
1570            logger
1571                .warn(Translate
1572                    .get("virtualdatabase.distributed.configuration.checking.mismatch.databaseschema"));
1573            return false;
1574          }
1575          break;
1576        case RAIDbLevels.RAIDb1 :
1577          // Schemas must be identical
1578
if (!dbs.equals(getRequestManager().getDatabaseSchema()))
1579          {
1580            logger
1581                .warn(Translate
1582                    .get("virtualdatabase.distributed.configuration.checking.mismatch.databaseschema"));
1583            return false;
1584          }
1585          break;
1586        case RAIDbLevels.RAIDb2 :
1587          // Common parts of the schema must be identical
1588
if (!dbs.isCompatibleWith(getRequestManager().getDatabaseSchema()))
1589          {
1590            logger
1591                .warn(Translate
1592                    .get("virtualdatabase.distributed.configuration.checking.mismatch.databaseschema"));
1593            return false;
1594          }
1595          break;
1596        case RAIDbLevels.SingleDB :
1597        default :
1598          logger.error("Unsupported RAIDb level: "
1599              + getRequestManager().getLoadBalancer().getRAIDbLevel());
1600          return false;
1601      }
1602    }
1603    return true;
1604  }
1605
1606  /**
1607   * Is this virtual database distributed ?
1608   *
1609   * @return true
1610   */

1611  public boolean isDistributed()
1612  {
1613    return true;
1614  }
1615
1616  /**
1617   * Returns the isVirtualDatabaseStarted value.
1618   *
1619   * @return Returns the isVirtualDatabaseStarted.
1620   */

1621  public final boolean isVirtualDatabaseStarted()
1622  {
1623    return isVirtualDatabaseStarted;
1624  }
1625
1626  /**
1627   * @see org.continuent.sequoia.common.jmx.mbeans.VirtualDatabaseMBean#viewControllerList()
1628   */

1629  public String JavaDoc[] viewControllerList()
1630  {
1631    if (logger.isInfoEnabled())
1632    {
1633      logger.info(channel.getLocalMembership() + " see members:"
1634          + currentGroup.getMembers() + " and has mapping:"
1635          + controllerJmxAddress);
1636    }
1637    Collection JavaDoc controllerJmxNames = controllerJmxAddress.values();
1638    return (String JavaDoc[]) controllerJmxNames.toArray(new String JavaDoc[controllerJmxNames
1639        .size()]);
1640  }
1641
1642  /**
1643   * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#addBackend(org.continuent.sequoia.controller.backend.DatabaseBackend)
1644   */

1645  public void addBackend(DatabaseBackend db) throws VirtualDatabaseException
1646  {
1647    // Add the backend to the virtual database.
1648
super.addBackend(db);
1649
1650    // Send a group message if already joined group
1651
try
1652    {
1653      broadcastBackendInformation(getAllMemberButUs());
1654    }
1655    catch (Exception JavaDoc e)
1656    {
1657      String JavaDoc msg = "Error while broadcasting backend information when adding backend";
1658      logger.error(msg, e);
1659      throw new VirtualDatabaseException(msg, e);
1660    }
1661  }
1662
1663  /**
1664   * Add a controller id to the controllerIds list.
1665   *
1666   * @param remoteControllerMembership the membership identifying the remote
1667   * controller
1668   * @param remoteControllerId remote controller identifier
1669   */

1670  public void addRemoteControllerId(Member remoteControllerMembership,
1671      long remoteControllerId)
1672  {
1673    controllerIds.put(remoteControllerMembership, new Long JavaDoc(remoteControllerId));
1674
1675    if (logger.isDebugEnabled())
1676      logger.debug("Adding new controller id:" + remoteControllerId
1677          + " for member " + remoteControllerMembership);
1678  }
1679
1680  /**
1681   * Add a list of remote backends to the backendsPerController map.
1682   *
1683   * @param sender the membership identifying the remote controller
1684   * @param remoteBackends remote controller backends
1685   */

1686  public void addBackendPerController(Member sender, List JavaDoc remoteBackends)
1687  {
1688    backendsPerController.put(sender, remoteBackends);
1689
1690    if (logger.isInfoEnabled())
1691      logger.info(Translate.get(
1692          "virtualdatabase.distributed.configuration.updating.backend.list",
1693          sender));
1694  }
1695
1696  /**
1697   * Returns the local view of the backends in this virtual database across all
1698   * <em>remote</em> controllers.
1699   *
1700   * @return a Hashtable&lt;Member, List&lt;DatabaseBackend&gt;&gt;
1701   */

1702  public Hashtable JavaDoc getBackendsPerController()
1703  {
1704    return backendsPerController;
1705  }
1706
1707  /**
1708   * Add a new controller name to the controllerJmxAddress list and refresh the
1709   * group membership.
1710   *
1711   * @param remoteControllerMembership the membership identifying the remote
1712   * controller
1713   * @param remoteControllerJmxName the JMX name of the remote controller
1714   */

1715  public void addRemoteControllerJmxName(Member remoteControllerMembership,
1716      String JavaDoc remoteControllerJmxName)
1717  {
1718    controllerJmxAddress.put(remoteControllerMembership,
1719        remoteControllerJmxName);
1720    if (logger.isDebugEnabled())
1721      logger.debug("Adding new controller " + remoteControllerJmxName
1722          + " for member " + remoteControllerMembership);
1723
1724    sendJmxNotification(SequoiaNotificationList.DISTRIBUTED_CONTROLLER_ADDED,
1725        Translate.get("notification.distributed.controller.added",
1726            new String JavaDoc[]{remoteControllerJmxName, name}));
1727
1728    refreshGroupMembership();
1729  }
1730
1731  /**
1732   * Broadcast backend information among controllers.
1733   *
1734   * @param dest List of <code>Address</code> to send the message to
1735   * @throws NotConnectedException if the channel is not connected
1736   */

1737  private void broadcastBackendInformation(ArrayList JavaDoc dest)
1738      throws NotConnectedException
1739  {
1740    logger
1741        .debug(Translate
1742            .get("virtualdatabase.distributed.configuration.querying.remote.status"));
1743
1744    // Send our backend status using serializable BackendInfo
1745
List JavaDoc backendInfos = DatabaseBackend.toBackendInfos(backends);
1746    MulticastResponse rspList = multicastRequestAdapter.multicastMessage(dest,
1747        new BackendStatus(backendInfos, distributedRequestManager
1748            .getControllerId()), MulticastRequestAdapter.WAIT_ALL,
1749        messageTimeouts.getBackendStatusTimeout());
1750
1751    int size = dest.size();
1752    for (int i = 0; i < size; i++)
1753    {
1754      // Add the backend configuration of every remote controller
1755
Member m = (Member) dest.get(i);
1756      if (rspList.getResult(m) != null)
1757      {
1758        BackendStatus bs = (BackendStatus) rspList.getResult(m);
1759        // Update backend list from sender
1760
List JavaDoc remoteBackendInfos = bs.getBackendInfos();
1761        // convert the BackendInfos to DatabaseBackends
1762
List JavaDoc remoteBackends = BackendInfo.toDatabaseBackends(this,
1763            remoteBackendInfos);
1764        backendsPerController.put(m, remoteBackends);
1765        if (logger.isDebugEnabled())
1766          logger
1767              .debug(Translate
1768                  .get(
1769                      "virtualdatabase.distributed.configuration.updating.backend.list",
1770                      m.toString()));
1771      }
1772      else
1773        logger.warn(Translate.get(
1774            "virtualdatabase.distributed.unable.get.remote.status", m
1775                .toString()));
1776    }
1777  }
1778
1779  /**
1780   * Send the configuration of this controller to remote controller. All remote
1781   * controllers must agree on the compatibility of the local controller
1782   * configuration with their own configuration. Compatibility checking include
1783   * Authentication Manager, Scheduler and Load Balancer settings.
1784   *
1785   * @param dest List of <code>Address</code> to send the message to
1786   * @return INCOMPATIBLE_CONFIGURATION if the configuration is not compatible
1787   * with other controllers or the controller id to use otherwise.
1788   */

1789  private long checkConfigurationCompatibilityAndReturnControllerId(
1790      ArrayList JavaDoc dest)
1791  {
1792    if (logger.isInfoEnabled())
1793      logger.info(Translate
1794          .get("virtualdatabase.distributed.configuration.checking"));
1795
1796    // Send our configuration
1797
MulticastResponse rspList;
1798    try
1799    {
1800      rspList = multicastRequestAdapter.multicastMessage(dest,
1801          new VirtualDatabaseConfiguration(this),
1802          MulticastRequestAdapter.WAIT_ALL, messageTimeouts
1803              .getVirtualDatabaseConfigurationTimeout());
1804    }
1805    catch (NotConnectedException e)
1806    {
1807      logger.error(
1808          "Channel unavailable while checking configuration compatibility", e);
1809      return INCOMPATIBLE_CONFIGURATION;
1810    }
1811
1812    // Check that everybody agreed
1813
Map JavaDoc results = rspList.getResults();
1814    int size = results.size();
1815    if (size == 0)
1816      logger.warn(Translate
1817          .get("virtualdatabase.distributed.configuration.checking.noanswer"));
1818
1819    long highestRemoteControllerId = 0;
1820    for (Iterator JavaDoc iter = results.values().iterator(); iter.hasNext();)
1821    {
1822      Object JavaDoc response = iter.next();
1823      if (response instanceof VirtualDatabaseConfigurationResponse)
1824      {
1825        // These highestRemotecontrollerId and remoteControllerId are returned
1826
// directly by the remote controller, and are 'thus' of 'shifted
1827
// nature': effective bits = upper 16 bits. See
1828
// DistributedRequestManager.CONTROLLER_ID_BITS
1829
VirtualDatabaseConfigurationResponse vdbcr = (VirtualDatabaseConfigurationResponse) response;
1830        long remoteControllerId = vdbcr.getControllerId();
1831        if (remoteControllerId == INCOMPATIBLE_CONFIGURATION)
1832        {
1833          return INCOMPATIBLE_CONFIGURATION;
1834        }
1835        // Check if there still is a problem of missing vdb users.
1836
// If it is the case try to add them dynamically.
1837
if (logger.isWarnEnabled())
1838        {
1839          logger
1840              .warn("Some virtual database users are missing from this configuration, trying to create them transparently...");
1841        }
1842        if (vdbcr.getAdditionalVdbUsers() != null)
1843        {
1844          for (Iterator JavaDoc iter2 = vdbcr.getAdditionalVdbUsers().iterator(); iter2
1845              .hasNext();)
1846          {
1847            VirtualDatabaseUser vdbUser = (VirtualDatabaseUser) iter2.next();
1848
1849            // Using the "super" trick here probably means bad design.
1850
// The intent is to create the vdb users just locally, hence we use
1851
// the method in
1852
// VirtualDatabase rather than the overridden method in
1853
// DistributedVirtual Database.
1854
super.checkAndAddVirtualDatabaseUser(vdbUser);
1855
1856            if (!getAuthenticationManager().isValidVirtualUser(vdbUser))
1857            {
1858              return INCOMPATIBLE_CONFIGURATION;
1859            }
1860          }
1861        }
1862
1863        if (highestRemoteControllerId < remoteControllerId)
1864          highestRemoteControllerId = remoteControllerId;
1865      }
1866      else
1867      {
1868        logger
1869            .error("Unexpected response while checking configuration compatibility: "
1870                + response);
1871        return INCOMPATIBLE_CONFIGURATION;
1872      }
1873    }
1874
1875    // Ok, everybody agreed that our configuration is compatible.
1876
// Take the highest controller id + 1 as our id. (non-shifted, this is used
1877
// to pass in setControllerId which expects 16 bits)
1878
return ((highestRemoteControllerId >> DistributedRequestManager.CONTROLLER_ID_SHIFT_BITS) & DistributedRequestManager.CONTROLLER_ID_BITS) + 1;
1879  }
1880
1881  /**
1882   * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#checkAndAddVirtualDatabaseUser(org.continuent.sequoia.common.users.VirtualDatabaseUser)
1883   */

1884  public void checkAndAddVirtualDatabaseUser(VirtualDatabaseUser vdbUser)
1885  {
1886    // Is vdb user valid?
1887
MulticastResponse rspList;
1888    try
1889    {
1890      rspList = multicastRequestAdapter.multicastMessage(getAllMembers(),
1891          new IsValidUserForAllBackends(vdbUser),
1892          MulticastRequestAdapter.WAIT_ALL, messageTimeouts
1893              .getVirtualDatabaseConfigurationTimeout());
1894    }
1895    catch (NotConnectedException e)
1896    {
1897      logger.error("Channel unavailable while checking validity of vdb user "
1898          + vdbUser.getLogin(), e);
1899      return;
1900    }
1901
1902    // Check that everybody agreed
1903
Map JavaDoc results = rspList.getResults();
1904    int size = results.size();
1905    if (size == 0)
1906      logger.warn("No response while checking validity of vdb user "
1907          + vdbUser.getLogin());
1908    for (Iterator JavaDoc iter = results.values().iterator(); iter.hasNext();)
1909    {
1910      Object JavaDoc response = iter.next();
1911      if (response instanceof Boolean JavaDoc)
1912      {
1913        if (!((Boolean JavaDoc) response).booleanValue())
1914        {
1915          if (logger.isWarnEnabled())
1916          {
1917            logger.warn("Could not create new vdb user " + vdbUser.getLogin()
1918                + " because it does not exist on all backends");
1919          }
1920          return;
1921        }
1922      }
1923      else
1924      {
1925        logger.error("Unexpected response while checking validity of vdb user "
1926            + vdbUser.getLogin() + " : " + response);
1927        return;
1928      }
1929    }
1930
1931    // Add user
1932
try
1933    {
1934      rspList = multicastRequestAdapter.multicastMessage(getAllMembers(),
1935          new AddVirtualDatabaseUser(vdbUser),
1936          MulticastRequestAdapter.WAIT_ALL, messageTimeouts
1937              .getVirtualDatabaseConfigurationTimeout());
1938    }
1939    catch (NotConnectedException e)
1940    {
1941      logger.error("Channel unavailable while adding vdb user "
1942          + vdbUser.getLogin() + ", trying to clean-up...", e);
1943      removeVirtualDatabaseUser(vdbUser);
1944    }
1945
1946    // Check for exceptions
1947
results = rspList.getResults();
1948    size = results.size();
1949    if (size == 0)
1950      logger.warn("No response while adding vdb user " + vdbUser.getLogin());
1951    for (Iterator JavaDoc iter = results.values().iterator(); iter.hasNext();)
1952    {
1953      Object JavaDoc response = iter.next();
1954      if (response instanceof ControllerException)
1955      {
1956        if (logger.isErrorEnabled())
1957        {
1958          logger.error("Error while adding vdb user " + vdbUser.getLogin()
1959              + ", trying to clean-up...");
1960        }
1961        removeVirtualDatabaseUser(vdbUser);
1962        return;
1963      }
1964    }
1965  }
1966
1967  /**
1968   * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#closePersistentConnection(java.lang.String,
1969   * long)
1970   */

1971  public void closePersistentConnection(String JavaDoc login,
1972      long persistentConnectionId)
1973  {
1974    distributedRequestManager.distributedClosePersistentConnection(login,
1975        persistentConnectionId);
1976  }
1977
1978  /**
1979   * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#openPersistentConnection(java.lang.String,
1980   * long)
1981   */

1982  public void openPersistentConnection(String JavaDoc login, long persistentConnectionId)
1983      throws SQLException JavaDoc
1984  {
1985    distributedRequestManager.distributedOpenPersistentConnection(login,
1986        persistentConnectionId);
1987  }
1988
1989  /**
1990   * @see org.continuent.sequoia.common.jmx.mbeans.VirtualDatabaseMBean#copyLogFromCheckpoint(java.lang.String,
1991   * java.lang.String)
1992   */

1993  public void copyLogFromCheckpoint(String JavaDoc dumpName, String JavaDoc controllerName)
1994      throws VirtualDatabaseException
1995  {
1996    // perform basic error checks (in particular, on success, we have a
1997
// recovery log)
1998
super.copyLogFromCheckpoint(dumpName, controllerName);
1999
2000    Member controllerByName = getControllerByName(controllerName);
2001    if (isLocalSender(controllerByName))
2002      throw new VirtualDatabaseException(
2003          "A restore log command must be applied to a remote controller");
2004
2005    // get the checkpoint name from the dump info, or die
2006
String JavaDoc dumpCheckpointName;
2007    DumpInfo dumpInfo;
2008
2009    try
2010    {
2011      dumpInfo = getRecoveryLog().getDumpInfo(dumpName);
2012    }
2013    catch (SQLException JavaDoc e)
2014    {
2015      throw new VirtualDatabaseException(
2016          "Recovery log error access occured while checking for dump"
2017              + dumpName, e);
2018    }
2019
2020    if (dumpInfo == null)
2021      throw new VirtualDatabaseException(
2022          "No information was found in the dump table for dump " + dumpName);
2023
2024    RestoreLogOperation restoreLogOperation = new RestoreLogOperation(dumpName,
2025        controllerName);
2026    addAdminOperation(restoreLogOperation);
2027    try
2028    {
2029      dumpCheckpointName = dumpInfo.getCheckpointName();
2030
2031      // set a global 'now' checkpoint (temporary) and suspend all activities
2032
String JavaDoc nowCheckpointName = setLogReplicationCheckpoint(controllerName);
2033
2034      // AT THIS POINT, ALL ACTIVITIES ARE SUSPENDED
2035

2036      // get its id (ewerk) so that we can replicate it on the other side
2037
long nowCheckpointId;
2038      RecoveryLog recoveryLog = getRequestManager().getRecoveryLog();
2039      try
2040      {
2041        try
2042        {
2043          nowCheckpointId = recoveryLog.getCheckpointLogId(nowCheckpointName);
2044        }
2045        catch (SQLException JavaDoc e)
2046        {
2047          String JavaDoc errorMessage = "Cannot find 'now checkpoint' log entry";
2048          logger.error(errorMessage);
2049          throw new VirtualDatabaseException(errorMessage);
2050        }
2051
2052        // initiate the replication - clears the remote recovery log.
2053
sendMessageToController(controllerByName, new ReplicateLogEntries(
2054            nowCheckpointName, null, dumpName, nowCheckpointId),
2055            messageTimeouts.getReplicateLogEntriesTimeout());
2056      }
2057      finally
2058      {
2059        getRequestManager().resumeActivity();
2060      }
2061
2062      // SCHEDULER ACTIVITIES ARE RESUMES AT THIS POINT
2063

2064      // protect from concurrent log updates: fake a recovery (increments
2065
// semaphore)
2066
recoveryLog.beginRecovery();
2067
2068      // copy the entries over to the remote controller.
2069
// Send them one by one over to the remote controller, coz each LogEntry
2070
// can
2071
// potentially be huge (e.g. if it contains a blob)
2072
try
2073      {
2074        ArrayList JavaDoc dest = new ArrayList JavaDoc();
2075        dest.add(controllerByName);
2076        long copyLogEntryTimeout = getMessageTimeouts()
2077            .getCopyLogEntryTimeout();
2078        long dumpId = recoveryLog.getCheckpointLogId(dumpCheckpointName);
2079
2080        if (logger.isDebugEnabled())
2081        {
2082          logger.debug("Resynchronizing from checkpoint " + dumpCheckpointName
2083              + " (" + dumpId + ") to checkpoint " + nowCheckpointName + " ("
2084              + nowCheckpointId + ")");
2085        }
2086
2087        for (long id = dumpId; id != nowCheckpointId; id++)
2088        {
2089          LogEntry entry = recoveryLog.getNextLogEntry(id);
2090          if (entry == null)
2091          {
2092            // No more entries available, stop here
2093
break;
2094          }
2095
2096          // Because 'getNextLogEntry()' will hunt for the next valid log entry,
2097
// we need to update the iterator with the new id value - 1
2098
id = entry.getLogId() - 1;
2099
2100          MulticastResponse resp = getMulticastRequestAdapter()
2101              .multicastMessage(dest, new CopyLogEntry(entry),
2102                  MulticastRequestAdapter.WAIT_NONE, copyLogEntryTimeout);
2103          if (resp.getFailedMembers() != null)
2104            throw new IOException JavaDoc("Failed to deliver log entry " + id
2105                + " to remote controller " + controllerName);
2106        }
2107
2108        // Now check that no entry was missed by the other controller since we
2109
// shipped all entries asynchronously without getting any individual ack
2110
// (much faster to address SEQUOIA-504)
2111
long localNbOfLogEntries = recoveryLog.getNumberOfLogEntries(dumpId,
2112            nowCheckpointId);
2113
2114        if (logger.isDebugEnabled())
2115        {
2116          logger.debug("Checking that " + localNbOfLogEntries
2117              + " entries were resynchronized in remote log");
2118        }
2119
2120        Serializable JavaDoc replyValue = sendMessageToController(controllerByName,
2121            new CompleteRecoveryLogResync(dumpId, nowCheckpointName,
2122                localNbOfLogEntries), getMessageTimeouts()
2123                .getReplicateLogEntriesTimeout());
2124        if (replyValue instanceof Long JavaDoc)
2125        {
2126          long diff = ((Long JavaDoc) replyValue).longValue();
2127          if (diff != 0)
2128            throw new VirtualDatabaseException(
2129                "Recovery log resynchronization reports a difference of "
2130                    + diff + " entries");
2131        }
2132        else
2133          throw new RuntimeException JavaDoc(
2134              "Invalid answer from remote controller on CompleteRecoveryLogResync ("
2135                  + replyValue + ")");
2136
2137        // terminate the replication - sets the remote dump checkpoint name.
2138
sendMessageToController(controllerName, new ReplicateLogEntries(null,
2139            dumpCheckpointName, dumpName, dumpId), messageTimeouts
2140            .getReplicateLogEntriesTimeout());
2141      }
2142      catch (Exception JavaDoc e)
2143      {
2144        String JavaDoc errorMessage = "Failed to send log entries";
2145        logger.error(errorMessage, e);
2146        throw new VirtualDatabaseException(errorMessage);
2147      }
2148      finally
2149      {
2150        recoveryLog.endRecovery(); // release semaphore
2151
}
2152    }
2153    finally
2154    {
2155      removeAdminOperation(restoreLogOperation);
2156    }
2157  }
2158
2159  /**
2160   * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#failoverForPersistentConnection(long)
2161   */

2162  public void failoverForPersistentConnection(long persistentConnectionId)
2163  {
2164    distributedRequestManager
2165        .distributedFailoverForPersistentConnection(persistentConnectionId);
2166  }
2167
2168  /**
2169   * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#failoverForTransaction(long)
2170   */

2171  public void failoverForTransaction(long currentTid)
2172  {
2173    distributedRequestManager.distributedFailoverForTransaction(currentTid);
2174  }
2175
2176  /**
2177   * Returns the recovery log associated with this controller.
2178   *
2179   * @return the recovery log associated with this controller.
2180   * @throws VirtualDatabaseException if the database has not recovery log
2181   */

2182  public RecoveryLog getRecoveryLog() throws VirtualDatabaseException
2183  {
2184    if (!hasRecoveryLog())
2185      throw new VirtualDatabaseException(Translate
2186          .get("virtualdatabase.no.recovery.log"));
2187
2188    return getRequestManager().getRecoveryLog();
2189  }
2190
2191  /**
2192   * Update remote backends list after a backend disable notification has been
2193   * received.
2194   *
2195   * @param disabledBackend backend that is disabled
2196   * @param sender the message sender
2197   */

2198  public void handleRemoteDisableBackendNotification(
2199      DatabaseBackend disabledBackend, Member sender)
2200  {
2201    synchronized (backendsPerController)
2202    {
2203      List JavaDoc remoteBackends = (List JavaDoc) backendsPerController.get(sender);
2204      if (remoteBackends == null)
2205      { // This case was reported by Alessandro Gamboz on April 1, 2005.
2206
// It looks like the EnableBackend message arrives before membership
2207
// has been properly updated.
2208
logger.warn("No information has been found for remote controller "
2209            + sender);
2210        remoteBackends = new ArrayList JavaDoc();
2211        backendsPerController.put(sender, remoteBackends);
2212      }
2213      int size = remoteBackends.size();
2214      boolean backendFound = false;
2215      for (int i = 0; i < size; i++)
2216      {
2217        DatabaseBackend remoteBackend = (DatabaseBackend) remoteBackends.get(i);
2218        if (remoteBackend.equals(disabledBackend))
2219        {
2220          logger.info("Backend " + remoteBackend.getName()
2221              + " disabled on controller " + sender);
2222          remoteBackends.set(i, disabledBackend);
2223          backendFound = true;
2224          break;
2225        }
2226      }
2227      if (!backendFound)
2228      {
2229        logger.warn("Updating backend list with unknown backend "
2230            + disabledBackend.getName() + " disabled on controller " + sender);
2231        remoteBackends.add(disabledBackend);
2232      }
2233    }
2234  }
2235
2236  /**
2237   * Update remote backends list after a backend disable notification has been
2238   * received.
2239   *
2240   * @param disabledBackendInfos List of BackendInfo objects that are disabled
2241   * @param sender the message sender
2242   */

2243  public void handleRemoteDisableBackendsNotification(
2244      ArrayList JavaDoc disabledBackendInfos, Member sender)
2245  {
2246    synchronized (backendsPerController)
2247    {
2248      List JavaDoc remoteBackends = (List JavaDoc) backendsPerController.get(sender);
2249      if (remoteBackends == null)
2250      { // This case was reported by Alessandro Gamboz on April 1, 2005.
2251
// It looks like the EnableBackend message arrives before membership
2252
// has been properly updated.
2253
logger.warn("No information has been found for remote controller "
2254            + sender);
2255        remoteBackends = new ArrayList JavaDoc();
2256        backendsPerController.put(sender, remoteBackends);
2257      }
2258      Iterator JavaDoc iter = disabledBackendInfos.iterator();
2259      while (iter.hasNext())
2260      {
2261        BackendInfo backendInfo = (BackendInfo) iter.next();
2262        DatabaseBackend backend = backendInfo.getDatabaseBackend(this);
2263
2264        if (remoteBackends.contains(backend))
2265        {
2266          logger.info("Backend " + backend.getName()
2267              + " disabled on controller " + sender);
2268          remoteBackends.set(remoteBackends.indexOf(backend), backend);
2269        }
2270        else
2271        {
2272          remoteBackends.add(backend);
2273          logger.warn("Updating backend list with unknown backend "
2274              + backendInfo.getName() + " disabled on controller " + sender);
2275        }
2276      }
2277    }
2278  }
2279
2280  /**
2281   * Sent the local controller configuration to a remote controller
2282   *
2283   * @param dest the membership of the controller so send the information to
2284   * @throws NotConnectedException if the group communication channel is not
2285   * conected
2286   */

2287  public void sendLocalConfiguration(Member dest) throws NotConnectedException
2288  {
2289    // Send controller name to new comer
2290
if (logger.isDebugEnabled())
2291      logger.debug("Sending local controller name to joining controller ("
2292          + dest + ")");
2293
2294    List JavaDoc target = new ArrayList JavaDoc();
2295    target.add(dest);
2296    multicastRequestAdapter.multicastMessage(target, new ControllerInformation(
2297        controller.getControllerName(), controller.getJmxName(),
2298        distributedRequestManager.getControllerId()),
2299        MulticastRequestAdapter.WAIT_ALL, messageTimeouts
2300            .getControllerNameTimeout());
2301
2302    // Send backend status
2303
if (logger.isDebugEnabled())
2304    {
2305      logger.debug("Sending backend status name to joining controller (" + dest
2306          + ")");
2307    }
2308    List JavaDoc backendInfos = DatabaseBackend.toBackendInfos(backends);
2309    multicastRequestAdapter.multicastMessage(target, new BackendStatus(
2310        backendInfos, distributedRequestManager.getControllerId()),
2311        MulticastRequestAdapter.WAIT_ALL, messageTimeouts
2312            .getBackendStatusTimeout());
2313  }
2314
2315  /**
2316   * Returns true if the corresponding controller is alive
2317   *
2318   * @param controllerId Id of the controller to check
2319   * @return true if controller is in controllerIds list
2320   */

2321  public boolean isAliveController(Long JavaDoc controllerId)
2322  {
2323    return controllerIds.containsValue(controllerId);
2324  }
2325
2326  /**
2327   * Returns true if the given member is ourselves.
2328   *
2329   * @param sender the sender
2330   * @return true if we are the sender, false otherwise
2331   */

2332  public boolean isLocalSender(Member sender)
2333  {
2334    return channel.getLocalMembership().equals(sender);
2335  }
2336
2337  /**
2338   * {@inheritDoc}
2339   *
2340   * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#removeBackend(java.lang.String)
2341   */

2342  public void removeBackend(String JavaDoc backend) throws VirtualDatabaseException
2343  {
2344    super.removeBackend(backend);
2345
2346    try
2347    {
2348      // Send a group message to update backend list
2349
broadcastBackendInformation(getAllMemberButUs());
2350    }
2351    catch (Exception JavaDoc e)
2352    {
2353      String JavaDoc msg = "An error occured while multicasting new backedn information";
2354      logger.error(msg, e);
2355      throw new VirtualDatabaseException(msg, e);
2356    }
2357  }
2358
2359  /**
2360   * Remove a remote controller (usually because it has failed) from the
2361   * controllerMap list and refresh the group membership. This also start a
2362   * ControllerFailureCleanupThread.
2363   *
2364   * @param remoteControllerMembership the membership identifying the remote
2365   * controller
2366   * @return the JMX name of the removed controller (or null if this controller
2367   * was not in the list)
2368   */

2369  private String JavaDoc removeRemoteControllerAndStartCleanupThread(
2370      Member remoteControllerMembership)
2371  {
2372    String JavaDoc remoteControllerJmxName = (String JavaDoc) controllerJmxAddress
2373        .remove(remoteControllerMembership);
2374    if (remoteControllerJmxName == null)
2375    {
2376      logger.warn("Unable to find remote controller name for member "
2377          + remoteControllerMembership + " in list "
2378          + controllerJmxAddress.toString());
2379    }
2380    else if (logger.isDebugEnabled())
2381      logger.debug("Removing controller " + remoteControllerJmxName);
2382
2383    // Remove the list of remote backends since they are no more reachable
2384
backendsPerController.remove(remoteControllerMembership);
2385    refreshGroupMembership();
2386
2387    // Retrieve id of controller that failed and start a cleanup thread to
2388
// eliminate any remaining transactions/remaining persistent connections if
2389
// no client failover occurs in the defined timeframe
2390
Long JavaDoc failedControllerId = (Long JavaDoc) controllerIds
2391        .remove(remoteControllerMembership);
2392
2393    if (failedControllerId != null)
2394    {
2395      ControllerFailureCleanupThread controllerFailureCleanupThread = new ControllerFailureCleanupThread(
2396          this, failedControllerId.longValue(), failoverTimeoutInMs,
2397          cleanupThreads, writesFlushed);
2398      cleanupThreads.put(failedControllerId, controllerFailureCleanupThread);
2399      controllerFailureCleanupThread.start();
2400    }
2401
2402    return remoteControllerJmxName;
2403  }
2404
2405  /**
2406   * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#removeVirtualDatabaseUser(org.continuent.sequoia.common.users.VirtualDatabaseUser)
2407   */

2408  private void removeVirtualDatabaseUser(VirtualDatabaseUser vdbUser)
2409  {
2410    try
2411    {
2412      multicastRequestAdapter.multicastMessage(getAllMembers(),
2413          new RemoveVirtualDatabaseUser(vdbUser),
2414          MulticastRequestAdapter.WAIT_NONE, messageTimeouts
2415              .getVirtualDatabaseConfigurationTimeout());
2416    }
2417    catch (NotConnectedException e)
2418    {
2419      logger.error("Channel unavailable while removing vdb user "
2420          + vdbUser.getLogin(), e);
2421      return;
2422    }
2423  }
2424
2425  /**
2426   * Set a cluster-wide checkpoint relying on the implementation of
2427   * SetCheckpoint to atomically set the checkpoint on all controllers. This
2428   * method leaves writes, transactions, and persistent connections suspended.
2429   * The caller must call RequestManager.resumeActivity() when the processing
2430   * associated with this checkpoint is complete.
2431   *
2432   * @param checkpointName the name of the (transfer) checkpoint to create
2433   * @param groupMembers an ArrayList of target Members
2434   * @throws VirtualDatabaseException in case of scheduler or recoveryLog
2435   * exceptions
2436   * @see DisableBackendsAndSetCheckpoint
2437   */

2438  public void setGroupCheckpoint(String JavaDoc checkpointName, ArrayList JavaDoc groupMembers)
2439      throws VirtualDatabaseException
2440  {
2441    try
2442    {
2443      // First suspend transactions
2444
distributedRequestManager.suspendActivity();
2445      getMulticastRequestAdapter().multicastMessage(groupMembers,
2446          new DisableBackendsAndSetCheckpoint(new ArrayList JavaDoc(), checkpointName),
2447          MulticastRequestAdapter.WAIT_ALL,
2448          messageTimeouts.getSetCheckpointTimeout());
2449    }
2450    catch (Exception JavaDoc e)
2451    {
2452      String JavaDoc msg = "Set group checkpoint failed: checkpointName="
2453          + checkpointName;
2454      logger.error(msg, e);
2455      throw new VirtualDatabaseException(msg);
2456    }
2457  }
2458
2459  /**
2460   * Sets an atomic (group-wide) checkpoint on local & target controllers
2461   * (referenced by Name).
2462   *
2463   * @param controllerName the target remote controller
2464   * @return the 'now' checkpoint name.
2465   * @throws VirtualDatabaseException in case of error (whatever error, wraps
2466   * the underlying error)
2467   */

2468  private String JavaDoc setLogReplicationCheckpoint(String JavaDoc controllerName)
2469      throws VirtualDatabaseException
2470  {
2471    return setLogReplicationCheckpoint(getControllerByName(controllerName));
2472  }
2473
2474  /**
2475   * Sets an atomic (group-wide) checkpoint on local & target controllers
2476   * (refrenced by Name). When this call completes sucessfully, all activity on
2477   * the system is suspend. RequestManager.resumeActivity() must be called to
2478   * resume processing.
2479   *
2480   * @param controller the target remote controller
2481   * @return the 'now' checkpoint name.
2482   * @throws VirtualDatabaseException in case of error (whatever error, wraps
2483   * the underlying error)
2484   */

2485  public String JavaDoc setLogReplicationCheckpoint(Member controller)
2486      throws VirtualDatabaseException
2487  {
2488    String JavaDoc checkpointName = buildCheckpointName("now");
2489
2490    // Apply checkpoint to remote controllers
2491
ArrayList JavaDoc dest = new ArrayList JavaDoc();
2492    dest.add(controller);
2493    dest.add(channel.getLocalMembership());
2494    setGroupCheckpoint(checkpointName, dest);
2495    return checkpointName;
2496  }
2497
2498  /**
2499   * Sets an atomic (group-wide) checkpoint on all controllers, indicating that
2500   * this vdb has shutdown.
2501   */

2502  public void setShutdownCheckpoint()
2503  {
2504    // Set a cluster-wide checkpoint
2505
try
2506    {
2507      setGroupCheckpoint(buildCheckpointName("shutdown"), getAllMembers());
2508    }
2509    catch (VirtualDatabaseException e)
2510    {
2511      logger.warn("Error while setting shutdown checkpoint", e);
2512    }
2513    finally
2514    {
2515      if(isShuttingDown())
2516        setRejectingNewTransaction(true);
2517      distributedRequestManager.resumeActivity();
2518    }
2519  }
2520
2521  /**
2522   * Send a Message to a remote controller, referenced by name. This sends a
2523   * point-to-point message, fifo. No total order is specifically required.
2524   *
2525   * @param controllerName name of the remote controller
2526   * @param message the message to send (should be Serializable)
2527   * @param timeout message timeout in ms
2528   * @throws VirtualDatabaseException (wrapping error) in case of communication
2529   * failure
2530   */

2531  private void sendMessageToController(String JavaDoc controllerName,
2532      Serializable JavaDoc message, long timeout) throws VirtualDatabaseException
2533  {
2534    sendMessageToController(getControllerByName(controllerName), message,
2535        timeout);
2536  }
2537
2538  /**
2539   * Send a Message to a remote controller, referenced by its Member. This sends
2540   * a point-to-point message, fifo. No total order is specifically required
2541   * (but enforced anyway).
2542   *
2543   * @param controllerMember Member object refering to the remote controller
2544   * @param message the message to send (should be Serializable)
2545   * @param timeout message timeout in ms
2546   * @return the result returned by the remote controller (except if this an
2547   * exception in which case it is automatically thrown)
2548   * @throws VirtualDatabaseException (wrapping error) in case of communication
2549   * failure
2550   */

2551  public Serializable JavaDoc sendMessageToController(Member controllerMember,
2552      Serializable JavaDoc message, long timeout) throws VirtualDatabaseException
2553  {
2554    try
2555    {
2556      ArrayList JavaDoc dest = new ArrayList JavaDoc();
2557      dest.add(controllerMember);
2558      MulticastResponse resp = getMulticastRequestAdapter().multicastMessage(
2559          dest, message, MulticastRequestAdapter.WAIT_ALL, timeout);
2560      Object JavaDoc o = resp.getResult(controllerMember);
2561      if (o instanceof Exception JavaDoc)
2562        throw (Exception JavaDoc) o;
2563      return (Serializable JavaDoc) o;
2564    }
2565    catch (Exception JavaDoc e)
2566    {
2567      logger.error(e);
2568      throw new VirtualDatabaseException(e);
2569    }
2570  }
2571
2572  /**
2573   * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#shutdown(int)
2574   */

2575  public void shutdown(int level)
2576  {
2577    // Shutdown cleanup threads
2578
for (Iterator JavaDoc iter = cleanupThreads.values().iterator(); iter.hasNext();)
2579    {
2580      ControllerFailureCleanupThread thread = (ControllerFailureCleanupThread) iter
2581          .next();
2582      thread.shutdown();
2583    }
2584
2585    // Shutdown request result failover cache clean-up thread
2586
requestResultFailoverCache.shutdown();
2587
2588    super.shutdown(level);
2589  }
2590
2591  /**
2592   * @see org.continuent.sequoia.common.jmx.mbeans.VirtualDatabaseMBean#transferBackend(java.lang.String,
2593   * java.lang.String)
2594   */

2595  public void transferBackend(String JavaDoc backend, String JavaDoc controllerDestination)
2596      throws VirtualDatabaseException
2597  {
2598    TransferBackendOperation transferOperation = new TransferBackendOperation(
2599        backend, controllerDestination);
2600    addAdminOperation(transferOperation);
2601    try
2602    {
2603      Member targetMember = getControllerByName(controllerDestination);
2604
2605      // Get reference on backend
2606
DatabaseBackend db = getAndCheckBackend(backend, CHECK_BACKEND_DISABLE);
2607      String JavaDoc transfertCheckpointName = buildCheckpointName("transfer backend: "
2608          + db.getName() + " from " + controller.getControllerName() + " to "
2609          + targetMember.getUid());
2610
2611      if (logger.isDebugEnabled())
2612        logger.debug("**** Disabling backend for transfer");
2613
2614      // Disable local backend
2615
try
2616      {
2617        if (!hasRecoveryLog())
2618          throw new VirtualDatabaseException(
2619              "Transfer is not supported on virtual databases without a recovery log");
2620
2621        distributedRequestManager.disableBackendWithCheckpoint(db,
2622            transfertCheckpointName);
2623      }
2624      catch (SQLException JavaDoc e)
2625      {
2626        throw new VirtualDatabaseException(e.getMessage());
2627      }
2628
2629      // Enable remote transfered backend.
2630
try
2631      {
2632        if (logger.isDebugEnabled())
2633          logger.debug("**** Sending transfer message to:" + targetMember);
2634
2635        ArrayList JavaDoc dest = new ArrayList JavaDoc(1);
2636        dest.add(targetMember);
2637
2638        sendMessageToController(targetMember,
2639            new BackendTransfer(controllerDestination, transfertCheckpointName,
2640                new BackendInfo(db)), messageTimeouts
2641                .getBackendTransferTimeout());
2642
2643        if (logger.isDebugEnabled())
2644          logger.debug("**** Removing local backend");
2645
2646        // Remove backend from this controller
2647
removeBackend(db);
2648
2649        // Broadcast updated backend list
2650
broadcastBackendInformation(getAllMemberButUs());
2651      }
2652      catch (Exception JavaDoc e)
2653      {
2654        String JavaDoc msg = "An error occured while transfering the backend";
2655        logger.error(msg, e);
2656        throw new VirtualDatabaseException(msg, e);
2657      }
2658    }
2659    finally
2660    {
2661      removeAdminOperation(transferOperation);
2662    }
2663  }
2664
2665  /**
2666   * @see org.continuent.sequoia.common.jmx.mbeans.VirtualDatabaseMBean#transferDump(java.lang.String,
2667   * java.lang.String, boolean)
2668   */

2669  public void transferDump(String JavaDoc dumpName, String JavaDoc remoteControllerName,
2670      boolean noCopy) throws VirtualDatabaseException
2671  {
2672    TransferDumpOperation transferOperation = new TransferDumpOperation(
2673        dumpName, remoteControllerName);
2674    addAdminOperation(transferOperation);
2675    try
2676    {
2677      // get the info from the backuper
2678
DumpInfo dumpInfo = null;
2679      try
2680      {
2681        dumpInfo = getRecoveryLog().getDumpInfo(dumpName);
2682        /*
2683         * getDumpInfo() is the one that throws SQLException (should it be a
2684         * VirtualDatabaseException instead ???)
2685         */

2686      }
2687      catch (SQLException JavaDoc e)
2688      {
2689        String JavaDoc msg = "getting dump info from backup manager failed";
2690        throw new VirtualDatabaseException(msg, e);
2691      }
2692
2693      if (dumpInfo == null)
2694        throw new VirtualDatabaseException("no dump info for dump '" + dumpName
2695            + "'");
2696
2697      if (remoteControllerName.equals(controller.getJmxName()))
2698        throw new VirtualDatabaseException("Not transfering dump to myself");
2699
2700      // if a copy is needed, hand-off copy to backuper: setup server side of
2701
// the
2702
// copy
2703
DumpTransferInfo dumpTransferInfo = null;
2704      if (!noCopy)
2705      {
2706        try
2707        {
2708          dumpTransferInfo = getRequestManager().getBackupManager()
2709              .getBackuperByFormat(dumpInfo.getDumpFormat()).setupDumpServer();
2710        }
2711        catch (IOException JavaDoc e)
2712        {
2713          throw new VirtualDatabaseException(e);
2714        }
2715      }
2716
2717      // send message to remote vdb instance, to act as a client
2718
// (see handleInitiateDumpCopy)
2719
sendMessageToController(remoteControllerName, new InitiateDumpCopy(
2720          dumpInfo, dumpTransferInfo), messageTimeouts
2721          .getInitiateDumpCopyTimeout());
2722    }
2723    finally
2724    {
2725      removeAdminOperation(transferOperation);
2726    }
2727  }
2728
2729  /**
2730   * If we are executing in a distributed virtual database, we have to make sure
2731   * that we post the query in the queue following the total order. This method
2732   * does not remove the request from the total order queue. You have to call
2733   * removeHeadFromAndNotifyTotalOrderQueue() to do so.
2734   *
2735   * @param request the request to wait for (can be any object but usually a
2736   * DistributedRequest, Commit or Rollback)
2737   * @param errorIfNotFound true if an error message should be logged if the
2738   * request is not found in the total order queue
2739   * @return true if the element was found and wait has succeeded, false
2740   * otherwise
2741   */

2742  public boolean waitForTotalOrder(Object JavaDoc request, boolean errorIfNotFound)
2743  {
2744    synchronized (totalOrderQueue)
2745    {
2746      int index = totalOrderQueue.indexOf(request);
2747      while (index > 0)
2748      {
2749        if (logger.isDebugEnabled())
2750          logger.debug("Waiting for " + index
2751              + " queries to execute (current is " + totalOrderQueue.get(0)
2752              + ")");
2753        try
2754        {
2755          totalOrderQueue.wait();
2756        }
2757        catch (InterruptedException JavaDoc ignore)
2758        {
2759        }
2760        index = totalOrderQueue.indexOf(request);
2761      }
2762      if (index == -1)
2763      {
2764        if (errorIfNotFound)
2765          logger
2766              .error("Request was not found in total order queue, posting out of order ("
2767                  + request + ")");
2768        return false;
2769      }
2770      else
2771        return true;
2772    }
2773  }
2774}
Popular Tags