KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > continuent > sequoia > controller > scheduler > AbstractScheduler


1 /**
2  * Sequoia: Database clustering technology.
3  * Copyright (C) 2002-2004 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
6  * Copyright (C) 2006 Continuent, Inc.
7  * Contact: sequoia@continuent.org
8  *
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  * http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  *
21  * Initial developer(s): Emmanuel Cecchet.
22  * Contributor(s): Jean-Bernard van Zuylen, Peter Royal, Damian Arregui, Stephane Giron.
23  */

24
25 package org.continuent.sequoia.controller.scheduler;
26
27 import java.sql.SQLException JavaDoc;
28 import java.util.ArrayList JavaDoc;
29 import java.util.HashMap JavaDoc;
30 import java.util.HashSet JavaDoc;
31 import java.util.Hashtable JavaDoc;
32 import java.util.LinkedList JavaDoc;
33 import java.util.List JavaDoc;
34 import java.util.Set JavaDoc;
35
36 import org.continuent.sequoia.common.exceptions.RollbackException;
37 import org.continuent.sequoia.common.exceptions.VDBisShuttingDownException;
38 import org.continuent.sequoia.common.i18n.Translate;
39 import org.continuent.sequoia.common.log.Trace;
40 import org.continuent.sequoia.common.xml.DatabasesXmlTags;
41 import org.continuent.sequoia.common.xml.XmlComponent;
42 import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
43 import org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager;
44 import org.continuent.sequoia.controller.requests.AbstractRequest;
45 import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
46 import org.continuent.sequoia.controller.requests.SelectRequest;
47 import org.continuent.sequoia.controller.requests.StoredProcedure;
48 import org.continuent.sequoia.controller.sql.schema.DatabaseSchema;
49 import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
50 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedCommit;
51 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedOpenPersistentConnection;
52 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedReleaseSavepoint;
53 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollback;
54 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollbackToSavepoint;
55 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedSetSavepoint;
56
57 /**
58  * The Request Scheduler should schedule the request according to a given
59  * policy.
60  * <p>
61  * The requests comes from the Request Controller and are sent later to the next
62  * controller components (cache and load balancer).
63  *
64  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
65  * @author <a HREF="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
66  * </a>
67  * @author <a HREF="mailto:damian.arregui@continuent.com">Damian Arregui</a>
68  * @version 1.0
69  */

70 public abstract class AbstractScheduler implements XmlComponent
71 {
72
73   //
74
// How the code is organized ?
75
//
76
// 1. Member variables
77
// 2. Constructor
78
// 3. Getter/Setter (possibly in alphabetical order)
79
// 4. Request handling
80
// 5. Transaction management
81
// 6. Checkpoint management
82
// 7. Debug/Monitoring
83
//
84

85   protected int raidbLevel;
86   protected int parsingGranularity;
87
88   // Transaction management
89
private long controllerId = 0;
90   private long tid;
91   private int sid;
92   private int suspendedTransactions = 0;
93   private int pendingTransactions = 0;
94   private final Object JavaDoc transactionSync = new Object JavaDoc();
95   private final Object JavaDoc endOfCurrentTransactions = new Object JavaDoc();
96   private List JavaDoc activeTransactions = new ArrayList JavaDoc();
97   /**
98    * Read requests only account for SelectRequest objects (stored procedures
99    * even with a read-only semantic will be in the write requests list).
100    * Synchronize access on the HashMap itself.
101    */

102   private HashMap JavaDoc activeReadRequests = new HashMap JavaDoc();
103   /**
104    * Write requests also include stored procedures (this list is synchronized
105    * using writesSync().
106    */

107   private HashMap JavaDoc activeWriteRequests = new HashMap JavaDoc();
108
109   // Persistent connection management
110
/**
111    * List of persistent connections that have been created <br>
112    * persistentConnectionId (Long) -> vLogin (String)
113    */

114   protected Hashtable JavaDoc persistentConnections = new Hashtable JavaDoc();
115   private final Object JavaDoc endOfCurrentPersistentConnections = new Object JavaDoc();
116   private Object JavaDoc persistentConnectionsSync = new Object JavaDoc();
117   private int suspendedNewPersistentConnections = 0;
118   private int suspendedOpenClosePersistentConnections = 0;
119   private int pendingOpenClosePersistentConnections = 0;
120   private Object JavaDoc suspendOpenClosePersistentConnectionSync = new Object JavaDoc();
121
122   // Writes management
123
private int suspendedWrites = 0;
124   private int pendingWrites = 0;
125   private final Object JavaDoc writesSync = new Object JavaDoc();
126   private final Object JavaDoc endOfCurrentWrites = new Object JavaDoc();
127
128   private Set JavaDoc suspendedRequests = new HashSet JavaDoc();
129
130   protected static Trace logger = Trace
131                                                                       .getLogger("org.continuent.sequoia.controller.scheduler");
132
133   /** Reference to distributed virtual database total order queue */
134   protected LinkedList JavaDoc totalOrderQueue;
135
136   // Monitoring values
137
private int numberRead = 0;
138   private int numberWrite = 0;
139
140   private VirtualDatabase vdb;
141
142   //
143
// Constructor
144
//
145

146   /**
147    * Default scheduler to assign scheduler RAIDb level, needed granularity and
148    * SQL macro handling (on the fly instanciation of NOW(), RAND(), ...).
149    *
150    * @param raidbLevel RAIDb level of this scheduler
151    * @param parsingGranularity Parsing granularity needed by the scheduler
152    * @param vdb virtual database using this scheduler (needed to access its
153    * total order queue)
154    */

155   public AbstractScheduler(int raidbLevel, int parsingGranularity,
156       VirtualDatabase vdb)
157   {
158     this.raidbLevel = raidbLevel;
159     this.parsingGranularity = parsingGranularity;
160     this.tid = 0;
161     this.sid = 0;
162     this.pendingTransactions = 0;
163     this.pendingWrites = 0;
164     this.totalOrderQueue = vdb.getTotalOrderQueue();
165     this.vdb = vdb;
166   }
167
168   /**
169    * Default scheduler to assign scheduler RAIDb level, needed granularity and
170    * SQL macro handling (on the fly instanciation of NOW(), RAND(), ...).
171    *
172    * @param raidbLevel RAIDb level of this scheduler
173    * @param parsingGranularity Parsing granularity needed by the scheduler
174    * @deprecated This constructor is used only by unsupported scheduler
175    * sub-classes.
176    */

177   public AbstractScheduler(int raidbLevel, int parsingGranularity)
178   {
179     this.raidbLevel = raidbLevel;
180     this.parsingGranularity = parsingGranularity;
181     this.tid = 0;
182     this.sid = 0;
183     this.pendingTransactions = 0;
184     this.pendingWrites = 0;
185     this.totalOrderQueue = null;
186     this.vdb = null;
187   }
188
189   //
190
// Getter/Setter methods
191
//
192

193   /**
194    * Returns the list of active read requests <request id, SelectRequest>.
195    *
196    * @return Returns the active read requests.
197    */

198   public final HashMap JavaDoc getActiveReadRequests()
199   {
200     return activeReadRequests;
201   }
202
203   /**
204    * Returns the list of active transactions (list contains transaction ids).
205    *
206    * @return Returns the active transaction ids.
207    */

208   public final List JavaDoc getActiveTransactions()
209   {
210     return activeTransactions;
211   }
212
213   /**
214    * Returns the list of active write requests <request id, AbstractRequest>.
215    * Write requests can be either StoredProcedure or AbstractWriteRequest
216    * objects.
217    *
218    * @return Returns the active write requests.
219    */

220   public final HashMap JavaDoc getActiveWriteRequests()
221   {
222     return activeWriteRequests;
223   }
224
225   /**
226    * Increments the savepoint id for un-named savepoints
227    *
228    * @return the next savepoint Id
229    */

230   public synchronized int incrementSavepointId()
231   {
232     sid++;
233     return sid;
234   }
235
236   /**
237    * Initialize the transaction id with the given value (usually retrieved from
238    * the recovery log).
239    *
240    * @param transactionId new current transaction identifier
241    */

242   public final void initializeTransactionId(long transactionId)
243   {
244     synchronized (transactionSync)
245     {
246       this.tid = transactionId;
247     }
248   }
249
250   /**
251    * Returns true if the given request id is in the active request list.
252    *
253    * @param requestId the request unique id
254    * @return true if the request is active, false otherwise
255    */

256   public boolean isActiveRequest(long requestId)
257   {
258     Long JavaDoc lId = new Long JavaDoc(requestId);
259     synchronized (activeReadRequests)
260     {
261       if (activeReadRequests.containsKey(lId))
262         return true;
263     }
264     synchronized (activeWriteRequests)
265     {
266       return activeWriteRequests.containsKey(lId);
267     }
268   }
269
270   /**
271    * Retrieve the next transaction identifier
272    *
273    * @return next transaction identifier
274    */

275   public long getNextTransactionId()
276   {
277     synchronized (transactionSync)
278     {
279       return tid++;
280     }
281   }
282
283   /**
284    * Get the needed query parsing granularity.
285    *
286    * @return needed query parsing granularity
287    */

288   public final int getParsingGranularity()
289   {
290     return parsingGranularity;
291   }
292
293   /**
294    * Set the needed query parsing granularity.
295    *
296    * @param parsingGranularity Parsing granularity needed by the scheduler
297    */

298   public final void setParsingGranularity(int parsingGranularity)
299   {
300     this.parsingGranularity = parsingGranularity;
301   }
302
303   /**
304    * Assigns the local controllerId. It is used for generating transactionIds
305    * for autocommit requests.
306    *
307    * @param controllerId for this controller
308    */

309   public void setControllerId(long controllerId)
310   {
311     this.controllerId = controllerId;
312   }
313
314   /**
315    * Returns the number of pending writes.
316    *
317    * @return int
318    */

319   public final int getPendingWrites()
320   {
321     return pendingWrites;
322   }
323
324   /**
325    * Returns the RAIDbLevel.
326    *
327    * @return int
328    */

329   public final int getRAIDbLevel()
330   {
331     return raidbLevel;
332   }
333
334   /**
335    * Sets the RAIDb level.
336    *
337    * @param raidbLevel The RAIDbLevel to set
338    */

339   public final void setRAIDbLevel(int raidbLevel)
340   {
341     this.raidbLevel = raidbLevel;
342   }
343
344   /**
345    * Merge the given <code>DatabaseSchema</code> with the current one.
346    *
347    * @param dbs a <code>DatabaseSchema</code> value
348    * @see org.continuent.sequoia.controller.scheduler.schema.SchedulerDatabaseSchema
349    */

350   public void mergeDatabaseSchema(DatabaseSchema dbs)
351   {
352     logger.info(Translate.get("scheduler.doesnt.support.schemas"));
353   }
354
355   /**
356    * Sets the <code>DatabaseSchema</code> of the current virtual database.
357    * This is only needed by some schedulers that will have to define their own
358    * scheduler schema
359    *
360    * @param dbs a <code>DatabaseSchema</code> value
361    * @see org.continuent.sequoia.controller.scheduler.schema.SchedulerDatabaseSchema
362    */

363   public void setDatabaseSchema(DatabaseSchema dbs)
364   {
365     if (logger.isInfoEnabled())
366       logger.info(Translate.get("scheduler.doesnt.support.schemas"));
367   }
368
369   /**
370    * Wait for the completion of the given request id. The method returns as soon
371    * as the request completion has been notified to the scheduler.
372    *
373    * @param requestId the unique request identifier
374    */

375   public void waitForRequestCompletion(long requestId)
376   {
377     Long JavaDoc lId = new Long JavaDoc(requestId);
378     synchronized (activeReadRequests)
379     {
380       while (activeReadRequests.containsKey(lId))
381       {
382         try
383         {
384           activeReadRequests.wait();
385         }
386         catch (InterruptedException JavaDoc ignore)
387         {
388         }
389       }
390     }
391     synchronized (activeWriteRequests)
392     {
393       while (activeWriteRequests.containsKey(lId))
394       {
395         try
396         {
397           activeWriteRequests.wait();
398         }
399         catch (InterruptedException JavaDoc ignore)
400         {
401         }
402       }
403     }
404   }
405
406   /**
407    * Schedule a read request
408    *
409    * @param request select request to schedule
410    * @exception SQLException if a timeout occurs or a query with the same id has
411    * already been scheduled.
412    */

413   public void scheduleReadRequest(SelectRequest request) throws SQLException JavaDoc
414   {
415     Long JavaDoc id = new Long JavaDoc(request.getId());
416     synchronized (activeReadRequests)
417     {
418       if (activeReadRequests.containsKey(id))
419         throw new SQLException JavaDoc("A query with id " + id
420             + " has already been scheduled");
421       activeReadRequests.put(id, request);
422     }
423
424     // Assign a unique transaction id to requests in autocommit mode as well
425
if (request.isAutoCommit() && request.isMustBroadcast())
426     {
427       long fakeTid = getNextTransactionId();
428       fakeTid = fakeTid & DistributedRequestManager.TRANSACTION_ID_BIT_MASK;
429       fakeTid = fakeTid | controllerId;
430       request.setTransactionId(fakeTid);
431     }
432
433     try
434     {
435       scheduleNonSuspendedReadRequest(request);
436     }
437     catch (SQLException JavaDoc e)
438     {
439       // Remove query for the active queue if we failed to schedule
440
synchronized (activeReadRequests)
441       {
442         activeReadRequests.remove(id);
443       }
444       throw e;
445     }
446   }
447
448   /**
449    * Schedule a read request (implementation specific)
450    *
451    * @param request Select request to schedule (SQL macros are already handled
452    * if needed)
453    * @exception SQLException if a timeout occurs
454    */

455   protected abstract void scheduleNonSuspendedReadRequest(SelectRequest request)
456       throws SQLException JavaDoc;
457
458   /**
459    * Notify the completion of a read statement.
460    *
461    * @param request the completed request
462    * @throws SQLException if the query was not in the list of active read
463    * requests (not scheduled)
464    */

465   public final void readCompleted(SelectRequest request) throws SQLException JavaDoc
466   {
467     Long JavaDoc id = new Long JavaDoc(request.getId());
468     synchronized (activeReadRequests)
469     {
470       if (activeReadRequests.remove(id) == null)
471         throw new SQLException JavaDoc("Query " + id
472             + " is not in the list of currently scheduled queries");
473       activeReadRequests.notifyAll();
474     }
475     numberRead++;
476     this.readCompletedNotify(request);
477   }
478
479   /**
480    * Notify the completion of a read statement.
481    *
482    * @param request the completed request
483    */

484   protected abstract void readCompletedNotify(SelectRequest request);
485
486   /**
487    * Schedule a write request. This method blocks if the writes are suspended.
488    * Then the number of pending writes is updated and the implementation
489    * specific scheduleNonSuspendedWriteRequest function is called. SQL macros
490    * are replaced in the request if the scheduler has needSQLMacroHandling set
491    * to true.
492    *
493    * @param request Write request to schedule
494    * @exception SQLException if a timeout occurs or a query with the same id has
495    * already been scheduled.
496    * @exception RollbackException if an error occurs
497    * @see #scheduleNonSuspendedWriteRequest(AbstractWriteRequest)
498    */

499   public final void scheduleWriteRequest(AbstractWriteRequest request)
500       throws SQLException JavaDoc, RollbackException
501   {
502     suspendWriteIfNeededAndAddQueryToActiveRequests(request);
503     scheduleNonSuspendedWriteRequest(request);
504
505     // Assign a unique transaction id to requests in autocommit mode as well
506
if (request.isAutoCommit())
507     {
508       long fakeTid = getNextTransactionId();
509       fakeTid = fakeTid & DistributedRequestManager.TRANSACTION_ID_BIT_MASK;
510       fakeTid = fakeTid | controllerId;
511       request.setTransactionId(fakeTid);
512     }
513   }
514
515   /**
516    * Schedule a write request (implementation specific). This method blocks
517    * until the request can be executed.
518    *
519    * @param request Write request to schedule (SQL macros are already handled if
520    * needed)
521    * @exception SQLException if a timeout occurs
522    * @exception RollbackException if the transaction must be rollbacked
523    */

524   protected abstract void scheduleNonSuspendedWriteRequest(
525       AbstractWriteRequest request) throws SQLException JavaDoc, RollbackException;
526
527   /**
528    * Notify the completion of a write statement.
529    * <p>
530    * This method updates the number of pending writes and calls the
531    * implementation specific notifyWriteCompleted function.
532    * <p>
533    * Finally, the suspendWrites() function is notified if needed.
534    *
535    * @param request the completed request
536    * @throws SQLException if the query is not in the list of scheduled queries
537    * @see #notifyWriteCompleted(AbstractWriteRequest)
538    * @see #checkPendingWrites()
539    */

540   public final void writeCompleted(AbstractWriteRequest request)
541       throws SQLException JavaDoc
542   {
543     Long JavaDoc id = new Long JavaDoc(request.getId());
544
545     synchronized (writesSync)
546     {
547       synchronized (activeWriteRequests)
548       {
549         if (activeWriteRequests.remove(id) == null)
550           throw new SQLException JavaDoc("Query " + id
551               + " is not in the list of currently scheduled queries");
552
553         activeWriteRequests.notifyAll();
554       }
555       pendingWrites--;
556
557       if (pendingWrites < 0)
558       {
559         logger
560             .error("Negative pending writes detected on write request completion ("
561                 + request + ")");
562         pendingWrites = 0;
563       }
564
565       if (logger.isDebugEnabled())
566         logger.debug("Write completed, remaining pending writes: "
567             + pendingWrites);
568
569       notifyWriteCompleted(request);
570
571       checkPendingWrites();
572     }
573     numberWrite++;
574   }
575
576   /**
577    * Notify the completion of a write statement. This method does not need to be
578    * synchronized, it is enforced by the caller.
579    *
580    * @param request the completed request
581    * @see #writeCompleted(AbstractWriteRequest)
582    */

583   protected abstract void notifyWriteCompleted(AbstractWriteRequest request);
584
585   /**
586    * Schedule a write request. This method blocks if the writes are suspended.
587    * Then the number of pending writes is updated and the implementation
588    * specific scheduleNonSuspendedWriteRequest function is called. SQL macros
589    * are replaced in the request if the scheduler has needSQLMacroHandling set
590    * to true.
591    *
592    * @param proc Stored procedure to schedule
593    * @exception SQLException if a timeout occurs
594    * @exception RollbackException if an error occurs
595    * @see #scheduleNonSuspendedStoredProcedure(StoredProcedure)
596    */

597   public final void scheduleStoredProcedure(StoredProcedure proc)
598       throws SQLException JavaDoc, RollbackException
599   {
600     suspendWriteIfNeededAndAddQueryToActiveRequests(proc);
601     scheduleNonSuspendedStoredProcedure(proc);
602
603     // Assign a unique transaction id to requests in autocommit mode as well
604
if (proc.isAutoCommit())
605     {
606       long fakeTid = getNextTransactionId();
607       fakeTid = fakeTid & DistributedRequestManager.TRANSACTION_ID_BIT_MASK;
608       fakeTid = fakeTid | controllerId;
609       proc.setTransactionId(fakeTid);
610     }
611   }
612
613   /**
614    * Schedule a write request (implementation specific). This method blocks
615    * until the request can be executed.
616    *
617    * @param proc Stored procedure to schedule
618    * @exception SQLException if a timeout occurs
619    * @exception RollbackException if the transaction must be rollbacked
620    */

621   protected abstract void scheduleNonSuspendedStoredProcedure(
622       StoredProcedure proc) throws SQLException JavaDoc, RollbackException;
623
624   /**
625    * Notify the completion of a stored procedure.
626    * <p>
627    * This method updates the number of pending writes and calls the
628    * implementation specific notifyStoredProcedureCompleted function.
629    * <p>
630    * Finally, the suspendWrites() function is notified if needed.
631    *
632    * @param proc the completed stored procedure
633    * @throws SQLException if the stored procedure was not scheduled before (not
634    * in the active request list)
635    * @see #notifyStoredProcedureCompleted(StoredProcedure)
636    * @see #checkPendingWrites()
637    */

638   public final void storedProcedureCompleted(StoredProcedure proc)
639       throws SQLException JavaDoc
640   {
641     Long JavaDoc id = new Long JavaDoc(proc.getId());
642
643     synchronized (writesSync)
644     {
645       synchronized (activeWriteRequests)
646       {
647         if (activeWriteRequests.remove(id) == null)
648           throw new SQLException JavaDoc("Query " + id
649               + " is not in the list of currently scheduled queries");
650
651         activeWriteRequests.notifyAll();
652       }
653
654       pendingWrites--;
655
656       if (pendingWrites < 0)
657       {
658         logger
659             .error("Negative pending writes detected on stored procedure completion ("
660                 + proc + ")");
661         pendingWrites = 0;
662       }
663
664       if (logger.isDebugEnabled())
665         logger.debug("Stored procedure completed, remaining pending writes: "
666             + pendingWrites);
667
668       notifyStoredProcedureCompleted(proc);
669
670       checkPendingWrites();
671     }
672     numberWrite++;
673   }
674
675   /**
676    * Notify the completion of a stored procedure. This method does not need to
677    * be synchronized, it is enforced by the caller.
678    *
679    * @param proc the completed stored procedure
680    * @see #storedProcedureCompleted(StoredProcedure)
681    */

682   protected abstract void notifyStoredProcedureCompleted(StoredProcedure proc);
683
684   /**
685    * Suspend write requests if suspendedWrites is active. Adds the request to
686    * the list of active requests after successful scheduling.
687    *
688    * @param request the request to suspend (a write request or a stored
689    * procedure)
690    * @throws SQLException if the request timeout has expired or a query with the
691    * same id has already been scheduled.
692    */

693   private void suspendWriteIfNeededAndAddQueryToActiveRequests(
694       AbstractRequest request) throws SQLException JavaDoc
695   {
696     Long JavaDoc id = new Long JavaDoc(request.getId());
697
698     synchronized (writesSync)
699     {
700       if (suspendedWrites > 0)
701       {
702         // Let requests in active transactions to execute since they might
703
// unblock queries of other transactions.
704
boolean mustBeSuspended = !request.isPersistentConnection()
705             && (request.isAutoCommit() || !activeTransactions
706                 .contains(new TransactionMetaData(request.getTransactionId(),
707                     0, request.getLogin(), request.isPersistentConnection(),
708                     request.getPersistentConnectionId())));
709
710         if (mustBeSuspended)
711         {
712           addSuspendedRequest(request);
713           try
714           {
715             // Wait on writesSync
716
int timeout = request.getTimeout();
717             if (timeout > 0)
718             {
719               long start = System.currentTimeMillis();
720               long lTimeout = timeout * 1000L;
721               writesSync.wait(lTimeout);
722               long end = System.currentTimeMillis();
723               int remaining = (int) (lTimeout - (end - start));
724               if (remaining > 0)
725                 request.setTimeout(remaining);
726               else
727               {
728                 String JavaDoc msg = Translate.get("scheduler.request.timeout",
729                     new String JavaDoc[]{String.valueOf(request.getId()),
730                         String.valueOf(request.getTimeout()),
731                         String.valueOf(pendingWrites)});
732                 logger.warn(msg);
733                 throw new SQLException JavaDoc(msg);
734               }
735             }
736             else
737               this.writesSync.wait();
738           }
739           catch (InterruptedException JavaDoc e)
740           {
741             String JavaDoc msg = Translate.get("scheduler.request.timeout.failed", e);
742             logger.warn(msg);
743             throw new SQLException JavaDoc(msg);
744           }
745         }
746       }
747
748       synchronized (activeWriteRequests)
749       {
750         if (activeWriteRequests.containsKey(id))
751           throw new SQLException JavaDoc("A query with id " + id
752               + " has already been scheduled");
753
754         activeWriteRequests.put(id, request);
755       }
756       pendingWrites++;
757
758       if (logger.isDebugEnabled())
759         logger.debug("Schedule " + request.getUniqueKey()
760             + " - Current pending writes: " + pendingWrites);
761     }
762   }
763
764   /**
765    * Signals the start of a persistent connection opening operation.
766    *
767    * @param dmsg distributed message which triggered this operation
768    */

769   public void scheduleOpenPersistentConnection(
770       DistributedOpenPersistentConnection dmsg)
771   {
772     checkForSuspendedOpenClosePersistentConnectionsAndIncreasePendingCount();
773
774     // Underlying Hashtable is synchronized and we systematically overwrite
775
// any previous value, it is as fast as checking first.
776
// Check if persistent connections creation is suspended
777
synchronized (persistentConnectionsSync)
778     {
779       if (suspendedNewPersistentConnections > 0)
780       {
781         addSuspendedRequest(dmsg);
782         try
783         {
784           persistentConnectionsSync.wait();
785         }
786         catch (InterruptedException JavaDoc e)
787         {
788           e.printStackTrace();
789         }
790       }
791       persistentConnections.put(new Long JavaDoc(dmsg.getPersistentConnectionId()),
792           dmsg.getLogin());
793     }
794   }
795
796   /**
797    * Signals the start of a persistent connection opening operation.
798    *
799    * @param persistentConnectionId persistent connection id of the connection
800    * that is being opened
801    * @param login login used to open this new persistent connection
802    * @see #scheduleOpenPersistentConnection(DistributedOpenPersistentConnection)
803    */

804   public void scheduleOpenPersistentConnection(long persistentConnectionId,
805       String JavaDoc login)
806   {
807     // Creating a fake DistributedOpenPersistentConnection here, as it is used
808
// in non distributed VDB
809
scheduleOpenPersistentConnection(new DistributedOpenPersistentConnection(
810         login, persistentConnectionId));
811   }
812
813   /**
814    * Schedule a close persistent connection.
815    */

816   public void scheduleClosePersistentConnection()
817   {
818     checkForSuspendedOpenClosePersistentConnectionsAndIncreasePendingCount();
819   }
820
821   private void checkForSuspendedOpenClosePersistentConnectionsAndIncreasePendingCount()
822   {
823     synchronized (suspendOpenClosePersistentConnectionSync)
824     {
825       while (suspendedOpenClosePersistentConnections > 0)
826       {
827         try
828         {
829           suspendOpenClosePersistentConnectionSync.wait();
830         }
831         catch (InterruptedException JavaDoc e)
832         {
833         }
834       }
835       pendingOpenClosePersistentConnections++;
836     }
837   }
838
839   private void decrementOpenClosePersistentConnectionCount()
840   {
841     synchronized (suspendOpenClosePersistentConnectionSync)
842     {
843       pendingOpenClosePersistentConnections--;
844       if (pendingOpenClosePersistentConnections < 0)
845       {
846         logger
847             .error("Negative count of pending open/close persistent connections");
848         pendingOpenClosePersistentConnections = 0;
849       }
850       if (suspendedOpenClosePersistentConnections == 0)
851         suspendOpenClosePersistentConnectionSync.notifyAll();
852     }
853   }
854
855   /**
856    * Notify open persistent connection completion. If it failed the connection
857    * is removed from the persistentConnections table.
858    *
859    * @param persistentConnectionId id of the opened persistent connection
860    * @param success true if connection opening was successful in which case the
861    * connection is added to the persistent connection list
862    */

863   public void openPersistentConnectionCompleted(long persistentConnectionId,
864       boolean success)
865   {
866     decrementOpenClosePersistentConnectionCount();
867     if (!success)
868       synchronized (endOfCurrentPersistentConnections)
869       {
870         persistentConnections.remove(new Long JavaDoc(persistentConnectionId));
871         endOfCurrentPersistentConnections.notifyAll();
872       }
873   }
874
875   /**
876    * Signals the completion of a persistent connection closing operation.
877    *
878    * @param persistentConnectionId id of the closed persistent connection
879    */

880   public void closePersistentConnectionCompleted(long persistentConnectionId)
881   {
882     decrementOpenClosePersistentConnectionCount();
883     synchronized (endOfCurrentPersistentConnections)
884     {
885       persistentConnections.remove(new Long JavaDoc(persistentConnectionId));
886       endOfCurrentPersistentConnections.notifyAll();
887     }
888   }
889
890   /**
891    * Returns the login associated with a given persistent connection.
892    *
893    * @param persistentConnectionId the id of the persistent connection
894    * @return the associated login
895    */

896   public String JavaDoc getPersistentConnectionLogin(Long JavaDoc persistentConnectionId)
897   {
898     return (String JavaDoc) persistentConnections.get(persistentConnectionId);
899   }
900
901   /**
902    * @see org.continuent.sequoia.controller.requestmanager.RequestManager#hasPersistentConnection(long)
903    */

904   public boolean hasPersistentConnection(long persistentConnectionId)
905   {
906     return persistentConnections.contains(new Long JavaDoc(persistentConnectionId));
907   }
908
909   /**
910    * Returns a hashtable of all the open persistent connections (and their
911    * associated login).
912    *
913    * @return persistent connection hashtable
914    */

915   public Hashtable JavaDoc getOpenPersistentConnections()
916   {
917     return persistentConnections;
918   }
919
920   //
921
// Transaction management
922
//
923

924   /**
925    * Begin a new transaction with the transaction identifier provided in the
926    * transaction meta data parameter. Note that this id must retrieve beforehand
927    * by calling getNextTransactionId(). This method is called from the driver
928    * when setAutoCommit(false) is called.
929    *
930    * @param tm The transaction marker metadata
931    * @param isLazyStart true if this begin is triggered by a lazy transaction
932    * start of a transaction initiated by a remote controller. In that
933    * case, suspended transactions will be ignored (but not suspended
934    * writes)
935    * @param request request which triggered this operation
936    * @throws SQLException if an error occurs
937    */

938   public final void begin(TransactionMetaData tm, boolean isLazyStart,
939       AbstractRequest request) throws SQLException JavaDoc
940   {
941     // Check if transactions are suspended
942
boolean retry;
943     do
944     {
945       retry = false;
946       synchronized (transactionSync)
947       {
948         if ((suspendedTransactions > 0) && !isLazyStart
949             && !tm.isPersistentConnection())
950         {
951           addSuspendedRequest(request);
952           try
953           {
954             // Wait on transactionSync
955
long timeout = tm.getTimeout();
956             if (timeout > 0)
957             {
958               long start = System.currentTimeMillis();
959               transactionSync.wait(timeout);
960               long end = System.currentTimeMillis();
961               long remaining = timeout - (end - start);
962               if (remaining > 0)
963                 tm.setTimeout(remaining);
964               else
965               {
966                 String JavaDoc msg = Translate.get(
967                     "scheduler.begin.timeout.transactionSync",
968                     pendingTransactions);
969                 logger.warn(msg);
970                 throw new SQLException JavaDoc(msg);
971               }
972             }
973             else
974               transactionSync.wait();
975           }
976           catch (InterruptedException JavaDoc e)
977           {
978             String JavaDoc msg = Translate.get(
979                 "scheduler.begin.timeout.transactionSync", pendingTransactions)
980                 + " (" + e + ")";
981             logger.error(msg);
982             throw new SQLException JavaDoc(msg);
983           }
984         }
985         if (vdb != null && vdb.isRejectingNewTransaction())
986           throw new VDBisShuttingDownException(
987               "VDB is shutting down... can't start a new transaction");
988
989         pendingTransactions++;
990
991         if (logger.isDebugEnabled())
992           logger.debug("Begin scheduled - current pending transactions: "
993               + pendingTransactions);
994       }
995
996       // Check if writes are suspended
997
synchronized (writesSync)
998       {
999         /*
1000         * If suspendedTransaction changed after we left the block above, we
1001         * need to go back and wait there.
1002         */

1003        synchronized (transactionSync)
1004        {
1005          if ((suspendedTransactions > 0) && !isLazyStart
1006              && !tm.isPersistentConnection())
1007          {
1008            retry = true;
1009            pendingTransactions--;
1010            checkPendingTransactions();
1011            continue;
1012          }
1013        }
1014        if ((suspendedWrites > 0) && !isLazyStart
1015            && !tm.isPersistentConnection())
1016        {
1017          addSuspendedRequest(request);
1018          try
1019          {
1020            // Wait on writesSync
1021
long timeout = tm.getTimeout();
1022            if (timeout > 0)
1023            {
1024              long start = System.currentTimeMillis();
1025              writesSync.wait(timeout);
1026              long end = System.currentTimeMillis();
1027              long remaining = timeout - (end - start);
1028              if (remaining > 0)
1029                tm.setTimeout(remaining);
1030              else
1031              {
1032                String JavaDoc msg = Translate.get(
1033                    "scheduler.begin.timeout.writesSync", pendingWrites);
1034                logger.warn(msg);
1035                synchronized (transactionSync)
1036                {
1037                  pendingTransactions--;
1038                }
1039                checkPendingTransactions();
1040                throw new SQLException JavaDoc(msg);
1041              }
1042            }
1043            else
1044              writesSync.wait();
1045          }
1046          catch (InterruptedException JavaDoc e)
1047          {
1048            String JavaDoc msg = Translate.get("scheduler.begin.timeout.writesSync",
1049                pendingWrites)
1050                + " (" + e + ")";
1051            logger.error(msg);
1052            synchronized (transactionSync)
1053            {
1054              pendingTransactions--;
1055            }
1056            checkPendingTransactions();
1057            throw new SQLException JavaDoc(msg);
1058          }
1059        }
1060        pendingWrites++;
1061
1062        if (logger.isDebugEnabled())
1063          logger.debug("Begin scheduled - current pending writes: "
1064              + pendingWrites);
1065
1066        // Check if the transaction has not already been started and add it to
1067
// the active transaction list
1068
if (activeTransactions.contains(tm))
1069        {
1070          logger.error("Trying to start twice transaction "
1071              + tm.getTransactionId());
1072        }
1073        else
1074          activeTransactions.add(tm);
1075      }
1076    }
1077    while (retry);
1078  }
1079
1080  /**
1081   * Notify the completion of a begin command.
1082   *
1083   * @param transactionId of the completed begin
1084   */

1085  public final void beginCompleted(long transactionId)
1086  {
1087    // Take care of suspended write
1088
synchronized (writesSync)
1089    {
1090      pendingWrites--;
1091      if (pendingWrites < 0)
1092      {
1093        logger
1094            .error("Negative pending writes detected on begin completion for transaction "
1095                + transactionId);
1096        pendingWrites = 0;
1097      }
1098
1099      if (logger.isDebugEnabled())
1100        logger.debug("Begin completed, remaining pending writes: "
1101            + pendingWrites);
1102
1103      checkPendingWrites();
1104    }
1105  }
1106
1107  /**
1108   * Commit a transaction.
1109   * <p>
1110   * Calls the implementation specific commitTransaction()
1111   *
1112   * @param tm The transaction marker metadata
1113   * @param emptyTransaction true if we are committing a transaction that did
1114   * not execute any query
1115   * @param dmsg distributed message which triggered this operation
1116   * @throws SQLException if an error occurs
1117   * @see #commitTransaction(long)
1118   */

1119  public final void commit(TransactionMetaData tm, boolean emptyTransaction,
1120      DistributedCommit dmsg) throws SQLException JavaDoc
1121  {
1122    // Check if writes are suspended
1123
synchronized (writesSync)
1124    {
1125      if (!activeTransactions.contains(tm))
1126        throw new SQLException JavaDoc("Transaction " + tm.getTransactionId()
1127            + " is not active, rejecting the commit.");
1128
1129      // if ((suspendedWrites > 0) && !tm.isPersistentConnection())
1130
if (false) // never suspend a commit
1131
{
1132        addSuspendedRequest(dmsg);
1133        try
1134        {
1135          // Wait on writesSync
1136
long timeout = tm.getTimeout();
1137          if (timeout > 0)
1138          {
1139            long start = System.currentTimeMillis();
1140            writesSync.wait(timeout);
1141            long end = System.currentTimeMillis();
1142            long remaining = timeout - (end - start);
1143            if (remaining > 0)
1144              tm.setTimeout(remaining);
1145            else
1146            {
1147              String JavaDoc msg = Translate.get("scheduler.commit.timeout.writesSync",
1148                  pendingWrites);
1149              logger.warn(msg);
1150              throw new SQLException JavaDoc(msg);
1151            }
1152          }
1153          else
1154            writesSync.wait();
1155        }
1156        catch (InterruptedException JavaDoc e)
1157        {
1158          String JavaDoc msg = Translate.get("scheduler.commit.timeout.writesSync",
1159              pendingWrites)
1160              + " (" + e + ")";
1161          logger.error(msg);
1162          throw new SQLException JavaDoc(msg);
1163        }
1164      }
1165      pendingWrites++;
1166
1167      if (logger.isDebugEnabled())
1168        logger.debug("Commit scheduled - current pending writes: "
1169            + pendingWrites);
1170    }
1171    if (!emptyTransaction)
1172      commitTransaction(tm.getTransactionId());
1173  }
1174
1175  /**
1176   * Commit a transaction given its id.
1177   *
1178   * @param transactionId the transaction id
1179   */

1180  protected abstract void commitTransaction(long transactionId);
1181
1182  /**
1183   * Notify the completion of a commit command.
1184   *
1185   * @param tm The transaction marker metadata
1186   * @param isSuccess true if commit was successful, false otherwise
1187   */

1188  public final void commitCompleted(TransactionMetaData tm, boolean isSuccess)
1189  {
1190    boolean transactionIsActive = false;
1191    synchronized (writesSync)
1192    {
1193      if (isSuccess)
1194      {
1195        transactionIsActive = activeTransactions.remove(tm);
1196      }
1197    }
1198    if (transactionIsActive)
1199    {
1200      // Take care of suspended transactions
1201
synchronized (transactionSync)
1202      {
1203        pendingTransactions--;
1204        if (pendingTransactions < 0)
1205        {
1206          logger
1207              .error("Negative pending transactions detected on commit completion for transaction "
1208                  + tm.getTransactionId());
1209          pendingTransactions = 0;
1210        }
1211
1212        if (logger.isDebugEnabled())
1213          logger.debug("Commit completed, remaining pending transactions: "
1214              + pendingTransactions);
1215
1216        checkPendingTransactions();
1217      }
1218    }
1219    else if ((isSuccess) && (logger.isDebugEnabled()))
1220      logger.debug("Transaction " + tm.getTransactionId()
1221          + " has already completed.");
1222
1223    // Take care of suspended write
1224
synchronized (writesSync)
1225    {
1226      pendingWrites--;
1227      if (pendingWrites < 0)
1228      {
1229        logger
1230            .error("Negative pending writes detected on commit completion for transaction"
1231                + tm.getTransactionId());
1232        pendingWrites = 0;
1233      }
1234
1235      if (logger.isDebugEnabled())
1236        logger.debug("Commit completed, remaining pending writes: "
1237            + pendingWrites);
1238
1239      checkPendingWrites();
1240    }
1241  }
1242
1243  /**
1244   * Rollback a transaction.
1245   * <p>
1246   * Calls the implementation specific rollbackTransaction()
1247   *
1248   * @param tm The transaction marker metadata
1249   * @param dmsg distributed message which triggered this operation
1250   * @exception SQLException if an error occurs
1251   * @see #rollbackTransaction(long)
1252   */

1253  public final void rollback(TransactionMetaData tm, DistributedRollback dmsg)
1254      throws SQLException JavaDoc
1255  {
1256    // Check if writes are suspended
1257
synchronized (writesSync)
1258    {
1259      if (!activeTransactions.contains(tm))
1260        throw new SQLException JavaDoc("Transaction " + tm.getTransactionId()
1261            + " is not active, rejecting the rollback.");
1262
1263      // if ((suspendedWrites > 0) && !tm.isPersistentConnection())
1264
if (false) // never suspend a rollback
1265
{
1266        addSuspendedRequest(dmsg);
1267        try
1268        {
1269          // Wait on writesSync
1270
long timeout = tm.getTimeout();
1271          if (timeout > 0)
1272          {
1273            long start = System.currentTimeMillis();
1274            writesSync.wait(timeout);
1275            long end = System.currentTimeMillis();
1276            long remaining = timeout - (end - start);
1277            if (remaining > 0)
1278              tm.setTimeout(remaining);
1279            else
1280            {
1281              String JavaDoc msg = Translate.get(
1282                  "scheduler.rollback.timeout.writesSync", pendingWrites);
1283              logger.warn(msg);
1284              throw new SQLException JavaDoc(msg);
1285            }
1286          }
1287          else
1288            writesSync.wait();
1289        }
1290        catch (InterruptedException JavaDoc e)
1291        {
1292          String JavaDoc msg = Translate.get("scheduler.rollback.timeout.writesSync",
1293              pendingWrites)
1294              + " (" + e + ")";
1295          logger.error(msg);
1296          throw new SQLException JavaDoc(msg);
1297        }
1298      }
1299      pendingWrites++;
1300
1301      if (logger.isDebugEnabled())
1302        logger.debug("Rollback scheduled - current pending writes: "
1303            + pendingWrites);
1304    }
1305    rollbackTransaction(tm.getTransactionId());
1306  }
1307
1308  /**
1309   * Rollback a transaction to a savepoint.
1310   * <p>
1311   * Calls the implementation specific rollbackTransaction()
1312   *
1313   * @param tm transaction marker metadata
1314   * @param savepointName name of the savepoint
1315   * @param dmsg distributed message which triggered this operation
1316   * @throws SQLException if an error occurs
1317   */

1318  public final void rollback(TransactionMetaData tm, String JavaDoc savepointName,
1319      DistributedRollbackToSavepoint dmsg) throws SQLException JavaDoc
1320  {
1321    // Check if writes are suspended
1322
synchronized (writesSync)
1323    {
1324      // if ((suspendedWrites > 0) && !tm.isPersistentConnection())
1325
if (false) // never suspend a rollback
1326
{
1327        addSuspendedRequest(dmsg);
1328        try
1329        {
1330          // Wait on writesSync
1331
long timeout = tm.getTimeout();
1332          if (timeout > 0)
1333          {
1334            long start = System.currentTimeMillis();
1335            writesSync.wait(timeout);
1336            long end = System.currentTimeMillis();
1337            long remaining = timeout - (end - start);
1338            if (remaining > 0)
1339              tm.setTimeout(remaining);
1340            else
1341            {
1342              String JavaDoc msg = Translate.get(
1343                  "scheduler.rollbacksavepoint.timeout.writeSync",
1344                  pendingWrites);
1345              logger.warn(msg);
1346              throw new SQLException JavaDoc(msg);
1347            }
1348          }
1349          else
1350            writesSync.wait();
1351        }
1352        catch (InterruptedException JavaDoc e)
1353        {
1354          String JavaDoc msg = Translate.get(
1355              "scheduler.rollbacksavepoint.timeout.writeSync", pendingWrites)
1356              + " (" + e + ")";
1357          logger.error(msg);
1358          throw new SQLException JavaDoc(msg);
1359        }
1360      }
1361      pendingWrites++;
1362
1363      if (logger.isDebugEnabled())
1364        logger.debug("Rollback " + savepointName
1365            + " scheduled - current pending writes: " + pendingWrites);
1366    }
1367
1368    this.rollbackTransaction(tm.getTransactionId(), savepointName);
1369  }
1370
1371  /**
1372   * Rollback a transaction given its id.
1373   *
1374   * @param transactionId the transaction id
1375   */

1376  protected abstract void rollbackTransaction(long transactionId);
1377
1378  /**
1379   * Rollback a transaction given its id to a savepoint given its name.
1380   *
1381   * @param transactionId the transaction id
1382   * @param savepointName the name of the savepoint
1383   */

1384  protected abstract void rollbackTransaction(long transactionId,
1385      String JavaDoc savepointName);
1386
1387  /**
1388   * Notify the completion of a rollback command.
1389   *
1390   * @param tm The transaction marker metadata
1391   * @param isSuccess true if commit was successful, false otherwise
1392   */

1393  public final void rollbackCompleted(TransactionMetaData tm, boolean isSuccess)
1394  {
1395    boolean transactionIsActive = false;
1396    synchronized (writesSync)
1397    {
1398      if (isSuccess)
1399      {
1400        transactionIsActive = activeTransactions.remove(tm);
1401      }
1402    }
1403    if (transactionIsActive)
1404    {
1405      // Take care of suspended transactions
1406
synchronized (transactionSync)
1407      {
1408        pendingTransactions--;
1409        if (pendingTransactions < 0)
1410        {
1411          logger
1412              .error("Negative pending transactions detected on rollback completion for transaction "
1413                  + tm.getTransactionId());
1414          pendingTransactions = 0;
1415        }
1416
1417        if (logger.isDebugEnabled())
1418          logger.debug("Rollback completed, remaining pending transactions: "
1419              + pendingTransactions);
1420
1421        checkPendingTransactions();
1422      }
1423    }
1424    else if ((isSuccess) && (logger.isDebugEnabled()))
1425      logger.debug("Transaction " + tm.getTransactionId()
1426          + " has already completed.");
1427
1428    // Take care of suspended write
1429
synchronized (writesSync)
1430    {
1431      pendingWrites--;
1432
1433      if (pendingWrites < 0)
1434      {
1435        logger
1436            .error("Negative pending writes detected on rollback completion for transaction "
1437                + tm.getTransactionId());
1438        pendingWrites = 0;
1439      }
1440
1441      if (logger.isDebugEnabled())
1442        logger.debug("Rollback completed, remaining pending writes: "
1443            + pendingWrites);
1444
1445      checkPendingWrites();
1446    }
1447  }
1448
1449  /**
1450   * Set an unnamed savepoint.
1451   * <p>
1452   * Calls the implementation specific setSavepointTransaction()
1453   *
1454   * @param tm transaction marker metadata
1455   * @return savepoint Id
1456   * @throws SQLException if an error occurs
1457   */

1458  public final int setSavepoint(TransactionMetaData tm) throws SQLException JavaDoc
1459  {
1460    // Check if writes are suspended
1461
synchronized (writesSync)
1462    {
1463      if (suspendedWrites > 0)
1464      {
1465        try
1466        {
1467          // Wait on writesSync
1468
long timeout = tm.getTimeout();
1469          if (timeout > 0)
1470          {
1471            long start = System.currentTimeMillis();
1472            writesSync.wait(timeout);
1473            long end = System.currentTimeMillis();
1474            long remaining = timeout - (end - start);
1475            if (remaining > 0)
1476              tm.setTimeout(remaining);
1477            else
1478            {
1479              String JavaDoc msg = Translate.get(
1480                  "scheduler.setsavepoint.timeout.writeSync", pendingWrites);
1481              logger.warn(msg);
1482              throw new SQLException JavaDoc(msg);
1483            }
1484          }
1485          else
1486            writesSync.wait();
1487        }
1488        catch (InterruptedException JavaDoc e)
1489        {
1490          String JavaDoc msg = Translate.get(
1491              "scheduler.setsavepoint.timeout.writeSync", pendingWrites)
1492              + " (" + e + ")";
1493          logger.error(msg);
1494          throw new SQLException JavaDoc(msg);
1495        }
1496      }
1497      pendingWrites++;
1498
1499      if (logger.isDebugEnabled())
1500        logger.debug("Set savepoint scheduled - current pending writes: "
1501            + pendingWrites);
1502    }
1503
1504    int savepointId = this.incrementSavepointId();
1505    this.setSavepointTransaction(tm.getTransactionId(), String
1506        .valueOf(savepointId));
1507    return savepointId;
1508  }
1509
1510  /**
1511   * Set a named savepoint.
1512   * <p>
1513   * Calls the implementation specific setSavepointTransaction()
1514   *
1515   * @param tm transaction marker metadata
1516   * @param name name of the savepoint
1517   * @param dmsg distributed message which triggered this operation
1518   * @throws SQLException if an error occurs
1519   */

1520  public final void setSavepoint(TransactionMetaData tm, String JavaDoc name,
1521      DistributedSetSavepoint dmsg) throws SQLException JavaDoc
1522  {
1523    // Check if writes are suspended
1524
synchronized (writesSync)
1525    {
1526      if (suspendedWrites > 0)
1527      {
1528        addSuspendedRequest(dmsg);
1529        try
1530        {
1531          // Wait on writesSync
1532
long timeout = tm.getTimeout();
1533          if (timeout > 0)
1534          {
1535            long start = System.currentTimeMillis();
1536            writesSync.wait(timeout);
1537            long end = System.currentTimeMillis();
1538            long remaining = timeout - (end - start);
1539            if (remaining > 0)
1540              tm.setTimeout(remaining);
1541            else
1542            {
1543              String JavaDoc msg = Translate.get(
1544                  "scheduler.setsavepoint.timeout.writeSync", pendingWrites);
1545              logger.warn(msg);
1546              throw new SQLException JavaDoc(msg);
1547            }
1548          }
1549          else
1550            writesSync.wait();
1551        }
1552        catch (InterruptedException JavaDoc e)
1553        {
1554          String JavaDoc msg = Translate.get(
1555              "scheduler.setsavepoint.timeout.writeSync", pendingWrites)
1556              + " (" + e + ")";
1557          logger.error(msg);
1558          throw new SQLException JavaDoc(msg);
1559        }
1560      }
1561      pendingWrites++;
1562
1563      if (logger.isDebugEnabled())
1564        logger.debug("Set savepoint " + name
1565            + " scheduled - current pending writes: " + pendingWrites);
1566    }
1567
1568    this.setSavepointTransaction(tm.getTransactionId(), name);
1569  }
1570
1571  /**
1572   * Set a savepoint given its name to a transaction given its id.
1573   *
1574   * @param transactionId the transaction id
1575   * @param name the name of the savepoint
1576   */

1577  protected abstract void setSavepointTransaction(long transactionId,
1578      String JavaDoc name);
1579
1580  /**
1581   * Release a savepoint.
1582   * <p>
1583   * Calls the implementation specific releaseSavepointTransaction()
1584   *
1585   * @param tm transaction marker metadata
1586   * @param name name of the savepoint
1587   * @param dmsg distributed message which triggered this operation
1588   * @throws SQLException if an error occurs
1589   */

1590  public final void releaseSavepoint(TransactionMetaData tm, String JavaDoc name,
1591      DistributedReleaseSavepoint dmsg) throws SQLException JavaDoc
1592  {
1593    // Check if writes are suspended
1594
synchronized (writesSync)
1595    {
1596      if (suspendedWrites > 0)
1597      {
1598        addSuspendedRequest(dmsg);
1599        try
1600        {
1601          // Wait on writesSync
1602
long timeout = tm.getTimeout();
1603          if (timeout > 0)
1604          {
1605            long start = System.currentTimeMillis();
1606            writesSync.wait(timeout);
1607            long end = System.currentTimeMillis();
1608            long remaining = timeout - (end - start);
1609            if (remaining > 0)
1610              tm.setTimeout(remaining);
1611            else
1612            {
1613              String JavaDoc msg = Translate
1614                  .get("scheduler.releasesavepoint.timeout.writeSync",
1615                      pendingWrites);
1616              logger.warn(msg);
1617              throw new SQLException JavaDoc(msg);
1618            }
1619          }
1620          else
1621            writesSync.wait();
1622        }
1623        catch (InterruptedException JavaDoc e)
1624        {
1625          String JavaDoc msg = Translate.get(
1626              "scheduler.releasesavepoint.timeout.writeSync", pendingWrites)
1627              + " (" + e + ")";
1628          logger.error(msg);
1629          throw new SQLException JavaDoc(msg);
1630        }
1631      }
1632      pendingWrites++;
1633
1634      if (logger.isDebugEnabled())
1635        logger.debug("Release savepoint " + name
1636            + " scheduled - current pending writes: " + pendingWrites);
1637    }
1638
1639    this.releaseSavepointTransaction(tm.getTransactionId(), name);
1640  }
1641
1642  /**
1643   * Release a savepoint given its name from a transaction given its id.
1644   *
1645   * @param transactionId the transaction id
1646   * @param name the name of the savepoint
1647   */

1648  protected abstract void releaseSavepointTransaction(long transactionId,
1649      String JavaDoc name);
1650
1651  /**
1652   * Notify the conpletion of a savepoint action.
1653   *
1654   * @param transactionId the transaction identifier
1655   */

1656  public final void savepointCompleted(long transactionId)
1657  {
1658    synchronized (writesSync)
1659    {
1660      pendingWrites--;
1661
1662      if (pendingWrites < 0)
1663      {
1664        logger
1665            .error("Negative pending writes detected on savepoint completion for transaction"
1666                + transactionId);
1667        pendingWrites = 0;
1668      }
1669
1670      if (logger.isDebugEnabled())
1671        logger.debug("Savepoint completed, remaining pending writes: "
1672            + pendingWrites);
1673
1674      checkPendingWrites();
1675    }
1676  }
1677
1678  /**
1679   * Resume new transactions that were suspended by
1680   * suspendNewTransactionsForCheckpoint().
1681   *
1682   * @see #suspendNewTransactions()
1683   */

1684  public final void resumeNewTransactions()
1685  {
1686    if (logger.isDebugEnabled())
1687      logger.debug("Resuming new transactions");
1688
1689    synchronized (transactionSync)
1690    {
1691      suspendedTransactions--;
1692      if (suspendedTransactions < 0)
1693      {
1694        suspendedTransactions = 0;
1695        logger
1696            .error("Unexpected negative suspendedTransactions in AbstractScheduler.resumeNewTransactions()");
1697      }
1698      if (suspendedTransactions == 0)
1699      {
1700        // Wake up all pending begin statements
1701
transactionSync.notifyAll();
1702      }
1703    }
1704  }
1705
1706  /**
1707   * Suspend all calls to begin() until until resumeWrites() is called. This
1708   * method does not block and returns immediately. To synchronize on suspended
1709   * writes completion, you must call waitForSuspendedWritesToComplete().
1710   * <p>
1711   * New transactions remain suspended until resumeNewTransactions() is called.
1712   *
1713   * @see #resumeNewTransactions()
1714   * @see #waitForSuspendedTransactionsToComplete()
1715   */

1716  public final void suspendNewTransactions()
1717  {
1718    if (logger.isDebugEnabled())
1719      logger.debug("Suspending new transactions");
1720
1721    synchronized (transactionSync)
1722    {
1723      suspendedTransactions++;
1724    }
1725  }
1726
1727  /**
1728   * Suspend all calls to begin() until until resumeWrites() and
1729   * resumeNewTransactions are called. This method does not block and returns
1730   * immediately. To synchronize on suspended writes completion, you must call
1731   * waitForSuspendedWritesToComplete(). Suspending writes and transactions is
1732   * done atomically in order to close a window in begin().
1733   * <p>
1734   * New transactions remain suspended until resumeNewTransactions() and
1735   * resumeWrites are called.
1736   *
1737   * @see #resumeNewTransactions()
1738   * @see #waitForSuspendedTransactionsToComplete()
1739   */

1740  public void suspendNewTransactionsAndWrites()
1741  {
1742    if (logger.isDebugEnabled())
1743      logger.debug("Suspending new transactions and writes");
1744
1745    synchronized (writesSync)
1746    {
1747      synchronized (transactionSync)
1748      {
1749        suspendedTransactions++;
1750        suspendedWrites++;
1751      }
1752    }
1753  }
1754
1755  /**
1756   * Wait for suspended transactions to complete. Returns as soon as number of
1757   * pending transactions has reached 0.
1758   *
1759   * @throws SQLException if an error occured during wait
1760   */

1761  public void waitForSuspendedTransactionsToComplete() throws SQLException JavaDoc
1762  {
1763    synchronized (transactionSync)
1764    {
1765      if (pendingTransactions == 0)
1766      {
1767        if (logger.isDebugEnabled())
1768          logger.debug("All transactions suspended");
1769        return;
1770      }
1771    }
1772    long waitTime = 15000;
1773    while (true)
1774    {
1775
1776      synchronized (endOfCurrentTransactions)
1777      {
1778        // Here we have a potential synchronization problem since the last
1779
// transaction completion could have happened before we entered this
1780
// synchronized block. Therefore we recheck if there is effectively
1781
// still pending transactions. If this is not the case, we don't have
1782
// to sleep and we can immediately return.
1783
if (pendingTransactions == 0)
1784        {
1785          if (logger.isDebugEnabled())
1786            logger.debug("All new transactions suspended");
1787          return;
1788        }
1789
1790        if (logger.isDebugEnabled())
1791          logger.debug("Waiting for " + pendingTransactions
1792              + " transactions to complete.");
1793
1794        // Wait for pending transactions to end
1795
try
1796        {
1797          endOfCurrentTransactions.wait(waitTime);
1798        }
1799        catch (InterruptedException JavaDoc e)
1800        {
1801          String JavaDoc msg = Translate.get("scheduler.suspend.transaction.failed", e);
1802          logger.error(msg);
1803          throw new SQLException JavaDoc(msg);
1804        }
1805      }
1806      synchronized (transactionSync)
1807      {
1808        if (pendingTransactions == 0)
1809          break;
1810        else
1811        {
1812          logger.warn("Waiting for " + pendingTransactions
1813              + " open transactions");
1814          waitTime *= 2;
1815        }
1816
1817      }
1818    }
1819
1820    if (logger.isDebugEnabled())
1821      logger.debug("All new transactions suspended");
1822  }
1823
1824  /**
1825   * Resume the execution of the <em>new write queries</em> that were
1826   * suspended by <code>suspendNewWrites()</code>.
1827   *
1828   * @see #suspendNewWrites()
1829   */

1830  public void resumeWrites()
1831  {
1832    if (logger.isDebugEnabled())
1833      logger.debug("Resuming writes");
1834
1835    synchronized (writesSync)
1836    {
1837      suspendedWrites--;
1838      if (suspendedWrites < 0)
1839      {
1840        suspendedWrites = 0;
1841        logger
1842            .error("Unexpected negative suspendedWrites in AbstractScheduler.resumeWrites()");
1843      }
1844      if (suspendedWrites == 0)
1845      {
1846        // Wake up all waiting writes
1847
writesSync.notifyAll();
1848      }
1849    }
1850  }
1851
1852  /**
1853   * Checks if the write queries are suspended and there is no remaining pending
1854   * writes. In that case, notify <code>endOcCurrentWrites</code>
1855   */

1856  private void checkPendingWrites()
1857  {
1858    synchronized (writesSync)
1859    {
1860      // If this is the last write to complete and writes are
1861
// suspended we have to notify suspendedWrites()
1862
if ((suspendedWrites > 0) && (pendingWrites == 0))
1863      {
1864        synchronized (endOfCurrentWrites)
1865        {
1866          endOfCurrentWrites.notifyAll();
1867        }
1868      }
1869    }
1870  }
1871
1872  /**
1873   * Checks if the transactions are suspended and that there is no remaining
1874   * pending transactions. In that case, notify
1875   * <code>endOfCurrentTransactions</code>
1876   *
1877   * @see #suspendNewTransactions()
1878   */

1879  private void checkPendingTransactions()
1880  {
1881    synchronized (transactionSync)
1882    {
1883      // If it is the last pending transaction to complete and we
1884
// are waiting for pending transactions to complete, then wake
1885
// up suspendNewTransactionsForCheckpoint()
1886
if ((suspendedTransactions > 0) && (pendingTransactions == 0))
1887      {
1888        synchronized (endOfCurrentTransactions)
1889        {
1890          endOfCurrentTransactions.notifyAll();
1891        }
1892      }
1893    }
1894  }
1895
1896  /**
1897   * Resume suspended writes, transactions and persistent connections (in this
1898   * order).
1899   */

1900  public void resumeWritesTransactionsAndPersistentConnections()
1901  {
1902    clearSuspendedRequests();
1903    resumeWrites();
1904    resumeNewTransactions();
1905    resumeNewPersistentConnections();
1906  }
1907
1908  //
1909
// Transaction management
1910
//
1911

1912  /**
1913   * Suspend all <em>new write queries</em> until resumeWrites() is called.
1914   * This method does not block and returns immediately. To synchronize on
1915   * suspended writes completion, you must call
1916   * waitForSuspendedWritesToComplete().
1917   *
1918   * @see #resumeWrites()
1919   * @see #waitForSuspendedWritesToComplete()
1920   */

1921  public void suspendNewWrites()
1922  {
1923    if (logger.isDebugEnabled())
1924      logger.debug("Suspending new writes");
1925
1926    synchronized (writesSync)
1927    {
1928      suspendedWrites++;
1929    }
1930  }
1931
1932  /**
1933   * Wait for suspended writes to complete. Returns as soon as number of pending
1934   * writes has reached 0.
1935   *
1936   * @throws SQLException if an error occured during wait
1937   */

1938  public void waitForSuspendedWritesToComplete() throws SQLException JavaDoc
1939  {
1940    synchronized (writesSync)
1941    {
1942      if (pendingWrites == 0)
1943      {
1944        if (logger.isDebugEnabled())
1945          logger.debug("All writes suspended");
1946        return;
1947      }
1948    }
1949
1950    long waitTime = 15000;
1951    while (true)
1952    {
1953      synchronized (endOfCurrentWrites)
1954      {
1955        // Here we have a potential synchronization problem since the last
1956
// write completion could have happened before we entered this
1957
// synchronized block. Therefore we recheck if there is effectively
1958
// still pending writes. If this is not the case, we don't have
1959
// to sleep and we can immediately return.
1960
if (pendingWrites == 0)
1961        {
1962          if (logger.isDebugEnabled())
1963            logger.debug("All writes suspended");
1964          return;
1965        }
1966
1967        if (logger.isDebugEnabled())
1968          logger.debug("Wait for " + pendingWrites + " writes to complete.");
1969
1970        // Wait for pending writes to end
1971
try
1972        {
1973          endOfCurrentWrites.wait(waitTime);
1974        }
1975        catch (InterruptedException JavaDoc e)
1976        {
1977          String JavaDoc msg = Translate.get("scheduler.suspend.writes.failed", e);
1978          logger.error(msg);
1979          throw new SQLException JavaDoc(msg);
1980        }
1981      }
1982      synchronized (writesSync)
1983      {
1984        if (pendingWrites == 0)
1985          break;
1986        else
1987        {
1988          logger.warn("Waiting for " + pendingWrites + " pending writes");
1989          waitTime *= 2;
1990        }
1991      }
1992    }
1993
1994    if (logger.isDebugEnabled())
1995      logger.debug("All writes suspended");
1996  }
1997
1998  /**
1999   * Resumes openinh and closing of persistent connections.
2000   */

2001  public void resumeOpenClosePersistentConnection()
2002  {
2003    synchronized (suspendOpenClosePersistentConnectionSync)
2004    {
2005      suspendedOpenClosePersistentConnections--;
2006      if (suspendedOpenClosePersistentConnections == 0)
2007        suspendOpenClosePersistentConnectionSync.notifyAll();
2008    }
2009  }
2010
2011  /**
2012   * Resume new persistent connections creations that were suspended by
2013   * suspendNewPersistentConnections().
2014   *
2015   * @see #suspendNewPersistentConnections()
2016   */

2017  public final void resumeNewPersistentConnections()
2018  {
2019    if (logger.isDebugEnabled())
2020      logger.debug("Resuming new persistent connections");
2021
2022    synchronized (persistentConnectionsSync)
2023    {
2024      suspendedNewPersistentConnections--;
2025      if (suspendedNewPersistentConnections < 0)
2026      {
2027        suspendedNewPersistentConnections = 0;
2028        logger
2029            .error("Unexpected negative suspendedPersistentConnections in AbstractScheduler.resumeNewPersistentConnections()");
2030      }
2031      if (suspendedNewPersistentConnections == 0)
2032      {
2033        // Wake up all pending persistent connections creation
2034
persistentConnectionsSync.notifyAll();
2035      }
2036    }
2037  }
2038
2039  /**
2040   * Suspends open and closing of persistent connections.
2041   *
2042   * @see org.continuent.sequoia.controller.requestmanager.RequestManager#closePersistentConnection(String,
2043   * long)
2044   */

2045  public void suspendOpenClosePersistentConnection()
2046  {
2047    synchronized (suspendOpenClosePersistentConnectionSync)
2048    {
2049      suspendedOpenClosePersistentConnections++;
2050    }
2051  }
2052
2053  /**
2054   * Suspend all new persistent connections creation. This method does not block
2055   * and returns immediately. New connections remain suspended until
2056   * resumeNewPersistentConnections() is called.
2057   *
2058   * @see #resumeNewPersistentConnections()
2059   * @see #waitForPersistentConnectionsToComplete()
2060   */

2061  public void suspendNewPersistentConnections()
2062  {
2063    if (logger.isDebugEnabled())
2064      logger.debug("Suspending new persistent connections");
2065
2066    synchronized (persistentConnectionsSync)
2067    {
2068      suspendedNewPersistentConnections++;
2069    }
2070  }
2071
2072  /**
2073   * Wait for opened persistent connections to complete. Returns as soon as
2074   * number of pending persistent connections has reached 0.
2075   *
2076   * @throws SQLException if an error occured during wait
2077   */

2078  public void waitForPersistentConnectionsToComplete() throws SQLException JavaDoc
2079  {
2080    synchronized (persistentConnectionsSync)
2081    {
2082      if (persistentConnections.isEmpty())
2083      {
2084        if (logger.isDebugEnabled())
2085          logger.debug("All persistent connections closed");
2086        return;
2087      }
2088    }
2089    long waitTime = 15000;
2090    synchronized (endOfCurrentPersistentConnections)
2091    {
2092      if (persistentConnections.isEmpty())
2093      {
2094        if (logger.isDebugEnabled())
2095          logger.debug("All persistent connections closed");
2096        return;
2097      }
2098
2099      if (logger.isDebugEnabled())
2100        logger.debug("Waiting for " + persistentConnections.size()
2101            + " persistent connections to be closed.");
2102
2103      // Wait for persistent connections to end
2104
long startTime = System.currentTimeMillis();
2105      while (!persistentConnections.isEmpty())
2106        try
2107        {
2108          endOfCurrentPersistentConnections.wait(waitTime);
2109          if (!persistentConnections.isEmpty()
2110              && (System.currentTimeMillis() - startTime) > waitTime)
2111          {
2112            logger.warn("Waiting for " + persistentConnections.size()
2113                + " open persistent connections");
2114            waitTime *= 2;
2115            startTime = System.currentTimeMillis();
2116          }
2117        }
2118        catch (InterruptedException JavaDoc e)
2119        {
2120          String JavaDoc msg = Translate.get("scheduler.suspend.transaction.failed", e);
2121          logger.error(msg);
2122          throw new SQLException JavaDoc(msg);
2123        }
2124
2125    }
2126
2127    if (logger.isDebugEnabled())
2128      logger.debug("All persistent connections closed");
2129  }
2130
2131  /**
2132   * Blocks until all pending open/close persistent connections operations are
2133   * completed.
2134   */

2135  public void waitForPendingOpenClosePersistentConnection()
2136  {
2137    synchronized (suspendOpenClosePersistentConnectionSync)
2138    {
2139      while (pendingOpenClosePersistentConnections > 0)
2140      {
2141        try
2142        {
2143          suspendOpenClosePersistentConnectionSync.wait();
2144        }
2145        catch (InterruptedException JavaDoc ignore)
2146        {
2147        }
2148      }
2149    }
2150  }
2151
2152  /**
2153   * Adds an object to the suspended requests list.
2154   *
2155   * @param obj suspended request.
2156   */

2157  private void addSuspendedRequest(Object JavaDoc obj)
2158  {
2159    synchronized (suspendedRequests)
2160    {
2161      suspendedRequests.add(obj);
2162    }
2163    if (totalOrderQueue != null)
2164    { // Distributed virtual database only
2165
synchronized (totalOrderQueue)
2166      {
2167        totalOrderQueue.notifyAll();
2168      }
2169    }
2170  }
2171
2172  /**
2173   * Checks if an object is in the suspended requests list.
2174   *
2175   * @param obj request to be checked
2176   * @return true if the request is suspended, false otherwise
2177   */

2178  public boolean isSuspendedRequest(Object JavaDoc obj)
2179  {
2180    synchronized (suspendedRequests)
2181    {
2182      return suspendedRequests.contains(obj);
2183    }
2184  }
2185
2186  /**
2187   * Removes all objects from the suspended requests list.
2188   */

2189  private void clearSuspendedRequests()
2190  {
2191    synchronized (suspendedRequests)
2192    {
2193      suspendedRequests.clear();
2194      if (totalOrderQueue != null)
2195      { // Distributed virtual database only
2196
synchronized (totalOrderQueue)
2197        {
2198          totalOrderQueue.notifyAll();
2199        }
2200      }
2201    }
2202  }
2203
2204  //
2205
// Debug/Monitoring
2206
//
2207

2208  protected abstract String JavaDoc getXmlImpl();
2209
2210  /**
2211   * Get information about the Request Scheduler in xml format
2212   *
2213   * @return <code>String</code> containing information in xml
2214   */

2215  public String JavaDoc getXml()
2216  {
2217    StringBuffer JavaDoc info = new StringBuffer JavaDoc();
2218    info.append("<" + DatabasesXmlTags.ELT_RequestScheduler + ">");
2219    info.append(this.getXmlImpl());
2220    info.append("</" + DatabasesXmlTags.ELT_RequestScheduler + ">");
2221    return info.toString();
2222  }
2223
2224  /**
2225   * Returns live information on the scheduler
2226   *
2227   * @return array of data
2228   */

2229  public String JavaDoc[] getSchedulerData()
2230  {
2231    String JavaDoc[] data = new String JavaDoc[7];
2232    data[0] = String.valueOf(numberRead);
2233    data[1] = String.valueOf(numberWrite);
2234    data[2] = String.valueOf(pendingTransactions);
2235    data[3] = String.valueOf(pendingWrites);
2236    data[4] = String.valueOf(numberRead + numberWrite);
2237    data[5] = String.valueOf(suspendedTransactions);
2238    data[6] = String.valueOf(suspendedWrites);
2239    return data;
2240  }
2241
2242  /**
2243   * @return Returns the numberRead.
2244   */

2245  public int getNumberRead()
2246  {
2247    return numberRead;
2248  }
2249
2250  /**
2251   * @return Returns the numberWrite.
2252   */

2253  public int getNumberWrite()
2254  {
2255    return numberWrite;
2256  }
2257
2258  /**
2259   * @return Returns the pendingTransactions.
2260   */

2261  public int getPendingTransactions()
2262  {
2263    return pendingTransactions;
2264  }
2265
2266  /**
2267   * @return Returns the suspendedTransactions.
2268   */

2269  public boolean isSuspendedTransactions()
2270  {
2271    return suspendedTransactions > 0;
2272  }
2273
2274  /**
2275   * @return Returns the suspendedWrites.
2276   */

2277  public boolean isSuspendedWrites()
2278  {
2279    return suspendedWrites > 0;
2280  }
2281
2282}
Popular Tags