KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > cjdbc > controller > virtualdatabase > DistributedVirtualDatabase


1 /**
2  * C-JDBC: Clustered JDBC.
3  * Copyright (C) 2002-2005 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Contact: c-jdbc@objectweb.org
6  *
7  * This library is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as published by the
9  * Free Software Foundation; either version 2.1 of the License, or any later
10  * version.
11  *
12  * This library is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15  * for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this library; if not, write to the Free Software Foundation,
19  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
20  *
21  * Initial developer(s): Emmanuel Cecchet.
22  * Contributor(s): Olivier Fambon.
23  */

24
25 package org.objectweb.cjdbc.controller.virtualdatabase;
26
27 import java.io.IOException JavaDoc;
28 import java.io.Serializable JavaDoc;
29 import java.net.URL JavaDoc;
30 import java.sql.SQLException JavaDoc;
31 import java.util.ArrayList JavaDoc;
32 import java.util.Date JavaDoc;
33 import java.util.HashMap JavaDoc;
34 import java.util.Hashtable JavaDoc;
35 import java.util.Iterator JavaDoc;
36 import java.util.Map.Entry;
37
38 import javax.management.NotCompliantMBeanException JavaDoc;
39
40 import org.objectweb.cjdbc.common.exceptions.BackupException;
41 import org.objectweb.cjdbc.common.exceptions.ControllerException;
42 import org.objectweb.cjdbc.common.exceptions.NoMoreBackendException;
43 import org.objectweb.cjdbc.common.exceptions.VirtualDatabaseException;
44 import org.objectweb.cjdbc.common.i18n.Translate;
45 import org.objectweb.cjdbc.common.jmx.JmxException;
46 import org.objectweb.cjdbc.common.jmx.notifications.CjdbcNotificationList;
47 import org.objectweb.cjdbc.common.log.Trace;
48 import org.objectweb.cjdbc.common.shared.BackendInfo;
49 import org.objectweb.cjdbc.common.shared.DumpInfo;
50 import org.objectweb.cjdbc.common.sql.AbstractRequest;
51 import org.objectweb.cjdbc.common.sql.filters.AbstractBlobFilter;
52 import org.objectweb.cjdbc.common.sql.schema.DatabaseSchema;
53 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
54 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
55 import org.objectweb.cjdbc.controller.backup.Backuper;
56 import org.objectweb.cjdbc.controller.backup.DumpTransferInfo;
57 import org.objectweb.cjdbc.controller.core.Controller;
58 import org.objectweb.cjdbc.controller.jmx.MBeanServerManager;
59 import org.objectweb.cjdbc.controller.jmx.RmiConnector;
60 import org.objectweb.cjdbc.controller.recoverylog.RecoveryLog;
61 import org.objectweb.cjdbc.controller.recoverylog.events.LogEntry;
62 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels;
63 import org.objectweb.cjdbc.controller.requestmanager.RequestManager;
64 import org.objectweb.cjdbc.controller.requestmanager.distributed.DistributedRequestManager;
65 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.BackendStatus;
66 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.BackendTransfer;
67 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.CJDBCGroupMessage;
68 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ControllerName;
69 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.CopyLogEntry;
70 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.DisableBackend;
71 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.DistributedRequest;
72 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.DistributedTransactionMarker;
73 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.EnableBackend;
74 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.InitiateDumpCopy;
75 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ReplicateLogEntries;
76 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.SetCheckpoint;
77 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.VirtualDatabaseConfiguration;
78 import org.objectweb.tribe.adapters.MulticastRequestAdapter;
79 import org.objectweb.tribe.adapters.MulticastRequestListener;
80 import org.objectweb.tribe.adapters.MulticastResponse;
81 import org.objectweb.tribe.channel.JGroupsReliableChannelWithGms;
82 import org.objectweb.tribe.channel.ReliableGroupChannelWithGms;
83 import org.objectweb.tribe.common.Address;
84 import org.objectweb.tribe.common.Group;
85 import org.objectweb.tribe.common.GroupIdentifier;
86 import org.objectweb.tribe.common.Member;
87 import org.objectweb.tribe.exceptions.ChannelException;
88 import org.objectweb.tribe.exceptions.NotConnectedException;
89 import org.objectweb.tribe.exceptions.TimeoutException;
90 import org.objectweb.tribe.gms.GroupMembershipListener;
91 import org.objectweb.tribe.gms.JGroupsMembershipService;
92 import org.objectweb.tribe.messages.MessageListener;
93
94 /**
95  * A <code>DistributedVirtualDatabase</code> is a virtual database hosted by
96  * several controllers. Communication between the controllers is achieved with
97  * reliable multicast provided by Javagroups.
98  *
99  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
100  * @version 1.0
101  */

102 public class DistributedVirtualDatabase extends VirtualDatabase
103     implements
104       MessageListener,
105       MulticastRequestListener,
106       GroupMembershipListener
107 {
108   //
109
// How the code is organized ?
110
//
111
// 1. Member variables
112
// 2. Constructor(s)
113
// 3. Request handling
114
// 4. Transaction handling
115
// 5. Database backend management
116
// 6. Distribution management (multicast)
117
// 7. Getter/Setter (possibly in alphabetical order)
118
//
119

120   // Distribution
121

122   /** Group name */
123   private String JavaDoc groupName = null;
124   /** Controller name mapping: Member --> controller JMX name */
125   private Hashtable JavaDoc controllersMap;
126   /** Tribe Member --> <code>ArrayList[BackendInfo]</code> */
127   private Hashtable JavaDoc backendsPerController;
128
129   /** JGroups channel */
130   private ReliableGroupChannelWithGms channel = null;
131   /** MessageDispatcher to communicate with the group */
132   private MulticastRequestAdapter multicastRequestAdapter;
133   private Group currentGroup = null;
134   private ArrayList JavaDoc allMemberButUs = null;
135   private static final long INCOMPATIBLE_CONFIGURATION = -1;
136   private boolean isVirtualDatabaseStarted = false;
137
138   /**
139    * Our view of the request manager, same as super.requestManager, only just
140    * typed properly.
141    */

142   private DistributedRequestManager distributedRequestManager;
143
144   /** Logger for distributed request execution */
145   private static Trace distributedRequestLogger;
146
147   //
148
// Constructors
149
//
150
/**
151    * Creates a new <code>DistributedVirtualDatabase</code> instance.
152    *
153    * @param controller the controller we belong to
154    * @param name the virtual database name
155    * @param groupName the virtual database group name
156    * @param maxConnections maximum number of concurrent connections.
157    * @param pool should we use a pool of threads for handling connections?
158    * @param minThreads minimum number of threads in the pool
159    * @param maxThreads maximum number of threads in the pool
160    * @param maxThreadIdleTime maximum time a thread can remain idle before being
161    * removed from the pool.
162    * @param sqlShortFormLength maximum number of characters of an SQL statement
163    * to display in traces or exceptions
164    * @param blobFilter encoding method for blobs
165    * @exception NotCompliantMBeanException in case the bean does not comply with
166    * jmx
167    * @exception JmxException the bean could not be registered
168    */

169   public DistributedVirtualDatabase(Controller controller, String JavaDoc name,
170       String JavaDoc groupName, int maxConnections, boolean pool, int minThreads,
171       int maxThreads, long maxThreadIdleTime, int sqlShortFormLength,
172       AbstractBlobFilter blobFilter) throws NotCompliantMBeanException JavaDoc,
173       JmxException
174   {
175     super(controller, name, maxConnections, pool, minThreads, maxThreads,
176         maxThreadIdleTime, sqlShortFormLength, blobFilter);
177
178     this.groupName = groupName;
179     backendsPerController = new Hashtable JavaDoc();
180     controllersMap = new Hashtable JavaDoc();
181     isVirtualDatabaseStarted = false;
182     distributedRequestLogger = Trace
183         .getLogger("org.objectweb.cjdbc.controller.distributedvirtualdatabase.request."
184             + name);
185   }
186
187   /**
188    * Disconnect the channel and close it.
189    *
190    * @see java.lang.Object#finalize()
191    */

192   protected void finalize() throws Throwable JavaDoc
193   {
194     quitChannel();
195     super.finalize();
196   }
197
198   /**
199    * @see org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase#addBackend(org.objectweb.cjdbc.controller.backend.DatabaseBackend)
200    */

201   public void addBackend(DatabaseBackend db) throws VirtualDatabaseException
202   {
203     // Add the backend to the virtual database.
204
super.addBackend(db);
205
206     // Send a group message if already joined group
207
try
208     {
209       broadcastBackendInformation(getAllMemberButUs());
210     }
211     catch (Exception JavaDoc e)
212     {
213       String JavaDoc msg = "Error while broadcasting backend information when adding backend";
214       logger.error(msg, e);
215       throw new VirtualDatabaseException(msg, e);
216     }
217   }
218
219   /**
220    * Terminate the multicast request adapter and quit the jgroups channel.
221    *
222    * @throws NotConnectedException if the channel is not connected
223    * @throws ChannelException if an error occurred while closing the channel
224    */

225   public void quitChannel() throws ChannelException, NotConnectedException
226   {
227     if (multicastRequestAdapter != null)
228     {
229       multicastRequestAdapter.stop();
230     }
231     if (channel != null)
232     {
233       channel.close();
234     }
235   }
236
237   /**
238    * Returns the controllerName value.
239    *
240    * @return Returns the controllerName.
241    */

242   public String JavaDoc getControllerName()
243   {
244     return controller.getControllerName();
245   }
246
247   /**
248    * Returns the group name this virtual database belongs to.
249    *
250    * @return a <code>String</code> value. Returns <code>null</code> if this
251    * virtual database is standalone
252    */

253   public String JavaDoc getGroupName()
254   {
255     return groupName;
256   }
257
258   /**
259    * Sets the group name used by the controllers hosting this virtual database.
260    *
261    * @param groupName the group name to set
262    */

263   public void setGroupName(String JavaDoc groupName)
264   {
265     this.groupName = groupName;
266   }
267
268   /**
269    * Sets a new distributed request manager for this database.
270    *
271    * @param requestManager the new request manager.
272    */

273   public void setRequestManager(RequestManager requestManager)
274   {
275     if (!(requestManager instanceof DistributedRequestManager))
276       throw new RuntimeException JavaDoc(
277           "A distributed virtual database can only work with a distributed request manager.");
278
279     distributedRequestManager = (DistributedRequestManager) requestManager;
280     // really, this is super.requestManager
281
this.requestManager = distributedRequestManager;
282   }
283
284   //
285
// Distribution handling
286
//
287

288   /**
289    * Makes this virtual database join a virtual database group. Those groups are
290    * mapped to JavaGroups groups.
291    *
292    * @exception Exception if an error occurs
293    */

294   public void joinGroup() throws Exception JavaDoc
295   {
296     try
297     {
298       // Read the protocol stack configuration from jgroups.xml
299
URL JavaDoc jgroupConfigFile = DistributedVirtualDatabase.class
300           .getResource("/jgroups.xml");
301       if (jgroupConfigFile == null)
302         logger.warn(Translate
303             .get("virtualdatabase.distributed.jgroups.xml.not.found"));
304       else
305         logger.info(Translate.get("virtualdatabase.distributed.jgroups.using",
306             jgroupConfigFile.toString()));
307       JGroupsMembershipService gms = new JGroupsMembershipService(
308           jgroupConfigFile);
309       gms.registerGroupMembershipListener(this);
310       channel = new JGroupsReliableChannelWithGms(gms);
311       if (logger.isDebugEnabled())
312         logger.debug("Group communication channel is configured as follows: "
313             + ((JGroupsReliableChannelWithGms) channel).getProperties());
314
315       // Join the group
316
channel.join(new Group(new GroupIdentifier(groupName)));
317       multicastRequestAdapter = new MulticastRequestAdapter(channel // group
318
// channel
319
, this /* MessageListener */
320           , this /* MulticastRequestListener */
321       );
322
323       // Let the MulticastRequestAdapter thread pump the membership out of the
324
// JGroups channel.
325
Thread.sleep(2000);
326
327       logger.info("Group " + groupName + " connected to "
328           + channel.getLocalMembership());
329
330       // Add ourselves to the list of controllers
331
controllersMap.put(channel.getLocalMembership(), controller.getJmxName());
332
333       // Check if we are alone or not
334
currentGroup = channel.getCurrentGroup();
335       ArrayList JavaDoc currentGroupMembers = currentGroup.getMembers();
336       int groupSize = currentGroupMembers.size();
337       if (groupSize == 1)
338       {
339         logger.info(Translate.get(
340             "virtualdatabase.distributed.configuration.first.in.group",
341             groupName));
342         allMemberButUs = new ArrayList JavaDoc();
343         distributedRequestManager.setControllerId(0);
344         isVirtualDatabaseStarted = true;
345         return;
346       }
347
348       logger.info("Group now contains " + groupSize + " controllers.");
349       if (logger.isDebugEnabled())
350       {
351         logger.debug("Current list of controllers is as follows:");
352         for (Iterator JavaDoc iter = currentGroupMembers.iterator(); iter.hasNext();)
353           logger.debug("Controller " + iter.next());
354       }
355
356       refreshGroupMembership(); // also updates allMemberButUs
357

358       // Check with the other controller that our config is compatible
359
long controllerId = checkConfigurationCompatibilityAndReturnControllerId(getAllMemberButUs());
360       if (controllerId == INCOMPATIBLE_CONFIGURATION)
361       {
362         String JavaDoc msg = Translate
363             .get("virtualdatabase.distributed.configuration.not.compatible");
364         logger.error(msg);
365         throw new ControllerException(msg);
366       }
367       else
368       {
369         // In case several controllers join at the same time they would get the
370
// same highest controller id value and here we discriminate them by
371
// adding their position in the membership. This assumes that the
372
// membership is ordered the same way at all nodes.
373
controllerId += currentGroupMembers.indexOf(channel
374             .getLocalMembership());
375
376         if (logger.isInfoEnabled())
377         {
378           logger.info(Translate
379               .get("virtualdatabase.distributed.configuration.compatible"));
380           logger.info("Controller identifier is set to: " + controllerId);
381         }
382         // Set the controller Id
383
distributedRequestManager.setControllerId(controllerId);
384       }
385
386       // Distribute backends among controllers knowing that at this point
387
// there is no conflict on the backend distribution policies.
388
broadcastBackendInformation(getAllMemberButUs());
389     }
390     catch (Exception JavaDoc e)
391     {
392       String JavaDoc msg = Translate.get("virtualdatabase.distributed.joingroup.error",
393           groupName);
394       if (e instanceof RuntimeException JavaDoc)
395         logger.error(msg, e);
396       throw new Exception JavaDoc(msg + " (" + e + ")");
397     }
398     isVirtualDatabaseStarted = true;
399   }
400
401   /**
402    * Synchronized access to current group members.
403    *
404    * @return a clone of the list of all members (never null).
405    */

406   public ArrayList JavaDoc getAllMembers()
407   {
408     synchronized (controllersMap)
409     {
410       if (currentGroup == null) // this happens if we did not #joinGroup()
411
return new ArrayList JavaDoc();
412       return ((ArrayList JavaDoc) currentGroup.getMembers().clone());
413     }
414   }
415
416   /**
417    * Returns the list of all members in the group except us. Consider the value
418    * read-only (do not alter).
419    *
420    * @return the allMembersButUs field (never null).
421    */

422   public ArrayList JavaDoc getAllMemberButUs()
423   {
424     synchronized (controllersMap)
425     {
426       if (allMemberButUs == null) // this happens if we did not #joinGroup()
427
return new ArrayList JavaDoc();
428
429       /**
430        * This synchronized block might seem loussy, but actually it's enough, as
431        * long as no caller alters the returned value: field allMembersButUs is
432        * replaced (as opposed to updated) by refreshGroupMembership(). So
433        * someone who has called this lives with a (possibly) outdated list, but,
434        * still, with a safe list (never updated concurently by vdb threads). If
435        * clients/callers are not trusted to leave the returned value un-touched,
436        * use a clone
437        */

438       return allMemberButUs;
439     }
440   }
441
442   /**
443    * Get the group channel used for group communications
444    *
445    * @return a <code>JChannel</code>
446    */

447   public ReliableGroupChannelWithGms getChannel()
448   {
449     return channel;
450   }
451
452   /**
453    * Returns the currentGroup value.
454    *
455    * @return Returns the currentGroup.
456    */

457   public Group getCurrentGroup()
458   {
459     return currentGroup;
460   }
461
462   /**
463    * Return the group communication multicast request adapter.
464    *
465    * @return the group communication multicast request adapter
466    */

467   public MulticastRequestAdapter getMulticastRequestAdapter()
468   {
469     return multicastRequestAdapter;
470   }
471
472   /**
473    * Send the configuration of this controller to remote controller. All remote
474    * controllers must agree on the compatibility of the local controller
475    * configuration with their own configuration. Compatibility checking include
476    * Authentication Manager, Scheduler and Load Balancer settings.
477    *
478    * @param dest List of <code>Address</code> to send the message to
479    * @return INCOMPATIBLE_CONFIGURATION if the configuration is not compatible
480    * with other controllers or the controller id to use otherwise.
481    */

482   private long checkConfigurationCompatibilityAndReturnControllerId(
483       ArrayList JavaDoc dest)
484   {
485     if (logger.isInfoEnabled())
486       logger.info(Translate
487           .get("virtualdatabase.distributed.configuration.checking"));
488
489     // Send our configuration
490
MulticastResponse rspList;
491     try
492     {
493       rspList = multicastRequestAdapter.multicastMessage(dest,
494           new VirtualDatabaseConfiguration(this),
495           MulticastRequestAdapter.WAIT_ALL,
496           CJDBCGroupMessage.defaultCastTimeOut);
497     }
498     catch (TimeoutException e)
499     {
500       logger.error(
501           "Timeout occured while checking configuration compatibility", e);
502       return INCOMPATIBLE_CONFIGURATION;
503     }
504     catch (ChannelException e)
505     {
506       logger
507           .error(
508               "Communication error occured while checking configuration compatibility",
509               e);
510       return INCOMPATIBLE_CONFIGURATION;
511     }
512     catch (NotConnectedException e)
513     {
514       logger.error(
515           "Channel unavailable while checking configuration compatibility", e);
516       return INCOMPATIBLE_CONFIGURATION;
517     }
518
519     // Check that everybody agreed
520
HashMap JavaDoc results = rspList.getResults();
521     int size = results.size();
522     if (size == 0)
523       logger.warn(Translate
524           .get("virtualdatabase.distributed.configuration.checking.noanswer"));
525
526     long highestRemoteControllerId = 0;
527     for (Iterator JavaDoc iter = results.values().iterator(); iter.hasNext();)
528     {
529       Object JavaDoc response = iter.next();
530       if (response instanceof Long JavaDoc)
531       {
532         // These highestRemotecontrollerId and remoteControllerId are returned
533
// directly by the remote controller, and are 'thus' of 'shifted
534
// nature': effective bits = upper 16 bits. See
535
// DistributedRequestManager.CONTROLLER_ID_BITS
536
long remoteControllerId = ((Long JavaDoc) response).longValue();
537         if (remoteControllerId == INCOMPATIBLE_CONFIGURATION)
538           return INCOMPATIBLE_CONFIGURATION;
539         else if (highestRemoteControllerId < remoteControllerId)
540           highestRemoteControllerId = remoteControllerId;
541       }
542       else
543       {
544         logger
545             .error("Unexpected response while checking configuration compatibility: "
546                 + response);
547         return INCOMPATIBLE_CONFIGURATION;
548       }
549     }
550
551     // Ok, everybody agreed that our configuration is compatible.
552
// Take the highest controller id + 1 as our id. (non-shifted, this is used
553
// to pass in setControllerId which expects 16 bits)
554
return ((highestRemoteControllerId >> DistributedRequestManager.CONTROLLER_ID_SHIFT_BITS) & DistributedRequestManager.CONTROLLER_ID_BITS) + 1;
555   }
556
557   /**
558    * Broadcast backend information among controllers.
559    *
560    * @param dest List of <code>Address</code> to send the message to
561    * @throws NotConnectedException if the channel is not connected
562    * @throws ChannelException if the channel reported an error
563    * @throws TimeoutException if a timeout occured
564    */

565   private void broadcastBackendInformation(ArrayList JavaDoc dest)
566       throws TimeoutException, ChannelException, NotConnectedException
567   {
568     logger
569         .debug(Translate
570             .get("virtualdatabase.distributed.configuration.querying.remote.status"));
571
572     // Send our backend status
573
MulticastResponse rspList = multicastRequestAdapter.multicastMessage(dest,
574         new BackendStatus(getBackendsInfo(backends)),
575         MulticastRequestAdapter.WAIT_ALL, CJDBCGroupMessage.defaultCastTimeOut);
576
577     int size = dest.size();
578     for (int i = 0; i < size; i++)
579     {
580       // Add the backend configuration of every remote controller
581
Member m = (Member) dest.get(i);
582       if (rspList.getResult(m) != null)
583       {
584         if (logger.isDebugEnabled())
585           logger
586               .debug(Translate
587                   .get(
588                       "virtualdatabase.distributed.configuration.updating.backend.list",
589                       m.toString()));
590       }
591       else
592         logger.warn(Translate.get(
593             "virtualdatabase.distributed.unable.get.remote.status", m
594                 .toString()));
595     }
596   }
597
598   /**
599    * Check if the given backend definition is compatible with the backend
600    * definitions of this distributed virtual database. Not that if the given
601    * backend does not exist in the current configuration, it is considered as
602    * compatible. Incompatibility results from 2 backends with the same JDBC URL.
603    *
604    * @param backend the backend to check
605    * @return true if the backend is compatible with the local definition
606    * @throws VirtualDatabaseException if locking the local backend list fails
607    */

608   public boolean isCompatibleBackend(BackendInfo backend)
609       throws VirtualDatabaseException
610   {
611     try
612     {
613       acquireReadLockBackendLists();
614     }
615     catch (InterruptedException JavaDoc e)
616     {
617       String JavaDoc msg = "Unable to acquire read lock on backend list in isCompatibleBackend ("
618           + e + ")";
619       logger.error(msg);
620       throw new VirtualDatabaseException(msg);
621     }
622
623     try
624     {
625       // Find the backend
626
String JavaDoc backendURL = backend.getUrl();
627       int size = backends.size();
628       DatabaseBackend b = null;
629       for (int i = 0; i < size; i++)
630       {
631         b = (DatabaseBackend) backends.get(i);
632         if (b.getURL().equals(backendURL))
633           return false;
634       }
635     }
636     catch (RuntimeException JavaDoc re)
637     {
638       throw new VirtualDatabaseException(re);
639     }
640     finally
641     {
642       releaseReadLockBackendLists();
643     }
644     // This backend does not exist here
645
return true;
646   }
647
648   /**
649    * Return true if the provided schema is compatible with the existing schema
650    * of this distributed virtual database. Note that if the given schema is
651    * null, this function returns true.
652    *
653    * @param dbs the database schema to compare with
654    * @return true if dbs is compatible with the current schema (according to
655    * RAIDb level)
656    */

657   public boolean isCompatibleDatabaseSchema(DatabaseSchema dbs)
658   {
659     // Database schema checking (if any)
660
if (dbs == null)
661     {
662       logger.warn(Translate
663           .get("virtualdatabase.distributed.configuration.checking.noschema"));
664     }
665     else
666     {
667       // Check database schemas compatibility
668
switch (getRequestManager().getLoadBalancer().getRAIDbLevel())
669       {
670         case RAIDbLevels.RAIDb0 :
671           // There must be no overlap between schemas
672
if (dbs.equals(getRequestManager().getDatabaseSchema()))
673           {
674             logger
675                 .warn(Translate
676                     .get("virtualdatabase.distributed.configuration.checking.mismatch.databaseschema"));
677             return false;
678           }
679           break;
680         case RAIDbLevels.RAIDb1 :
681           // Schemas must be identical
682
if (!dbs.equals(getRequestManager().getDatabaseSchema()))
683           {
684             logger
685                 .warn(Translate
686                     .get("virtualdatabase.distributed.configuration.checking.mismatch.databaseschema"));
687             return false;
688           }
689           break;
690         case RAIDbLevels.RAIDb2 :
691           // Common parts of the schema must be identical
692
if (!dbs.isCompatibleWith(getRequestManager().getDatabaseSchema()))
693           {
694             logger
695                 .warn(Translate
696                     .get("virtualdatabase.distributed.configuration.checking.mismatch.databaseschema"));
697             return false;
698           }
699           break;
700         case RAIDbLevels.SingleDB :
701         default :
702           logger.error("Unsupported RAIDb level: "
703               + getRequestManager().getLoadBalancer().getRAIDbLevel());
704           return false;
705       }
706     }
707     return true;
708   }
709
710   //
711
// Message dispatcher request handling
712
//
713

714   /**
715    * @see org.objectweb.tribe.messages.MessageListener#receive(java.io.Serializable)
716    */

717   public void receive(Serializable JavaDoc msg)
718   {
719     logger.error("Distributed virtual database received unhandled message: "
720         + msg);
721   }
722
723   /**
724    * This method handle the scheduling part of the queries to be sure that the
725    * query is scheduled in total order before letting other queries to execute.
726    *
727    * @see org.objectweb.tribe.adapters.MulticastRequestListener#handleMessageSingleThreaded(java.io.Serializable,
728    * org.objectweb.tribe.common.Member)
729    */

730   public Object JavaDoc handleMessageSingleThreaded(Serializable JavaDoc msg, Member sender)
731   {
732     try
733     {
734       if (msg != null)
735       {
736         if (logger.isDebugEnabled())
737           logger.debug("handleMessageSingleThreaded (" + msg.getClass() + "): "
738               + msg);
739
740         // This has to be executed in total order
741
if (msg instanceof DistributedRequest)
742         { // Distributed request execution
743
if (!isVirtualDatabaseStarted)
744             return null; // Not ready yet to handle requests
745

746           if (logger.isDebugEnabled())
747             logger.debug(getControllerName()
748                 + ": Scheduling distributedRequest "
749                 + ((DistributedRequest) msg).getRequest().getId() + " from "
750                 + sender);
751
752           if (distributedRequestLogger.isInfoEnabled())
753           {
754             AbstractRequest request = ((DistributedRequest) msg).getRequest();
755             distributedRequestLogger.info((request.isSelect() ? "S " : "W ")
756                 + request.getTransactionId() + " " + request.getSQL());
757           }
758           ((DistributedRequest) msg).scheduleRequest(distributedRequestManager);
759           return null;
760         }
761         if (msg instanceof DistributedTransactionMarker)
762         { // Distributed commit/rollback/savepoint
763
if (!isVirtualDatabaseStarted)
764             return null; // Not ready yet to handle requests
765

766           if (logger.isDebugEnabled())
767             logger.debug(getControllerName() + ": Scheduling " + msg + " from "
768                 + sender);
769
770           if (distributedRequestLogger.isInfoEnabled())
771             distributedRequestLogger.info(msg);
772
773           ((DistributedTransactionMarker) msg)
774               .scheduleCommand(distributedRequestManager);
775           return null;
776         }
777         if (msg instanceof SetCheckpoint)
778           setCheckpoint(((SetCheckpoint) msg).getCheckpointName());
779         else if (msg instanceof ReplicateLogEntries)
780           handleReplicateLogEntries((ReplicateLogEntries) msg);
781         else if (msg instanceof CopyLogEntry)
782           handleCopyLogEntry((CopyLogEntry) msg);
783         // Other message types will be handled in multithreaded handler
784
}
785       else
786       {
787         String JavaDoc errorMsg = "Invalid null message";
788         logger.error(errorMsg);
789         return new ControllerException(errorMsg);
790       }
791
792       return null;
793     }
794     catch (Exception JavaDoc e)
795     {
796       if (e instanceof RuntimeException JavaDoc)
797         logger.warn("Error while handling group message:" + msg.getClass(), e);
798       return e;
799     }
800   }
801
802   /**
803    * @see org.objectweb.tribe.adapters.MulticastRequestListener#handleMessageMultiThreaded(Serializable,
804    * Member, Object)
805    */

806   public Serializable JavaDoc handleMessageMultiThreaded(Serializable JavaDoc msg,
807       Member sender, Object JavaDoc handleMessageSingleThreadedResult)
808   {
809     try
810     {
811       if (msg != null)
812       {
813         if (logger.isDebugEnabled())
814           logger.debug("handleMessageMultiThreaded (" + msg.getClass() + "): "
815               + msg);
816         if (msg instanceof SetCheckpoint)
817         {
818           return null; // ignore, handled in the single-threaded stage.
819
}
820         if (msg instanceof DistributedRequest)
821         { // Distributed request execution
822
if (!isVirtualDatabaseStarted)
823             return new NoMoreBackendException(
824                 "Controller is in intialization phase");
825
826           if (logger.isDebugEnabled())
827             logger.debug(getControllerName()
828                 + ": Executing distributedRequest "
829                 + ((DistributedRequest) msg).getRequest().getId() + " from "
830                 + sender);
831           Serializable JavaDoc result = ((Serializable JavaDoc) ((DistributedRequest) msg)
832               .executeScheduledRequest(distributedRequestManager));
833           return result;
834         }
835         if (msg instanceof DistributedTransactionMarker)
836         { // Distributed commit/rollback/savepoint execution
837
if (!isVirtualDatabaseStarted)
838             return new NoMoreBackendException(
839                 "Controller is in intialization phase");
840
841           if (logger.isDebugEnabled())
842             logger.debug(getControllerName() + ": " + msg + " from " + sender);
843           return ((Serializable JavaDoc) ((DistributedTransactionMarker) msg)
844               .executeCommand(distributedRequestManager));
845         }
846         if (msg instanceof ControllerName)
847         {
848           addRemoteController(sender, ((ControllerName) msg).getJmxName());
849           return Boolean.TRUE;
850         }
851         else if (msg instanceof VirtualDatabaseConfiguration)
852         { // Check if given configuration is compatible with the local one
853
return handleVirtualDatabaseConfiguration(
854               (VirtualDatabaseConfiguration) msg, sender);
855         }
856         else if (msg instanceof BackendTransfer)
857         { // Receive a backend and enable it
858
return handleBackendTransfer((BackendTransfer) msg);
859         }
860         else if (msg instanceof BackendStatus)
861           handleBackendStatus((BackendStatus) msg, sender);
862         else if (msg instanceof EnableBackend)
863           handleEnableBackend((EnableBackend) msg, sender);
864         else if (msg instanceof DisableBackend)
865           handleDisableBackend((DisableBackend) msg, sender);
866         else if (msg instanceof ReplicateLogEntries)
867           return null; // handled in single-threaded
868
else if (msg instanceof CopyLogEntry)
869           return null; // handled in single-threaded
870
else if (msg instanceof InitiateDumpCopy)
871           handleInitiateDumpCopy((InitiateDumpCopy) msg);
872         else
873           logger.warn("Unhandled message type received: " + msg.getClass()
874               + "(" + msg + ")");
875       }
876       else
877       {
878         String JavaDoc errorMsg = "Invalid null message";
879         logger.error(errorMsg);
880         return new ControllerException(errorMsg);
881       }
882       return null;
883     }
884     catch (Exception JavaDoc e)
885     {
886       if (e instanceof RuntimeException JavaDoc)
887         logger.warn("Error while handling group message:" + msg.getClass(), e);
888       return e;
889     }
890     finally
891     {
892       if (msg != null)
893       {
894         // Just in case something bad happen and the request was not properly
895
// removed from the queue.
896
if (msg instanceof DistributedRequest)
897         {
898           synchronized (totalOrderQueue)
899           {
900             if (totalOrderQueue.remove(((DistributedRequest) msg).getRequest()))
901             {
902               logger.warn("Distributed request "
903                   + ((DistributedRequest) msg).getRequest()
904                   + " did not remove itself from the total order queue");
905               totalOrderQueue.notifyAll();
906             }
907           }
908         }
909         else if (msg instanceof DistributedTransactionMarker)
910         {
911           synchronized (totalOrderQueue)
912           {
913             if (totalOrderQueue.remove(msg))
914             {
915               logger.warn("Distributed " + msg.toString() + " did not remove "
916                   + "itself from the total order queue");
917               totalOrderQueue.notifyAll();
918             }
919           }
920         }
921       }
922     }
923   }
924
925   private RecoveryLog getRecoveryLog() throws VirtualDatabaseException
926   {
927     if (!hasRecoveryLog())
928       throw new VirtualDatabaseException(Translate
929           .get("virtualdatabase.no.recovery.log"));
930
931     return getRequestManager().getRecoveryLog();
932   }
933
934   private void handleInitiateDumpCopy(InitiateDumpCopy msg)
935       throws VirtualDatabaseException, BackupException, IOException JavaDoc
936   {
937     // hand-off copy to backuper, if a copy is required
938
if (msg.getDumpTransferInfo() != null)
939     {
940       Backuper backuper = getRequestManager().getBackupManager()
941           .getBackuperByFormat(msg.getDumpInfo().getDumpFormat());
942
943       backuper.fetchDump(msg.getDumpTransferInfo(), msg.getDumpInfo()
944           .getDumpPath(), msg.getDumpInfo().getDumpName());
945     }
946
947     // update local recovery log dump tables
948
getRecoveryLog().setDumpInfo(msg.getDumpInfo());
949   }
950
951   /**
952    * Handle CopyLogEntry messages.
953    *
954    * @param msg the CopyLogEntry message.
955    */

956   private void handleCopyLogEntry(CopyLogEntry msg)
957   {
958     if (!hasRecoveryLog())
959     {
960       logger.warn("Tentative handleCopyLogEntry on vdb with no recovery log");
961       return;
962     }
963
964     getRequestManager().getRecoveryLog().logLogEntry(msg.getEntry());
965   }
966
967   /**
968    * Handle ReplicateLogEntries messages. There are two cases: initialization
969    * and termination of the replication process. Initialization asynchronously
970    * and ato,ically wipes-out the recovery log entries before the 'now'
971    * checkpointand. Termination sets the dump checkpoint name, thus making it
972    * available so that a recovery can be executed.
973    *
974    * @param msg the ReplicateLogEntries message
975    * @throws SQLException in case of error when resetting the recovery log.
976    */

977   private void handleReplicateLogEntries(ReplicateLogEntries msg)
978       throws SQLException JavaDoc
979   {
980     if (!hasRecoveryLog())
981     {
982       logger
983           .warn("Tentative handleReplicateLogEntries on vdb with no recovery log");
984       return;
985     }
986
987     if (msg.getDumpCheckpointName() != null)
988     { // Termination
989
getRequestManager().getRecoveryLog().storeDumpCheckpointName(
990           msg.getDumpCheckpointName(), msg.getCheckpointId());
991     }
992     else
993     { // Initialization
994
getRequestManager().getRecoveryLog().resetLogTableIdAndDeleteRecoveryLog(
995           msg.getCheckpointName(), msg.getCheckpointId());
996     }
997   }
998
999   /**
1000   * Handles BackendStatus messages.
1001   *
1002   * @param msg the BackendStatus message
1003   * @param sender the message sender
1004   */

1005  private void handleBackendStatus(BackendStatus msg, Member sender)
1006  {
1007    // Update backend list from sender
1008
ArrayList JavaDoc remoteBackendInfoList = msg.getBackends();
1009    // Convert BackendInfo arraylist to real DatabaseBackend objects
1010
ArrayList JavaDoc remoteBackendList = new ArrayList JavaDoc(remoteBackendInfoList.size());
1011    for (Iterator JavaDoc iter = remoteBackendInfoList.iterator(); iter.hasNext();)
1012    {
1013      BackendInfo info = (BackendInfo) iter.next();
1014      remoteBackendList.add(info.getDatabaseBackend());
1015    }
1016    backendsPerController.put(sender, remoteBackendList);
1017    if (logger.isInfoEnabled())
1018      logger.info(Translate.get(
1019          "virtualdatabase.distributed.configuration.updating.backend.list",
1020          sender));
1021  }
1022
1023  /**
1024   * Handles BackendTransfer messages.
1025   *
1026   * @param msg the BackendTransfer message
1027   * @return true on success
1028   * @throws NotCompliantMBeanException if an error occurs while creating the
1029   * DatabaseBackend
1030   * @throws VirtualDatabaseException if an error occurs while adding the new
1031   * backend
1032   */

1033  private Serializable JavaDoc handleBackendTransfer(BackendTransfer msg)
1034      throws NotCompliantMBeanException JavaDoc, VirtualDatabaseException
1035  {
1036    if (logger.isInfoEnabled())
1037      logger.info(getControllerName()
1038          + ": Received transfer command. Checkpoint: "
1039          + msg.getCheckpointName());
1040    BackendInfo info = msg.getInfo();
1041    DatabaseBackend backend = new DatabaseBackend(info);
1042    this.addBackend(backend);
1043    if (logger.isInfoEnabled())
1044      logger.info(getControllerName() + ": Enable backend");
1045    enableBackendFromCheckpoint(backend.getName(), msg.getCheckpointName());
1046    return Boolean.TRUE;
1047  }
1048
1049  /**
1050   * Handles DisableBackend messages.
1051   *
1052   * @param msg the DisableBackend message
1053   * @param sender the message sender
1054   */

1055  private void handleDisableBackend(DisableBackend msg, Member sender)
1056  {
1057    ArrayList JavaDoc remoteBackendList = (ArrayList JavaDoc) backendsPerController.get(sender);
1058    if (remoteBackendList == null)
1059    { // This case was reported by Alessandro Gamboz on April 1, 2005.
1060
// It looks like the EnableBackend message arrives before membership
1061
// has been properly updated.
1062
logger.warn("No information has been found for remote controller "
1063          + sender);
1064      remoteBackendList = new ArrayList JavaDoc();
1065      backendsPerController.put(sender, remoteBackendList);
1066    }
1067    DatabaseBackend disabledBackend = msg.getDatabaseBackend();
1068    int size = remoteBackendList.size();
1069    boolean backendFound = false;
1070    for (int i = 0; i < size; i++)
1071    {
1072      DatabaseBackend b = (DatabaseBackend) remoteBackendList.get(i);
1073      if (b.equals(disabledBackend))
1074      {
1075        logger.info("Backend " + b.getName() + " disabled on controller "
1076            + sender);
1077        remoteBackendList.set(i, disabledBackend);
1078        backendFound = true;
1079        break;
1080      }
1081    }
1082    if (!backendFound)
1083    {
1084      logger.warn("Updating backend list with unknown backend "
1085          + disabledBackend.getName() + " disabled on controller " + sender);
1086      remoteBackendList.add(disabledBackend);
1087    }
1088  }
1089
1090  /**
1091   * Handles EnableBackend messages.
1092   *
1093   * @param msg the EnableBackend message
1094   * @param sender the message sender
1095   */

1096  private void handleEnableBackend(EnableBackend msg, Member sender)
1097  {
1098    ArrayList JavaDoc remoteBackendList = (ArrayList JavaDoc) backendsPerController.get(sender);
1099    if (remoteBackendList == null)
1100    { // This case was reported by Alessandro Gamboz on April 1, 2005.
1101
// It looks like the EnableBackend message arrives before membership
1102
// has been properly updated.
1103
logger.warn("No information has been found for remote controller "
1104          + sender);
1105      remoteBackendList = new ArrayList JavaDoc();
1106      backendsPerController.put(sender, remoteBackendList);
1107    }
1108    DatabaseBackend enabledBackend = msg.getDatabaseBackend();
1109    int size = remoteBackendList.size();
1110    boolean backendFound = false;
1111    for (int i = 0; i < size; i++)
1112    {
1113      DatabaseBackend b = (DatabaseBackend) remoteBackendList.get(i);
1114      if (b.equals(enabledBackend))
1115      {
1116        logger.info("Backend " + b.getName() + " enabled on controller "
1117            + sender);
1118        remoteBackendList.set(i, enabledBackend);
1119        backendFound = true;
1120        break;
1121      }
1122    }
1123    if (!backendFound)
1124    {
1125      logger.warn("Updating backend list with unknown backend "
1126          + enabledBackend.getName() + " enabled on controller " + sender);
1127      remoteBackendList.add(enabledBackend);
1128    }
1129  }
1130
1131  /**
1132   * Handles VirtualDatabaseConfiguration messages.
1133   *
1134   * @param msg the VirtualDatabaseConfiguration message
1135   * @param sender the membership of the message sender
1136   * @return a long, controllerId on success, INCOMPATIBLE_CONFIGURATION on
1137   * error
1138   * @throws TimeoutException if a group communication timeout occurs
1139   * @throws ChannelException if the group communication fails
1140   * @throws NotConnectedException if the group communication channel is not
1141   * conected
1142   */

1143  private Serializable JavaDoc handleVirtualDatabaseConfiguration(
1144      VirtualDatabaseConfiguration msg, Member sender) throws TimeoutException,
1145      ChannelException, NotConnectedException
1146  {
1147
1148    if (logger.isInfoEnabled())
1149      logger.info("Checking virtual database configuration from "
1150          + msg.getControllerName());
1151
1152    if (!isLocalSender(sender) && !msg.isCompatible(this))
1153      // Failure
1154
return new Long JavaDoc(INCOMPATIBLE_CONFIGURATION);
1155
1156    // Send JMX notification
1157
if (MBeanServerManager.isJmxEnabled())
1158    {
1159      Hashtable JavaDoc data = new Hashtable JavaDoc();
1160      data.put("controllerName", msg.getControllerName());
1161      data.put("rmiconnector", new String JavaDoc[]{msg.getRmiHostname(),
1162          msg.getRmiPort()});
1163      RmiConnector.broadcastNotification(this,
1164          CjdbcNotificationList.DISTRIBUTED_CONTROLLER_ADDED,
1165          CjdbcNotificationList.NOTIFICATION_LEVEL_INFO, Translate.get(
1166              "notification.distributed.controller.added", this
1167                  .getVirtualDatabaseName()), data);
1168    }
1169
1170    addRemoteController(sender, msg.getControllerJmxName());
1171
1172    // Send controller name to new comer
1173
if (logger.isDebugEnabled())
1174      logger.debug("Sending local controller name to joining controller ("
1175          + sender + ")");
1176    ArrayList JavaDoc target = new ArrayList JavaDoc();
1177    target.add(sender);
1178    multicastRequestAdapter.multicastMessage(target, new ControllerName(
1179        controller.getControllerName(), controller.getJmxName()),
1180        MulticastRequestAdapter.WAIT_ALL, CJDBCGroupMessage.defaultCastTimeOut);
1181
1182    // Send backend status
1183
if (logger.isDebugEnabled())
1184      logger.debug("Sending backend status name to joining controller ("
1185          + sender + ")");
1186    multicastRequestAdapter.multicastMessage(target, new BackendStatus(
1187        getBackendsInfo(backends)), MulticastRequestAdapter.WAIT_ALL,
1188        CJDBCGroupMessage.defaultCastTimeOut);
1189
1190    if (logger.isInfoEnabled())
1191      logger.info("Controller " + msg.getControllerName()
1192          + " is compatible with the local configuration");
1193    return new Long JavaDoc(distributedRequestManager.getControllerId());
1194  }
1195
1196  /**
1197   * Returns true if the given member is ourselves.
1198   *
1199   * @param sender the sender
1200   * @return true if we are the sender, false otherwise
1201   */

1202  private boolean isLocalSender(Member sender)
1203  {
1204    return channel.getLocalMembership().equals(sender);
1205  }
1206
1207  /**
1208   * Get the status of all remote controllers
1209   *
1210   * @throws NotConnectedException if the channel is not connected
1211   * @throws ChannelException if the channel reported an error
1212   * @throws TimeoutException if a timeout occured
1213   */

1214  public void getBackendStatus() throws TimeoutException, ChannelException,
1215      NotConnectedException
1216  {
1217    if (logger.isDebugEnabled())
1218      logger.debug("Requesting remote controllers status");
1219    MulticastResponse rspList = multicastRequestAdapter.multicastMessage(null,
1220        new BackendStatus(getBackendsInfo(backends)),
1221        MulticastRequestAdapter.WAIT_ALL, CJDBCGroupMessage.defaultCastTimeOut);
1222
1223    HashMap JavaDoc results = rspList.getResults();
1224    for (Iterator JavaDoc iter = results.values().iterator(); iter.hasNext();)
1225    {
1226      ArrayList JavaDoc b = (ArrayList JavaDoc) iter.next();
1227      int bSize = b.size();
1228      if (bSize == 0)
1229        logger.debug("No Database backends");
1230      else
1231        for (int j = 0; j < bSize; j++)
1232          logger.debug(((DatabaseBackend) b.get(j)).getXml());
1233    }
1234  }
1235
1236  //
1237
// Group Membership Management
1238
//
1239

1240  /**
1241   * Add a new controller name to the controllerMap list and refresh the group
1242   * membership.
1243   *
1244   * @param remoteControllerMembership the membership identifying the remote
1245   * controller
1246   * @param remoteControllerJmxName the JMX name of the remote controller
1247   */

1248  public void addRemoteController(Member remoteControllerMembership,
1249      String JavaDoc remoteControllerJmxName)
1250  {
1251    controllersMap.put(remoteControllerMembership, remoteControllerJmxName);
1252    if (logger.isDebugEnabled())
1253      logger.debug("Adding new controller " + remoteControllerJmxName);
1254    refreshGroupMembership();
1255  }
1256
1257  /**
1258   * Remove a remote controller (usually because it has failed) from the
1259   * controllerMap list and refresh the group membership.
1260   *
1261   * @param remoteControllerMembership the membership identifying the remote
1262   * controller
1263   * @return the JMX name of the removed controller (or null if this controller
1264   * was not in the list)
1265   */

1266  public String JavaDoc removeRemoteController(Member remoteControllerMembership)
1267  {
1268    String JavaDoc remoteControllerJmxName = (String JavaDoc) controllersMap
1269        .remove(remoteControllerMembership);
1270    if (logger.isDebugEnabled())
1271      logger.debug("Removing controller " + remoteControllerJmxName);
1272
1273    // Remove the list of remote backends since they are no more reachable
1274
backendsPerController.remove(remoteControllerMembership);
1275    refreshGroupMembership();
1276    return remoteControllerJmxName;
1277  }
1278
1279  /*
1280   * Set a raw checkpoint in local recoveryLog (if any, or yell) to reflect
1281   * group-comm events. Local, raw operation: this is not a cluster-wide,
1282   * synchronous checkpoint (although group-comm event message is sent to every
1283   * member of the group). Intended to be used by external recovery schemes, or
1284   * as debugging tags.
1285   */

1286  private void setEventCheckpoint(String JavaDoc name, Member m, GroupIdentifier gid)
1287  {
1288    if (!hasRecoveryLog())
1289    {
1290      logger.warn("Trying to set checkpoint with no recoveryLog. Ignored.");
1291      return;
1292    }
1293
1294    try
1295    {
1296      getRequestManager().getRecoveryLog().storeCheckpoint(
1297          (name + " member:" + m + " gid:" + gid + " " + new Date JavaDoc())
1298              .replaceAll(" ", "_"));
1299    }
1300    catch (SQLException JavaDoc e)
1301    {
1302      logger.warn("setEventCheckpoint: " + name, e);
1303    }
1304  }
1305
1306  /**
1307   * @see org.objectweb.tribe.gms.GroupMembershipListener#joinMember(org.objectweb.tribe.common.Member,
1308   * org.objectweb.tribe.common.GroupIdentifier)
1309   */

1310  public void joinMember(Member m, GroupIdentifier gid)
1311  {
1312    if (hasRecoveryLog())
1313      setEventCheckpoint("joining", m, gid);
1314  }
1315
1316  /**
1317   * @see org.objectweb.tribe.gms.GroupMembershipListener#quitMember(org.objectweb.tribe.common.Member,
1318   * org.objectweb.tribe.common.GroupIdentifier)
1319   */

1320  public void quitMember(Member m, GroupIdentifier gid)
1321  {
1322    if (hasRecoveryLog())
1323      setEventCheckpoint("quitting", m, gid);
1324
1325    // Notify adapter that we do not expect responses anymore from this member
1326
int failures = multicastRequestAdapter.memberFailsOnAllReplies(m);
1327    logger.info(failures + " requests were waiting responses from " + m);
1328
1329    // Remove controller from list and notify JMX listeners
1330
String JavaDoc remoteControllerName = removeRemoteController(m);
1331    if (remoteControllerName != null)
1332    {
1333      logger.warn("Controller " + m + " has left the cluster.");
1334      // Send JMX notification
1335
if (MBeanServerManager.isJmxEnabled())
1336      {
1337        Hashtable JavaDoc data = new Hashtable JavaDoc();
1338        data.put("controllerName", remoteControllerName);
1339        RmiConnector.broadcastNotification(this,
1340            CjdbcNotificationList.DISTRIBUTED_CONTROLLER_REMOVED,
1341            CjdbcNotificationList.NOTIFICATION_LEVEL_INFO, Translate.get(
1342                "notification.distributed.controller.removed", this
1343                    .getVirtualDatabaseName()), data);
1344      }
1345    }
1346  }
1347
1348  /**
1349   * @see org.objectweb.tribe.gms.GroupMembershipListener#groupComposition(org.objectweb.tribe.common.Group,
1350   * org.objectweb.tribe.common.Address)
1351   */

1352  public void groupComposition(Group g, Address sender)
1353  {
1354    // Just ignore
1355
}
1356
1357  /**
1358   * @see org.objectweb.tribe.gms.GroupMembershipListener#failedMember(org.objectweb.tribe.common.Member,
1359   * org.objectweb.tribe.common.GroupIdentifier,
1360   * org.objectweb.tribe.common.Member)
1361   */

1362  public void failedMember(Member failed, GroupIdentifier gid, Member sender)
1363  {
1364    // Just ignore
1365
}
1366
1367  /**
1368   * Refresh the current group membership when someone has joined or left the
1369   * group.
1370   */

1371  private void refreshGroupMembership()
1372  {
1373    if (logger.isDebugEnabled())
1374      logger.debug("Refreshing members list:" + currentGroup.getMembers());
1375
1376    synchronized (controllersMap)
1377    {
1378      allMemberButUs = ((ArrayList JavaDoc) currentGroup.getMembers().clone());
1379      allMemberButUs.remove(channel.getLocalMembership());
1380    }
1381  }
1382
1383  //
1384
// Getter/Setter and tools (equals, ...)
1385
//
1386

1387  /**
1388   * Is this virtual database distributed ?
1389   *
1390   * @return true
1391   */

1392  public boolean isDistributed()
1393  {
1394    return true;
1395  }
1396
1397  /**
1398   * Two virtual databases are equal if they have the same name, login and
1399   * password.
1400   *
1401   * @param other an object
1402   * @return a <code>boolean</code> value
1403   */

1404  public boolean equals(Object JavaDoc other)
1405  {
1406    if ((other == null)
1407        || (!(other instanceof org.objectweb.cjdbc.controller.virtualdatabase.DistributedVirtualDatabase)))
1408      return false;
1409    else
1410    {
1411      DistributedVirtualDatabase db = (org.objectweb.cjdbc.controller.virtualdatabase.DistributedVirtualDatabase) other;
1412      return name.equals(db.getDatabaseName())
1413          && groupName.equals(db.getGroupName());
1414    }
1415  }
1416
1417  /**
1418   * Get the XML dump of the Distribution element if any.
1419   *
1420   * @return XML dump of the Distribution element
1421   */

1422  protected String JavaDoc getDistributionXml()
1423  {
1424    StringBuffer JavaDoc info = new StringBuffer JavaDoc();
1425    info.append("<" + DatabasesXmlTags.ELT_Distribution + " "
1426        + DatabasesXmlTags.ATT_groupName + "=\"" + groupName + "\">");
1427
1428    info.append("</" + DatabasesXmlTags.ELT_Distribution + ">");
1429    return info.toString();
1430  }
1431
1432  /**
1433   * @see org.objectweb.cjdbc.common.jmx.mbeans.VirtualDatabaseMBean#viewControllerList()
1434   */

1435  public String JavaDoc[] viewControllerList()
1436  {
1437    if (logger.isInfoEnabled())
1438    {
1439      logger.info(channel.getLocalMembership() + " see members:"
1440          + currentGroup.getMembers() + " and has mapping:" + controllersMap);
1441    }
1442    String JavaDoc[] members = new String JavaDoc[controllersMap.keySet().size()];
1443    Iterator JavaDoc iter = controllersMap.keySet().iterator();
1444    int i = 0;
1445    while (iter.hasNext())
1446    {
1447      members[i] = (String JavaDoc) controllersMap.get(iter.next());
1448      i++;
1449    }
1450    return members;
1451  }
1452
1453  /**
1454   * @see org.objectweb.cjdbc.common.jmx.mbeans.VirtualDatabaseMBean#viewGroupBackends()
1455   */

1456  public Hashtable JavaDoc viewGroupBackends() throws VirtualDatabaseException
1457  {
1458    Hashtable JavaDoc map = new Hashtable JavaDoc(controllersMap.size());
1459    Iterator JavaDoc iter = backendsPerController.keySet().iterator();
1460    Member member;
1461    while (iter.hasNext())
1462    {
1463      member = (Member) iter.next();
1464      map.put(controllersMap.get(member), backendsPerController.get(member));
1465    }
1466    return map;
1467  }
1468
1469  /**
1470   * We have to convert the backends list from an array of
1471   * <code>DatabaseBackend</code> object to an ArrayList of
1472   * <code>BackendInfo</code> objects. The DatabaseBackend objects cannot be
1473   * serialized because they are used as MBean and notification emitters, so we
1474   * want to extract the info out of them.
1475   *
1476   * @param backendsObject the list of DatabaseBackend object
1477   * @see BackendInfo
1478   * @return a list of BackendInfo objects
1479   */

1480  public ArrayList JavaDoc getBackendsInfo(ArrayList JavaDoc backendsObject)
1481  {
1482    int size = backendsObject.size();
1483    ArrayList JavaDoc infos = new ArrayList JavaDoc(size);
1484    DatabaseBackend backend;
1485    for (int i = 0; i < size; i++)
1486    {
1487      backend = (DatabaseBackend) backendsObject.get(i);
1488      infos.add(createBackendInfo(backend, false));
1489    }
1490    return infos;
1491  }
1492
1493  /**
1494   * Create backend information object from a DatabaseBackend object. This will
1495   * get only static information.
1496   *
1497   * @param backend the <code>DatabaseBackend</code> object to get info from
1498   * @param useXml should we add xml for extensive backend description
1499   * @return <code>BackendInfo</code>
1500   */

1501  public BackendInfo createBackendInfo(DatabaseBackend backend, boolean useXml)
1502  {
1503    BackendInfo info = new BackendInfo(backend);
1504    if (!useXml)
1505      info.setXml(null);
1506    return info;
1507  }
1508
1509  /**
1510   * @see org.objectweb.cjdbc.common.jmx.mbeans.VirtualDatabaseMBean#removeBackend(java.lang.String)
1511   */

1512  public void removeBackend(String JavaDoc backend) throws VirtualDatabaseException
1513  {
1514    super.removeBackend(backend);
1515
1516    try
1517    {
1518      // Send a group message to update backend list
1519
broadcastBackendInformation(getAllMemberButUs());
1520    }
1521    catch (Exception JavaDoc e)
1522    {
1523      String JavaDoc msg = "An error occured while multicasting new backedn information";
1524      logger.error(msg, e);
1525      throw new VirtualDatabaseException(msg, e);
1526    }
1527  }
1528
1529  /**
1530   * @see org.objectweb.cjdbc.common.jmx.mbeans.VirtualDatabaseMBean#transferBackend(java.lang.String,
1531   * java.lang.String)
1532   */

1533  public void transferBackend(String JavaDoc backend, String JavaDoc controllerDestination)
1534      throws VirtualDatabaseException
1535  {
1536    Member targetMember = getControllerByName(controllerDestination);
1537
1538    // Get reference on backend
1539
DatabaseBackend db = getAndCheckBackend(backend, CHECK_BACKEND_DISABLE);
1540    String JavaDoc transfertCheckpointName = makeTransferCheckpointName(db,
1541        targetMember);
1542
1543    if (logger.isDebugEnabled())
1544      logger.debug("**** Disabling backend for transfer");
1545
1546    // Disable local backend (distributed operation)
1547
try
1548    {
1549      if (!hasRecoveryLog())
1550        throw new VirtualDatabaseException(
1551            "Transfer is not supported on virtual databases without a recovery log");
1552
1553      distributedRequestManager.disableBackendForCheckpoint(db,
1554          transfertCheckpointName);
1555    }
1556    catch (SQLException JavaDoc e)
1557    {
1558      throw new VirtualDatabaseException(e.getMessage());
1559    }
1560
1561    // Enable remote transfered backend.
1562
try
1563    {
1564      if (logger.isDebugEnabled())
1565        logger.debug("**** Sending transfer message to:" + targetMember);
1566
1567      ArrayList JavaDoc dest = new ArrayList JavaDoc(1);
1568      dest.add(targetMember);
1569
1570      multicastRequestAdapter.multicastMessage(dest, new BackendTransfer(
1571          controllerDestination, transfertCheckpointName, createBackendInfo(db,
1572              true)), MulticastRequestAdapter.WAIT_ALL,
1573          CJDBCGroupMessage.defaultCastTimeOut);
1574
1575      if (logger.isDebugEnabled())
1576        logger.debug("**** Removing local backend");
1577
1578      // Remove backend from this controller
1579
removeBackend(db);
1580
1581      // Broadcast updated backend list
1582
broadcastBackendInformation(getAllMemberButUs());
1583    }
1584    catch (Exception JavaDoc e)
1585    {
1586      String JavaDoc msg = "An error occured while transfering the backend";
1587      logger.error(msg, e);
1588      throw new VirtualDatabaseException(msg, e);
1589    }
1590  }
1591
1592  /**
1593   * Gets a Controller specified by its name as a Member object suitable for
1594   * group communication.
1595   *
1596   * @param controllerName the name of the target controller
1597   * @return a Member representing the target controller
1598   * @throws VirtualDatabaseException
1599   */

1600  private Member getControllerByName(String JavaDoc controllerName)
1601      throws VirtualDatabaseException
1602  {
1603    // Get the target controller
1604
Iterator JavaDoc iter = controllersMap.entrySet().iterator();
1605    Member targetMember = null;
1606    while (iter.hasNext())
1607    {
1608      Entry entry = (Entry) iter.next();
1609      if (entry.getValue().equals(controllerName))
1610      {
1611        targetMember = (Member) entry.getKey();
1612        break;
1613      }
1614    }
1615    if (targetMember == null)
1616      throw new VirtualDatabaseException("Cannot find controller:"
1617          + controllerName + " in group");
1618    return targetMember;
1619  }
1620
1621  private String JavaDoc makeTransferCheckpointName(DatabaseBackend db, Member dest)
1622  {
1623    return ("transfer-checkpoint: " + db.getName() + " from "
1624        + this.controller.getControllerName() + " to " + dest.getUid() + " " + new Date JavaDoc())
1625        .replaceAll(" ", "_");
1626  }
1627
1628  /**
1629   * Atomically checkpoint across specified group members. Group-wide atomicity
1630   * is (and can only be) achieved with a globally ordered message, possibly
1631   * also sent to ourselves. The local operation must block writes group-wide
1632   * (in scheduler), to allow a clean write of the checkpoint mark. TODO CHECK
1633   * this requirement (group-wide lock).
1634   *
1635   * @param checkpointName the name of the (transfer) checkpoint to create
1636   * @param groupMembers an ArrayList of target Members
1637   * @throws VirtualDatabaseException in case of scheduler or recoveryLog
1638   * exceptions
1639   * @see VirtualDatabase#setCheckpoint(String)
1640   * @see DistributedVirtualDatabase#handleMessageSingleThreaded(Serializable,
1641   * Member)
1642   */

1643  public void setGroupCheckpoint(String JavaDoc checkpointName, ArrayList JavaDoc groupMembers)
1644      throws VirtualDatabaseException
1645  {
1646    try
1647    {
1648      getMulticastRequestAdapter().multicastMessage(groupMembers,
1649          new SetCheckpoint(checkpointName), MulticastRequestAdapter.WAIT_ALL,
1650          CJDBCGroupMessage.defaultCastTimeOut);
1651    }
1652    catch (Exception JavaDoc e)
1653    {
1654      String JavaDoc msg = "Set group checkpoint failed: checkpointName="
1655          + checkpointName;
1656      logger.error(msg, e);
1657      throw new VirtualDatabaseException(msg);
1658    }
1659  }
1660
1661  /**
1662   * Sets an atomic (group-wide) checkpoint on local & target controllers.
1663   *
1664   * @param controllerName the target remote controller
1665   * @return the 'now' checkpoint name.
1666   * @throws VirtualDatabaseException in case of error (whatever error, wraps
1667   * the underlying error)
1668   */

1669  private String JavaDoc setLogReplicationCheckpoint(String JavaDoc controllerName)
1670      throws VirtualDatabaseException
1671  {
1672    String JavaDoc checkpointName = "now-" + new Date JavaDoc();
1673
1674    ArrayList JavaDoc dest = new ArrayList JavaDoc();
1675    dest.add(getControllerByName(controllerName));
1676    dest.add(channel.getLocalMembership());
1677    setGroupCheckpoint(checkpointName, dest);
1678    return checkpointName;
1679  }
1680
1681  /**
1682   * @see org.objectweb.cjdbc.common.jmx.mbeans.VirtualDatabaseMBean#copyLogFromCheckpoint(java.lang.String,
1683   * java.lang.String)
1684   */

1685  public void copyLogFromCheckpoint(String JavaDoc dumpName, String JavaDoc controllerName)
1686      throws VirtualDatabaseException
1687  {
1688    // perform basic error checks (in particular, on sucksess, we have a
1689
// recovery log)
1690
super.copyLogFromCheckpoint(dumpName, controllerName);
1691
1692    // get the checkpoint name from the dump info, or die
1693
String JavaDoc dumpCheckpointName;
1694    DumpInfo dumpInfo;
1695
1696    try
1697    {
1698      dumpInfo = getRecoveryLog().getDumpInfo(dumpName);
1699    }
1700    catch (SQLException JavaDoc e)
1701    {
1702      throw new VirtualDatabaseException(
1703          "Recovery log error access occured while checking for dump"
1704              + dumpName, e);
1705    }
1706
1707    if (dumpInfo == null)
1708      throw new VirtualDatabaseException(
1709          "No information was found in the dump table for dump " + dumpName);
1710
1711    dumpCheckpointName = dumpInfo.getCheckpointName();
1712
1713    // set a global 'now' checkpoint (temporary)
1714
String JavaDoc nowCheckpointName = setLogReplicationCheckpoint(controllerName);
1715
1716    // get it's id (ewerk) so that we can replicate it on the other side
1717
// TODO: find a way to hide those ids in the recovery log interface...
1718
long nowCheckpointId;
1719    RecoveryLog recoveryLog = getRequestManager().getRecoveryLog();
1720    try
1721    {
1722      nowCheckpointId = recoveryLog.getCheckpointRequestId(nowCheckpointName);
1723    }
1724    catch (SQLException JavaDoc e)
1725    {
1726      String JavaDoc errorMessage = "Can not find 'now checkpoint' log entry";
1727      logger.error(errorMessage);
1728      throw new VirtualDatabaseException(errorMessage);
1729    }
1730
1731    // initiate the replication - clears the remote recovery log.
1732
sendMessageToController(controllerName, new ReplicateLogEntries(
1733        nowCheckpointName, null, nowCheckpointId));
1734
1735    // protect from concurrent log updates: fake a recovery (increments
1736
// semaphore)
1737
recoveryLog.beginRecovery();
1738
1739    // copy the entries over to the remote controller.
1740
// Send them one by one over to the remote controller, coz each LogEntry can
1741
// potentially be huge (e.g. if it contains a blob)
1742

1743    // Note: ugly iteration over Ids, but that's all we have.
1744
// TODO: provide itertator in terms of checkpoint names
1745
try
1746    {
1747      long dumpId = recoveryLog.getCheckpointRequestId(dumpCheckpointName);
1748      long nowId = recoveryLog.getCheckpointRequestId(nowCheckpointName);
1749
1750      for (long id = dumpId; id != nowId; id++)
1751      {
1752        LogEntry entry = recoveryLog.getNextLogEntry(id);
1753        if (entry == null)
1754        {
1755          String JavaDoc errorMessage = "Can not find expected log entry: " + id;
1756          logger.error(errorMessage);
1757          throw new VirtualDatabaseException(errorMessage);
1758        }
1759
1760        // Because 'getNextLogEntry()' will hunt for the next valid log entry,
1761
// we need to update the iterator with the new id value - 1
1762
id = entry.getId() - 1;
1763        sendMessageToController(controllerName, new CopyLogEntry(entry));
1764      }
1765
1766      // terminate the replication - sets the remote dump chakpoint name.
1767
sendMessageToController(controllerName, new ReplicateLogEntries(null,
1768          dumpCheckpointName, dumpId));
1769    }
1770    catch (SQLException JavaDoc e)
1771    {
1772      String JavaDoc errorMessage = "Failed to send log entries";
1773      logger.error(errorMessage, e);
1774      throw new VirtualDatabaseException(errorMessage);
1775    }
1776    finally
1777    {
1778      recoveryLog.endRecovery(); // release semaphore
1779
}
1780
1781  }
1782
1783  /**
1784   * Send a Message to a remote controller. This sends a point-to-point message,
1785   * fifo. No total order is specifically required.
1786   *
1787   * @param controllerName name of the remote controller
1788   * @param message the message to send (should be Serializable)
1789   * @throws VirtualDatabaseException (wrapping error) in case of communication
1790   * failure
1791   */

1792  private void sendMessageToController(String JavaDoc controllerName,
1793      Serializable JavaDoc message) throws VirtualDatabaseException
1794  {
1795    try
1796    {
1797      ArrayList JavaDoc dest = new ArrayList JavaDoc();
1798      dest.add(getControllerByName(controllerName));
1799      getMulticastRequestAdapter().multicastMessage(dest, message,
1800          MulticastRequestAdapter.WAIT_ALL,
1801          CJDBCGroupMessage.defaultCastTimeOut);
1802    }
1803    catch (Exception JavaDoc e)
1804    {
1805      String JavaDoc errorMessage = message.getClass().getName() + ": send failed";
1806      logger.error(errorMessage, e);
1807      throw new VirtualDatabaseException(errorMessage);
1808    }
1809  }
1810
1811  /**
1812   * What this method does is really initiating the copy. It is the remote
1813   * controller's vdb that performs the actual copy, fetching the dump from this
1814   * vdb's local backuper.
1815   *
1816   * @see org.objectweb.cjdbc.common.jmx.mbeans.VirtualDatabaseMBean#copyDump(java.lang.String,
1817   * java.lang.String)
1818   * @param dumpName the name of the dump to copy. Should exist locally, and not
1819   * remotely.
1820   * @param remoteControllerName the remote controller to talk to.
1821   * @throws VirtualDatabaseException in case of error.
1822   */

1823  public void copyDump(String JavaDoc dumpName, String JavaDoc remoteControllerName)
1824      throws VirtualDatabaseException
1825  {
1826    transferDump(dumpName, remoteControllerName, false);
1827  }
1828
1829  /**
1830   * @see org.objectweb.cjdbc.common.jmx.mbeans.VirtualDatabaseMBean#transferDump(java.lang.String,
1831   * java.lang.String, boolean)
1832   */

1833  public void transferDump(String JavaDoc dumpName, String JavaDoc remoteControllerName,
1834      boolean noCopy) throws VirtualDatabaseException
1835  {
1836    // get the info from the backuper
1837
DumpInfo dumpInfo = null;
1838    try
1839    {
1840      dumpInfo = getRecoveryLog().getDumpInfo(dumpName);
1841      /*
1842       * getDumpInfo() is the one that throws SQLException (should it be a
1843       * VirtualDatabaseException instead ???)
1844       */

1845    }
1846    catch (SQLException JavaDoc e)
1847    {
1848      String JavaDoc msg = "getting dump info from backup manager failed";
1849      throw new VirtualDatabaseException(msg, e);
1850    }
1851
1852    if (dumpInfo == null)
1853      throw new VirtualDatabaseException("no dump info for dump '" + dumpName
1854          + "'");
1855
1856    // if a copy is needed, hand-off copy to backuper: setup server side of the
1857
// copy
1858
DumpTransferInfo dumpTransferInfo = null;
1859    if (!noCopy)
1860    {
1861      try
1862      {
1863        dumpTransferInfo = getRequestManager().getBackupManager()
1864            .getBackuperByFormat(dumpInfo.getDumpFormat()).setupServer();
1865      }
1866      catch (IOException JavaDoc e)
1867      {
1868        throw new VirtualDatabaseException(e);
1869      }
1870    }
1871
1872    // send message to remote vdb instance, to act as a client
1873
// (see handleInitiateDumpCopy)
1874
sendMessageToController(remoteControllerName, new InitiateDumpCopy(
1875        dumpInfo, dumpTransferInfo));
1876
1877  }
1878
1879}
Popular Tags