KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > cjdbc > controller > loadbalancer > BackendWorkerThread


1 /**
2  * C-JDBC: Clustered JDBC.
3  * Copyright (C) 2002-2004 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Contact: c-jdbc@objectweb.org
6  *
7  * This library is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as published by the
9  * Free Software Foundation; either version 2.1 of the License, or any later
10  * version.
11  *
12  * This library is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15  * for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this library; if not, write to the Free Software Foundation,
19  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
20  *
21  * Initial developer(s): Emmanuel Cecchet.
22  * Contributor(s):
23  */

24
25 package org.objectweb.cjdbc.controller.loadbalancer;
26
27 import java.sql.SQLException JavaDoc;
28 import java.util.ArrayList JavaDoc;
29
30 import org.objectweb.cjdbc.common.i18n.Translate;
31 import org.objectweb.cjdbc.common.log.Trace;
32 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
33 import org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask;
34
35 /**
36  * Process sequentially a set of tasks and send them to a backend.
37  *
38  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
39  * @version 1.0
40  */

41 public class BackendWorkerThread extends Thread JavaDoc
42 {
43   //
44
// How the code is organized ?
45
// 1. Member variables
46
// 2. Constructor(s)
47
// 3. Task management
48
// 4. Getter/Setters
49
//
50

51   private AbstractLoadBalancer loadBalancer;
52   private DatabaseBackend backend;
53   private ArrayList JavaDoc taskList;
54   private ArrayList JavaDoc tidList;
55   private boolean isKilled = false;
56
57   // not null if we are currently processing a task
58
private AbstractTask currentlyProcessingTask;
59   // Tid of the current task if currentlyProcessingTask.hasTid()==true
60
private Long JavaDoc currentTaskTid;
61
62   private Trace logger = null;
63
64   /*
65    * Constructor
66    */

67
68   /**
69    * Creates a new <code>BackendWorkerThread</code>.
70    *
71    * @param backend the backend this thread is associated to.
72    * @param loadBalancer the load balancer instanciating this thread
73    * @throws SQLException if an error occurs
74    */

75   public BackendWorkerThread(DatabaseBackend backend,
76       AbstractLoadBalancer loadBalancer) throws SQLException JavaDoc
77   {
78     this("BackendWorkerThread for backend '" + backend.getName()
79         + "' with RAIDb level:" + loadBalancer.getRAIDbLevel(), backend,
80         loadBalancer);
81   }
82
83   /**
84    * Creates a new <code>BackendWorkerThread</code>.
85    *
86    * @param name the name to give to the thread
87    * @param backend the backend this thread is associated to.
88    * @param loadBalancer the load balancer instanciating this thread
89    * @throws SQLException if an error occurs
90    */

91   public BackendWorkerThread(String JavaDoc name, DatabaseBackend backend,
92       AbstractLoadBalancer loadBalancer) throws SQLException JavaDoc
93   {
94     super(name);
95     // Sanity checks
96
if (backend == null)
97     {
98       String JavaDoc msg = Translate.get("backendworkerthread.null.backend");
99       logger = Trace
100           .getLogger("org.objectweb.cjdbc.controller.backend.DatabaseBackend");
101       logger.error(msg);
102       throw new SQLException JavaDoc(msg);
103     }
104
105     backend.checkDriverCompliance();
106
107     logger = Trace
108         .getLogger("org.objectweb.cjdbc.controller.backend.DatabaseBackend."
109             + backend.getName());
110
111     if (loadBalancer == null)
112     {
113       String JavaDoc msg = Translate.get("backendworkerthread.null.loadbalancer");
114       logger.error(msg);
115       throw new SQLException JavaDoc(msg);
116     }
117
118     this.backend = backend;
119     this.loadBalancer = loadBalancer;
120     taskList = new ArrayList JavaDoc();
121     tidList = new ArrayList JavaDoc();
122   }
123
124   /*
125    * Task Management
126    */

127
128   /**
129    * Adds a task at the end of the task list. Warning! This method is not
130    * synchronized and the caller must synchronize on the thread before calling
131    * this method.
132    *
133    * @param task the task to add
134    */

135   public void addTask(AbstractTask task)
136   {
137     if (!isKilled)
138     {
139       taskList.add(task);
140       // We assume that all requests here are writes
141
backend.addPendingWriteRequest(task);
142     }
143     else
144       task.notifyCompletion();
145   }
146
147   /**
148    * Adds a task at the end of the task list. Warning! This method is not
149    * synchronized and the caller must synchronize on the thread before calling
150    * this method.
151    *
152    * @param task the task to add
153    * @param transactionId transaction id in which this task execute
154    */

155   public void addTask(AbstractTask task, long transactionId)
156   {
157     if (!isKilled)
158     {
159       tidList.add(new Long JavaDoc(transactionId));
160       task.setHasTid(true);
161       addTask(task);
162     }
163     else
164       task.notifyCompletion();
165   }
166
167   /**
168    * Adds a task just after the last write task for the given transaction in the
169    * task list. Warning! This method is not synchronized and the caller must
170    * synchronize on the thread before calling this method.
171    * <p>
172    * This method is usually used to insert a commit/rollback task when
173    * asynchrony is allowed between backends.
174    *
175    * @param task the task to add
176    * @param transactionId transaction id in which this task execute
177    */

178   public void insertTaskAfterLastWriteForTransaction(AbstractTask task,
179       Long JavaDoc transactionId)
180   {
181     if (!isKilled)
182     {
183       task.setHasTid(true);
184
185       // Find the last task index in the tid queue
186
int lastTidIndex = tidList.lastIndexOf(transactionId);
187       if (lastTidIndex == -1)
188       { // Not found, add in last position
189
taskList.add(task);
190         tidList.add(transactionId);
191         backend.addPendingWriteRequest(task);
192         return;
193       }
194
195       // Find the corresponding task in the task list (we have to skip
196
// autocommit tasks)
197
int lastRequestIndex = 0;
198       while (lastTidIndex >= 0)
199       {
200         AbstractTask t = (AbstractTask) taskList.get(lastRequestIndex);
201         if (t.hasTid())
202           lastTidIndex--;
203         lastRequestIndex++;
204       }
205
206       // Add the task after the last write task and the tid in the tid list.
207
taskList.add(lastRequestIndex, task);
208       tidList.add(lastTidIndex + 1, transactionId);
209       // Warning, the task is added in queue (not sorted) in the backend pending
210
// request list.
211
backend.addPendingWriteRequest(task);
212     }
213     else
214       task.notifyCompletion();
215   }
216
217   /**
218    * Adds a task upfront to the task list so that this task will be the next
219    * executed task. Warning! This method is not synchronized and the caller must
220    * synchronize on the thread before calling this method.
221    *
222    * @param task the task to add
223    */

224   public void addPriorityTask(AbstractTask task)
225   {
226     if (!isKilled)
227     {
228       taskList.add(0, task);
229       // We assume that all requests here are writes
230
backend.addPendingWriteRequest(task);
231     }
232     else
233       task.notifyCompletion();
234   }
235
236   /**
237    * Adds a task upfront to the task list so that this task will be the next
238    * executed task. Warning! This method is not synchronized and the caller must
239    * synchronize on the thread before calling this method
240    *
241    * @param task the task to add
242    * @param transactionId transaction id in which this task execute
243    */

244   public void addPriorityTask(AbstractTask task, long transactionId)
245   {
246     if (!isKilled)
247     {
248       task.setHasTid(true);
249       addPriorityTask(task);
250       tidList.add(0, new Long JavaDoc(transactionId));
251     }
252     else
253       task.notifyCompletion();
254   }
255
256   /**
257    * Returns true if the thread has pending tasks for the given transaction.
258    *
259    * @param tid the transaction identifier
260    * @return true if the task list contains task(s) for transaction tid.
261    */

262   public boolean hasTaskForTransaction(Long JavaDoc tid)
263   {
264     synchronized (this)
265     {
266       if ((currentTaskTid != null) && (currentTaskTid.equals(tid)))
267         // Currently executing task belong to this transaction
268
return true;
269       else
270         return tidList.contains(tid);
271     }
272   }
273
274   /**
275    * Waits for all tasks of the specified transaction to complete.
276    *
277    * @param transactionId the transaction identifier
278    */

279   public void waitForAllTasksToComplete(long transactionId)
280   {
281     if ((transactionId == 0) || (tidList == null))
282       return;
283
284     Long JavaDoc tid = new Long JavaDoc(transactionId);
285     synchronized (this)
286     {
287       if (!tidList.contains(tid))
288       {
289         if ((currentTaskTid != null)
290             && (currentTaskTid.longValue() == transactionId))
291         {
292           try
293           {
294             if (logger.isDebugEnabled())
295               logger.debug(Translate.get("backendworkerthread.waiting.task"));
296             wait();
297           }
298           catch (InterruptedException JavaDoc ignore)
299           {
300           }
301           return;
302         }
303         else
304           return;
305       }
306
307       while (tidList.contains(tid))
308       {
309         if (logger.isDebugEnabled())
310           logger.debug(Translate.get("backendworkerthread.waiting.transaction",
311               String.valueOf(tid)));
312
313         try
314         {
315           wait();
316         }
317         catch (InterruptedException JavaDoc ignore)
318         {
319         }
320       }
321     }
322   }
323
324   /**
325    * Waits for all current tasks to complete.
326    */

327   public void waitForAllTasksToComplete()
328   {
329     synchronized (this)
330     {
331       Object JavaDoc current;
332       if (taskList.size() == 0)
333       {
334         if (currentlyProcessingTask != null)
335         {
336           try
337           {
338             if (logger.isDebugEnabled())
339               logger.debug(Translate.get("backendworkerthread.waiting.task"));
340             wait();
341           }
342           catch (InterruptedException JavaDoc ignore)
343           {
344             logger.warn(Translate
345                 .get("backendworkerthread.no.full.task.synchronization"));
346           }
347           return;
348         }
349         else
350         { // No task currently executing
351
return;
352         }
353       }
354       else
355         current = taskList.get(taskList.size() - 1);
356
357       if (logger.isDebugEnabled())
358         logger.debug(Translate.get("backendworkerthread.waiting.request",
359             current.toString()));
360
361       while (taskList.contains(current))
362       {
363         try
364         {
365           wait();
366         }
367         catch (InterruptedException JavaDoc ignore)
368         {
369         }
370       }
371     }
372   }
373
374   /**
375    * Kills this thread after the next task processing and forces the load
376    * balancer to disable the backend. It also marks all remaining tasks in the
377    * task list as failed.
378    */

379   public void kill()
380   {
381     kill(true);
382   }
383
384   /**
385    * Kills this thread after the next task processing. It also marks all
386    * remaining tasks in the task list as failed.
387    */

388   public void killWithoutDisablingBackend()
389   {
390     kill(false);
391   }
392
393   /**
394    * Kills this thread after the next task processing. It also marks all
395    * remaining tasks in the task list as failed.
396    *
397    * @param forceDisable true if the task must call the load balancer to disable
398    * the backend
399    */

400   private void kill(boolean forceDisable)
401   {
402     synchronized (this)
403     {
404       if (backend.isKilled())
405         return;
406
407       String JavaDoc msg = this.getName() + " is shutting down";
408       logger.info(msg);
409
410       // Remove all tasks
411
while (!taskList.isEmpty())
412       {
413         AbstractTask task = (AbstractTask) taskList.remove(0);
414         try
415         {
416           task.notifyFailure(this, 1, new SQLException JavaDoc(msg));
417         }
418         catch (SQLException JavaDoc ignore)
419         {
420         }
421       }
422       isKilled = true;
423       notify(); // Wake up thread
424
}
425     if (forceDisable)
426     {
427       try
428       {
429         // This ensure that all worker threads get removed from the load
430
// balancer
431
// list and that the backend state is set to disable.
432
loadBalancer.disableBackend(backend);
433       }
434       catch (SQLException JavaDoc ignore)
435       {
436       }
437     }
438   }
439
440   /**
441    * Process the tasklist and call <code>wait()</code> (on itself) when the
442    * tasklist becomes empty.
443    */

444   public void run()
445   {
446     currentlyProcessingTask = null;
447
448     while (!isKilled)
449     {
450       synchronized (this)
451       {
452         while (taskList.isEmpty() && !isKilled)
453         { // Nothing to do, go to bed!
454
try
455           {
456             wait();
457           }
458           catch (InterruptedException JavaDoc e)
459           {
460             logger.warn(Translate.get("backendworkerthread.wait.interrupted"));
461             break;
462           }
463         }
464         try
465         { // Take the 1st task from the list
466
currentlyProcessingTask = (AbstractTask) taskList.remove(0);
467           if (currentlyProcessingTask.hasTid())
468             currentTaskTid = (Long JavaDoc) tidList.remove(0);
469           else
470             currentTaskTid = null;
471         }
472         catch (IndexOutOfBoundsException JavaDoc oob)
473         {
474           // Should only happen if the thread was interrupted (see above)
475
logger.warn(Translate.get("backendworkerthread.no.task"), oob);
476           currentlyProcessingTask = null;
477         }
478       }
479       // Execute the task out of the sync block
480
try
481       {
482         if (currentlyProcessingTask == null)
483         {
484           logger.warn("Null task in BackendWorkerThread");
485           continue;
486         }
487         if (logger.isDebugEnabled())
488           logger.debug(Translate.get("backendworkerthread.execute.task",
489               currentlyProcessingTask.toString()));
490         currentlyProcessingTask.execute(this);
491       }
492       catch (SQLException JavaDoc e)
493       {
494         // Task should have notified of failure
495
logger.warn(Translate.get("backendworkerthread.task.failed", e));
496       }
497       catch (RuntimeException JavaDoc re)
498       {
499         // We can't know for sure if the task has notified the failure or not.
500
// To prevent a deadlock, we force the failure notification here.
501
try
502         {
503           currentlyProcessingTask.notifyFailure(this, 1, new SQLException JavaDoc(re
504               .getMessage()));
505         }
506         catch (SQLException JavaDoc e1)
507         {
508           // just notify
509
}
510         logger.fatal(Translate.get(
511             "backendworkerthread.task.runtime.exception",
512             currentlyProcessingTask.toString()), re);
513       }
514       finally
515       {
516         try
517         {
518           backend.removePendingRequest(currentlyProcessingTask);
519         }
520         catch (RuntimeException JavaDoc e)
521         {
522           logger.warn(
523               Translate.get("backendworkerthread.remove.task.error", e), e);
524         }
525       }
526
527       // Notify the completion of the task if someone is waiting for
528
// the completion of this transaction.
529
// @see #waitForAllTasksToComplete()
530
// @see #waitForAllTasksToComplete(long)
531
synchronized (this)
532       {
533         notifyAll();
534         currentlyProcessingTask = null;
535         currentTaskTid = null;
536       }
537     } // end while (!isKilled)
538

539     // Automatically disable the backend when the thread dies
540
try
541     {
542       if (backend.isReadEnabled() || backend.isWriteEnabled())
543         loadBalancer.disableBackend(backend);
544     }
545     catch (SQLException JavaDoc e)
546     {
547       logger.error(Translate.get("backendworkerthread.backend.disable.failed",
548           new String JavaDoc[]{backend.getName(), e.getMessage()}));
549     }
550   }
551
552   /*
553    * Getter/Setter
554    */

555
556   /**
557    * Returns the backend.
558    *
559    * @return a <code>DatabaseBackend</code> instance
560    */

561   public DatabaseBackend getBackend()
562   {
563     return backend;
564   }
565
566   /**
567    * Returns the logger for tracing.
568    *
569    * @return a <code>Trace</code> instance
570    */

571   public Trace getLogger()
572   {
573     return logger;
574   }
575
576 }
Popular Tags