KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > continuent > sequoia > controller > loadbalancer > tasks > StatementExecuteQueryTask


1 /**
2  * Sequoia: Database clustering technology.
3  * Copyright (C) 2002-2004 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Contact: sequoia@continuent.org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18
19
20  * Free Software Foundation; either version 2.1 of the License, or any later
21  * version.
22  *
23  * This library is distributed in the hope that it will be useful, but WITHOUT
24  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
25  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
26  * for more details.
27  *
28  * You should have received a copy of the GNU Lesser General Public License
29  * along with this library; if not, write to the Free Software Foundation,
30  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
31  *
32  * Initial developer(s): Emmanuel Cecchet.
33  * Contributor(s): Julie Marguerite, Jaco Swart.
34  */

35
36 package org.continuent.sequoia.controller.loadbalancer.tasks;
37
38 import java.sql.Connection JavaDoc;
39 import java.sql.SQLException JavaDoc;
40
41 import org.continuent.sequoia.common.exceptions.NoTransactionStartWhenDisablingException;
42 import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
43 import org.continuent.sequoia.common.i18n.Translate;
44 import org.continuent.sequoia.common.log.Trace;
45 import org.continuent.sequoia.controller.backend.DatabaseBackend;
46 import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
47 import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
48 import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
49 import org.continuent.sequoia.controller.connection.PooledConnection;
50 import org.continuent.sequoia.controller.core.ControllerConstants;
51 import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
52 import org.continuent.sequoia.controller.loadbalancer.BackendWorkerThread;
53 import org.continuent.sequoia.controller.requests.AbstractRequest;
54 import org.continuent.sequoia.controller.requests.SelectRequest;
55
56 /**
57  * Executes a <code>SELECT</code> statement.
58  *
59  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
60  * @author <a HREF="mailto:Julie.Marguerite@inria.fr">Julie Marguerite </a>
61  * @author <a HREF="mailto:jaco.swart@iblocks.co.uk">Jaco Swart </a>
62  * @version 1.0
63  */

64 public class StatementExecuteQueryTask extends AbstractTask
65 {
66   private SelectRequest request;
67   private MetadataCache metadataCache;
68   private ControllerResultSet result = null;
69
70   static Trace endUserLogger = Trace
71                                                 .getLogger("org.continuent.sequoia.enduser");
72
73   /**
74    * Creates a new <code>StatementExecuteUpdateTask</code> instance.
75    *
76    * @param nbToComplete number of threads that must succeed before returning
77    * @param totalNb total number of threads
78    * @param request an <code>AbstractWriteRequest</code>
79    * @param metadataCache MetadataCache (null if none)
80    */

81   public StatementExecuteQueryTask(int nbToComplete, int totalNb,
82       SelectRequest request, MetadataCache metadataCache)
83   {
84     super(nbToComplete, totalNb, request.isPersistentConnection(), request
85         .getPersistentConnectionId());
86     this.request = request;
87     this.metadataCache = metadataCache;
88   }
89
90   /**
91    * Executes a write request with the given backend thread
92    *
93    * @param backendThread the backend thread that will execute the task
94    * @throws SQLException if an error occurs
95    */

96   public void executeTask(BackendWorkerThread backendThread)
97       throws SQLException JavaDoc
98   {
99     DatabaseBackend backend = backendThread.getBackend();
100
101     try
102     {
103       AbstractConnectionManager cm = backend.getConnectionManager(request
104           .getLogin());
105       if (cm == null)
106       {
107         SQLException JavaDoc se = new SQLException JavaDoc(
108             "No Connection Manager for Virtual Login:" + request.getLogin());
109         try
110         {
111           notifyFailure(backendThread, -1, se);
112         }
113         catch (SQLException JavaDoc ignore)
114         {
115
116         }
117         throw se;
118       }
119
120       Trace logger = backendThread.getLogger();
121       if (request.isAutoCommit())
122         executeInAutoCommit(backendThread, backend, cm, logger);
123       else
124         executeInTransaction(backendThread, backend, cm, logger);
125
126       if (result != null)
127         notifySuccess(backendThread);
128     }
129     finally
130     {
131       backend.getTaskQueues().completeWriteRequestExecution(this);
132     }
133   }
134
135   private void executeInAutoCommit(BackendWorkerThread backendThread,
136       DatabaseBackend backend, AbstractConnectionManager cm, Trace logger)
137       throws SQLException JavaDoc
138   {
139     if (!backend.canAcceptTasks(request))
140     {
141       // Backend is disabling, we do not execute queries except the one in the
142
// transaction we already started. Just notify the completion for the
143
// others.
144
notifyCompletion(backendThread);
145       return;
146     }
147
148     // Use a connection just for this request
149
PooledConnection c = null;
150     try
151     {
152       c = cm.retrieveConnectionInAutoCommit(request);
153     }
154     catch (UnreachableBackendException e1)
155     {
156       SQLException JavaDoc se = new SQLException JavaDoc("Backend " + backend.getName()
157           + " is no more reachable.");
158       try
159       {
160         notifyFailure(backendThread, -1, se);
161       }
162       catch (SQLException JavaDoc ignore)
163       {
164       }
165       // Disable this backend (it is no more in sync) by killing the backend
166
// thread
167
backendThread.getLoadBalancer().disableBackend(backend, true);
168       String JavaDoc msg = Translate.get(
169           "loadbalancer.backend.disabling.unreachable", backend.getName());
170       logger.error(msg);
171       endUserLogger.error(msg);
172       throw se;
173     }
174
175     // Sanity check
176
if (c == null)
177     {
178       SQLException JavaDoc se = new SQLException JavaDoc("No more connections");
179       try
180       { // All backends failed, just ignore
181
if (!notifyFailure(backendThread, request.getTimeout() * 1000L, se))
182           return;
183       }
184       catch (SQLException JavaDoc ignore)
185       {
186       }
187       // Disable this backend (it is no more in sync) by killing the backend
188
// thread
189
backendThread.getLoadBalancer().disableBackend(backend, true);
190       endUserLogger.error(Translate.get(
191           "loadbalancer.backend.disabling", backend.getName()));
192       throw new SQLException JavaDoc("Request '"
193           + request.getSqlShortForm(ControllerConstants.SQL_SHORT_FORM_LENGTH)
194           + "' failed on backend " + backend.getName() + " (" + se + ")");
195     }
196
197     // Execute Query
198
try
199     {
200       result = AbstractLoadBalancer.executeStatementExecuteQueryOnBackend(
201           request, backend, backendThread, c.getConnection(), metadataCache);
202     }
203     catch (Throwable JavaDoc e)
204     {
205       try
206       { // All backends failed, just ignore
207
if (!notifyFailure(backendThread, request.getTimeout() * 1000L, e))
208         {
209           result = null;
210           return;
211         }
212       }
213       catch (SQLException JavaDoc ignore)
214       {
215       }
216       throw new SQLException JavaDoc("Request '"
217           + request.getSqlShortForm(ControllerConstants.SQL_SHORT_FORM_LENGTH)
218           + "' failed on backend " + backend.getName() + " (" + e + ")");
219     }
220     finally
221     {
222       cm.releaseConnectionInAutoCommit(request, c);
223     }
224   }
225
226   private void executeInTransaction(BackendWorkerThread backendThread,
227       DatabaseBackend backend, AbstractConnectionManager cm, Trace logger)
228       throws SQLException JavaDoc
229   {
230     Connection JavaDoc c;
231     long tid = request.getTransactionId();
232
233     try
234     {
235       c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(request, cm);
236     }
237     catch (UnreachableBackendException ube)
238     {
239       SQLException JavaDoc se = new SQLException JavaDoc("Backend " + backend.getName()
240           + " is no more reachable.");
241       try
242       {
243         notifyFailure(backendThread, -1, se);
244       }
245       catch (SQLException JavaDoc ignore)
246       {
247       }
248       // Disable this backend (it is no more in sync) by killing the backend
249
// thread
250
backendThread.getLoadBalancer().disableBackend(backend, true);
251       String JavaDoc msg = Translate.get(
252           "loadbalancer.backend.disabling.unreachable", backend.getName());
253       logger.error(msg);
254       endUserLogger.error(msg);
255       throw se;
256     }
257     catch (NoTransactionStartWhenDisablingException e)
258     {
259       // Backend is disabling, we do not execute queries except the one in the
260
// transaction we already started. Just notify the completion for the
261
// others.
262
notifyCompletion(backendThread);
263       return;
264     }
265     catch (SQLException JavaDoc e1)
266     {
267       SQLException JavaDoc se = new SQLException JavaDoc(
268           "Unable to get connection for transaction " + tid);
269       try
270       { // All backends failed, just ignore
271
if (!notifyFailure(backendThread, request.getTimeout() * 1000L, se))
272           return;
273       }
274       catch (SQLException JavaDoc ignore)
275       {
276       }
277       // Disable this backend (it is no more in sync) by killing the
278
// backend thread
279
backendThread.getLoadBalancer().disableBackend(backend, true);
280       String JavaDoc msg = "Request '"
281           + request.getSqlShortForm(backend.getSqlShortFormLength())
282           + "' failed on backend " + backend.getName() + " but " + getSuccess()
283           + " succeeded (" + se + ")";
284       logger.error(msg);
285       endUserLogger.error(Translate.get(
286           "loadbalancer.backend.disabling", backend.getName()));
287       throw new SQLException JavaDoc(msg);
288     }
289
290     // Sanity check
291
if (c == null)
292     { // Bad connection
293
SQLException JavaDoc se = new SQLException JavaDoc(
294           "Unable to retrieve connection for transaction " + tid);
295       try
296       { // All backends failed, just ignore
297
if (!notifyFailure(backendThread, request.getTimeout() * 1000L, se))
298           return;
299       }
300       catch (SQLException JavaDoc ignore)
301       {
302       }
303       // Disable this backend (it is no more in sync) by killing the
304
// backend thread
305
backendThread.getLoadBalancer().disableBackend(backend, true);
306       String JavaDoc msg = "Request '"
307           + request.getSqlShortForm(backend.getSqlShortFormLength())
308           + "' failed on backend " + backend.getName() + " but " + getSuccess()
309           + " succeeded (" + se + ")";
310       logger.error(msg);
311       endUserLogger.error(Translate.get(
312           "loadbalancer.backend.disabling", backend.getName()));
313       throw new SQLException JavaDoc(msg);
314     }
315
316     // Execute Query
317
try
318     {
319       result = AbstractLoadBalancer.executeStatementExecuteQueryOnBackend(
320           request, backend, backendThread, c, metadataCache);
321     }
322     catch (Throwable JavaDoc e)
323     {
324       try
325       { // All backends failed, just ignore
326
if (!notifyFailure(backendThread, request.getTimeout() * 1000L, e))
327         {
328           result = null;
329           return;
330         }
331       }
332       catch (SQLException JavaDoc ignore)
333       {
334       }
335       throw new SQLException JavaDoc("Request '"
336           + request.getSqlShortForm(ControllerConstants.SQL_SHORT_FORM_LENGTH)
337           + "' failed on backend " + backend.getName() + " (" + e + ")");
338     }
339   }
340
341   /**
342    * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getRequest()
343    */

344   public AbstractRequest getRequest()
345   {
346     return request;
347   }
348
349   /**
350    * Returns the result.
351    *
352    * @return a <code>ResultSet</code>
353    */

354   public ControllerResultSet getResult()
355   {
356     return result;
357   }
358
359   /**
360    * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getTransactionId()
361    */

362   public long getTransactionId()
363   {
364     return request.getTransactionId();
365   }
366
367   /**
368    * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#isAutoCommit()
369    */

370   public boolean isAutoCommit()
371   {
372     return request.isAutoCommit();
373   }
374
375   /**
376    * @see java.lang.Object#equals(java.lang.Object)
377    */

378   public boolean equals(Object JavaDoc other)
379   {
380     if ((other == null) || !(other instanceof StatementExecuteQueryTask))
381       return false;
382
383     StatementExecuteQueryTask seqt = (StatementExecuteQueryTask) other;
384     return this.request.equals(seqt.getRequest());
385   }
386
387   /**
388    * @see java.lang.Object#hashCode()
389    */

390   public int hashCode()
391   {
392     return (int) request.getId();
393   }
394
395   /**
396    * @see java.lang.Object#toString()
397    */

398   public String JavaDoc toString()
399   {
400     if (request.isAutoCommit())
401       return "Autocommit StatementExecuteQueryTask (" + request.getUniqueKey()
402           + ")";
403     else
404       return "StatementExecuteQueryTask from transaction "
405           + request.getTransactionId() + " (" + request.getUniqueKey() + ")";
406   }
407 }
Popular Tags