KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > continuent > sequoia > controller > loadbalancer > tasks > AbstractTask


1 /**
2  * Sequoia: Database clustering technology.
3  * Copyright (C) 2002-2004 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Copyright (C) 2006 Continuent, Inc.
6  * Contact: sequoia@continuent.org
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  * Initial developer(s): Emmanuel Cecchet.
21  * Contributor(s): Jaco Swart.
22  */

23
24 package org.continuent.sequoia.controller.loadbalancer.tasks;
25
26 import java.sql.ResultSet JavaDoc;
27 import java.sql.SQLException JavaDoc;
28 import java.util.ArrayList JavaDoc;
29 import java.util.HashMap JavaDoc;
30 import java.util.List JavaDoc;
31 import java.util.Map JavaDoc;
32
33 import org.continuent.sequoia.common.exceptions.SQLExceptionFactory;
34 import org.continuent.sequoia.controller.backend.DatabaseBackend;
35 import org.continuent.sequoia.controller.loadbalancer.BackendWorkerThread;
36 import org.continuent.sequoia.controller.requests.AbstractRequest;
37
38 /**
39  * Defines an abstract task to be processed by a
40  * <code>BackendWorkerThread</code>.
41  *
42  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
43  * @author <a HREF="mailto:jaco.swart@iblocks.co.uk">Jaco Swart </a>
44  * @version 1.0
45  */

46 public abstract class AbstractTask
47 {
48   //
49
// How the code is organized ?
50
// 1. Member variables
51
// 2. Constructor(s)
52
// 3. Task management
53
// 4. Getter/Setter
54
//
55

56   /** Total number of threads. */
57   private int totalNb;
58
59   /** Number of threads that must succeed before returning. */
60   private int nbToComplete;
61
62   /** Number of thread that have started the execution of the task */
63   private int executionStarted;
64   /** Number of backendThread that have succeeded */
65   private int success = 0;
66   /** Number of backendThread that have failed */
67   private int failed = 0;
68   /** List of backendThread that have notified this task */
69   private List JavaDoc notifications = null;
70   /** List of exceptions of failed nodes */
71   private List JavaDoc exceptions = null;
72   /** Map of Lists of locksMap taken by this task, indexed by backend */
73   private Map JavaDoc locksMap = new HashMap JavaDoc();
74
75   // ResultSet for getting back autogenerated keys in an update with keys
76
private ResultSet JavaDoc generatedKeysResultSet;
77
78   // True if the timeout has expired on this task
79
private boolean timeoutExpired = false;
80
81   /** Query result on the fist backend to succeed. Used for sanity check */
82   private int resultOnFirstBackendToSucceed;
83
84   /** True if this task executes on a persistent connection */
85   private boolean persistentConnection;
86
87   /** Persistent connection id if persistentConnection is true */
88   private long persistentConnectionId;
89
90   /*
91    * Constructor
92    */

93
94   /**
95    * Sets the number of threads among the total number of threads that must
96    * successfully complete the execution of this AbstractTask before returning.
97    *
98    * @param nbToComplete number of threads that must succeed before returning
99    * @param totalNb total number of threads
100    * @param isPersistentConnection true if this task executes on a persistent
101    * connection
102    * @param persistentConnectionId persistent connection id if
103    * persistentConnection is true
104    */

105   public AbstractTask(int nbToComplete, int totalNb,
106       boolean isPersistentConnection, long persistentConnectionId)
107   {
108     this.nbToComplete = nbToComplete;
109     this.totalNb = totalNb;
110     success = 0;
111     failed = 0;
112     executionStarted = 0;
113     notifications = new ArrayList JavaDoc(nbToComplete);
114     this.persistentConnection = isPersistentConnection;
115     this.persistentConnectionId = persistentConnectionId;
116   }
117
118   /*
119    * Task management
120    */

121
122   /**
123    * The task code executed by the backendThread.
124    *
125    * @param backendThread The backend thread executing this task
126    * @throws SQLException if an error occurs
127    */

128   public void execute(BackendWorkerThread backendThread) throws SQLException JavaDoc
129   {
130     synchronized (this)
131     {
132       // If the task has expired and nobody has executed it yet, we ignore it
133
// else we have to play it.
134
// Note that the exception corresponding to the timeout is set by the
135
// caller of setExpiredTimeout.
136
if (timeoutExpired && (executionStarted == 0))
137         return;
138       this.executionStarted++;
139     }
140     executeTask(backendThread);
141     // Completed executions are handled by the task internal code that calls
142
// notifyFailure or notifySuccess.
143
}
144
145   /**
146    * The implementation specific task code to be executed by backendThread.
147    *
148    * @param backendThread The backend thread executing this task
149    * @throws SQLException if an error occurs
150    */

151   public abstract void executeTask(BackendWorkerThread backendThread)
152       throws SQLException JavaDoc;
153
154   /**
155    * This is used to notify the completion of this task without success or
156    * failure. This is usually used when the task has been discarded for example
157    * by a backend that is currently disabling but still needs to execute the
158    * remaining queries in open transactions.
159    * <p>
160    * Therefore, this only decrements by one the number of threads that needs to
161    * complete.
162    *
163    * @param backendThread The backend worker thread notifying this task (null in
164    * case this task is cancelled before being processed by a worker
165    * thread)
166    */

167   public synchronized void notifyCompletion(BackendWorkerThread backendThread)
168   {
169     if ((backendThread != null) && !addNotification(backendThread))
170       return;
171
172     totalNb--;
173     // Notify if needed
174
if (success + failed >= totalNb)
175     {
176       notifyAll(); // Notify all failed threads
177
}
178   }
179
180   /**
181    * Notifies that the specified backendThread failed to execute this task. If
182    * all nodes failed, this method return <code>false</code> meaning that the
183    * problem was due to the task and not to the thread. If the method returns
184    * <code>true</code>, it can mean that this thread failed and is no more
185    * coherent, therefore the backend associated to this thread should be
186    * disabled.
187    *
188    * @param backendThread The backend thread notifying this task
189    * @param timeout time in milliseconds to wait for other threads to signal
190    * success or failure (use -1 if you don't want to wait)
191    * @param e the exception causing the failure
192    * @return <code>true</code> if at least one node succeeded to execute this
193    * task, <code>false</code> if all threads failed
194    * @throws SQLException if an error occured in the notification process
195    */

196   public synchronized boolean notifyFailure(BackendWorkerThread backendThread,
197       long timeout, Throwable JavaDoc e) throws SQLException JavaDoc
198   {
199     if (!addNotification(backendThread))
200     {
201       if (backendThread != null)
202         backendThread.getLogger().info(
203             "Backend " + backendThread.getBackend() + " already notified task "
204                 + toString());
205       return success > 0;
206     }
207
208     failed++;
209
210     // Log the exception
211
if (exceptions == null)
212       exceptions = new ArrayList JavaDoc();
213     String JavaDoc backendName;
214     if (backendThread == null)
215     { // Happens in case of cascade abort (see SEQUOIA-469)
216
backendName = "Query not processed";
217     }
218     else
219       backendName = backendThread.getName();
220
221     if (e instanceof SQLException JavaDoc)
222     {
223       SQLException JavaDoc sqlEx = (SQLException JavaDoc) e;
224       exceptions.add(SQLExceptionFactory.getSQLException(sqlEx, "Backend "
225           + backendName + " failed (" + sqlEx.getLocalizedMessage() + ")"));
226     }
227     else
228       exceptions.add(new SQLException JavaDoc("Backend " + backendName + " failed ("
229           + e.getLocalizedMessage() + ")").initCause(e));
230
231     // Notify if needed
232
if (success + failed >= totalNb)
233     {
234       notifyAll(); // Notify all failed threads
235
}
236     else
237     {
238       if ((timeout > -1) && (success == 0))
239       {
240         try
241         { // Wait to check if all other threads failed or not
242
wait(timeout);
243         }
244         catch (InterruptedException JavaDoc ie)
245         {
246           throw (SQLException JavaDoc) new SQLException JavaDoc(
247               "Wait interrupted() in failed task of backend " + backendName
248                   + " (" + e.getLocalizedMessage() + ")").initCause(e);
249         }
250       }
251     }
252     return success > 0;
253   }
254
255   /**
256    * Notifies the successful completion of this task.
257    *
258    * @param backendThread The backend thread notifying this task
259    */

260   public synchronized void notifySuccess(BackendWorkerThread backendThread)
261
262   {
263     if (!addNotification(backendThread))
264       return;
265
266     doNotifySuccess();
267   }
268
269   /**
270    *
271    */

272   private void doNotifySuccess()
273   {
274     success++;
275
276     // Notify if needed
277
if ((success == nbToComplete) || (success + failed >= totalNb))
278     {
279       if (failed > 0)
280         notifyAll(); // Notify all failed threads too
281
else
282         notify();
283     }
284   }
285
286   /**
287    * Notifies the successful completion of this task and provide the
288    * resultOnFirstBackendToSucceed for checking.
289    *
290    * @param backendThread The backend thread notifying this task
291    * @param result the result of the query on the backend that notify the
292    * success
293    * @return the result returned by the query on the first backend which
294    * succeeded
295    */

296   public synchronized int notifySuccess(BackendWorkerThread backendThread,
297       int result)
298   {
299     if (success == 0)
300     {
301       /*
302        * we keep only the result of the first backend to succeed which can be
303        * used as a basis to check that success on other backends will be
304        * consistent with that first result.
305        */

306       resultOnFirstBackendToSucceed = result;
307     }
308
309     /*
310      * There is a nasty case if the database does not support
311      * Statement.cancel(). When a deadlock is detected, the query will be
312      * aborted and failure will be notified. But if the query cannot be
313      * cancelled, it might be notified as a success later on. In this case, we
314      * have to update resultOnFirstBackendToSucceed first (this is why the check
315      * on addNotification is done after the update of
316      * resultOnFirstBackendToSucceed.
317      */

318     if (!addNotification(backendThread))
319       return resultOnFirstBackendToSucceed;
320
321     doNotifySuccess();
322
323     return resultOnFirstBackendToSucceed;
324   }
325
326   /**
327    * Add a backend worker thread to the notification list
328    *
329    * @param backendThread the backend worker thread to add
330    * @return false if this thread already notified the task
331    */

332   private boolean addNotification(BackendWorkerThread backendThread)
333   {
334     if (notifications.contains(backendThread))
335       return false;
336     notifications.add(backendThread);
337     return true;
338   }
339
340   //
341
// Getter/Setter
342
//
343

344   /**
345    * Returns true if this task is in autocommit mode, false if it is in a
346    * transaction.
347    *
348    * @return Returns true if task must be executed in autoCommit mode.
349    * @see #getTransactionId()
350    */

351   public abstract boolean isAutoCommit();
352
353   /**
354    * Returns the exceptions lists.
355    *
356    * @return an <code>List</code>
357    */

358   public List JavaDoc getExceptions()
359   {
360     return exceptions;
361   }
362
363   /**
364    * Returns the number of threads that have started the execution of the task.
365    *
366    * @return Returns the number of started executions.
367    */

368   public synchronized int getExecutionStarted()
369   {
370     return executionStarted;
371   }
372
373   /**
374    * Set the flag to tell that the timeout has expired on this task. If no
375    * backend has started the task execution then the task will be canceled and
376    * the method will return true. Otherwise, all backends will execute the
377    * request and the method will return false.
378    *
379    * @return true if BackendThreads will ignore the task, false if all backends
380    * will execute the task.
381    */

382   public synchronized boolean setExpiredTimeout()
383   {
384     this.timeoutExpired = true;
385     return executionStarted == 0;
386   }
387
388   /**
389    * Returns the failed.
390    *
391    * @return an <code>int</code> value
392    */

393   public int getFailed()
394   {
395     return failed;
396   }
397
398   /**
399    * Returns the generatedKeysResultSet value.
400    *
401    * @return Returns the generatedKeysResultSet.
402    */

403   public ResultSet JavaDoc getGeneratedKeysResultSet()
404   {
405     return generatedKeysResultSet;
406   }
407
408   /**
409    * Sets the generatedKeysResultSet value.
410    *
411    * @param generatedKeysResultSet The generatedKeysResultSet to set.
412    */

413   public void setGeneratedKeysResultSet(ResultSet JavaDoc generatedKeysResultSet)
414   {
415     this.generatedKeysResultSet = generatedKeysResultSet;
416   }
417
418   /**
419    * Returns the locksMap taken by this task for a given backend.
420    *
421    * @param backend backend for which to get the locksMap
422    * @return Returns the locksMap.
423    */

424   public List JavaDoc getLocks(DatabaseBackend backend)
425   {
426     synchronized (locksMap)
427     {
428       return (List JavaDoc) locksMap.get(backend);
429     }
430   }
431
432   /**
433    * Sets the locksMap taken by this task for a given backend (ignored if the
434    * locksMap were already set once).
435    *
436    * @param backend backend for which to set the locksMap
437    * @param locks The locks map taken.
438    */

439   public synchronized void setLocks(DatabaseBackend backend, List JavaDoc locks)
440   {
441     synchronized (locksMap)
442     {
443       /*
444        * Use get instead of contains key, because sometimes a null entry is
445        * added.
446        */

447       if (locksMap.get(backend) == null)
448         locksMap.put(backend, locks);
449       else
450       {
451         backend.getLogger().fatal(
452             "Double locks entry: " + locks + " and " + locksMap.get(backend),
453             new Exception JavaDoc());
454       }
455     }
456   }
457
458   /**
459    * Returns the number of threads that must succeed before returning.
460    *
461    * @return an <code>int</code> value
462    */

463   public int getNbToComplete()
464   {
465     return nbToComplete;
466   }
467
468   /**
469    * Returns the persistentConnection value.
470    *
471    * @return Returns the persistentConnection.
472    */

473   public final boolean isPersistentConnection()
474   {
475     return persistentConnection;
476   }
477
478   /**
479    * Sets the persistentConnection value.
480    *
481    * @param persistentConnection The persistentConnection to set.
482    */

483   public final void setPersistentConnection(boolean persistentConnection)
484   {
485     this.persistentConnection = persistentConnection;
486   }
487
488   /**
489    * Returns the persistentConnectionId value.
490    *
491    * @return Returns the persistentConnectionId.
492    */

493   public final long getPersistentConnectionId()
494   {
495     return persistentConnectionId;
496   }
497
498   /**
499    * Sets the persistentConnectionId value.
500    *
501    * @param persistentConnectionId The persistentConnectionId to set.
502    */

503   public final void setPersistentConnectionId(long persistentConnectionId)
504   {
505     this.persistentConnectionId = persistentConnectionId;
506   }
507
508   /**
509    * Return the request associated with this task if any, returns null
510    * otherwise.
511    *
512    * @return the request associated with this task or null
513    */

514   public abstract AbstractRequest getRequest();
515
516   /**
517    * Returns the success.
518    *
519    * @return an <code>int</code> value
520    */

521   public int getSuccess()
522   {
523     return success;
524   }
525
526   /**
527    * Returns the total number of threads.
528    *
529    * @return an <code>int</code> value
530    * @see #setTotalNb
531    */

532   public int getTotalNb()
533   {
534     return totalNb;
535   }
536
537   /**
538    * Sets the total number of threads.
539    *
540    * @param totalNb the total number of threads to set
541    * @see #getTotalNb
542    */

543   public void setTotalNb(int totalNb)
544   {
545     this.totalNb = totalNb;
546   }
547
548   /**
549    * Returns the transaction identifier of this task if any (isAutoCommit
550    * returns false).
551    *
552    * @return Returns the transaction identifier.
553    */

554   public abstract long getTransactionId();
555
556   /**
557    * Returns true if the task has been sucessfully completed by nbToComplete
558    * nodes (set in the constructor) of if everyone has completed (successfully
559    * or not), false otherwise.
560    *
561    * @return true if the task execution is complete
562    * @see AbstractTask#AbstractTask(int, int, boolean, long)
563    */

564   public synchronized boolean hasCompleted()
565   {
566     return ((success >= nbToComplete) || (success + failed == totalNb));
567   }
568
569   /**
570    * Returns true if the task has completed (successfully or not) or false if we
571    * are still expecting answers from some backends.
572    *
573    * @return true if the task execution is complete
574    */

575   public synchronized boolean hasFullyCompleted()
576   {
577     return success + failed == totalNb;
578   }
579
580 }
581
Popular Tags