KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > cjdbc > controller > loadbalancer > raidb1 > RAIDb1ec


1 /**
2  * C-JDBC: Clustered JDBC.
3  * Copyright (C) 2002-2004 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Contact: c-jdbc@objectweb.org
6  *
7  * This library is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as published by the
9  * Free Software Foundation; either version 2.1 of the License, or any later
10  * version.
11  *
12  * This library is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15  * for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this library; if not, write to the Free Software Foundation,
19  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
20  *
21  * Initial developer(s): Emmanuel Cecchet.
22  * Contributor(s): ______________________.
23  */

24
25 package org.objectweb.cjdbc.controller.loadbalancer.raidb1;
26
27 import java.sql.SQLException JavaDoc;
28 import java.util.ArrayList JavaDoc;
29
30 import org.objectweb.cjdbc.common.i18n.Translate;
31 import org.objectweb.cjdbc.common.log.Trace;
32 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
33 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
34 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread;
35 import org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy;
36 import org.objectweb.cjdbc.controller.loadbalancer.policies.errorchecking.ErrorCheckingPolicy;
37 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask;
38 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
39
40 /**
41  * RAIDb-1 load balancer.
42  * <p>
43  * This class is an abstract call because the read requests coming from the
44  * request manager are NOT treated here but in the subclasses. Transaction
45  * management and write requests are broadcasted to all backends.
46  *
47  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
48  * @version 1.0
49  */

50 public abstract class RAIDb1ec extends RAIDb1
51 {
52   /*
53    * How the code is organized ? 1. Member variables 2. Constructor(s) 3.
54    * Request handling 4. Transaction handling 5. Backend management
55    */

56
57   protected ArrayList JavaDoc backendReadThreads;
58   protected int nbOfConcurrentReads;
59   protected ErrorCheckingPolicy errorCheckingPolicy;
60
61   protected static Trace logger = Trace
62                                            .getLogger("org.objectweb.cjdbc.controller.loadbalancer.RAIDb1ec");
63
64   /*
65    * Constructors
66    */

67
68   /**
69    * Creates a new RAIDb-1 Round Robin request load balancer. A new backend
70    * worker thread is created for each backend.
71    *
72    * @param vdb the virtual database this load balancer belongs to
73    * @param waitForCompletionPolicy how many backends must complete before
74    * returning the result?
75    * @param errorCheckingPolicy policy to apply for error checking.
76    * @param nbOfConcurrentReads number of concurrent reads allowed
77    * @exception Exception if an error occurs
78    */

79   public RAIDb1ec(VirtualDatabase vdb,
80       WaitForCompletionPolicy waitForCompletionPolicy,
81       ErrorCheckingPolicy errorCheckingPolicy, int nbOfConcurrentReads)
82       throws Exception JavaDoc
83   {
84     super(vdb, waitForCompletionPolicy);
85     backendReadThreads = new ArrayList JavaDoc();
86     this.errorCheckingPolicy = errorCheckingPolicy;
87     this.nbOfConcurrentReads = nbOfConcurrentReads;
88   }
89
90   /*
91    * Backends management
92    */

93
94   /**
95    * Enables a backend that was previously disabled.
96    * <p>
97    * Ask the corresponding connection manager to initialize the connections if
98    * needed.
99    * <p>
100    * No sanity checks are performed by this function.
101    *
102    * @param db the database backend to enable
103    * @param writeEnabled True if the backend must be enabled for writes
104    * @throws SQLException if an error occurs
105    */

106   public void enableBackend(DatabaseBackend db, boolean writeEnabled)
107       throws SQLException JavaDoc
108   {
109     // Create 2 worker threads for writes
110
BackendWorkerThread blockingThread = new BackendWorkerThread(db, this);
111     BackendWorkerThread nonBlockingThread = new BackendWorkerThread(db, this);
112
113     // Add first to the blocking thread list
114
try
115     {
116       backendBlockingThreadsRWLock.acquireWrite();
117     }
118     catch (InterruptedException JavaDoc e)
119     {
120       String JavaDoc msg = Translate.get(
121           "loadbalancer.backendlist.acquire.writelock.failed", e);
122       logger.error(msg);
123       throw new SQLException JavaDoc(msg);
124     }
125     backendBlockingThreads.add(blockingThread);
126     backendBlockingThreadsRWLock.releaseWrite();
127     blockingThread.start();
128     logger.info(Translate.get("loadbalancer.backend.workerthread.blocking.add",
129         db.getName()));
130
131     // Then add to the non-blocking thread list
132
try
133     {
134       backendNonBlockingThreadsRWLock.acquireWrite();
135     }
136     catch (InterruptedException JavaDoc e)
137     {
138       String JavaDoc msg = Translate.get(
139           "loadbalancer.backendlist.acquire.writelock.failed", e);
140       logger.error(msg);
141       throw new SQLException JavaDoc(msg);
142     }
143     backendNonBlockingThreads.add(nonBlockingThread);
144     backendNonBlockingThreadsRWLock.releaseWrite();
145     nonBlockingThread.start();
146     logger.info(Translate.get(
147         "loadbalancer.backend.workerthread.non.blocking.add", db.getName()));
148
149     if (!db.isInitialized())
150       db.initializeConnections();
151     db.enableRead();
152     if (writeEnabled)
153       db.enableWrite();
154   }
155
156   /**
157    * Disables a backend that was previously enabled.
158    * <p>
159    * Ask the corresponding connection manager to finalize the connections if
160    * needed.
161    * <p>
162    * No sanity checks are performed by this function.
163    *
164    * @param db the database backend to disable
165    * @throws SQLException if an error occurs
166    */

167   public synchronized void disableBackend(DatabaseBackend db)
168       throws SQLException JavaDoc
169   {
170     int nbOfThreads = backendBlockingThreads.size();
171
172     // Find the right blocking thread
173
for (int i = 0; i < nbOfThreads; i++)
174     {
175       BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
176           .get(i);
177       if (thread.getBackend().equals(db))
178       {
179         logger.info(Translate.get(
180             "loadbalancer.backend.workerthread.blocking.remove", db.getName()));
181
182         // Remove it from the backendBlockingThread list
183
try
184         {
185           backendBlockingThreadsRWLock.acquireWrite();
186         }
187         catch (InterruptedException JavaDoc e)
188         {
189           String JavaDoc msg = Translate.get(
190               "loadbalancer.backendlist.acquire.writelock.failed", e);
191           logger.error(msg);
192           throw new SQLException JavaDoc(msg);
193         }
194         backendBlockingThreads.remove(thread);
195         backendBlockingThreadsRWLock.releaseWrite();
196
197         synchronized (thread)
198         {
199           // Kill the thread
200
thread.addPriorityTask(new KillThreadTask(1, 1));
201           thread.notify();
202         }
203         break;
204       }
205     }
206
207     // Find the right non-blocking thread
208
nbOfThreads = backendNonBlockingThreads.size();
209     for (int i = 0; i < nbOfThreads; i++)
210     {
211       BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
212           .get(i);
213       if (thread.getBackend().equals(db))
214       {
215         logger.info(Translate.get(
216             "loadbalancer.backend.workerthread.non.blocking.remove", db
217                 .getName()));
218
219         // Remove it from the backendNonBlockingThreads list
220
try
221         {
222           backendNonBlockingThreadsRWLock.acquireWrite();
223         }
224         catch (InterruptedException JavaDoc e)
225         {
226           String JavaDoc msg = Translate.get(
227               "loadbalancer.backendlist.acquire.writelock.failed", e);
228           logger.error(msg);
229           throw new SQLException JavaDoc(msg);
230         }
231         backendNonBlockingThreads.remove(thread);
232         backendNonBlockingThreadsRWLock.releaseWrite();
233
234         synchronized (thread)
235         {
236           // Kill the thread
237
thread.addPriorityTask(new KillThreadTask(1, 1));
238           thread.notify();
239         }
240         break;
241       }
242     }
243
244     db.disable();
245     if (db.isInitialized())
246       db.finalizeConnections();
247   }
248
249   /**
250    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#getXmlImpl
251    */

252   public String JavaDoc getXmlImpl()
253   {
254     StringBuffer JavaDoc info = new StringBuffer JavaDoc();
255     info.append("<" + DatabasesXmlTags.ELT_RAIDb_1ec + " "
256         + DatabasesXmlTags.ATT_nbOfConcurrentReads + "=\""
257         + this.nbOfConcurrentReads + "\">");
258     this.getRaidb1Xml();
259     if (waitForCompletionPolicy != null)
260       info.append(waitForCompletionPolicy.getXml());
261     info.append("</" + DatabasesXmlTags.ELT_RAIDb_1ec + ">");
262     return info.toString();
263   }
264 }
Popular Tags