KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > continuent > sequoia > controller > loadbalancer > BackendTaskQueues


1 /**
2  * Sequoia: Database clustering technology.
3  * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
4  * Copyright (C) 2005-2006 Continuent, Inc.
5  * Contact: sequoia@continuent.org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * Initial developer(s): Emmanuel Cecchet.
20  * Contributor(s): ______________________.
21  */

22
23 package org.continuent.sequoia.controller.loadbalancer;
24
25 import java.sql.SQLException JavaDoc;
26 import java.sql.Statement JavaDoc;
27 import java.util.ArrayList JavaDoc;
28 import java.util.ConcurrentModificationException JavaDoc;
29 import java.util.Iterator JavaDoc;
30 import java.util.LinkedList JavaDoc;
31 import java.util.List JavaDoc;
32 import java.util.SortedSet JavaDoc;
33
34 import org.continuent.sequoia.common.log.Trace;
35 import org.continuent.sequoia.controller.backend.DatabaseBackend;
36 import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy;
37 import org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask;
38 import org.continuent.sequoia.controller.loadbalancer.tasks.BeginTask;
39 import org.continuent.sequoia.controller.loadbalancer.tasks.KillThreadTask;
40 import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackTask;
41 import org.continuent.sequoia.controller.locks.DeadlockDetectionThread;
42 import org.continuent.sequoia.controller.locks.TransactionLogicalLock;
43 import org.continuent.sequoia.controller.requestmanager.RequestManager;
44 import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
45 import org.continuent.sequoia.controller.requests.AbstractRequest;
46 import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
47 import org.continuent.sequoia.controller.requests.ParsingGranularities;
48 import org.continuent.sequoia.controller.requests.SelectRequest;
49 import org.continuent.sequoia.controller.requests.StoredProcedure;
50 import org.continuent.sequoia.controller.semantic.SemanticBehavior;
51 import org.continuent.sequoia.controller.semantic.SemanticManager;
52 import org.continuent.sequoia.controller.sql.schema.DatabaseSchema;
53 import org.continuent.sequoia.controller.sql.schema.DatabaseTable;
54 import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
55
56 /**
57  * This class defines task queues that stores the requests to be executed on a
58  * database backend.
59  *
60  * @author <a HREF="mailto:emmanuel.cecchet@emicnetworks.com">Emmanuel Cecchet</a>
61  * @version 1.0
62  */

63 public class BackendTaskQueues
64 {
65   /** Queue in which queries arrive in total order */
66   private LinkedList JavaDoc totalOrderQueue;
67   /**
68    * Queue for stored procedures without semantic information (locking the whole
69    * database)
70    */

71   private LinkedList JavaDoc storedProcedureQueue;
72   /**
73    * Queue for conflicting requests (only first request of the queue can be
74    * executed)
75    */

76   private LinkedList JavaDoc conflictingRequestsQueue;
77   /**
78    * Queue for non-conflicting requests that can be executed in parallel, in any
79    * order.
80    */

81   private LinkedList JavaDoc nonConflictingRequestsQueue;
82   /** Backend these queues are attached to */
83   private DatabaseBackend backend;
84   private WaitForCompletionPolicy waitForCompletionPolicy;
85   private RequestManager requestManager;
86   private boolean allowTasksToBePosted;
87   private static final Object JavaDoc ALLOW_TASKS_SYNC = new Object JavaDoc();
88
89   private DeadlockDetectionThread deadlockDetectionThread;
90
91   // Number of stored procedures that have been posted in the queue and that
92
// have not completed yet
93
private int storedProcedureInQueue = 0;
94
95   private int writesWithMultipleLocks = 0;
96   private Trace logger;
97
98   /**
99    * Creates a new <code>BackendTaskQueues</code> object
100    *
101    * @param backend DatabaseBackend associated to these queues
102    * @param waitForCompletionPolicy the load balancer wait for completion policy
103    * @param requestManager the request manager associated with these queues
104    */

105   public BackendTaskQueues(DatabaseBackend backend,
106       WaitForCompletionPolicy waitForCompletionPolicy,
107       RequestManager requestManager)
108   {
109     this.backend = backend;
110     this.logger = backend.getLogger();
111     this.waitForCompletionPolicy = waitForCompletionPolicy;
112     this.requestManager = requestManager;
113     totalOrderQueue = new LinkedList JavaDoc();
114     storedProcedureQueue = new LinkedList JavaDoc();
115     conflictingRequestsQueue = new LinkedList JavaDoc();
116     nonConflictingRequestsQueue = new LinkedList JavaDoc();
117     allowTasksToBePosted = false;
118   }
119
120   /**
121    * Abort all queries belonging to the provided transaction.
122    *
123    * @param tid the transaction identifier
124    * @return true if a rollback is already in progress
125    */

126   public boolean abortAllQueriesForTransaction(long tid)
127   {
128     synchronized (this)
129     {
130       boolean rollbackInProgress = abortAllQueriesEvenRunningInTransaction(tid,
131           storedProcedureQueue);
132       if (abortAllQueriesEvenRunningInTransaction(tid, conflictingRequestsQueue))
133         rollbackInProgress = true;
134       if (abortAllQueriesEvenRunningInTransaction(tid,
135           nonConflictingRequestsQueue))
136         rollbackInProgress = true;
137       return rollbackInProgress;
138     }
139   }
140
141   /**
142    * Abort all queries belonging to the given transaction even if they are
143    * currently processed by a BackendWorkerThread.
144    *
145    * @param tid transaction identifier
146    * @param queue queue to scan for queries to abort
147    * @return true if a rollback is already in progress
148    */

149   private boolean abortAllQueriesEvenRunningInTransaction(long tid,
150       LinkedList JavaDoc queue)
151   {
152     boolean rollbackInProgress = false;
153     synchronized (queue)
154     {
155       Long JavaDoc lTid = new Long JavaDoc(tid);
156       for (Iterator JavaDoc iter = queue.iterator(); iter.hasNext();)
157       {
158         BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter.next();
159         boolean isProcessing = false;
160         AbstractTask task = entry.getTask();
161         if (task.getTransactionId() == tid)
162         {
163           if (task instanceof RollbackTask)
164             rollbackInProgress = true;
165           else
166           { /*
167              * If we cancel a task in a transaction that was supposed to be
168              * lazily started by that task, then we have to fake the transaction
169              * start on this backend so that the transaction rollback and
170              * subsequent check for priority inversion occurs on that backend.
171              */

172             if (!task.isAutoCommit() && !backend.isStartedTransaction(lTid))
173               backend.startTransaction(lTid);
174
175             if (logger.isDebugEnabled())
176               logger.debug("Aborting request " + task.getRequest()
177                   + " on backend " + backend.getName());
178
179             BackendWorkerThread processingThread = entry.getProcessingThread();
180             if (processingThread != null)
181             { // A thread is working on it, cancel the task
182
isProcessing = true;
183               Statement JavaDoc s = processingThread.getCurrentStatement();
184               if (s != null)
185               {
186                 try
187                 {
188                   s.cancel();
189                 }
190                 catch (SQLException JavaDoc e)
191                 {
192                   logger.warn("Unable to cancel execution of request", e);
193                 }
194                 catch (NullPointerException JavaDoc e)
195                 {
196                   if (logger.isWarnEnabled())
197                     logger
198                         .warn(
199                             "Ignoring NullPointerException caused by Connector/J 5.0.4 bug #24721",
200                             e);
201                 }
202               }
203             }
204             if (!task.hasCompleted())
205             {
206               try
207               {
208                 if (processingThread == null)
209                 { // abort has been called on a non-processed query, use a
210
// random worker thread for notification
211
processingThread = backend
212                       .getBackendWorkerThreadForNotification();
213                   if (processingThread == null)
214                   { // No worker thread left, should never happen.
215
// Backend already disabled?
216
logger
217                         .warn("No worker thread found for request abort notification, creating fake worker thread");
218                     processingThread = new BackendWorkerThread(backend,
219                         requestManager.getLoadBalancer());
220                   }
221                 }
222                 task.notifyFailure(processingThread, -1L, new SQLException JavaDoc(
223                     "Transaction aborted due to deadlock"));
224               }
225               catch (SQLException JavaDoc ignore)
226               {
227               }
228             }
229             if (!isProcessing)
230             {
231               /*
232                * If the task was being processed by a thread, the completion
233                * will be notified by the thread itself
234                */

235               completedEntryExecution(entry, iter);
236             }
237           }
238         }
239       }
240     }
241     return rollbackInProgress;
242   }
243
244   /**
245    * Abort all requests remaining in the queues. This is usually called when the
246    * backend is disabled and no backend worker thread should be processing any
247    * request any more (this will generate a warning otherwise).
248    */

249   public void abortRemainingRequests()
250   {
251     setAllowTasksToBePosted(false);
252     abortRemainingRequests(storedProcedureQueue);
253     abortRemainingRequests(conflictingRequestsQueue);
254     abortRemainingRequests(nonConflictingRequestsQueue);
255   }
256
257   /**
258    * Abort the remaining request in the given queue
259    *
260    * @param queue the queue to purge
261    */

262   private void abortRemainingRequests(LinkedList JavaDoc queue)
263   {
264     synchronized (queue)
265     {
266       for (Iterator JavaDoc iter = queue.iterator(); iter.hasNext();)
267       {
268         BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter.next();
269         AbstractTask task = entry.getTask();
270
271         // Do not cancel KillThreadTasks
272
if (task instanceof KillThreadTask)
273           continue;
274
275         if (entry.getProcessingThread() != null)
276         { // A thread is working on it, warn and cancel the task
277
logger.warn("A worker thread was still processing task " + task
278               + ", aborting the request execution.");
279           Statement JavaDoc s = entry.getProcessingThread().getCurrentStatement();
280           if (s != null)
281           {
282             try
283             {
284               s.cancel();
285             }
286             catch (SQLException JavaDoc e)
287             {
288               logger.warn("Unable to cancel execution of request", e);
289             }
290           }
291         }
292         if (!task.hasCompleted())
293         {
294           if (logger.isDebugEnabled())
295             logger.debug("Cancelling task " + task);
296           task.notifyCompletion(entry.getProcessingThread());
297         }
298         completedEntryExecution(entry, iter);
299       }
300     }
301   }
302
303   /**
304    * Add a task at the end of the backend total order queue
305    *
306    * @param task the task to add
307    */

308   public final void addTaskToBackendTotalOrderQueue(AbstractTask task)
309   {
310     synchronized (this)
311     {
312       synchronized (totalOrderQueue)
313       {
314         totalOrderQueue.addLast(task);
315       }
316
317       /*
318        * Wake up all worker threads in case we post multiple tasks before a
319        * thread had time to take this task into account (this would result in a
320        * lost notified event).
321        */

322       this.notifyAll();
323     }
324   }
325
326   /**
327    * Add a task at the end of the backend total order queue. Block as long as
328    * the total order queue size if over the indicated queue size.
329    *
330    * @param task the task to add
331    * @param queueSize the maximum queue size
332    */

333   public final void addTaskToBackendTotalOrderQueue(AbstractTask task,
334       int queueSize)
335   {
336     synchronized (this)
337     {
338       boolean mustNotify = false;
339       do
340       {
341         synchronized (totalOrderQueue)
342         {
343           if (totalOrderQueue.size() < queueSize)
344           {
345             totalOrderQueue.addLast(task);
346             mustNotify = true;
347           }
348         }
349
350         if (mustNotify)
351         {
352           /*
353            * Wake up all worker threads in case we post multiple tasks before a
354            * thread had time to take this task into account (this would result
355            * in a lost notified event).
356            */

357           this.notifyAll();
358           return; // exit method here
359
}
360         else
361         {
362           try
363           { // Wait for queue to free an entry
364
this.wait();
365           }
366           catch (InterruptedException JavaDoc e)
367           {
368           }
369         }
370       }
371       while (!mustNotify);
372     }
373   }
374
375   /**
376    * Add a task in the ConflictingRequestsQueue.
377    *
378    * @param task task to add
379    */

380   private void addTaskInConflictingRequestsQueue(AbstractTask task)
381   {
382     addTaskToQueue(conflictingRequestsQueue, task, false);
383   }
384
385   /**
386    * Add a task in the NonConflictingRequestsQueue.
387    *
388    * @param task task to add
389    * @param isACommitOrRollback true if the task is a commit or a rollback
390    */

391   private void addTaskInNonConflictingRequestsQueue(AbstractTask task,
392       boolean isACommitOrRollback)
393   {
394     addTaskToQueue(nonConflictingRequestsQueue, task, isACommitOrRollback);
395   }
396
397   /**
398    * Add a task in the StoredProcedureQueue.
399    *
400    * @param task task to add
401    */

402   private void addTaskInStoredProcedureQueue(AbstractTask task)
403   {
404     addTaskToQueue(storedProcedureQueue, task, false);
405   }
406
407   /**
408    * Add the task in the given queue and notify the queue. Note that the task is
409    * also added to the backend pending write request queue. addTaskToQueue
410    *
411    * @param queue queue in which the task must be added
412    * @param task the task to add
413    * @param isACommitOrRollback true if the task is a commit or a rollback
414    */

415   private void addTaskToQueue(LinkedList JavaDoc queue, AbstractTask task,
416       boolean isACommitOrRollback)
417   {
418     if (!allowTasksToBePosted())
419     {
420       if (logger.isDebugEnabled())
421         logger.debug("Cancelling task " + task);
422       task.notifyCompletion(null);
423       return;
424     }
425
426     // We assume that all requests here are writes
427
backend.addPendingTask(task);
428     if (logger.isDebugEnabled())
429       logger.debug("Adding task " + task + " to pending request queue");
430
431     synchronized (this)
432     {
433       // Add to the queue
434
synchronized (queue)
435       {
436         queue.addLast(new BackendTaskQueueEntry(task, queue,
437             isACommitOrRollback));
438       }
439
440       /*
441        * Wake up all worker threads in case we post multiple tasks before a
442        * thread had time to take this task into account (this would result in a
443        * lost notified event).
444        */

445       this.notifyAll();
446     }
447   }
448
449   /**
450    * Check for priority inversion in the conflicting queue or possibly stored
451    * procedure queue flushing if a transaction executing a stored procedure has
452    * just completed. remove this entry and possibly re-arrange the queues
453    */

454   public final void checkForPriorityInversion()
455   {
456     DatabaseSchema schema = backend.getDatabaseSchema();
457
458     // Let's check the conflicting queue for priority inversion
459
synchronized (conflictingRequestsQueue)
460     {
461       for (Iterator JavaDoc iter = conflictingRequestsQueue.iterator(); iter.hasNext();)
462       {
463         BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter.next();
464
465         // If the entry is currently processed, don't try to move it else it
466
// would be duplicated in the non-conflicting queue!!!
467
if (entry.processingThread != null)
468           continue;
469
470         AbstractTask task = entry.getTask();
471         if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
472         {
473           if (task.getSuccess() + task.getFailed() > 0)
474           { // Task has already been started by other nodes, just proceed with
475
// it, we are late!
476
if (logger.isDebugEnabled())
477               logger.debug("Priority inversion for already started request "
478                   + task.getRequest());
479             moveToNonConflictingQueue(iter, entry);
480             continue;
481           }
482         }
483
484         AbstractRequest request = task.getRequest();
485         SortedSet JavaDoc lockedTables = request.getWriteLockedDatabaseTables();
486         if (lockedTables != null)
487         {
488           boolean queryIsConflicting = false;
489           for (Iterator JavaDoc iterator = lockedTables.iterator(); iterator.hasNext()
490               && !queryIsConflicting;)
491           {
492             String JavaDoc tableName = (String JavaDoc) iterator.next();
493             DatabaseTable table = schema.getTable(tableName, false);
494             if (table == null)
495             { // No table found, stay in the conflicting queue
496
logger
497                   .warn("Unable to find table "
498                       + tableName
499                       + " in database schema, when checking priority inversion for query "
500                       + request.toStringShortForm(requestManager
501                           .getVirtualDatabase().getSqlShortFormLength()));
502             }
503             else
504             {
505               /*
506                * If the table we are conflicting with now belongs to us then we
507                * can go in the non-conflicting queue. Note that it is not
508                * possible for the lock to be free since we have acquired it
509                * earlier and we are waiting for our turn.
510                */

511               TransactionLogicalLock lock = table.getLock();
512               if (!lock.isLocked())
513                 logger.warn("Unexpected free lock on table " + table);
514               else
515               { // Check that the lock belong to our transaction
516
queryIsConflicting = lock.getLocker() != task
517                     .getTransactionId();
518               }
519             }
520           }
521           if (!queryIsConflicting)
522           { // Locks are now free, move to the non-conflicting queue
523
// Do not try to take the lock again else it will not be released
524
if (logger.isDebugEnabled())
525               logger.debug("Priority inversion for request "
526                   + task.getRequest());
527             moveToNonConflictingQueue(iter, entry);
528           }
529         }
530         else
531         { // Query does not lock anything, it should not have been posted in the
532
// conflicting queue
533
logger.warn("Non-locking task " + task
534               + " was posted in conflicting queue");
535           if (logger.isDebugEnabled())
536             logger.debug("Priority inversion for request " + task.getRequest());
537           moveToNonConflictingQueue(iter, entry);
538         }
539       }
540     }
541
542     // Look at the stored procedure queue
543
synchronized (storedProcedureQueue)
544     {
545       for (Iterator JavaDoc iter = storedProcedureQueue.iterator(); iter.hasNext();)
546       {
547         BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter.next();
548
549         TransactionLogicalLock globalLock = schema.getLock();
550         AbstractTask task = entry.getTask();
551         AbstractRequest request = task.getRequest();
552         if (globalLock.isLocked())
553         { // Stored procedure is executing
554
if (task.getTransactionId() == globalLock.getLocker())
555           {
556             // Just wait for current transactions to complete if all locks are
557
// not free.
558
if (!schema.allTablesAreUnlockedOrLockedByTransaction(request))
559               return;
560
561             /*
562              * We belong to the transaction that executes the stored procedure
563              * (or to the auto commit request that holds the lock), let's go in
564              * the non-conflicting request queue.
565              */

566             moveToNonConflictingQueue(iter, entry);
567             // if we are in auto commit, it means that we are the stored
568
// procedure which has acquired the lock during the atomic post
569
if (task.isAutoCommit())
570               return;
571             continue;
572           }
573           else
574           { // Check if the stored procedure currently executing is not
575
// somewhere in the stored procedure queue.
576

577             boolean currentStoredProcedureInQueue = false;
578             for (Iterator JavaDoc iter2 = storedProcedureQueue.iterator(); iter2
579                 .hasNext();)
580             {
581               BackendTaskQueueEntry entry2 = (BackendTaskQueueEntry) iter2
582                   .next();
583               AbstractTask task2 = entry2.getTask();
584               if ((task2 != null)
585                   && (task2.getTransactionId() == globalLock.getLocker()))
586                 currentStoredProcedureInQueue = true;
587             }
588
589             // If the stored procedure is not in the queue then it is currently
590
// executing and we have to wait for its completion
591
if (!currentStoredProcedureInQueue)
592               return;
593           }
594         }
595
596         // Schema is not locked, no stored procedure currently executes
597
TransactionMetaData tm = getTransactionMetaData(request);
598
599         if ((request instanceof SelectRequest)
600             || (request instanceof AbstractWriteRequest))
601         {
602           SortedSet JavaDoc writeLockedTables = request.getWriteLockedDatabaseTables();
603
604           if (writeLockedTables == null || writeLockedTables.isEmpty())
605           { // This request does not lock anything
606
moveToNonConflictingQueue(iter, entry);
607             continue;
608           }
609
610           moveMultipleWriteLocksQuery(schema, iter, entry, task, request, tm);
611         }
612         else
613         {
614           if (request instanceof StoredProcedure)
615           {
616             StoredProcedure sp = (StoredProcedure) request;
617             SemanticBehavior semantic = sp.getSemantic();
618             if (semantic != null)
619             {
620               // Try to optimize the stored procedure execution based on its
621
// semantic information
622
if (semantic.canExecuteOutOfOrder() || semantic.isReadOnly()
623                   || (request.getWriteLockedDatabaseTables() == null))
624                 moveToNonConflictingQueue(iter, entry);
625               else
626                 moveMultipleWriteLocksQuery(schema, iter, entry, task, request,
627                     tm);
628               continue;
629             }
630           }
631
632           // Stored procedure or unknown query, take the global lock and proceed
633
// if all other locks are free.
634

635           globalLock.acquire(request);
636           if (tm != null)
637           {
638             List JavaDoc acquiredLocks = tm.getAcquiredLocks(backend);
639             if ((acquiredLocks == null) || !acquiredLocks.contains(globalLock))
640               tm.addAcquiredLock(backend, globalLock);
641           }
642           else
643           {
644             ArrayList JavaDoc globalLockList = new ArrayList JavaDoc();
645             globalLockList.add(globalLock);
646             task.setLocks(backend, globalLockList);
647           }
648
649           // Just wait for current transactions to complete if all locks are not
650
// free.
651
if (!schema.allTablesAreUnlockedOrLockedByTransaction(request))
652             return;
653
654           // Clear to go, all locks are free. Acquire the global lock and move
655
// to the non-conflicting queue.
656
moveToNonConflictingQueue(iter, entry);
657           continue;
658         }
659       }
660     }
661   }
662
663   private void moveMultipleWriteLocksQuery(DatabaseSchema schema,
664       Iterator JavaDoc iter, BackendTaskQueueEntry entry, AbstractTask task,
665       AbstractRequest request, TransactionMetaData tm)
666   {
667     /*
668      * Assume that we will get all locks and that we will execute in the
669      * non-conflicting queue. If there is any issue, the queue will be set to
670      * conflicting queue.
671      */

672     boolean allLocksAcquired = true;
673     for (Iterator JavaDoc lockIter = request.getWriteLockedDatabaseTables().iterator(); lockIter
674         .hasNext();)
675     {
676       String JavaDoc tableName = (String JavaDoc) lockIter.next();
677       DatabaseTable table = schema.getTable(tableName, false);
678       if (table == null)
679       { // No table found, let's go for the conflicting queue
680
logger.warn("Unable to find table "
681             + tableName
682             + " in database schema, scheduling query "
683             + request.toStringShortForm(requestManager.getVirtualDatabase()
684                 .getSqlShortFormLength()) + " in conflicting queue.");
685         allLocksAcquired = false;
686       }
687       else
688       { /*
689          * If we get the lock we go in the non conflicting queue else we go in
690          * the conflicting queue
691          */

692         TransactionLogicalLock tableLock = table.getLock();
693         if (!tableLock.acquire(request))
694           allLocksAcquired = false;
695         /*
696          * Make sure that the lock is added only once to the list especially if
697          * multiple backends execute this piece of code when checking for
698          * priority inversion in their own queue (if the lock was already
699          * acquired, tableLock.acquire() returns directly true)
700          */

701         if (tm != null)
702         {
703           List JavaDoc acquiredLocks = tm.getAcquiredLocks(backend);
704           if ((acquiredLocks == null) || !acquiredLocks.contains(tableLock))
705             tm.addAcquiredLock(backend, tableLock);
706         }
707         else
708         {
709           List JavaDoc tableLockList = task.getLocks(backend);
710           if (tableLockList == null)
711             tableLockList = new ArrayList JavaDoc();
712           // There is no need to synchronize on task.getLocks because we are in
713
// mutual exclusion here in a synchronized block on synchronized
714
// (atomicPostSyncObject) that has been taken by the caller of this
715
// method.
716
if (!tableLockList.contains(tableLock))
717           {
718             tableLockList.add(tableLock);
719             task.setLocks(backend, tableLockList);
720           }
721         }
722       }
723     }
724     // if we acquired all locks, we can go to the non conflicting queue
725
if (allLocksAcquired)
726       moveToNonConflictingQueue(iter, entry);
727     else
728       moveToConflictingQueue(iter, entry);
729   }
730
731   private void moveToConflictingQueue(Iterator JavaDoc iter, BackendTaskQueueEntry entry)
732   {
733     iter.remove();
734     if (logger.isDebugEnabled())
735       logger.debug("Moving " + entry.getTask() + " to conflicting queue");
736     synchronized (conflictingRequestsQueue)
737     {
738       entry.setQueue(conflictingRequestsQueue);
739       conflictingRequestsQueue.addLast(entry);
740     }
741   }
742
743   private void moveToNonConflictingQueue(Iterator JavaDoc iter,
744       BackendTaskQueueEntry entry)
745   {
746     iter.remove();
747     if (logger.isDebugEnabled())
748       logger.debug("Moving " + entry.getTask() + " to non conflicting queue");
749     synchronized (nonConflictingRequestsQueue)
750     {
751       entry.setQueue(nonConflictingRequestsQueue);
752       nonConflictingRequestsQueue.addLast(entry);
753     }
754   }
755
756   private static final int UNASSIGNED_QUEUE = -1;
757   private static final int CONFLICTING_QUEUE = 0;
758   private static final int NON_CONFLICTING_QUEUE = 1;
759   private static final int STORED_PROCEDURE_QUEUE = 2;
760   private final Object JavaDoc atomicPostSyncObject = new Object JavaDoc();
761
762   /**
763    * Lock list variable is set by getQueueAndWriteLockTables and retrieved by
764    * atomicTaskPostInQueueAndReleaseLock. This is safe since this happens in the
765    * synchronized (ATOMIC_POST_SYNC_OBJECT) block.
766    */

767   private ArrayList JavaDoc lockList = null;
768
769   /**
770    * Fetch the next task from the backend total order queue and post it to one
771    * of the queues (conflicting or not).
772    * <p>
773    * Note that this method must be called within a synchronized block on this.
774    *
775    * @return true if an entry was processed, false if there is the total order
776    * queue is empty.
777    */

778   private boolean fetchNextQueryFromBackendTotalOrderQueue()
779   {
780     DatabaseSchema schema = backend.getDatabaseSchema();
781     TransactionMetaData tm = null;
782     int queueToUse = UNASSIGNED_QUEUE;
783
784     AbstractTask task;
785     AbstractRequest request;
786
787     // Fetch first task from queue
788
synchronized (totalOrderQueue)
789     {
790       if (totalOrderQueue.isEmpty())
791         return false;
792       task = (AbstractTask) totalOrderQueue.removeFirst();
793
794       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
795       { /*
796          * If asynchronous execution is allowed, we have to ensure that queries
797          * of the same transaction are executed in order that is only one at a
798          * time. We also have to ensure that late queries execute before new
799          * queries accessing the same resources.
800          */

801
802         /*
803          * SYNCHRONIZATION: this check has to be performed in a synchronized
804          * block to avoid race conditions with terminating taks that perform
805          * priority inversions. Such operations move tasks across the backend
806          * queues and may invalidate the check.
807          */

808         synchronized (atomicPostSyncObject)
809         {
810           while (mustWaitForLateTask(task))
811           {
812             totalOrderQueue.addFirst(task); // Put back request in queue
813
// Behave as an empty queue, we will be notified when the blocking
814
// query has completed
815
return false;
816           }
817         }
818       }
819
820       // Now process the task
821
request = task.getRequest();
822       if (request == null || task instanceof BeginTask)
823       {
824         addTaskInNonConflictingRequestsQueue(task, !task.isAutoCommit());
825         return true;
826       }
827       else
828       { // Parse the request if needed, should only happen at recover time
829
try
830         {
831           if (!request.isParsed())
832           {
833             SemanticManager semanticManager = requestManager
834                 .getVirtualDatabase().getSemanticManager();
835             // Check if there is semantic information
836
request.setSemanticManager(semanticManager);
837             SemanticBehavior semantic = semanticManager
838                 .getRequestSemantic(request);
839             if (semantic != null)
840               request.setSemantic(semantic);
841             else
842             { // No semantic, parse it
843
request.parse(backend.getDatabaseSchema(),
844                   ParsingGranularities.TABLE, false);
845             }
846           }
847         }
848         catch (SQLException JavaDoc e)
849         {
850           logger.warn("Parsing of request " + request
851               + " failed in recovery process", e);
852         }
853       }
854       if (backend.isReplaying())
855       {
856         /*
857          * Read-only stored procedures can get logged when the schema is
858          * unavailable, because no backends are enabled. They do not need to be
859          * replayed and they may slow down recovery significantly.
860          */

861         if (request instanceof StoredProcedure)
862         {
863           StoredProcedure sp = (StoredProcedure) request;
864           SemanticBehavior semantic = sp.getSemantic();
865           if (semantic != null && semantic.isReadOnly())
866           {
867             task.notifySuccess(null);
868             synchronized (this)
869             {// The RecoverThread may be waiting to add a request
870
notifyAll();
871             }
872             return true;
873           }
874         }
875       }
876       if (!request.isAutoCommit())
877       { // Retrieve the transaction marker metadata
878
try
879         {
880           tm = requestManager.getTransactionMetaData(new Long JavaDoc(request
881               .getTransactionId()));
882         }
883         catch (SQLException JavaDoc e)
884         {
885           // We didn't start or lazy start the transaction
886
if (logger.isDebugEnabled())
887             logger.debug("No transaction medatada found for transaction "
888                 + request.getTransactionId());
889         }
890       }
891
892       if (schema == null)
893       {
894         try
895         {
896           task.notifyFailure((BackendWorkerThread) Thread.currentThread(), 0,
897               new SQLException JavaDoc(
898                   "No schema available to perform request locking on backend "
899                       + backend.getName()));
900         }
901         catch (SQLException JavaDoc ignore)
902         {
903           // Wait interrupted in notifyFailure
904
}
905         return true;
906       }
907
908       synchronized (atomicPostSyncObject)
909       {
910         lockList = null;
911
912         boolean requestIsAStoredProcedure = request instanceof StoredProcedure;
913         if (requestIsAStoredProcedure)
914           storedProcedureInQueue++;
915
916         SortedSet JavaDoc writeLockedTables = request.getWriteLockedDatabaseTables();
917         if ((writeLockedTables != null) && (writeLockedTables.size() > 1))
918           writesWithMultipleLocks++;
919
920         // Check if a stored procedure is locking the database
921
TransactionLogicalLock globalLock = schema.getLock();
922         if (globalLock.isLocked())
923         {
924           if (request.isAutoCommit())
925           {
926             queueToUse = STORED_PROCEDURE_QUEUE;
927           }
928           else
929           {
930             /*
931              * If we are the transaction executing the stored procedure, then we
932              * can proceed in the conflicting queue.
933              */

934             if (globalLock.getLocker() == request.getTransactionId())
935               queueToUse = NON_CONFLICTING_QUEUE;
936             else
937             {
938               /*
939                * If we are one of the transactions that already has acquired
940                * locks then we should try to complete our transaction else we
941                * stack in the stored procedure queue.
942                */

943               if ((tm == null) || (tm.getAcquiredLocks(backend) == null))
944               {
945                 // No locks taken so far, or transaction not [lazy] started =>
946
// go in the stored procedure queue
947
queueToUse = STORED_PROCEDURE_QUEUE;
948               }
949             }
950           }
951         }
952
953         if (queueToUse == UNASSIGNED_QUEUE)
954         { // No stored procedure or transaction that started before the stored
955
// procedure was posted
956
if (request instanceof AbstractWriteRequest
957               && !((AbstractWriteRequest) request).requiresGlobalLock())
958           {
959             queueToUse = getQueueAndWriteLockTables(request, schema, tm);
960           }
961           else if (request instanceof SelectRequest)
962           {
963             /*
964              * Note that SelectRequest scheduling is a little bit tricky to
965              * understand. Basically, we should just allow one select request at
966              * a time. If they are in different transactions, this is fine they
967              * will be properly isolated by the underlying database and queries
968              * from the same transaction are guaranteed to be executed in order
969              * (therefore they will go to the non-conflicting queue since their
970              * write lock set is null). If SELECT is in autocommit, we ensure
971              * that only one autocommit request is executed at a time, so
972              * finally we are safe in all cases. SELECT...FOR UPDATE are treated
973              * as writes since their write lock tables is set accordingly.
974              */

975             queueToUse = getQueueAndWriteLockTables(request, schema, tm);
976           }
977           else
978           {
979             if (requestIsAStoredProcedure)
980             {
981               StoredProcedure sp = (StoredProcedure) request;
982               SemanticBehavior semantic = sp.getSemantic();
983               if (semantic != null)
984               {
985                 // Try to optimize the stored procedure execution based on its
986
// semantic information
987
if (semantic.isReadOnly()
988                     || (request.getWriteLockedDatabaseTables() == null))
989                   queueToUse = NON_CONFLICTING_QUEUE;
990                 else
991                 {
992                   queueToUse = getQueueAndWriteLockTables(request, schema, tm);
993                   if (semantic.canExecuteOutOfOrder())
994                     queueToUse = NON_CONFLICTING_QUEUE;
995                 }
996               }
997             }
998
999             if (queueToUse == UNASSIGNED_QUEUE)
1000            {
1001              /*
1002               * Stored procedure or unknown query, let's assume it blocks the
1003               * whole database. Check if we can lock everything else we wait
1004               * for all locks to be free.
1005               */

1006              if (!globalLock.isLocked())
1007              { // Lock the whole database so that we can execute when all
1008
// locks are released
1009
globalLock.acquire(request);
1010                if (tm != null)
1011                  tm.addAcquiredLock(backend, globalLock);
1012                else
1013                {
1014                  if (lockList == null)
1015                    lockList = new ArrayList JavaDoc();
1016                  lockList.add(globalLock);
1017                }
1018                if (schema.allTablesAreUnlockedOrLockedByTransaction(request))
1019                  // Clear to go, all locks are free
1020
queueToUse = NON_CONFLICTING_QUEUE;
1021                else
1022                  // We will have to wait for everyone to release its locks
1023
queueToUse = STORED_PROCEDURE_QUEUE;
1024              }
1025              else
1026              { /*
1027                 * A stored procedure is holding the lock but we are in a
1028                 * transaction that already acquired locks so we are authorized
1029                 * to complete.
1030                 */

1031                if (schema.allTablesAreUnlockedOrLockedByTransaction(request))
1032                {
1033                  queueToUse = NON_CONFLICTING_QUEUE;
1034                  List JavaDoc locks = schema.lockAllTables(request);
1035                  if (tm != null)
1036                    tm.addAcquiredLocks(backend, locks);
1037                  else
1038                  {
1039                    if (lockList == null)
1040                      lockList = new ArrayList JavaDoc();
1041                    lockList.add(locks);
1042                  }
1043                }
1044                else
1045                { /*
1046                   * We will have to wait for the completion of the transaction
1047                   * of the stored procedure currently holding the global lock.
1048                   */

1049                  queueToUse = STORED_PROCEDURE_QUEUE;
1050                }
1051              }
1052            }
1053          }
1054        }
1055
1056        if (queueToUse == NON_CONFLICTING_QUEUE)
1057        {
1058          if (logger.isDebugEnabled())
1059            logger.debug("Scheduling request " + request
1060                + " in non conflicting queue");
1061          addTaskInNonConflictingRequestsQueue(task, false);
1062        }
1063        else if (queueToUse == CONFLICTING_QUEUE)
1064        {
1065          if (logger.isDebugEnabled())
1066            logger.debug("Scheduling request " + request
1067                + " in conflicting queue");
1068          addTaskInConflictingRequestsQueue(task);
1069        }
1070        else if (queueToUse == STORED_PROCEDURE_QUEUE)
1071        {
1072          if (logger.isDebugEnabled())
1073            logger.debug("Scheduling request " + request
1074                + " in stored procedure queue");
1075          addTaskInStoredProcedureQueue(task);
1076        }
1077
1078        task.setLocks(backend, lockList);
1079      } // synchronized (atomicPostSyncObject)
1080
} // synchronized (totalOrderQueue)
1081

1082    return true;
1083  }
1084
1085  /**
1086   * Schedule a query that takes write on multiple tables and tell in which
1087   * queue the task should be posted. This updates the conflictingTable above if
1088   * the query must be posted in the CONFLICTING_QUEUE and this always update
1089   * the list of locks taken by this request (either directly in tm lock list if
1090   * tm is not null, or by updating lockList defined above)
1091   *
1092   * @param request the request to schedule
1093   * @param schema the current database schema containing lock information
1094   * @param tm the transaction marker metadata (null if request is autocommit)
1095   * @return the queue to use (NON_CONFLICTING_QUEUE or CONFLICTING_QUEUE)
1096   */

1097  private int getQueueAndWriteLockTables(AbstractRequest request,
1098      DatabaseSchema schema, TransactionMetaData tm)
1099  {
1100    SortedSet JavaDoc writeLockedTables = request.getWriteLockedDatabaseTables();
1101
1102    if (writeLockedTables == null || writeLockedTables.isEmpty())
1103    { // This request does not lock anything
1104
return NON_CONFLICTING_QUEUE;
1105    }
1106    else if (request.isCreate() && writeLockedTables.size() == 1)
1107    { // This request does not lock anything
1108
// create table : we do not need to execute
1109
// in conflicting queue, but we have to lock the table for recovery
1110
// operations (that are done in a parallel way)
1111
return NON_CONFLICTING_QUEUE;
1112    }
1113
1114    /*
1115     * Assume that we will get all locks and that we will execute in the
1116     * non-conflicting queue. If there is any issue, the queue will be set to
1117     * conflicting queue.
1118     */

1119    int queueToUse = NON_CONFLICTING_QUEUE;
1120    for (Iterator JavaDoc iter = writeLockedTables.iterator(); iter.hasNext();)
1121    {
1122      String JavaDoc tableName = (String JavaDoc) iter.next();
1123      DatabaseTable table = schema.getTable(tableName, false);
1124      if (table == null)
1125      { // No table found, let's go for the conflicting queue
1126
logger.warn("Unable to find table "
1127            + tableName
1128            + " in database schema, scheduling query "
1129            + request.toStringShortForm(requestManager.getVirtualDatabase()
1130                .getSqlShortFormLength()) + " in conflicting queue.");
1131        queueToUse = CONFLICTING_QUEUE;
1132      }
1133      else
1134      { /*
1135         * If we get the lock we go in the non conflicting queue else we go in
1136         * the conflicting queue
1137         */

1138        if (!table.getLock().acquire(request))
1139        {
1140          queueToUse = CONFLICTING_QUEUE;
1141          if (logger.isDebugEnabled())
1142            logger.debug("Request " + request + " waits for lock on table "
1143                + table);
1144        }
1145        if (tm != null)
1146          tm.addAcquiredLock(backend, table.getLock());
1147        else
1148        {
1149          if (lockList == null)
1150            lockList = new ArrayList JavaDoc();
1151          lockList.add(table.getLock());
1152        }
1153      }
1154    }
1155    return queueToUse;
1156  }
1157
1158  /**
1159   * Release the locks acquired by a request executed in autocommit mode.
1160   *
1161   * @param locks the list of locks acquired by the request
1162   * @param transactionId the "fake" transaction id assign to the autocommit
1163   * request releasing the locks
1164   */

1165  private void releaseLocksForAutoCommitRequest(List JavaDoc locks, long transactionId)
1166  {
1167    if (locks == null)
1168      return; // No locks acquired
1169
for (Iterator JavaDoc iter = locks.iterator(); iter.hasNext();)
1170    {
1171      TransactionLogicalLock lock = (TransactionLogicalLock) iter.next();
1172      if (lock == null)
1173        logger.warn("Unexpected null lock for transaction " + transactionId
1174            + " when releasing " + locks.toArray());
1175      else
1176        lock.release(transactionId);
1177    }
1178  }
1179
1180  /**
1181   * Release the locks held by the given transaction at commit/rollback time.
1182   *
1183   * @param transactionId the transaction releasing the locks
1184   */

1185  private void releaseLocksForTransaction(long transactionId)
1186  {
1187    try
1188    {
1189      TransactionMetaData tm = requestManager.getTransactionMetaData(new Long JavaDoc(
1190          transactionId));
1191      releaseLocksForAutoCommitRequest(tm.removeBackendLocks(backend),
1192          transactionId);
1193    }
1194    catch (SQLException JavaDoc e)
1195    {
1196      /*
1197       * this is expected to fail when replaying the recovery log, since the
1198       * request manager won't have any transaction metadatas for transactions
1199       * we are replaying => we don't log warnings in this case.
1200       */

1201      if (!backend.isReplaying())
1202        if (logger.isWarnEnabled())
1203          logger.warn("No transaction medatada found for transaction "
1204              + transactionId + " releasing locks manually");
1205      if (backend.getDatabaseSchema() != null)
1206        backend.getDatabaseSchema().releaseLocksOnAllTables(transactionId);
1207      else
1208      {
1209        /*
1210         * At this point, schema can be null, for example if the backend is down
1211         */

1212        if (logger.isWarnEnabled())
1213          logger
1214              .warn("Cannot release locks, as no schema is available on this backend. "
1215                  + "This backend is problably not available anymore.");
1216      }
1217    }
1218  }
1219
1220  private TransactionMetaData getTransactionMetaData(AbstractRequest request)
1221  {
1222    TransactionMetaData tm = null;
1223    if ((request != null) && !request.isAutoCommit())
1224    { // Retrieve the transaction marker metadata
1225
try
1226      {
1227        tm = requestManager.getTransactionMetaData(new Long JavaDoc(request
1228            .getTransactionId()));
1229      }
1230      catch (SQLException JavaDoc e)
1231      {
1232        // We didn't start or lazy start the transaction
1233
if (logger.isDebugEnabled())
1234          logger.debug("No transaction medatada found for transaction "
1235              + request.getTransactionId());
1236      }
1237    }
1238    return tm;
1239  }
1240
1241  /**
1242   * Notify the completion of the given entry. The corresponding task completion
1243   * is notified to the backend.
1244   *
1245   * @param entry the executed entry
1246   */

1247  public final void completedEntryExecution(BackendTaskQueueEntry entry)
1248  {
1249    completedEntryExecution(entry, null);
1250  }
1251
1252  /**
1253   * Perform the cleanup to release locks and priority inversion checkings after
1254   * a stored procedure execution
1255   *
1256   * @param task the task that completed
1257   */

1258  public void completeStoredProcedureExecution(AbstractTask task)
1259  {
1260    AbstractRequest request = task.getRequest();
1261    long transactionId = request.getTransactionId();
1262    synchronized (atomicPostSyncObject)
1263    {
1264      if (request.isAutoCommit())
1265      {
1266        releaseLocksForAutoCommitRequest(task.getLocks(backend), transactionId);
1267        checkForPriorityInversion();
1268        SortedSet JavaDoc writeLockedTables = request.getWriteLockedDatabaseTables();
1269        if ((writeLockedTables != null) && (writeLockedTables.size() > 1))
1270          writesWithMultipleLocks--;
1271      }
1272      storedProcedureInQueue--;
1273    }
1274  }
1275
1276  /**
1277   * Perform the cleanup to release locks and priority inversion checkings after
1278   * a write query execution
1279   *
1280   * @param task the task that completed
1281   */

1282  public void completeWriteRequestExecution(AbstractTask task)
1283  {
1284    AbstractRequest request = task.getRequest();
1285    SortedSet JavaDoc writeLockedTables = request.getWriteLockedDatabaseTables();
1286    if ((writeLockedTables != null) && (writeLockedTables.size() > 1))
1287      synchronized (atomicPostSyncObject)
1288      {
1289        writesWithMultipleLocks--;
1290      }
1291
1292    long transactionId = request.getTransactionId();
1293    if (request.isAutoCommit())
1294    {
1295      synchronized (atomicPostSyncObject)
1296      {
1297        releaseLocksForAutoCommitRequest(task.getLocks(backend), transactionId);
1298        // Make sure we release the requests locking multiple tables or the
1299
// stored procedures that are blocked if any
1300
if (writesWithMultipleLocks > 0
1301            || waitForCompletionPolicy.isEnforceTableLocking())
1302          checkForPriorityInversion();
1303        else if (storedProcedureInQueue > 0)
1304          checkForPriorityInversion();
1305      }
1306    }
1307  }
1308
1309  /**
1310   * Releasing locks and checking for priority inversion. Usually used at commit
1311   * or rollback completion time.
1312   *
1313   * @param tm the transaction metadata
1314   */

1315  public void releaseLocksAndCheckForPriorityInversion(TransactionMetaData tm)
1316  {
1317    synchronized (atomicPostSyncObject)
1318    {
1319      releaseLocksForTransaction(tm.getTransactionId());
1320      checkForPriorityInversion();
1321    }
1322  }
1323
1324  /**
1325   * Removes the specified entry from its queue and notifies threads waiting on
1326   * this backend task queue. The removal is performed using the iterator, if
1327   * specified (non-null), or directly on the queue otherwize.
1328   *
1329   * @param entry the entry to remove from its queue
1330   * @param iter the iterator on which to call remove(), or null if not
1331   * applicable.
1332   */

1333  private void completedEntryExecution(BackendTaskQueueEntry entry,
1334      Iterator JavaDoc iter)
1335  {
1336    if (entry == null)
1337      return;
1338
1339    // Notify the backend that this query execution is complete
1340
AbstractTask task = entry.getTask();
1341    if (!backend.removePendingTask(task))
1342      logger.warn("Unable to remove task " + task
1343          + " from pending request queue");
1344
1345    synchronized (this)
1346    {
1347      // Remove the entry from its queue
1348
LinkedList JavaDoc queue = entry.getQueue();
1349      synchronized (queue)
1350      {
1351        if (iter != null)
1352          iter.remove();
1353        else
1354        {
1355          if (!queue.remove(entry))
1356            logger.error("Failed to remove task " + task + " from " + queue);
1357        }
1358      }
1359
1360      // Notify the queues to unblock queries waiting in getNextEntryToExecute
1361
// for the completion of the current request.
1362
this.notifyAll();
1363    }
1364  }
1365
1366  /**
1367   * Return the first entry in the conflicting requests queue (does not remove
1368   * it from the list).
1369   *
1370   * @return the first entry in the conflicting queue
1371   */

1372  public final BackendTaskQueueEntry getFirstConflictingRequestQueueOrStoredProcedureQueueEntry()
1373  {
1374    synchronized (conflictingRequestsQueue)
1375    {
1376      if (conflictingRequestsQueue.isEmpty())
1377      {
1378        synchronized (storedProcedureQueue)
1379        {
1380          if (storedProcedureQueue.isEmpty())
1381            return null;
1382          return (BackendTaskQueueEntry) storedProcedureQueue.getFirst();
1383        }
1384      }
1385      return (BackendTaskQueueEntry) conflictingRequestsQueue.getFirst();
1386    }
1387  }
1388
1389  /**
1390   * Returns the stored procedure queue. This is needed for deadlock detection
1391   * but clearly does break the abstarction layer as it exposes a private field
1392   * in an un-controlled way.
1393   *
1394   * @return the stored procedure queue.
1395   */

1396  public List JavaDoc getStoredProcedureQueue()
1397  {
1398    return storedProcedureQueue;
1399  }
1400
1401  /**
1402   * Get the next available task entry to process from the queues. If the
1403   * backend is killed, this method will return a KillThreadTask else it will
1404   * wait for a task to be ready to be executed. Note that the task is left in
1405   * the queue and flagged as processed by the thread given as a parameter. The
1406   * task will only be removed from the queue when the thread notifies the
1407   * completion of the task.
1408   *
1409   * @param thread the thread that will execute the task
1410   * @return the task to execute
1411   */

1412  public final BackendTaskQueueEntry getNextEntryToExecute(
1413      BackendWorkerThread thread)
1414  {
1415    BackendTaskQueueEntry entry = null;
1416
1417    /*
1418     * The strategy is to look first for the non-conflicting queue so that
1419     * non-conflicting transactions could progress as fast as possible. Then we
1420     * check the conflicting queue if we did not find a task to execute.<p> If
1421     * we failed to find something to execute in the active queues, we process
1422     * everything available in the total order queue to push the tasks in the
1423     * active queues.
1424     */

1425
1426    while (true)
1427    {
1428      Object JavaDoc firstNonConflictingTask = null;
1429      Object JavaDoc lastNonConflictingTask = null;
1430      // Check the non-conflicting queue first
1431
synchronized (nonConflictingRequestsQueue)
1432      {
1433        if (!nonConflictingRequestsQueue.isEmpty())
1434        {
1435          firstNonConflictingTask = nonConflictingRequestsQueue.getFirst();
1436          lastNonConflictingTask = nonConflictingRequestsQueue.getLast();
1437          for (Iterator JavaDoc iter = nonConflictingRequestsQueue.iterator(); iter
1438              .hasNext();)
1439          {
1440            entry = (BackendTaskQueueEntry) iter.next();
1441            if (entry.getProcessingThread() == null)
1442            { // This task is not currently processed, let's execute it
1443
entry.setProcessingThread(thread);
1444              return entry;
1445            }
1446          }
1447        }
1448      }
1449
1450      // Nothing to be executed now in the non-conflicting queue, check the
1451
// conflicting queue
1452
Object JavaDoc firstConflictingTask = null;
1453      Object JavaDoc lastConflictingTask = null;
1454      synchronized (conflictingRequestsQueue)
1455      {
1456        if (!conflictingRequestsQueue.isEmpty())
1457        {
1458          firstConflictingTask = conflictingRequestsQueue.getFirst();
1459          lastConflictingTask = conflictingRequestsQueue.getLast();
1460          // Only check the first task since we must execute them only one at a
1461
// time.
1462
entry = (BackendTaskQueueEntry) conflictingRequestsQueue.getFirst();
1463          if (entry.getProcessingThread() == null)
1464          { // The task is not currently processed.
1465
AbstractRequest request = entry.getTask().getRequest();
1466            SortedSet JavaDoc lockedTables = request.getWriteLockedDatabaseTables();
1467            if ((lockedTables != null) && (lockedTables.size() > 0))
1468            {
1469              /**
1470               * Check if there are requests in the non-conflicting queue that
1471               * belongs to a transaction that is holding a lock on which we
1472               * conflict.
1473               * <p>
1474               * Note that if we need to lock multiple tables and that we are in
1475               * the conflicting queue, we are going to wait until all locks are
1476               * free or a deadlock detection occurs.
1477               */

1478              boolean conflictingQueryDetected = false;
1479              synchronized (nonConflictingRequestsQueue)
1480              {
1481                if (!nonConflictingRequestsQueue.isEmpty()
1482                    || waitForCompletionPolicy.isEnforceTableLocking())
1483                { // Check for a potential conflict
1484
int locksNotOwnedByMe = 0;
1485                  long transactionId = entry.getTask().getTransactionId();
1486                  DatabaseSchema schema = backend.getDatabaseSchema();
1487                  for (Iterator JavaDoc iterator = lockedTables.iterator(); iterator
1488                      .hasNext()
1489                      && !conflictingQueryDetected;)
1490                  {
1491                    String JavaDoc tableName = (String JavaDoc) iterator.next();
1492                    DatabaseTable table = schema.getTable(tableName, false);
1493                    if (table == null)
1494                    { // No table found, let's go for the conflicting queue
1495
logger
1496                          .warn("Unable to find table "
1497                              + tableName
1498                              + " in database schema, when getting next entry to execute : "
1499                              + request
1500                                  .toStringShortForm(requestManager
1501                                      .getVirtualDatabase()
1502                                      .getSqlShortFormLength()));
1503
1504                      // Assume conflict since non-conflicting queue is not
1505
// empty
1506
conflictingQueryDetected = true;
1507                    }
1508                    else
1509                    {
1510                      TransactionLogicalLock lock = table.getLock();
1511                      if (lock.isLocked())
1512                      {
1513                        if (lock.getLocker() != transactionId)
1514                          locksNotOwnedByMe++;
1515
1516                        /*
1517                         * Check if we find a query in the conflicting queue
1518                         * that owns the lock or waits for the lock we need
1519                         */

1520                        for (Iterator JavaDoc iter = nonConflictingRequestsQueue
1521                            .iterator(); iter.hasNext();)
1522                        {
1523                          BackendTaskQueueEntry nonConflictingEntry = (BackendTaskQueueEntry) iter
1524                              .next();
1525                          long nonConflictingRequestTransactionId = nonConflictingEntry
1526                              .getTask().getTransactionId();
1527                          if ((lock.getLocker() == nonConflictingRequestTransactionId)
1528                              || lock
1529                                  .isWaiting(nonConflictingRequestTransactionId))
1530                          {
1531                            conflictingQueryDetected = true;
1532                            break;
1533                          }
1534                        }
1535                      }
1536                    }
1537                  }
1538
1539                  /*
1540                   * If table level locking is enforced, we don't allow a
1541                   * request to execute before it has all its locks
1542                   */

1543                  if (waitForCompletionPolicy.isEnforceTableLocking())
1544                    conflictingQueryDetected = locksNotOwnedByMe > 0;
1545
1546                  /*
1547                   * If we don't own a single lock (in case of multiple locks)
1548                   * needed by this query then we wait for the locks to be
1549                   * released or the deadlock detection to abort a transaction
1550                   * that is holding at least one of the locks that we need.
1551                   */

1552                  conflictingQueryDetected = conflictingQueryDetected
1553                      || ((locksNotOwnedByMe > 1) && (locksNotOwnedByMe == lockedTables
1554                          .size()));
1555                }
1556              }
1557
1558              // If everyone is done in the non-conflicting queue, then
1559
// let's go with this conflicting request
1560
if (!conflictingQueryDetected)
1561              {
1562                entry.setProcessingThread(thread);
1563                return entry;
1564              }
1565            }
1566            else
1567            {
1568              if (logger.isWarnEnabled())
1569                logger.warn("Detected non-locking task " + entry.getTask()
1570                    + " in conflicting queue");
1571
1572              /*
1573               * No clue on where the conflict happens, it might well be that we
1574               * don't access any table but in that case we shouldn't have ended
1575               * up in the conflicting queue. To be safer, let's wait for the
1576               * non-conflicting queue to be empty.
1577               */

1578              synchronized (nonConflictingRequestsQueue)
1579              {
1580                if (nonConflictingRequestsQueue.isEmpty())
1581                {
1582                  entry.setProcessingThread(thread);
1583                  return entry;
1584                }
1585              }
1586            }
1587          }
1588        }
1589      }
1590
1591      synchronized (this)
1592      {
1593        // No entry in the queues or all entries are currently processed,
1594
// process the total order queue.
1595
if (fetchNextQueryFromBackendTotalOrderQueue())
1596          continue;
1597
1598        // Nothing in the total order queue either !
1599
// Double-check that something was not posted in the queue after we
1600
// scanned it
1601
synchronized (nonConflictingRequestsQueue)
1602        {
1603          if (!nonConflictingRequestsQueue.isEmpty())
1604          {
1605            if (firstNonConflictingTask != nonConflictingRequestsQueue
1606                .getFirst())
1607              continue;
1608            if (lastNonConflictingTask != nonConflictingRequestsQueue.getLast())
1609              continue;
1610          }
1611          else if (firstNonConflictingTask != null)
1612            continue; // The queue was emptied all at once
1613
}
1614        synchronized (conflictingRequestsQueue)
1615        {
1616          if (!conflictingRequestsQueue.isEmpty())
1617          {
1618            if (firstConflictingTask != conflictingRequestsQueue.getFirst())
1619              continue;
1620            if (lastConflictingTask != conflictingRequestsQueue.getLast())
1621              continue;
1622          }
1623          else if (firstConflictingTask != null)
1624            continue; // The queue was emptied all at once
1625
}
1626
1627        // Wait until a new task is posted
1628
try
1629        {
1630          this.wait();
1631        }
1632        catch (InterruptedException JavaDoc ignore)
1633        {
1634        }
1635      }
1636
1637    }
1638  }
1639
1640  /**
1641   * Get the next available commit or rollback task to process from the queues.
1642   * If the backend is killed, this method will return a KillThreadTask else it
1643   * will wait for a task to be ready to be executed. Note that the task is left
1644   * in the queue and flagged as processed by the thread given as a parameter.
1645   * The task will only be removed from the queue when the thread notifies the
1646   * completion of the task.
1647   *
1648   * @param thread the thread that will execute the task
1649   * @return the commmit or rollback task to execute
1650   */

1651  public BackendTaskQueueEntry getNextCommitRollbackToExecute(
1652      BackendWorkerThread thread)
1653  {
1654    boolean found = false;
1655    BackendTaskQueueEntry entry = null;
1656    while (!found)
1657    {
1658      Object JavaDoc firstNonConflictingTask = null;
1659      Object JavaDoc lastNonConflictingTask = null;
1660      // Check the non-conflicting queue first
1661
synchronized (nonConflictingRequestsQueue)
1662      {
1663        if (!nonConflictingRequestsQueue.isEmpty())
1664        {
1665          firstNonConflictingTask = nonConflictingRequestsQueue.getFirst();
1666          lastNonConflictingTask = nonConflictingRequestsQueue.getLast();
1667          for (Iterator JavaDoc iter = nonConflictingRequestsQueue.iterator(); iter
1668              .hasNext();)
1669          {
1670            entry = (BackendTaskQueueEntry) iter.next();
1671            if ((entry.isACommitOrRollback() || (entry.getTask() instanceof KillThreadTask))
1672                && (entry.getProcessingThread() == null))
1673            { // This task is not currently processed, let's execute it
1674
entry.setProcessingThread(thread);
1675              return entry;
1676            }
1677          }
1678        }
1679      }
1680
1681      synchronized (this)
1682      {
1683        // No entry in the queues or all entries are currently processed,
1684
// process the total order queue.
1685
if (fetchNextQueryFromBackendTotalOrderQueue())
1686          continue;
1687
1688        // Double-check that something was not posted in the queue after we
1689
// scanned it
1690
synchronized (nonConflictingRequestsQueue)
1691        {
1692          if (!nonConflictingRequestsQueue.isEmpty())
1693          {
1694            if (firstNonConflictingTask != nonConflictingRequestsQueue
1695                .getFirst())
1696              continue;
1697            if (lastNonConflictingTask != nonConflictingRequestsQueue.getLast())
1698              continue;
1699          }
1700        }
1701
1702        try
1703        {
1704          this.wait();
1705        }
1706        catch (InterruptedException JavaDoc ignore)
1707        {
1708        }
1709      }
1710
1711    }
1712    // We should never reach this point
1713
return null;
1714  }
1715
1716  /**
1717   * Checks if the current entry needs to wait for a later entry before being
1718   * able to execute.
1719   *
1720   * @param currentTask the current <code>AbstractTask</code> candidate for
1721   * scheduling
1722   * @return <code>true</code> if the current task needs to wait for a late
1723   * task before being able to execute, <code>false</code> else
1724   */

1725  private boolean mustWaitForLateTask(AbstractTask currentTask)
1726  {
1727    if (currentTask.isPersistentConnection())
1728    {
1729      long currentCid = currentTask.getPersistentConnectionId();
1730      // Check if there are other requests for this transaction in
1731
// the queue
1732
if (hasTaskForPersistentConnectionInQueue(nonConflictingRequestsQueue,
1733          currentCid)
1734          || hasTaskForPersistentConnectionInQueue(conflictingRequestsQueue,
1735              currentCid)
1736          || hasTaskForPersistentConnectionInQueue(storedProcedureQueue,
1737              currentCid))
1738        // Skip this commit/rollback until the conflicting request completes
1739
return true;
1740    }
1741
1742    if (!currentTask.isAutoCommit())
1743    {
1744      long currentTid = currentTask.getTransactionId();
1745      // Check if there are other requests for this transaction in
1746
// the queue
1747
if (hasTaskForTransactionInQueue(nonConflictingRequestsQueue, currentTid)
1748          || hasTaskForTransactionInQueue(conflictingRequestsQueue, currentTid)
1749          || hasTaskForTransactionInQueue(storedProcedureQueue, currentTid))
1750        // Skip this commit/rollback until the conflicting request completes
1751
return true;
1752    }
1753
1754    return hasDDLTaskInQueue(nonConflictingRequestsQueue)
1755        || hasDDLTaskInQueue(conflictingRequestsQueue)
1756        || hasDDLTaskInQueue(storedProcedureQueue);
1757  }
1758
1759  private boolean hasDDLTaskInQueue(List JavaDoc queue)
1760  {
1761    boolean retry;
1762    do
1763    {
1764      retry = false;
1765      try
1766      {
1767        for (Iterator JavaDoc iter = queue.iterator(); iter.hasNext();)
1768        {
1769          BackendTaskQueueEntry otherEntry = (BackendTaskQueueEntry) iter
1770              .next();
1771          AbstractTask otherTask = otherEntry.getTask();
1772          AbstractRequest request = otherTask.getRequest();
1773          /**
1774           * For the moment just check if this is a create, drop or alter
1775           * statement, we could also check
1776           * AbstractRequest#altersDatabaseSchema() but we don't want to block
1777           * if this is not a DDL (a stored procedure might alter the schema
1778           * because of its default semantic but still might need other queries
1779           * to be executed before it can really execute).
1780           */

1781          if ((request != null)
1782              && (request.isCreate() || request.isAlter() || request.isDrop()))
1783          {
1784            return true;
1785          }
1786        }
1787      }
1788      catch (ConcurrentModificationException JavaDoc e)
1789      {
1790        retry = true;
1791      }
1792    }
1793    while (retry);
1794    return false;
1795  }
1796
1797  private boolean hasTaskForPersistentConnectionInQueue(List JavaDoc queue, long cid)
1798  {
1799    boolean retry;
1800    do
1801    {
1802      retry = false;
1803      try
1804      {
1805        for (Iterator JavaDoc iter = queue.iterator(); iter.hasNext();)
1806        {
1807          BackendTaskQueueEntry otherEntry = (BackendTaskQueueEntry) iter
1808              .next();
1809
1810          AbstractTask otherTask = otherEntry.getTask();
1811
1812          // Check if the query is in the same transaction
1813
if (otherTask.isPersistentConnection()
1814              && (otherTask.getPersistentConnectionId() == cid))
1815          {
1816            return true;
1817          }
1818        }
1819      }
1820      catch (ConcurrentModificationException JavaDoc e)
1821      {
1822        retry = true;
1823      }
1824    }
1825    while (retry);
1826    return false;
1827  }
1828
1829  private boolean hasTaskForTransactionInQueue(List JavaDoc queue, long tid)
1830  {
1831    boolean retry;
1832    do
1833    {
1834      retry = false;
1835      try
1836      {
1837        for (Iterator JavaDoc iter = queue.iterator(); iter.hasNext();)
1838        {
1839          BackendTaskQueueEntry otherEntry = (BackendTaskQueueEntry) iter
1840              .next();
1841
1842          AbstractTask otherTask = otherEntry.getTask();
1843
1844          // Check if the query is in the same transaction
1845
if (!otherTask.isAutoCommit()
1846              && (otherTask.getTransactionId() == tid))
1847          {
1848            return true;
1849          }
1850        }
1851      }
1852      catch (ConcurrentModificationException JavaDoc e)
1853      {
1854        retry = true;
1855      }
1856    }
1857    while (retry);
1858    return false;
1859  }
1860
1861  /**
1862   * Return true if tasks are allowed to be posted to the queue. If false, all
1863   * tasks posted to the queue are systematically notified for completion
1864   * without being executed (abort behavior)
1865   *
1866   * @return Returns the allowTasksToBePosted.
1867   */

1868  public boolean allowTasksToBePosted()
1869  {
1870    synchronized (ALLOW_TASKS_SYNC)
1871    {
1872      return allowTasksToBePosted;
1873    }
1874  }
1875
1876  /**
1877   * Set to true if tasks are allowed to be posted to the queue else, all tasks
1878   * posted to the queue are systematically notified for completion without
1879   * being executed (abort behavior)
1880   *
1881   * @param allowTasksToBePosted The allowTasksToBePosted to set.
1882   */

1883  public void setAllowTasksToBePosted(boolean allowTasksToBePosted)
1884  {
1885    synchronized (ALLOW_TASKS_SYNC)
1886    {
1887      this.allowTasksToBePosted = allowTasksToBePosted;
1888    }
1889  }
1890
1891  /**
1892   * Start a new Deadlock Detection Thread (throws a RuntimeException if called
1893   * twice without stopping the thread before the second call).
1894   *
1895   * @param vdb the virtual database the backend is attached to
1896   */

1897  public void startDeadlockDetectionThread(VirtualDatabase vdb)
1898  {
1899    if (deadlockDetectionThread != null)
1900      throw new RuntimeException JavaDoc(
1901          "Trying to start multiple times a deadlock detection thread on the same backend "
1902              + backend.getName());
1903
1904    deadlockDetectionThread = new DeadlockDetectionThread(backend, vdb,
1905        atomicPostSyncObject, waitForCompletionPolicy.getDeadlockTimeoutInMs());
1906    deadlockDetectionThread.start();
1907  }
1908
1909  /**
1910   * Terminate the Deadlock Detection Thread. Throws a RuntimeException is the
1911   * thread was already stopped (or not started).
1912   */

1913  public void terminateDeadlockDetectionThread()
1914  {
1915    if (deadlockDetectionThread == null)
1916      throw new RuntimeException JavaDoc(
1917          "No deadlock detection thread to stop on backend "
1918              + backend.getName());
1919
1920    deadlockDetectionThread.kill();
1921    deadlockDetectionThread = null;
1922  }
1923
1924  /**
1925   * Returns a <code>String</code> corresponding to the dump of the internal
1926   * state of this BackendTaskQueues.<br />
1927   * This method is synchronized to provided a consistent snapshots of the
1928   * queues.
1929   *
1930   * @return a <code>String</code> representing the internal state of this
1931   * BackendTaskQueues
1932   */

1933  protected synchronized String JavaDoc dump()
1934  {
1935    StringBuffer JavaDoc buff = new StringBuffer JavaDoc();
1936    buff.append("Non Conflicting Requests Queue ("
1937        + nonConflictingRequestsQueue.size() + ")\n");
1938    for (Iterator JavaDoc iter = nonConflictingRequestsQueue.iterator(); iter.hasNext();)
1939    {
1940      BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter.next();
1941      buff.append("\t" + entry + "\n");
1942    }
1943    buff.append("Conflicting Requests Queue ("
1944        + conflictingRequestsQueue.size() + ")\n");
1945    for (Iterator JavaDoc iter = conflictingRequestsQueue.iterator(); iter.hasNext();)
1946    {
1947      BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter.next();
1948      buff.append("\t" + entry + "\n");
1949    }
1950    buff.append("Stored Procedures Queue (" + storedProcedureQueue.size()
1951        + ")\n");
1952    for (Iterator JavaDoc iter = storedProcedureQueue.iterator(); iter.hasNext();)
1953    {
1954      BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter.next();
1955      buff.append("\t" + entry + "\n");
1956    }
1957    return buff.toString();
1958  }
1959}
1960
Popular Tags