KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > cjdbc > controller > loadbalancer > raidb2 > RAIDb2ec


1 /**
2  * C-JDBC: Clustered JDBC.
3  * Copyright (C) 2002-2004 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Contact: c-jdbc@objectweb.org
6  *
7  * This library is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as published by the
9  * Free Software Foundation; either version 2.1 of the License, or any later
10  * version.
11  *
12  * This library is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15  * for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this library; if not, write to the Free Software Foundation,
19  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
20  *
21  * Initial developer(s): Emmanuel Cecchet.
22  * Contributor(s): ______________________.
23  */

24
25 package org.objectweb.cjdbc.controller.loadbalancer.raidb2;
26
27 import java.sql.SQLException JavaDoc;
28 import java.util.ArrayList JavaDoc;
29
30 import org.objectweb.cjdbc.common.i18n.Translate;
31 import org.objectweb.cjdbc.common.log.Trace;
32 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
33 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
34 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread;
35 import org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy;
36 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTablePolicy;
37 import org.objectweb.cjdbc.controller.loadbalancer.policies.errorchecking.ErrorCheckingPolicy;
38 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask;
39 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
40
41 /**
42  * RAIDb-2ec load balancer.
43  * <p>
44  * This class is an abstract call because the read requests coming from the
45  * request manager are NOT treated here but in the subclasses. This class deals
46  * with backend enable/disable for backendReadThreads creation/termination.
47  *
48  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
49  * @version 1.0
50  */

51 public abstract class RAIDb2ec extends RAIDb2
52 {
53   /*
54    * How the code is organized ? 1. Member variables 2. Constructor(s) 3.
55    * Request handling 4. Transaction handling 5. Backend management
56    */

57
58   protected ArrayList JavaDoc backendReadThreads;
59   protected int nbOfConcurrentReads;
60   protected ErrorCheckingPolicy errorCheckingPolicy;
61
62   protected static Trace logger = Trace
63                                            .getLogger("org.objectweb.cjdbc.controller.loadbalancer.RAIDb2ec");
64
65   /*
66    * Constructors
67    */

68
69   /**
70    * Creates a new RAIDb-1 Round Robin request load balancer. A new backend
71    * worker thread is created for each backend.
72    *
73    * @param vdb the virtual database this load balancer belongs to.
74    * @param waitForCompletionPolicy how many backends must complete before
75    * returning the result?
76    * @param createTablePolicy the policy defining how 'create table' statements
77    * should be handled
78    * @param errorCheckingPolicy policy to apply for error checking.
79    * @param nbOfConcurrentReads number of concurrent reads allowed
80    * @exception Exception if an error occurs
81    */

82   public RAIDb2ec(VirtualDatabase vdb,
83       WaitForCompletionPolicy waitForCompletionPolicy,
84       CreateTablePolicy createTablePolicy,
85       ErrorCheckingPolicy errorCheckingPolicy, int nbOfConcurrentReads)
86       throws Exception JavaDoc
87   {
88     super(vdb, waitForCompletionPolicy, createTablePolicy);
89     backendReadThreads = new ArrayList JavaDoc();
90     this.errorCheckingPolicy = errorCheckingPolicy;
91     this.nbOfConcurrentReads = nbOfConcurrentReads;
92   }
93
94   /*
95    * Backends management
96    */

97
98   /**
99    * Enables a backend that was previously disabled.
100    * <p>
101    * Ask the corresponding connection manager to initialize the connections if
102    * needed.
103    * <p>
104    * No sanity checks are performed by this function.
105    *
106    * @param db the database backend to enable
107    * @param writeEnabled True if the backend must be enabled for writes
108    * @throws SQLException if an error occurs
109    */

110   public void enableBackend(DatabaseBackend db, boolean writeEnabled)
111       throws SQLException JavaDoc
112   {
113     // Create 2 worker threads for writes
114
BackendWorkerThread blockingThread = new BackendWorkerThread(db, this);
115     BackendWorkerThread nonBlockingThread = new BackendWorkerThread(db, this);
116
117     // Add first to the blocking thread list
118
try
119     {
120       backendBlockingThreadsRWLock.acquireWrite();
121     }
122     catch (InterruptedException JavaDoc e)
123     {
124       String JavaDoc msg = Translate.get(
125           "loadbalancer.backendlist.acquire.writelock.failed", e);
126       logger.error(msg);
127       throw new SQLException JavaDoc(msg);
128     }
129     backendBlockingThreads.add(blockingThread);
130     backendBlockingThreadsRWLock.releaseWrite();
131     blockingThread.start();
132     logger.info(Translate.get("loadbalancer.backend.workerthread.blocking.add",
133         db.getName()));
134
135     // Then add to the non-blocking thread list
136
try
137     {
138       backendNonBlockingThreadsRWLock.acquireWrite();
139     }
140     catch (InterruptedException JavaDoc e)
141     {
142       String JavaDoc msg = Translate.get(
143           "loadbalancer.backendlist.acquire.writelock.failed", e);
144       logger.error(msg);
145       throw new SQLException JavaDoc(msg);
146     }
147     backendNonBlockingThreads.add(nonBlockingThread);
148     backendNonBlockingThreadsRWLock.releaseWrite();
149     nonBlockingThread.start();
150     logger.info(Translate.get(
151         "loadbalancer.backend.workerthread.non.blocking.add", db.getName()));
152
153     if (!db.isInitialized())
154       db.initializeConnections();
155     db.enableRead();
156     if (writeEnabled)
157       db.enableWrite();
158   }
159
160   /**
161    * Disables a backend that was previously enabled.
162    * <p>
163    * Ask the corresponding connection manager to finalize the connections if
164    * needed.
165    * <p>
166    * No sanity checks are performed by this function.
167    *
168    * @param db the database backend to disable
169    * @throws SQLException if an error occurs
170    */

171   public synchronized void disableBackend(DatabaseBackend db)
172       throws SQLException JavaDoc
173   {
174     int nbOfThreads = backendBlockingThreads.size();
175
176     // Find the right blocking thread
177
for (int i = 0; i < nbOfThreads; i++)
178     {
179       BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
180           .get(i);
181       if (thread.getBackend().equals(db))
182       {
183         logger.info(Translate.get(
184             "loadbalancer.backend.workerthread.blocking.remove", db.getName()));
185
186         // Remove it from the backendBlockingThread list
187
try
188         {
189           backendBlockingThreadsRWLock.acquireWrite();
190         }
191         catch (InterruptedException JavaDoc e)
192         {
193           String JavaDoc msg = Translate.get(
194               "loadbalancer.backendlist.acquire.writelock.failed", e);
195           logger.error(msg);
196           throw new SQLException JavaDoc(msg);
197         }
198         backendBlockingThreads.remove(thread);
199         backendBlockingThreadsRWLock.releaseWrite();
200
201         synchronized (thread)
202         {
203           // Kill the thread
204
thread.addPriorityTask(new KillThreadTask(1, 1));
205           thread.notify();
206         }
207         break;
208       }
209     }
210
211     // Find the right non-blocking thread
212
nbOfThreads = backendNonBlockingThreads.size();
213     for (int i = 0; i < nbOfThreads; i++)
214     {
215       BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
216           .get(i);
217       if (thread.getBackend().equals(db))
218       {
219         logger.info(Translate.get(
220             "loadbalancer.backend.workerthread.non.blocking.remove", db
221                 .getName()));
222
223         // Remove it from the backendNonBlockingThreads list
224
try
225         {
226           backendNonBlockingThreadsRWLock.acquireWrite();
227         }
228         catch (InterruptedException JavaDoc e)
229         {
230           String JavaDoc msg = Translate.get(
231               "loadbalancer.backendlist.acquire.writelock.failed", e);
232           logger.error(msg);
233           throw new SQLException JavaDoc(msg);
234         }
235         backendNonBlockingThreads.remove(thread);
236         backendNonBlockingThreadsRWLock.releaseWrite();
237
238         synchronized (thread)
239         {
240           // Kill the thread
241
thread.addPriorityTask(new KillThreadTask(1, 1));
242           thread.notify();
243         }
244         break;
245       }
246     }
247
248     db.disable();
249     if (db.isInitialized())
250       db.finalizeConnections();
251   }
252
253   /**
254    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#getXmlImpl
255    */

256   public String JavaDoc getXmlImpl()
257   {
258     StringBuffer JavaDoc info = new StringBuffer JavaDoc();
259     info.append("<" + DatabasesXmlTags.ELT_RAIDb_2ec + " "
260         + DatabasesXmlTags.ATT_nbOfConcurrentReads + "=\""
261         + this.nbOfConcurrentReads + "\">");
262     this.getRaidb2Xml();
263     if (waitForCompletionPolicy != null)
264       info.append(waitForCompletionPolicy.getXml());
265     info.append("</" + DatabasesXmlTags.ELT_RAIDb_2ec + ">");
266     return info.toString();
267   }
268 }
Popular Tags