KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > continuent > sequoia > controller > requestmanager > distributed > DistributedRequestManager


1 /**
2  * Sequoia: Database clustering technology.
3  * Copyright (C) 2002-2004 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
6  * Copyright (C) 2005-2006 Continuent, Inc.
7  * Contact: sequoia@continuent.org
8  *
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  * http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  *
21  * Initial developer(s): Emmanuel Cecchet.
22  * Contributor(s): Olivier Fambon, Jean-Bernard van Zuylen, Damian Arregui,
23  * Peter Royal.
24  */

25
26 package org.continuent.sequoia.controller.requestmanager.distributed;
27
28 import java.io.Serializable JavaDoc;
29 import java.sql.SQLException JavaDoc;
30 import java.util.ArrayList JavaDoc;
31 import java.util.HashMap JavaDoc;
32 import java.util.Iterator JavaDoc;
33 import java.util.LinkedList JavaDoc;
34 import java.util.List JavaDoc;
35
36 import javax.management.NotCompliantMBeanException JavaDoc;
37
38 import org.continuent.hedera.adapters.MulticastRequestAdapter;
39 import org.continuent.hedera.adapters.MulticastResponse;
40 import org.continuent.hedera.common.Member;
41 import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
42 import org.continuent.sequoia.common.exceptions.NoResultAvailableException;
43 import org.continuent.sequoia.common.exceptions.VirtualDatabaseException;
44 import org.continuent.sequoia.common.i18n.Translate;
45 import org.continuent.sequoia.common.jmx.management.BackendInfo;
46 import org.continuent.sequoia.common.log.Trace;
47 import org.continuent.sequoia.controller.backend.DatabaseBackend;
48 import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
49 import org.continuent.sequoia.controller.backend.result.ExecuteResult;
50 import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
51 import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult;
52 import org.continuent.sequoia.controller.cache.result.AbstractResultCache;
53 import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
54 import org.continuent.sequoia.controller.loadbalancer.AllBackendsFailedException;
55 import org.continuent.sequoia.controller.recoverylog.RecoveryLog;
56 import org.continuent.sequoia.controller.requestmanager.RequestManager;
57 import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
58 import org.continuent.sequoia.controller.requests.AbstractRequest;
59 import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
60 import org.continuent.sequoia.controller.requests.SelectRequest;
61 import org.continuent.sequoia.controller.requests.StoredProcedure;
62 import org.continuent.sequoia.controller.requests.UnknownWriteRequest;
63 import org.continuent.sequoia.controller.scheduler.AbstractScheduler;
64 import org.continuent.sequoia.controller.semantic.SemanticBehavior;
65 import org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase;
66 import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
67 import org.continuent.sequoia.controller.virtualdatabase.protocol.DisableBackendsAndSetCheckpoint;
68 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedClosePersistentConnection;
69 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedCommit;
70 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedOpenPersistentConnection;
71 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedReleaseSavepoint;
72 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollback;
73 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollbackToSavepoint;
74 import org.continuent.sequoia.controller.virtualdatabase.protocol.FailoverForPersistentConnection;
75 import org.continuent.sequoia.controller.virtualdatabase.protocol.FailoverForTransaction;
76 import org.continuent.sequoia.controller.virtualdatabase.protocol.GetRequestResultFromFailoverCache;
77 import org.continuent.sequoia.controller.virtualdatabase.protocol.NotifyCompletion;
78 import org.continuent.sequoia.controller.virtualdatabase.protocol.NotifyDisableBackend;
79 import org.continuent.sequoia.controller.virtualdatabase.protocol.NotifyEnableBackend;
80 import org.continuent.sequoia.controller.virtualdatabase.protocol.NotifyInconsistency;
81 import org.continuent.sequoia.controller.virtualdatabase.protocol.ResumeActivity;
82 import org.continuent.sequoia.controller.virtualdatabase.protocol.SuspendActivity;
83
84 /**
85  * This class defines a Distributed Request Manager.
86  * <p>
87  * The DRM is composed of a Request Scheduler, an optional Query Cache, and a
88  * Load Balancer and an optional Recovery Log. Unlike a non-distributed Request
89  * Manager, this implementation is responsible for synchronizing the different
90  * controllers components (schedulers, ...). Functions that are RAIDb level
91  * dependent are implemented in sub-classes.
92  *
93  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
94  * @author <a HREF="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
95  * </a>
96  * @author <a HREF="mailto:Damian.Arregui@emicnetworks.com">Damian Arregui</a>
97  * @version 1.0
98  */

99 public abstract class DistributedRequestManager extends RequestManager
100 {
101   protected DistributedVirtualDatabase dvdb;
102   /**
103    * List of queries that failed on all backends. Value contained in the map is
104    * a boolean indicating whether the request has been scheduled or not before
105    * the failure so that we know if the scheduler must be notified or not.
106    */

107   private HashMap JavaDoc failedOnAllBackends;
108   /** Unique controller identifier */
109   private long controllerId;
110   /** List of transactions that have executed on multiple controllers */
111   protected LinkedList JavaDoc distributedTransactions;
112
113   /**
114    * Constant to acknowledge the successful completion of a distributed query
115    */

116   public static final Integer JavaDoc SUCCESSFUL_COMPLETION = new Integer JavaDoc(-1);
117
118   /**
119    * Builds a new <code>DistributedRequestManager</code> instance without
120    * cache.
121    *
122    * @param vdb the virtual database this request manager belongs to
123    * @param scheduler the Request Scheduler to use
124    * @param cache a Query Cache implementation
125    * @param loadBalancer the Request Load Balancer to use
126    * @param recoveryLog the Log Recovery to use
127    * @param beginTimeout timeout in seconds for begin
128    * @param commitTimeout timeout in seconds for commit
129    * @param rollbackTimeout timeout in seconds for rollback
130    * @throws SQLException if an error occurs
131    * @throws NotCompliantMBeanException if this class is not a compliant JMX
132    * MBean
133    */

134   public DistributedRequestManager(DistributedVirtualDatabase vdb,
135       AbstractScheduler scheduler, AbstractResultCache cache,
136       AbstractLoadBalancer loadBalancer, RecoveryLog recoveryLog,
137       long beginTimeout, long commitTimeout, long rollbackTimeout)
138       throws SQLException JavaDoc, NotCompliantMBeanException JavaDoc
139   {
140     super(vdb, scheduler, cache, loadBalancer, recoveryLog, beginTimeout,
141         commitTimeout, rollbackTimeout);
142     dvdb = vdb;
143     failedOnAllBackends = new HashMap JavaDoc();
144     distributedTransactions = new LinkedList JavaDoc();
145   }
146
147   //
148
// Controller identifier related functions
149
//
150

151   /**
152    * Effective controllerIds are on the upper 16 bits of a long (64 bits).
153    * Distributed transaction ids (longs) are layed out as [ControllerId(16bits) |
154    * LocalTransactionId(64bits)]. <br/>This constant used in
155    * DistributedVirtualDatabase.
156    */

157   public static final long CONTROLLER_ID_BIT_MASK = 0xffff000000000000L;
158   /**
159    * TRANSACTION_ID_BIT_MASK is used to get the transaction id local to the
160    * originating controller
161    */

162   public static final long TRANSACTION_ID_BIT_MASK = ~CONTROLLER_ID_BIT_MASK;
163
164   /**
165    * @see #CONTROLLER_ID_BIT_MASK
166    */

167   public static final int CONTROLLER_ID_SHIFT_BITS = 48;
168
169   /**
170    * @see #CONTROLLER_ID_BIT_MASK
171    */

172   public static final long CONTROLLER_ID_BITS = 0x000000000000ffffL;
173
174   /**
175    * Returns the unique controller identifier.
176    *
177    * @return Returns the controllerId.
178    */

179   public long getControllerId()
180   {
181     return controllerId;
182   }
183
184   /**
185    * Sets the controller identifier value (this id must be unique). Parameter id
186    * must hold on 16 bits (&lt; 0xffff), otherwise an exception is thrown.
187    * Effective this.controllerId is <strong>not </strong> set to passed
188    * parameter id, but to id &lt;&lt; ControllerIdShiftBits. The reason for all
189    * this is that controllerIds are to be carried into ditributed transactions
190    * ids, in the upper 16 bits.
191    *
192    * @param id The controllerId to set.
193    */

194   public void setControllerId(long id)
195   {
196     if ((id & ~CONTROLLER_ID_BITS) != 0)
197     {
198       String JavaDoc msg = "Out of range controller id (" + id + ")";
199       logger.error(msg);
200       throw new RuntimeException JavaDoc(msg);
201     }
202     this.controllerId = (id << CONTROLLER_ID_SHIFT_BITS)
203         & CONTROLLER_ID_BIT_MASK;
204     if (logger.isDebugEnabled())
205       logger.debug("Setting controller identifier to " + id
206           + " (shifted value is " + controllerId + ")");
207
208     scheduler.setControllerId(controllerId);
209   }
210
211   /**
212    * Make the given persistent connection id unique cluster-wide
213    *
214    * @param id original id
215    * @return unique connection id
216    */

217   public long getNextConnectionId(long id)
218   {
219     // 2 first bytes are used for controller id
220
// 6 right-most bytes are used for transaction id
221
id = id & TRANSACTION_ID_BIT_MASK;
222     id = id | controllerId;
223     return id;
224   }
225
226   /**
227    * @see org.continuent.sequoia.controller.requestmanager.RequestManager#getNextRequestId()
228    */

229   public long getNextRequestId()
230   {
231     // We use the same bitmask as for transaction ids
232

233     long id = super.getNextRequestId();
234     // 2 first bytes are used for controller id
235
// 6 right-most bytes are used for transaction id
236
id = id & TRANSACTION_ID_BIT_MASK;
237     id = id | controllerId;
238     return id;
239   }
240
241   /**
242    * Get the trace logger of this DistributedRequestManager
243    *
244    * @return a <code>Trace</code> object
245    */

246   public Trace getLogger()
247   {
248     return logger;
249   }
250
251   /**
252    * Returns the vdb value.
253    *
254    * @return Returns the vdb.
255    */

256   public VirtualDatabase getVirtualDatabase()
257   {
258     return dvdb;
259   }
260
261   /**
262    * @see org.continuent.sequoia.controller.requestmanager.RequestManager#setScheduler(org.continuent.sequoia.controller.scheduler.AbstractScheduler)
263    */

264   public void setScheduler(AbstractScheduler scheduler)
265   {
266     super.setScheduler(scheduler);
267     // Note: don't try to use this.dvdb here: setScheduler is called by the
268
// c'tor, and dvdb is not set at this time.
269
if (vdb.getTotalOrderQueue() == null)
270       throw new RuntimeException JavaDoc(
271           "New scheduler does not support total ordering and is not compatible with distributed virtual databases.");
272   }
273
274   //
275
// Database Backends management
276
//
277

278   /**
279    * Enable a backend that has been previously added to this virtual database
280    * and that is in the disabled state. We check we the other controllers if
281    * this backend must be enabled in read-only or read-write. The current policy
282    * is that the first one to enable this backend will have read-write access to
283    * it and others will be in read-only.
284    *
285    * @param db The database backend to enable
286    * @throws SQLException if an error occurs
287    */

288   public void enableBackend(DatabaseBackend db) throws SQLException JavaDoc
289   {
290     int size = dvdb.getAllMemberButUs().size();
291     if (size > 0)
292     {
293       logger.debug(Translate
294           .get("virtualdatabase.distributed.enable.backend.check"));
295
296       try
297       {
298         // Notify other controllers that we enable this backend.
299
// No answer is expected.
300
dvdb.getMulticastRequestAdapter().multicastMessage(
301             dvdb.getAllMemberButUs(),
302             new NotifyEnableBackend(new BackendInfo(db)),
303             MulticastRequestAdapter.WAIT_NONE,
304             dvdb.getMessageTimeouts().getEnableBackendTimeout());
305       }
306       catch (Exception JavaDoc e)
307       {
308         String JavaDoc msg = "Error while enabling backend " + db.getName();
309         logger.error(msg, e);
310         throw new SQLException JavaDoc(msg + "(" + e + ")");
311       }
312     }
313
314     super.enableBackend(db);
315   }
316
317   /**
318    * @see org.continuent.sequoia.controller.requestmanager.RequestManager#disableBackend(org.continuent.sequoia.controller.backend.DatabaseBackend,
319    * boolean)
320    */

321   public void disableBackend(DatabaseBackend db, boolean forceDisable)
322       throws SQLException JavaDoc
323   {
324     int size = dvdb.getAllMemberButUs().size();
325     if (size > 0)
326     {
327       logger.debug(Translate.get("virtualdatabase.distributed.disable.backend",
328           db.getName()));
329
330       try
331       {
332         // Notify other controllers that we disable this backend.
333
// No answer is expected.
334
dvdb.getMulticastRequestAdapter().multicastMessage(
335             dvdb.getAllMemberButUs(),
336             new NotifyDisableBackend(new BackendInfo(db)),
337             MulticastRequestAdapter.WAIT_NONE,
338             dvdb.getMessageTimeouts().getDisableBackendTimeout());
339       }
340       catch (Exception JavaDoc e)
341       {
342         String JavaDoc msg = "Error while disabling backend " + db.getName();
343         logger.error(msg, e);
344         throw new SQLException JavaDoc(msg + "(" + e + ")");
345       }
346     }
347
348     super.disableBackend(db, forceDisable);
349   }
350
351   /**
352    * {@inheritDoc}
353    *
354    * @see org.continuent.sequoia.controller.requestmanager.RequestManager#disableBackendsWithCheckpoint(java.util.ArrayList,
355    * java.lang.String)
356    */

357   public void disableBackendsWithCheckpoint(ArrayList JavaDoc backendInfos,
358       String JavaDoc checkpointName) throws SQLException JavaDoc
359   {
360     // Perform the distributed call through the group-comm, in order to
361
// atomically disable the backend and store a cluster-wide checkpoint.
362
try
363     {
364       // Suspend transactions
365
suspendActivity();
366
367       dvdb.getMulticastRequestAdapter().multicastMessage(dvdb.getAllMembers(),
368           new DisableBackendsAndSetCheckpoint(backendInfos, checkpointName),
369           MulticastRequestAdapter.WAIT_ALL,
370           dvdb.getMessageTimeouts().getDisableBackendTimeout());
371     }
372     catch (Exception JavaDoc e)
373     {
374       String JavaDoc msg = "Error while disabling backends " + backendInfos;
375       logger.error(msg, e);
376       throw new SQLException JavaDoc(msg + "(" + e + ")");
377     }
378     finally
379     {
380       resumeActivity();
381     }
382   }
383
384   //
385
// Failover management
386
//
387

388   private class FailureInformation
389   {
390     private boolean needSchedulerNotification;
391     private long logId = -1;
392     private boolean success;
393     private boolean disableBackendOnSuccess;
394     private int updateCount;
395
396     /**
397      * Creates a new <code>FailureInformation</code> object storing the
398      * information about a failure that waits for a remote controller final
399      * status.
400      *
401      * @param needSchedulerNotification true if the scheduler must be notified
402      * @param logId the recovery log id of the query
403      */

404     public FailureInformation(boolean needSchedulerNotification, long logId)
405     {
406       this.needSchedulerNotification = needSchedulerNotification;
407       this.logId = logId;
408     }
409
410     /**
411      * Creates a new <code>FailureInformation</code> object storing the final
412      * status from a remote controller. This version of the constructor is used
413      * when the remote controller sends the final status before the local
414      * controller has completed the request. This can happen in the case of a
415      * timeout.
416      *
417      * @param success indicates the result of the operation on the remote
418      * controller
419      * @param disableBackendOnSuccess indicates whether or not the backend
420      * should be disabled.
421      * @param updateCount the update count for the request
422      */

423     public FailureInformation(boolean success, boolean disableBackendOnSuccess,
424         int updateCount)
425     {
426       this.success = success;
427       this.disableBackendOnSuccess = disableBackendOnSuccess;
428       this.updateCount = updateCount;
429     }
430
431     /**
432      * Returns the recovery log id value.
433      *
434      * @return the recovery log id.
435      */

436     public final long getLogId()
437     {
438       return logId;
439     }
440
441     /**
442      * Sets the local logId for the request
443      *
444      * @param logId the log id to set
445      */

446     public void setLogId(long logId)
447     {
448       this.logId = logId;
449     }
450
451     /**
452      * Returns true if scheduler notification is needed.
453      *
454      * @return true if scheduler notification is needed.
455      */

456     public final boolean needSchedulerNotification()
457     {
458       return needSchedulerNotification;
459     }
460
461     /**
462      * sets the local scheduler notification indicator
463      *
464      * @param needSchedulerNotification true if scheduler notification is
465      * needed.
466      */

467     public void setNeedSchedulerNotification(boolean needSchedulerNotification)
468     {
469       this.needSchedulerNotification = needSchedulerNotification;
470     }
471
472     /**
473      * Indicates whether or not the backend should be disabled.
474      *
475      * @return true if backend must be disabled
476      */

477     public boolean isDisableBackendOnSuccess()
478     {
479       return disableBackendOnSuccess;
480     }
481
482     /**
483      * Indicates whether or not the query was successful on the remote
484      * controller.
485      *
486      * @return true if the query was successful on one of the remote
487      * controllers.
488      */

489     public boolean isSuccess()
490     {
491       return success;
492     }
493
494     /**
495      * Returns the update count (only meaningful if this was a request returning
496      * an update count)
497      *
498      * @return update count
499      */

500     public int getUpdateCount()
501     {
502       return updateCount;
503     }
504
505   }
506
507   private void logRequestCompletionAndNotifyScheduler(AbstractRequest request,
508       boolean success, FailureInformation failureInfo, int updateCount)
509   {
510     // Update recovery log with completion information
511
if (recoveryLog != null)
512     {
513       boolean mustLog = !request.isReadOnly();
514       if (request instanceof StoredProcedure)
515       {
516         SemanticBehavior semantic = ((StoredProcedure) request).getSemantic();
517         mustLog = (semantic == null) || !semantic.isReadOnly();
518       }
519       if (mustLog && failureInfo.getLogId() != 0)
520         recoveryLog.logRequestCompletion(failureInfo.getLogId(), success,
521             request.getExecTimeInMs(), updateCount);
522     }
523
524     if (failureInfo.needSchedulerNotification())
525     {
526       try
527       {
528         // Notify scheduler now, the notification was postponed when
529
// addFailedOnAllBackends was called.
530
if (request instanceof StoredProcedure)
531           scheduler.storedProcedureCompleted((StoredProcedure) request);
532         else if (!request.isAutoCommit()
533             && (request instanceof UnknownWriteRequest))
534         {
535           String JavaDoc sql = request.getSqlOrTemplate();
536           TransactionMetaData tm = new TransactionMetaData(request
537               .getTransactionId(), 0, request.getLogin(), request
538               .isPersistentConnection(), request.getPersistentConnectionId());
539           if ("commit".equals(sql))
540             scheduler.commitCompleted(tm, success);
541           else if ("rollback".equals(sql) || "abort".equals(sql))
542             scheduler.rollbackCompleted(tm, success);
543           else if (sql.startsWith("rollback")) // rollback to savepoint
544
scheduler.savepointCompleted(tm.getTransactionId());
545           else if (sql.startsWith("release "))
546             scheduler.savepointCompleted(tm.getTransactionId());
547           else if (sql.startsWith("savepoint "))
548             scheduler.savepointCompleted(tm.getTransactionId());
549           else
550             // Real UnknownWriteRequest
551
scheduler.writeCompleted((AbstractWriteRequest) request);
552         }
553         else
554           // Just an AbstractWriteRequest
555
scheduler.writeCompleted((AbstractWriteRequest) request);
556       }
557       catch (SQLException JavaDoc e)
558       {
559         logger.warn("Failed to notify scheduler for request " + request, e);
560       }
561     }
562   }
563
564   /**
565    * Add a request that failed on all backends.
566    *
567    * @param request the request that failed
568    * @param needSchedulerNotification true if the request has been scheduled but
569    * the scheduler has not been notified yet of the request completion
570    * @see #completeFailedOnAllBackends(AbstractRequest, boolean, boolean, int)
571    */

572   public void addFailedOnAllBackends(AbstractRequest request,
573       boolean needSchedulerNotification)
574   {
575     synchronized (failedOnAllBackends)
576     {
577       /*
578        * Failure information may already exist if the request was timed out at
579        * the originating controller. In which case, we have all the information
580        * required to complete the request.
581        */

582       FailureInformation failureInfo = (FailureInformation) failedOnAllBackends
583           .get(request);
584       if (failureInfo == null)
585         failedOnAllBackends.put(request, new FailureInformation(
586             needSchedulerNotification, request.getLogId()));
587       else
588       {
589         failureInfo.setLogId(request.getLogId());
590         failureInfo.setNeedSchedulerNotification(needSchedulerNotification);
591         completeFailedOnAllBackends(request, failureInfo.isSuccess(),
592             failureInfo.isDisableBackendOnSuccess(), failureInfo.updateCount);
593       }
594     }
595   }
596
597   /**
598    * Cleanup all queries registered as failed issued from the given controller.
599    * This is used when a controller has failed and no status information will be
600    * returned for failed queries. Queries are systematically tagged as failed.
601    * <p>
602    * FIXME: This is only correct with 2 controllers, in a 3+ controller
603    * scenario, we need to check from other controllers if someone succeeded.
604    *
605    * @param failedControllerId id of the controller that has failed
606    */

607   public void cleanupAllFailedQueriesFromController(long failedControllerId)
608   {
609     synchronized (failedOnAllBackends)
610     {
611       for (Iterator JavaDoc iter = failedOnAllBackends.keySet().iterator(); iter
612           .hasNext();)
613       {
614         AbstractRequest request = (AbstractRequest) iter.next();
615         if (((request.getId() & CONTROLLER_ID_BIT_MASK) == failedControllerId)
616             || ((request.getTransactionId() & CONTROLLER_ID_BIT_MASK) == failedControllerId)
617             || (request.isPersistentConnection() && (request
618                 .getPersistentConnectionId() & CONTROLLER_ID_BIT_MASK) == failedControllerId))
619         { // Need to remove that entry
620
FailureInformation failureInfo = (FailureInformation) failedOnAllBackends
621               .get(request);
622           // failedOnAllBackends can contain completion status information for
623
// requests that failed before we started processing processing
624
// requests. These entries do not have a logId and should be ignored.
625
if (failureInfo.getLogId() > 0)
626           {
627             if (logger.isInfoEnabled())
628               logger.info("No status information received for request "
629                   + request + ", considering status as failed.");
630
631             // If the current request is a rollback / abort, we need to call
632
// logRequestCompletionAndNotifyScheduler with success set to true
633
// for the transaction to be correctly cleaned up
634
boolean isAbortOrRollback = (request instanceof UnknownWriteRequest)
635                 && ("rollback".equals(request.getSqlOrTemplate()) || "abort"
636                     .equals(request.getSqlOrTemplate()));
637
638             logRequestCompletionAndNotifyScheduler(request, isAbortOrRollback,
639                 failureInfo, -1);
640           }
641           iter.remove();
642         }
643       }
644     }
645   }
646
647   /**
648    * Notify completion of a request that either failed on all backends or was
649    * not executed at all (NoMoreBackendException). If completion was successful
650    * and query failed on all backends locally, all local backends are disabled
651    * if disabledBackendOnSuccess is true.
652    *
653    * @param request request that completed
654    * @param success true if completion is successful
655    * @param disableBackendOnSuccess disable all local backends if query was
656    * successful (but failed locally)
657    * @param updateCount request update count to be logged in recovery log
658    * @see #addFailedOnAllBackends(AbstractRequest, boolean)
659    */

660   public void completeFailedOnAllBackends(AbstractRequest request,
661       boolean success, boolean disableBackendOnSuccess, int updateCount)
662   {
663     FailureInformation failureInfo;
664     synchronized (failedOnAllBackends)
665     {
666       failureInfo = (FailureInformation) failedOnAllBackends.remove(request);
667       if (failureInfo == null)
668       {
669         /*
670          * If we can't find failureInformation, assume the remote controller
671          * failed the request before it completed locally. This is probably due
672          * to a timeout.
673          */

674         failureInfo = new FailureInformation(success, disableBackendOnSuccess,
675             updateCount);
676         failedOnAllBackends.put(request, failureInfo);
677
678         logger.info("Unable to find request "
679             + request.getSqlShortForm(dvdb.getSqlShortFormLength())
680             + " in list of requests that failed on all backends.");
681         return;
682       }
683     }
684     logRequestCompletionAndNotifyScheduler(request, success, failureInfo,
685         updateCount);
686
687     if (disableBackendOnSuccess && success)
688     {
689       // Now really disable the backends
690

691       logger
692           .error("Request "
693               + request.getSqlShortForm(dvdb.getSqlShortFormLength())
694               + " failed on all local backends but succeeded on other controllers. Disabling all local backends.");
695
696       try
697       {
698         dvdb.disableAllBackends(true);
699       }
700       catch (VirtualDatabaseException e)
701       {
702         logger.error("An error occured while disabling all backends", e);
703       }
704     }
705   }
706
707   /**
708    * @see org.continuent.sequoia.controller.requestmanager.RequestManager#closePersistentConnection(java.lang.String,
709    * long)
710    */

711   public void distributedClosePersistentConnection(String JavaDoc login,
712       long persistentConnectionId)
713   {
714     List JavaDoc groupMembers = dvdb.getCurrentGroup().getMembers();
715
716     if (logger.isDebugEnabled())
717       logger.debug("Broadcasting closing persistent connection "
718           + persistentConnectionId + " for user " + login
719           + " to all controllers (" + dvdb.getChannel().getLocalMembership()
720           + "->" + groupMembers.toString() + ")");
721
722     // Send the query to everybody including us
723
try
724     {
725       dvdb.getMulticastRequestAdapter().multicastMessage(
726           groupMembers,
727           new DistributedClosePersistentConnection(login,
728               persistentConnectionId), MulticastRequestAdapter.WAIT_ALL, 0);
729     }
730     catch (Exception JavaDoc e)
731     {
732       String JavaDoc msg = "An error occured while executing distributed persistent connection "
733           + persistentConnectionId + " closing";
734       logger.warn(msg, e);
735     }
736
737     if (logger.isDebugEnabled())
738       logger.debug("Persistent connection " + persistentConnectionId
739           + " closed.");
740   }
741
742   /**
743    * @see org.continuent.sequoia.controller.requestmanager.RequestManager#openPersistentConnection(String,
744    * long)
745    */

746   public void distributedOpenPersistentConnection(String JavaDoc login,
747       long persistentConnectionId) throws SQLException JavaDoc
748   {
749     List JavaDoc groupMembers = dvdb.getCurrentGroup().getMembers();
750
751     if (logger.isDebugEnabled())
752       logger.debug("Broadcasting opening persistent connection "
753           + persistentConnectionId + " for user " + login
754           + " to all controllers (" + dvdb.getChannel().getLocalMembership()
755           + "->" + groupMembers.toString() + ")");
756
757     boolean success = false;
758     Exception JavaDoc exception = null;
759     try
760     {
761       // Send the query to everybody including us
762
MulticastResponse responses = dvdb.getMulticastRequestAdapter()
763           .multicastMessage(
764               groupMembers,
765               new DistributedOpenPersistentConnection(login,
766                   persistentConnectionId), MulticastRequestAdapter.WAIT_ALL, 0);
767
768       // get a list that won't change while we go through it
769
groupMembers = dvdb.getAllMembers();
770       int size = groupMembers.size();
771       ArrayList JavaDoc failedControllers = null;
772       // Get the result of each controller
773
for (int i = 0; i < size; i++)
774       {
775         Member member = (Member) groupMembers.get(i);
776         if ((responses.getFailedMembers() != null)
777             && responses.getFailedMembers().contains(member))
778         {
779           logger.warn("Controller " + member + " is suspected of failure.");
780           continue;
781         }
782         Object JavaDoc r = responses.getResult(member);
783         if (r instanceof Boolean JavaDoc)
784         {
785           if (((Boolean JavaDoc) r).booleanValue())
786             success = true;
787           else
788             logger.error("Unexpected result for controller " + member);
789         }
790         else if (r instanceof Exception JavaDoc)
791         {
792           if (failedControllers == null)
793             failedControllers = new ArrayList JavaDoc();
794           failedControllers.add(member);
795           if (exception == null)
796             exception = (Exception JavaDoc) r;
797           if (logger.isDebugEnabled())
798             logger.debug("Controller " + member
799                 + " failed to open persistent connection "
800                 + persistentConnectionId + " (" + r + ")");
801         }
802       }
803
804       /*
805        * Notify all controllers where all backend failed (if any) that
806        * completion was 'success'.
807        */

808       if (failedControllers != null)
809       {
810         UnknownWriteRequest notifRequest = new UnknownWriteRequest("open "
811             + persistentConnectionId, false, 0, null);
812         notifRequest.setPersistentConnection(true);
813         notifRequest.setPersistentConnectionId(persistentConnectionId);
814         notifyRequestCompletion(notifRequest, success, false, failedControllers);
815       }
816     }
817     catch (Exception JavaDoc e)
818     {
819       String JavaDoc msg = "An error occured while executing distributed persistent connection "
820           + persistentConnectionId + " opening";
821       logger.warn(msg, e);
822     }
823
824     if (success)
825     {
826       if (logger.isDebugEnabled())
827         logger.debug("Persistent connection " + persistentConnectionId
828             + " opened.");
829       return; // This is a success if at least one controller has succeeded
830
}
831
832     // At this point, all controllers failed
833
String JavaDoc msg = "Failed to open persistent connection "
834         + persistentConnectionId + " on all controllers (" + exception + ")";
835     logger.warn(msg);
836     throw new SQLException JavaDoc(msg);
837   }
838
839   /**
840    * Notify to all members the failover for the specified persistent connection.
841    *
842    * @param persistentConnectionId persistent connection id
843    * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#failoverForPersistentConnection(long)
844    */

845   public void distributedFailoverForPersistentConnection(
846       long persistentConnectionId)
847   {
848     List JavaDoc groupMembers = dvdb.getCurrentGroup().getMembers();
849
850     if (logger.isDebugEnabled())
851       logger.debug("Broadcasting failover for persistent connection "
852           + persistentConnectionId + " to all controllers ("
853           + dvdb.getChannel().getLocalMembership() + "->"
854           + groupMembers.toString() + ")");
855
856     // Send the query to everybody including us
857
try
858     {
859       dvdb.getMulticastRequestAdapter().multicastMessage(groupMembers,
860           new FailoverForPersistentConnection(persistentConnectionId),
861           MulticastRequestAdapter.WAIT_ALL, 0);
862     }
863     catch (Exception JavaDoc e)
864     {
865       String JavaDoc msg = "An error occured while notifying distributed persistent connection "
866           + persistentConnectionId + " failover";
867       logger.warn(msg, e);
868     }
869   }
870
871   /**
872    * Notify to all members the failover for the specified transaction.
873    *
874    * @param currentTid transaction id
875    * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#failoverForTransaction(long)
876    */

877   public void distributedFailoverForTransaction(long currentTid)
878   {
879     List JavaDoc groupMembers = dvdb.getCurrentGroup().getMembers();
880
881     if (logger.isDebugEnabled())
882       logger.debug("Broadcasting failover for transaction " + currentTid
883           + " to all controllers (" + dvdb.getChannel().getLocalMembership()
884           + "->" + groupMembers.toString() + ")");
885
886     // Send the query to everybody including us
887
try
888     {
889       dvdb.getMulticastRequestAdapter().multicastMessage(groupMembers,
890           new FailoverForTransaction(currentTid),
891           MulticastRequestAdapter.WAIT_ALL, 0);
892     }
893     catch (Exception JavaDoc e)
894     {
895       String JavaDoc msg = "An error occured while notifying distributed persistent connection "
896           + currentTid + " failover";
897       logger.warn(msg, e);
898     }
899   }
900
901   //
902
// Transaction management
903
//
904

905   /**
906    * @see org.continuent.sequoia.controller.requestmanager.RequestManager#abort(long,
907    * boolean, boolean)
908    */

909   public void abort(long transactionId, boolean logAbort, boolean forceAbort)
910       throws SQLException JavaDoc
911   {
912     Long JavaDoc lTid = new Long JavaDoc(transactionId);
913     TransactionMetaData tm;
914     try
915     {
916       tm = getTransactionMetaData(lTid);
917       if (!forceAbort && tidSavepoints.get(lTid) != null)
918       {
919         if (logger.isDebugEnabled())
920           logger.debug("Transaction " + transactionId
921               + " has savepoints, transaction will not be aborted");
922         return;
923       }
924     }
925     catch (SQLException JavaDoc e1)
926     {
927       logger.warn("No transaction metadata found to abort transaction "
928           + transactionId + ". Creating a fake context for abort.");
929       // We ignore the persistent connection id here (retrieved by connection
930
// manager)
931
tm = new TransactionMetaData(transactionId, 0, RecoveryLog.UNKNOWN_USER,
932           false, 0);
933       if (tidSavepoints.get(lTid) != null)
934       {
935         if (logger.isDebugEnabled())
936           logger.debug("Transaction " + transactionId
937               + " has savepoints, transaction will not be aborted");
938         return;
939       }
940     }
941
942     boolean isAWriteTransaction;
943     synchronized (distributedTransactions)
944     {
945       isAWriteTransaction = distributedTransactions.contains(lTid);
946     }
947     if (isAWriteTransaction)
948     {
949       distributedAbort(tm.getLogin(), transactionId);
950     }
951     else
952     {
953       // read-only transaction, it is local but we still have to post the query
954
// in the total order queue. Note that we post a Rollback object because
955
// the load balancer will treat the abort as a rollback.
956
LinkedList JavaDoc totalOrderQueue = dvdb.getTotalOrderQueue();
957       synchronized (totalOrderQueue)
958       {
959         totalOrderQueue.addLast(new DistributedRollback(tm.getLogin(),
960             transactionId));
961       }
962       super.abort(transactionId, logAbort, forceAbort);
963     }
964   }
965
966   /**
967    * @see org.continuent.sequoia.controller.requestmanager.RequestManager#begin(String,
968    * boolean, long) overrides RequestManager.begin(String) to apply bit
969    * masks to the tid returned by the scheduler
970    */

971   public long begin(String JavaDoc login, boolean isPersistentConnection,
972       long persistentConnectionId) throws SQLException JavaDoc
973   {
974     long tid = scheduler.getNextTransactionId();
975     // 2 first bytes are used for controller id
976
// 6 right-most bytes are used for transaction id
977
tid = tid & TRANSACTION_ID_BIT_MASK;
978     tid = tid | controllerId;
979     doBegin(login, tid, isPersistentConnection, persistentConnectionId);
980     return tid;
981   }
982
983   /**
984    * @see org.continuent.sequoia.controller.requestmanager.RequestManager#commit(long,
985    * boolean, boolean)
986    */

987   public void commit(long transactionId, boolean logCommit,
988       boolean emptyTransaction) throws SQLException JavaDoc
989   {
990     Long JavaDoc lTid = new Long JavaDoc(transactionId);
991     TransactionMetaData tm = getTransactionMetaData(lTid);
992     boolean isAWriteTransaction;
993     synchronized (distributedTransactions)
994     {
995       isAWriteTransaction = distributedTransactions.contains(lTid);
996     }
997     if (isAWriteTransaction)
998     {
999       distributedCommit(tm.getLogin(), transactionId);
1000    }
1001    else
1002    {
1003      // read-only transaction, it is local
1004
DistributedCommit commit = new DistributedCommit(tm.getLogin(),
1005          transactionId);
1006      if (!emptyTransaction)
1007      {
1008        LinkedList JavaDoc totalOrderQueue = dvdb.getTotalOrderQueue();
1009        synchronized (totalOrderQueue)
1010        {
1011          totalOrderQueue.addLast(commit);
1012        }
1013      }
1014      try
1015      {
1016        super.commit(transactionId, logCommit, emptyTransaction);
1017      }
1018      catch (SQLException JavaDoc e)
1019      {
1020        if (logger.isWarnEnabled())
1021        {
1022          logger
1023              .warn("Ignoring failure of commit for read-only transaction, exception was: "
1024                  + e);
1025        }
1026
1027        // Force transaction completion on scheduler
1028
scheduler.commit(tm, emptyTransaction, commit);
1029        scheduler.commitCompleted(tm, true);
1030
1031        // Clean-up transactional context
1032
completeTransaction(lTid);
1033      }
1034    }
1035  }
1036
1037  /**
1038   * @see org.continuent.sequoia.controller.requestmanager.RequestManager#completeTransaction(java.lang.Long)
1039   */

1040  public void completeTransaction(Long JavaDoc tid)
1041  {
1042    synchronized (distributedTransactions)
1043    {
1044      distributedTransactions.remove(tid);
1045    }
1046    super.completeTransaction(tid);
1047  }
1048
1049  /**
1050   * Check if the transaction corresponding to the given query has been started
1051   * remotely and start the transaction locally in a lazy manner if needed. This
1052   * also checks if a local request must trigger the logging of a 'begin' in the
1053   * recovery log.
1054   *
1055   * @param request query to execute
1056   * @throws SQLException if an error occurs
1057   */

1058  public void lazyTransactionStart(AbstractRequest request) throws SQLException JavaDoc
1059  {
1060    // Check if this is a remotely started transaction that we need to lazyly
1061
// start locally
1062
if (!request.isAutoCommit())
1063    {
1064      long tid = request.getTransactionId();
1065      Long JavaDoc lTid = new Long JavaDoc(tid);
1066      TransactionMetaData tm = (TransactionMetaData) transactionMetaDatas
1067          .get(lTid);
1068
1069      if ((tid & CONTROLLER_ID_BIT_MASK) == controllerId)
1070      { // Local transaction, check if we need to log lazy begin
1071
if (tm == null)
1072          logger.error("Unexpected non-started local transaction " + lTid);
1073        else
1074        {
1075          if (tm.isReadOnly())
1076          {
1077            request.setIsLazyTransactionStart(true);
1078            tm.setReadOnly(false);
1079          }
1080        }
1081      }
1082      else
1083      { // Remote transaction, check that it is started
1084
if (tm != null)
1085        {
1086          /*
1087           * It may have been started by a failover before any writes were
1088           * executed.
1089           */

1090          if (tm.isReadOnly())
1091          {
1092            request.setIsLazyTransactionStart(true);
1093            tm.setReadOnly(false);
1094          }
1095          return; // transaction already started
1096
}
1097        // Begin this transaction
1098
try
1099        {
1100          tm = new TransactionMetaData(tid, beginTimeout, request.getLogin(),
1101              request.isPersistentConnection(), request
1102                  .getPersistentConnectionId());
1103          tm.setReadOnly(false);
1104
1105          if (logger.isDebugEnabled())
1106            logger.debug(Translate.get("transaction.begin.lazy", String
1107                .valueOf(tid)));
1108
1109          scheduler.begin(tm, true, request);
1110
1111          try
1112          {
1113            // Send to load balancer
1114
loadBalancer.begin(tm);
1115
1116            // We need to update the tid table first so that
1117
// logLazyTransactionBegin can retrieve the metadata
1118
transactionMetaDatas.put(lTid, tm);
1119            request.setIsLazyTransactionStart(true);
1120
1121            synchronized (distributedTransactions)
1122            {
1123              if (!distributedTransactions.contains(lTid))
1124                distributedTransactions.add(lTid);
1125            }
1126          }
1127          catch (SQLException JavaDoc e)
1128          {
1129            if (recoveryLog != null)
1130              // In case logLazyTransactionBegin failed
1131
transactionMetaDatas.remove(lTid);
1132            throw e;
1133          }
1134          finally
1135          {
1136            // Notify scheduler for completion in any case
1137
scheduler.beginCompleted(tid);
1138          }
1139        }
1140        catch (RuntimeException JavaDoc e)
1141        {
1142          String JavaDoc msg = Translate
1143              .get("fatal.runtime.exception.requestmanager.begin");
1144          logger.fatal(msg, e);
1145          endUserLogger.fatal(msg);
1146          throw new SQLException JavaDoc(e.getMessage());
1147        }
1148      }
1149    }
1150  }
1151
1152  /**
1153   * @see org.continuent.sequoia.controller.requestmanager.RequestManager#rollback(long,
1154   * boolean)
1155   */

1156  public void rollback(long transactionId, boolean logRollback)
1157      throws SQLException JavaDoc
1158  {
1159    Long JavaDoc lTid = new Long JavaDoc(transactionId);
1160    TransactionMetaData tm = getTransactionMetaData(lTid);
1161    boolean isAWriteTransaction;
1162    synchronized (distributedTransactions)
1163    {
1164      isAWriteTransaction = distributedTransactions.contains(lTid);
1165    }
1166    if (isAWriteTransaction)
1167    {
1168      distributedRollback(tm.getLogin(), transactionId);
1169    }
1170    else
1171    {
1172      // read-only transaction, it is local
1173
DistributedRollback rollback = new DistributedRollback(tm.getLogin(),
1174          transactionId);
1175      LinkedList JavaDoc totalOrderQueue = dvdb.getTotalOrderQueue();
1176      synchronized (totalOrderQueue)
1177      {
1178        totalOrderQueue.addLast(rollback);
1179      }
1180      try
1181      {
1182        super.rollback(transactionId, logRollback);
1183      }
1184      catch (SQLException JavaDoc e)
1185      {
1186        if (logger.isWarnEnabled())
1187        {
1188          logger
1189              .warn("Ignoring failure of rollback for read-only transaction, exception was: "
1190                  + e);
1191        }
1192
1193        // Force transaction completion on scheduler
1194
try
1195        {
1196          scheduler.rollback(tm, rollback);
1197        }
1198        catch (SQLException JavaDoc ignore)
1199        {
1200        }
1201        scheduler.rollbackCompleted(tm, true);
1202
1203        // Clean-up transactional context
1204
completeTransaction(lTid);
1205      }
1206    }
1207  }
1208
1209  /**
1210   * @see org.continuent.sequoia.controller.requestmanager.RequestManager#rollback(long,
1211   * String)
1212   */

1213  public void rollback(long transactionId, String JavaDoc savepointName)
1214      throws SQLException JavaDoc
1215  {
1216    Long JavaDoc lTid = new Long JavaDoc(transactionId);
1217    boolean isAWriteTransaction;
1218    synchronized (distributedTransactions)
1219    {
1220      isAWriteTransaction = distributedTransactions.contains(lTid);
1221    }
1222    if (isAWriteTransaction)
1223    {
1224      TransactionMetaData tm = getTransactionMetaData(lTid);
1225      distributedRollback(tm.getLogin(), transactionId, savepointName);
1226    }
1227    else
1228    { // read-only transaction, it is local
1229
LinkedList JavaDoc totalOrderQueue = dvdb.getTotalOrderQueue();
1230      synchronized (totalOrderQueue)
1231      {
1232        totalOrderQueue.addLast(new DistributedRollbackToSavepoint(
1233            transactionId, savepointName));
1234      }
1235      super.rollback(transactionId, savepointName);
1236    }
1237  }
1238
1239  /**
1240   * @see org.continuent.sequoia.controller.requestmanager.RequestManager#setSavepoint(long)
1241   */

1242  public int setSavepoint(long transactionId) throws SQLException JavaDoc
1243  {
1244    Long JavaDoc lTid = new Long JavaDoc(transactionId);
1245    int savepointId = scheduler.incrementSavepointId();
1246    TransactionMetaData tm = getTransactionMetaData(lTid);
1247    synchronized (distributedTransactions)
1248    {
1249      if (!distributedTransactions.contains(lTid))
1250        distributedTransactions.add(lTid);
1251    }
1252    distributedSetSavepoint(tm.getLogin(), transactionId, String
1253        .valueOf(savepointId));
1254    return savepointId;
1255  }
1256
1257  /**
1258   * @see org.continuent.sequoia.controller.requestmanager.RequestManager#setSavepoint(long,
1259   * String)
1260   */

1261  public void setSavepoint(long transactionId, String JavaDoc name) throws SQLException JavaDoc
1262  {
1263    Long JavaDoc lTid = new Long JavaDoc(transactionId);
1264    TransactionMetaData tm = getTransactionMetaData(lTid);
1265    synchronized (distributedTransactions)
1266    {
1267      if (!distributedTransactions.contains(lTid))
1268        distributedTransactions.add(lTid);
1269    }
1270    distributedSetSavepoint(tm.getLogin(), transactionId, name);
1271  }
1272
1273  /**
1274   * @see org.continuent.sequoia.controller.requestmanager.RequestManager#releaseSavepoint(long,
1275   * String)
1276   */

1277  public void releaseSavepoint(long transactionId, String JavaDoc name)
1278      throws SQLException JavaDoc
1279  {
1280    Long JavaDoc lTid = new Long JavaDoc(transactionId);
1281    boolean isAWriteTransaction;
1282    synchronized (distributedTransactions)
1283    {
1284      isAWriteTransaction = distributedTransactions.contains(lTid);
1285    }
1286    if (isAWriteTransaction)
1287    {
1288      TransactionMetaData tm = getTransactionMetaData(lTid);
1289      distributedReleaseSavepoint(tm.getLogin(), transactionId, name);
1290    }
1291    else
1292    {
1293      // read-only transaction, it is local
1294
LinkedList JavaDoc totalOrderQueue = dvdb.getTotalOrderQueue();
1295      synchronized (totalOrderQueue)
1296      {
1297        totalOrderQueue.addLast(new DistributedReleaseSavepoint(transactionId,
1298            name));
1299      }
1300      super.releaseSavepoint(transactionId, name);
1301    }
1302  }
1303
1304  /**
1305   * Add this transaction to the list of write transactions that needs to be
1306   * globally commited. This happens if the transaction has only be started
1307   * locally but not through a lazy start.
1308   */

1309  private void addToDistributedTransactionListIfNeeded(AbstractRequest request)
1310  {
1311    // Add to distributed transactions if needed
1312
if (!request.isAutoCommit())
1313    {
1314      Long JavaDoc lTid = new Long JavaDoc(request.getTransactionId());
1315      synchronized (distributedTransactions)
1316      {
1317        if (!distributedTransactions.contains(lTid))
1318          distributedTransactions.add(lTid);
1319      }
1320    }
1321  }
1322
1323  /**
1324   * Retrieve the vLogin corresponding to the persistent connection id provided
1325   * and close the connection if found. This is used by the
1326   * ControllerFailureCleanupThread to cleanup reamining persistent connections
1327   * from a failed controller whose clients never recovered.
1328   *
1329   * @param connectionId the persistent connection id
1330   */

1331  public void closePersistentConnection(Long JavaDoc connectionId)
1332  {
1333    String JavaDoc vLogin = scheduler.getPersistentConnectionLogin(connectionId);
1334    if (vLogin != null)
1335      super.closePersistentConnection(vLogin, connectionId.longValue());
1336  }
1337
1338  /**
1339   * Performs a local read operation, as opposed to execReadRequest() which
1340   * attempts to use distributed reads when there is NoMoreBackendException.
1341   *
1342   * @param request the read request to perform
1343   * @return a ControllerResultSet
1344   * @throws NoMoreBackendException when no more local backends are available to
1345   * execute the request
1346   * @throws SQLException in case of error
1347   */

1348  public ControllerResultSet execLocalStatementExecuteQuery(
1349      SelectRequest request) throws NoMoreBackendException, SQLException JavaDoc
1350  {
1351    return super.statementExecuteQuery(request);
1352  }
1353
1354  /**
1355   * Execute a read request on some remote controller - one in the group. Used
1356   * when the local controller has no backend available to execute the request.
1357   *
1358   * @param request the request to execute
1359   * @return the query ResultSet
1360   * @throws SQLException in case of bad request
1361   */

1362  public abstract ControllerResultSet execRemoteStatementExecuteQuery(
1363      SelectRequest request) throws SQLException JavaDoc;
1364
1365  /**
1366   * @see org.continuent.sequoia.controller.requestmanager.RequestManager#statementExecuteQuery(org.continuent.sequoia.controller.requests.SelectRequest)
1367   */

1368  public ControllerResultSet statementExecuteQuery(SelectRequest request)
1369      throws SQLException JavaDoc
1370  {
1371    if (!request.isMustBroadcast())
1372    {
1373      try
1374      {
1375        return execLocalStatementExecuteQuery(request);
1376      }
1377      catch (SQLException JavaDoc e)
1378      {
1379        if (!(e instanceof NoMoreBackendException))
1380          throw e;
1381        // else this failed locally, try it remotely
1382
// Request failed locally, try on other controllers.
1383
addToDistributedTransactionListIfNeeded(request);
1384        return execRemoteStatementExecuteQuery(request);
1385      }
1386    }
1387    addToDistributedTransactionListIfNeeded(request);
1388    return distributedStatementExecuteQuery(request);
1389  }
1390
1391  /**
1392   * @see org.continuent.sequoia.controller.requestmanager.RequestManager#statementExecuteUpdate(org.continuent.sequoia.controller.requests.AbstractWriteRequest)
1393   */

1394  public ExecuteUpdateResult statementExecuteUpdate(AbstractWriteRequest request)
1395      throws SQLException JavaDoc
1396  {
1397    if (!request.isAutoCommit())
1398    { /*
1399       * Add this transaction to the list of write transactions that needs to be
1400       * globally commited. This happens if the transaction has only be started
1401       * locally but not through a lazy start.
1402       */

1403      Long JavaDoc lTid = new Long JavaDoc(request.getTransactionId());
1404      synchronized (distributedTransactions)
1405      {
1406        if (!distributedTransactions.contains(lTid))
1407          distributedTransactions.add(lTid);
1408      }
1409    }
1410    return distributedStatementExecuteUpdate(request);
1411  }
1412
1413  /**
1414   * @see org.continuent.sequoia.controller.requestmanager.RequestManager#statementExecuteUpdateWithKeys(org.continuent.sequoia.controller.requests.AbstractWriteRequest)
1415   */

1416  public GeneratedKeysResult statementExecuteUpdateWithKeys(
1417      AbstractWriteRequest request) throws SQLException JavaDoc
1418  {
1419    if (!request.isAutoCommit())
1420    { /*
1421       * Add this transaction to the list of write transactions that needs to be
1422       * globally commited. This happens if the transaction has only be started
1423       * locally but not through a lazy start.
1424       */

1425      Long JavaDoc lTid = new Long JavaDoc(request.getTransactionId());
1426      synchronized (distributedTransactions)
1427      {
1428        if (!distributedTransactions.contains(lTid))
1429          distributedTransactions.add(lTid);
1430      }
1431    }
1432    return distributedStatementExecuteUpdateWithKeys(request);
1433  }
1434
1435  /**
1436   * @see org.continuent.sequoia.controller.requestmanager.RequestManager#statementExecute(AbstractRequest)
1437   */

1438  public ExecuteResult statementExecute(AbstractRequest request)
1439      throws SQLException JavaDoc
1440  {
1441    if (!request.isAutoCommit())
1442    { /*
1443       * Add this transaction to the list of write transactions that needs to be
1444       * globally commited. This happens if the transaction has only be started
1445       * locally but not through a lazy start.
1446       */

1447      Long JavaDoc lTid = new Long JavaDoc(request.getTransactionId());
1448      synchronized (distributedTransactions)
1449      {
1450        if (!distributedTransactions.contains(lTid))
1451          distributedTransactions.add(lTid);
1452      }
1453    }
1454    return distributedStatementExecute(request);
1455  }
1456
1457  /**
1458   * @see org.continuent.sequoia.controller.requestmanager.RequestManager#scheduleExecWriteRequest(org.continuent.sequoia.controller.requests.AbstractWriteRequest)
1459   */

1460  public void scheduleExecWriteRequest(AbstractWriteRequest request)
1461      throws SQLException JavaDoc
1462  {
1463    lazyTransactionStart(request);
1464    super.scheduleExecWriteRequest(request);
1465  }
1466
1467  /**
1468   * @see org.continuent.sequoia.controller.requestmanager.RequestManager#callableStatementExecuteQuery(StoredProcedure)
1469   */

1470  public ControllerResultSet callableStatementExecuteQuery(StoredProcedure proc)
1471      throws SQLException JavaDoc
1472  {
1473    // Parse the query first to update the semantic information
1474
getParsingFromCacheOrParse(proc);
1475
1476    // If procedure is read-only, we don't broadcast
1477
SemanticBehavior semantic = proc.getSemantic();
1478    if (proc.isReadOnly() || ((semantic != null) && (semantic.isReadOnly())))
1479    {
1480      try
1481      {
1482        proc.setIsReadOnly(true);
1483        return execLocallyCallableStatementExecuteQuery(proc);
1484      }
1485      catch (AllBackendsFailedException ignore)
1486      {
1487        // This failed locally, try it remotely
1488
}
1489      catch (SQLException JavaDoc e)
1490      {
1491        if (!(e instanceof NoMoreBackendException))
1492          throw e;
1493        // else this failed locally, try it remotely
1494
}
1495    }
1496
1497    addToDistributedTransactionListIfNeeded(proc);
1498    return distributedCallableStatementExecuteQuery(proc);
1499  }
1500
1501  /**
1502   * @see org.continuent.sequoia.controller.requestmanager.RequestManager#callableStatementExecuteUpdate(org.continuent.sequoia.controller.requests.StoredProcedure)
1503   */

1504  public ExecuteUpdateResult callableStatementExecuteUpdate(StoredProcedure proc)
1505      throws SQLException JavaDoc
1506  {
1507    if (!proc.isAutoCommit())
1508    { /*
1509       * Add this transaction to the list of write transactions that needs to be
1510       * globally commited. This happens if the transaction has only be started
1511       * locally but not through a lazy start.
1512       */

1513      Long JavaDoc lTid = new Long JavaDoc(proc.getTransactionId());
1514      synchronized (distributedTransactions)
1515      {
1516        if (!distributedTransactions.contains(lTid))
1517          distributedTransactions.add(lTid);
1518      }
1519    }
1520    return distributedCallableStatementExecuteUpdate(proc);
1521  }
1522
1523  /**
1524   * @see org.continuent.sequoia.controller.requestmanager.RequestManager#callableStatementExecute(StoredProcedure)
1525   */

1526  public ExecuteResult callableStatementExecute(StoredProcedure proc)
1527      throws SQLException JavaDoc
1528  {
1529    // Parse the query first to update the semantic information
1530
getParsingFromCacheOrParse(proc);
1531
1532    // If procedure is read-only, we don't broadcast
1533
SemanticBehavior semantic = proc.getSemantic();
1534    if (proc.isReadOnly() || ((semantic != null) && (semantic.isReadOnly())))
1535    {
1536      try
1537      {
1538        proc.setIsReadOnly(true);
1539        return execLocallyCallableStatementExecute(proc);
1540      }
1541      catch (AllBackendsFailedException ignore)
1542      {
1543        // This failed locally, try it remotely
1544
}
1545      catch (SQLException JavaDoc e)
1546      {
1547        if (!(e instanceof NoMoreBackendException))
1548          throw e;
1549        // else this failed locally, try it remotely
1550
}
1551    }
1552
1553    if (!proc.isAutoCommit())
1554    { /*
1555       * Add this transaction to the list of write transactions that needs to be
1556       * globally commited. This happens if the transaction has only be started
1557       * locally but not through a lazy start.
1558       */

1559      Long JavaDoc lTid = new Long JavaDoc(proc.getTransactionId());
1560      synchronized (distributedTransactions)
1561      {
1562        if (!distributedTransactions.contains(lTid))
1563          distributedTransactions.add(lTid);
1564      }
1565    }
1566    return distributedCallableStatementExecute(proc);
1567  }
1568
1569  /**
1570   * Fetch the result of a previously executed request from a remote controller
1571   * failover cache.
1572   *
1573   * @param successfulControllers controllers to fetch the result from
1574   * @param id unique identifier of the query to look the result for
1575   * @return the request result
1576   * @throws NoResultAvailableException if no result could be retrieved from the
1577   * failover cache
1578   */

1579  protected Serializable JavaDoc getRequestResultFromFailoverCache(
1580      List JavaDoc successfulControllers, long id) throws NoResultAvailableException
1581  {
1582    List JavaDoc groupMembers = new ArrayList JavaDoc(1);
1583
1584    // Try all members in turn and return as soon as one succeeds
1585
for (Iterator JavaDoc iter = successfulControllers.iterator(); iter.hasNext();)
1586    {
1587      Member remoteController = (Member) iter.next();
1588      groupMembers.clear();
1589      groupMembers.add(remoteController);
1590
1591      if (logger.isDebugEnabled())
1592        logger.debug("Getting result for request " + id + " from controllers "
1593            + remoteController);
1594
1595      try
1596      { // Send the request to that controller
1597
MulticastResponse response = dvdb.getMulticastRequestAdapter()
1598            .multicastMessage(groupMembers,
1599                new GetRequestResultFromFailoverCache(id),
1600                MulticastRequestAdapter.WAIT_ALL, 0);
1601        Serializable JavaDoc result = response.getResult(remoteController);
1602
1603        if ((result instanceof Exception JavaDoc)
1604            || (response.getFailedMembers() != null))
1605        { // Failure on the remote controller
1606
if (logger.isInfoEnabled())
1607            logger.info("Controller " + remoteController
1608                + " could not fetch result for request " + id,
1609                (Exception JavaDoc) result);
1610        }
1611        else
1612          return result;
1613      }
1614      catch (Exception JavaDoc e)
1615      {
1616        String JavaDoc msg = "An error occured while getching result for request " + id
1617            + " from controller " + remoteController;
1618        logger.warn(msg, e);
1619      }
1620    }
1621    throw new NoResultAvailableException(
1622        "All controllers failed when trying to fetch result for request " + id);
1623  }
1624
1625  /**
1626   * Stores a result associated with a request in the request result failover
1627   * cache.
1628   * <p>
1629   * Only results for requests initiated on a remote controller are stored.
1630   *
1631   * @param request the request executed
1632   * @param result the result of the request
1633   * @return true if the result was added to the cache, false if the request was
1634   * local to this controller
1635   * @see org.continuent.sequoia.controller.virtualdatabase.RequestResultFailoverCache#store(AbstractRequest,
1636   * Serializable)
1637   */

1638  public boolean storeRequestResult(AbstractRequest request, Serializable JavaDoc result)
1639  {
1640    // Cache only results for requests initiated by a remote controller.
1641
if ((request.getId() & CONTROLLER_ID_BIT_MASK) != dvdb.getControllerId())
1642    {
1643      dvdb.getRequestResultFailoverCache().store(request, result);
1644      return true;
1645    }
1646    return false;
1647  }
1648
1649  //
1650
// RAIDb level specific methods
1651
//
1652

1653  /**
1654   * Distributed implementation of an abort
1655   *
1656   * @param login login that abort the transaction
1657   * @param transactionId id of the commiting transaction
1658   * @throws SQLException if an error occurs
1659   */

1660  public abstract void distributedAbort(String JavaDoc login, long transactionId)
1661      throws SQLException JavaDoc;
1662
1663  /**
1664   * Distributed implementation of a commit
1665   *
1666   * @param login login that commit the transaction
1667   * @param transactionId id of the commiting transaction
1668   * @throws SQLException if an error occurs
1669   */

1670  public abstract void distributedCommit(String JavaDoc login, long transactionId)
1671      throws SQLException JavaDoc;
1672
1673  /**
1674   * Distributed implementation of a rollback
1675   *
1676   * @param login login that rollback the transaction
1677   * @param transactionId id of the rollbacking transaction
1678   * @throws SQLException if an error occurs
1679   */

1680  public abstract void distributedRollback(String JavaDoc login, long transactionId)
1681      throws SQLException JavaDoc;
1682
1683  /**
1684   * Distributed implementation of a rollback to a savepoint
1685   *
1686   * @param login login that rollback the transaction
1687   * @param transactionId id of the transaction
1688   * @param savepointName name of the savepoint
1689   * @throws SQLException if an error occurs
1690   */

1691  public abstract void distributedRollback(String JavaDoc login, long transactionId,
1692      String JavaDoc savepointName) throws SQLException JavaDoc;
1693
1694  /**
1695   * Distributed implementation of setting a savepoint to a transaction
1696   *
1697   * @param login login that releases the savepoint
1698   * @param transactionId id of the transaction
1699   * @param name name of the savepoint to set
1700   * @throws SQLException if an error occurs
1701   */

1702  public abstract void distributedSetSavepoint(String JavaDoc login,
1703      long transactionId, String JavaDoc name) throws SQLException JavaDoc;
1704
1705  /**
1706   * Distributed implementation of releasing a savepoint from a transaction
1707   *
1708   * @param login login that set the savepoint
1709   * @param transactionId id of the transaction
1710   * @param name name of the savepoint to release
1711   * @throws SQLException if an error occurs
1712   */

1713  public abstract void distributedReleaseSavepoint(String JavaDoc login,
1714      long transactionId, String JavaDoc name) throws SQLException JavaDoc;
1715
1716  /**
1717   * Distributed implementation of a select request execution that returns a
1718   * ResultSet.
1719   *
1720   * @param request request to execute
1721   * @return ResultSet containing the auto-generated keys.
1722   * @throws SQLException if an error occurs
1723   */

1724  public abstract ControllerResultSet distributedStatementExecuteQuery(
1725      SelectRequest request) throws SQLException JavaDoc;
1726
1727  /**
1728   * Distributed implementation of a write request execution.
1729   *
1730   * @param request request to execute
1731   * @return number of modified rows
1732   * @throws SQLException if an error occurs
1733   */

1734  public abstract ExecuteUpdateResult distributedStatementExecuteUpdate(
1735      AbstractWriteRequest request) throws SQLException JavaDoc;
1736
1737  /**
1738   * Distributed implementation of a write request execution that returns
1739   * auto-generated keys.
1740   *
1741   * @param request request to execute
1742   * @return update count and ResultSet containing the auto-generated keys.
1743   * @throws SQLException if an error occurs
1744   */

1745  public abstract GeneratedKeysResult distributedStatementExecuteUpdateWithKeys(
1746      AbstractWriteRequest request) throws SQLException JavaDoc;
1747
1748  /**
1749   * Distributed implementation of a Statement.execute() execution.
1750   *
1751   * @param request request to execute
1752   * @return an <code>ExecuteResult</code> object
1753   * @throws SQLException if an error occurs
1754   */

1755  public abstract ExecuteResult distributedStatementExecute(
1756      AbstractRequest request) throws SQLException JavaDoc;
1757
1758  /**
1759   * Distributed implementation of a stored procedure
1760   * CallableStatement.executeQuery() execution.
1761   *
1762   * @param proc stored procedure to execute
1763   * @return ResultSet corresponding to this stored procedure execution
1764   * @throws SQLException if an error occurs
1765   */

1766  public abstract ControllerResultSet distributedCallableStatementExecuteQuery(
1767      StoredProcedure proc) throws SQLException JavaDoc;
1768
1769  /**
1770   * Distributed implementation of a stored procedure
1771   * CallableStatement.executeUpdate() execution.
1772   *
1773   * @param proc stored procedure to execute
1774   * @return number of modified rows
1775   * @throws SQLException if an error occurs
1776   */

1777  public abstract ExecuteUpdateResult distributedCallableStatementExecuteUpdate(
1778      StoredProcedure proc) throws SQLException JavaDoc;
1779
1780  /**
1781   * Distributed implementation of a stored procedure
1782   * CallableStatement.execute() execution.
1783   *
1784   * @param proc stored procedure to execute
1785   * @return an <code>ExecuteResult</code> object
1786   * @throws SQLException if an error occurs
1787   */

1788  public abstract ExecuteResult distributedCallableStatementExecute(
1789      StoredProcedure proc) throws SQLException JavaDoc;
1790
1791  /**
1792   * Once the request has been dispatched, it can be executed using the code
1793   * from <code>RequestManager</code>
1794   *
1795   * @param proc stored procedure to execute
1796   * @return ResultSet corresponding to this stored procedure execution
1797   * @throws AllBackendsFailedException if all backends failed to execute the
1798   * stored procedure
1799   * @throws SQLException if an error occurs
1800   */

1801  public ControllerResultSet execLocallyCallableStatementExecuteQuery(
1802      StoredProcedure proc) throws AllBackendsFailedException, SQLException JavaDoc
1803  {
1804    return super.callableStatementExecuteQuery(proc);
1805  }
1806
1807  /**
1808   * Once the request has been dispatched, it can be executed using the code
1809   * from <code>RequestManager</code>
1810   *
1811   * @param proc stored procedure to execute
1812   * @return ExecuteResult corresponding to this stored procedure execution
1813   * @throws AllBackendsFailedException if all backends failed to execute the
1814   * stored procedure
1815   * @throws SQLException if an error occurs
1816   */

1817  public ExecuteResult execLocallyCallableStatementExecute(StoredProcedure proc)
1818      throws AllBackendsFailedException, SQLException JavaDoc
1819  {
1820    return super.callableStatementExecute(proc);
1821  }
1822
1823  /**
1824   * Test if a transaction has been started on this controller, but initialized
1825   * by a remote controller.
1826   *
1827   * @param currentTid Current transaction Id
1828   * @return True if this transaction of Id currentId has already been started
1829   * on the current controller
1830   */

1831  public boolean isDistributedTransaction(long currentTid)
1832  {
1833    synchronized (distributedTransactions)
1834    {
1835      return distributedTransactions.contains(new Long JavaDoc(currentTid));
1836    }
1837  }
1838
1839  /**
1840   * @see org.continuent.sequoia.controller.requestmanager.RequestManager#resumeActivity()
1841   */

1842  public void resumeActivity()
1843  {
1844    // Perform the distributed call through the group-comm, in order to
1845
// resume all the activity suspended above.
1846
try
1847    {
1848      // Suspend transactions
1849
dvdb.getMulticastRequestAdapter().multicastMessage(dvdb.getAllMembers(),
1850          new ResumeActivity(), MulticastRequestAdapter.WAIT_ALL,
1851          dvdb.getMessageTimeouts().getDisableBackendTimeout());
1852    }
1853    catch (Exception JavaDoc e)
1854    {
1855      String JavaDoc msg = "Error while resuming activity";
1856      logger.error(msg, e);
1857    }
1858  }
1859
1860  /**
1861   * @see org.continuent.sequoia.controller.requestmanager.RequestManager#suspendActivity()
1862   */

1863  public void suspendActivity() throws SQLException JavaDoc
1864  {
1865    // Perform the distributed call through the group-comm, in order to
1866
// atomically suspend all activity on the system.
1867
try
1868    {
1869      // Suspend transactions
1870
dvdb.getMulticastRequestAdapter().multicastMessage(dvdb.getAllMembers(),
1871          new SuspendActivity(), MulticastRequestAdapter.WAIT_ALL,
1872          dvdb.getMessageTimeouts().getDisableBackendTimeout());
1873      logger.info("All activity is suspended for " + dvdb.getDatabaseName());
1874    }
1875    catch (Exception JavaDoc e)
1876    {
1877      String JavaDoc msg = "Error while suspending activity";
1878      logger.error(msg, e);
1879      throw (SQLException JavaDoc) new SQLException JavaDoc(msg + "(" + e + ")").initCause(e);
1880    }
1881  }
1882
1883  /**
1884   * Notify controllers that they are now inconsistent with the cluster and that
1885   * they sould disable themselves.
1886   *
1887   * @param request request that generated the consistency
1888   * @param inconsistentControllers controllers that need to be notified
1889   * @throws SQLException if an error occurs
1890   */

1891  protected void notifyControllerInconsistency(AbstractRequest request,
1892      ArrayList JavaDoc inconsistentControllers) throws SQLException JavaDoc
1893  {
1894    try
1895    {
1896      dvdb.getMulticastRequestAdapter().multicastMessage(
1897          inconsistentControllers, new NotifyInconsistency(request),
1898          MulticastRequestAdapter.WAIT_ALL, 0);
1899    }
1900    catch (Exception JavaDoc e)
1901    {
1902      String JavaDoc msg = "An error occured while notifying controllers ("
1903          + inconsistentControllers
1904          + ") of inconsistency due to distributed request " + request.getId();
1905      logger.warn(msg, e);
1906      throw new SQLException JavaDoc(msg + " (" + e + ")");
1907    }
1908  }
1909
1910  /**
1911   * Notify a set of backends of the query completion.
1912   *
1913   * @param request the request that has completed
1914   * @param success true if the request has successfully completed
1915   * @param disableBackendOnSuccess disable all local backends if query was
1916   * successful (but failed locally). Usually set to true in case of
1917   * AllBackendsFailedException and false for NoMoreBackendException.
1918   * @param backendsToNotify list of backends to notify (returns right away if
1919   * the list is null)
1920   * @throws SQLException if an error occurs
1921   */

1922  protected void notifyRequestCompletion(AbstractRequest request,
1923      boolean success, boolean disableBackendOnSuccess,
1924      ArrayList JavaDoc backendsToNotify) throws SQLException JavaDoc
1925  {
1926    if (backendsToNotify == null)
1927      return;
1928    try
1929    {
1930      dvdb.getMulticastRequestAdapter().multicastMessage(backendsToNotify,
1931          new NotifyCompletion(request, success, disableBackendOnSuccess),
1932          MulticastRequestAdapter.WAIT_ALL,
1933          dvdb.getMessageTimeouts().getNotifyCompletionTimeout());
1934    }
1935    catch (Exception JavaDoc e)
1936    {
1937      String JavaDoc msg = "An error occured while notifying all controllers of failure of distributed request "
1938          + request.getId();
1939      logger.warn(msg, e);
1940      throw new SQLException JavaDoc(msg + " (" + e + ")");
1941    }
1942  }
1943
1944  /**
1945   * Notify a set of backends of the query completion.
1946   *
1947   * @param request the request that has completed
1948   * @param success true if the request has successfully completed
1949   * @param disableBackendOnSuccess disable all local backends if query was
1950   * successful (but failed locally). Usually set to true in case of
1951   * AllBackendsFailedException and false for NoMoreBackendException.
1952   * @param backendsToNotify list of backends to notify (returns right away if
1953   * the list is null)
1954   * @param requestUpdateCount the request update count to be logged if it
1955   * succeeded somewhere
1956   * @throws SQLException
1957   */

1958  protected void notifyRequestCompletion(AbstractRequest request,
1959      boolean success, boolean disableBackendOnSuccess,
1960      ArrayList JavaDoc backendsToNotify, int requestUpdateCount) throws SQLException JavaDoc
1961  {
1962    if (backendsToNotify == null)
1963      return;
1964    try
1965    {
1966      dvdb.getMulticastRequestAdapter().multicastMessage(
1967          backendsToNotify,
1968          new NotifyCompletion(request, success, disableBackendOnSuccess,
1969              requestUpdateCount), MulticastRequestAdapter.WAIT_ALL,
1970          dvdb.getMessageTimeouts().getNotifyCompletionTimeout());
1971    }
1972    catch (Exception JavaDoc e)
1973    {
1974      String JavaDoc msg = "An error occured while notifying all controllers of failure of distributed request "
1975          + request.getId();
1976      logger.warn(msg, e);
1977      throw new SQLException JavaDoc(msg + " (" + e + ")");
1978    }
1979  }
1980
1981  /**
1982   * Cleanup all queries from a given transaction that were registered as failed
1983   * and issued from the other controller. This is used when a controller has
1984   * failed and no status information will be returned for failed queries.
1985   * Queries are systematically tagged as failed. This method is called only for
1986   * failover during a rollback / abort to properly close the transaction on the
1987   * remaining controller.
1988   *
1989   * @param tId the transaction id that we are looking for
1990   */

1991  public void cleanupRollbackFromOtherController(long tId)
1992  {
1993    long cid = this.getControllerId();
1994    synchronized (failedOnAllBackends)
1995    {
1996      for (Iterator JavaDoc iter = failedOnAllBackends.keySet().iterator(); iter
1997          .hasNext();)
1998      {
1999        AbstractRequest request = (AbstractRequest) iter.next();
2000        if (((request.getId() & CONTROLLER_ID_BIT_MASK) != cid)
2001            || ((request.getTransactionId() & CONTROLLER_ID_BIT_MASK) != cid)
2002            && request.getTransactionId() == tId)
2003        { // Need to remove that entry
2004
if (logger.isInfoEnabled())
2005            logger.info("Failover while rollbacking the transaction " + tId
2006                + " detected. No status information received for request "
2007                + request + ", considering status as failed.");
2008          FailureInformation failureInfo = (FailureInformation) failedOnAllBackends
2009              .get(request);
2010
2011          // If the current request is a rollback / abort, we need to call
2012
// logRequestCompletionAndNotifyScheduler with success set to true
2013
// for the transaction to be correctly cleaned up
2014
boolean isAbortOrRollback = (request instanceof UnknownWriteRequest)
2015              && ("rollback".equals(request.getSqlOrTemplate()) || "abort"
2016                  .equals(request.getSqlOrTemplate()));
2017
2018          logRequestCompletionAndNotifyScheduler(request, isAbortOrRollback,
2019              failureInfo, -1);
2020          iter.remove();
2021        }
2022      }
2023    }
2024  }
2025}
Popular Tags