KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > continuent > sequoia > controller > loadbalancer > tasks > CallableStatementExecuteQueryTask


1 /**
2  * Sequoia: Database clustering technology.
3  * Copyright (C) 2002-2004 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
6  * Copyright (C) 2005-2006 Continuent, Inc.
7  * Contact: sequoia@continuent.org
8  *
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  * http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  *
21  * Initial developer(s): Emmanuel Cecchet.
22  * Contributor(s): ______________________.
23  */

24
25 package org.continuent.sequoia.controller.loadbalancer.tasks;
26
27 import java.sql.Connection JavaDoc;
28 import java.sql.SQLException JavaDoc;
29
30 import org.continuent.sequoia.common.exceptions.NoTransactionStartWhenDisablingException;
31 import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
32 import org.continuent.sequoia.common.i18n.Translate;
33 import org.continuent.sequoia.common.log.Trace;
34 import org.continuent.sequoia.controller.backend.DatabaseBackend;
35 import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
36 import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
37 import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
38 import org.continuent.sequoia.controller.connection.PooledConnection;
39 import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
40 import org.continuent.sequoia.controller.loadbalancer.BackendWorkerThread;
41 import org.continuent.sequoia.controller.requests.AbstractRequest;
42 import org.continuent.sequoia.controller.requests.StoredProcedure;
43
44 /**
45  * Executes a <code>StoredProcedure</code> call using
46  * CallableStatement.executeQuery() and returns a ResultSet.
47  *
48  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
49  * @version 1.0
50  */

51 public class CallableStatementExecuteQueryTask extends AbstractTask
52 {
53   private StoredProcedure proc;
54   private ControllerResultSet result = null;
55   private MetadataCache metadataCache;
56
57   static Trace endUserLogger = Trace
58                                                 .getLogger("org.continuent.sequoia.enduser");
59
60   /**
61    * Creates a new <code>CallableStatementExecuteQueryTask</code>.
62    *
63    * @param nbToComplete number of threads that must succeed before returning
64    * @param totalNb total number of threads
65    * @param proc the <code>StoredProcedure</code> to call
66    * @param metadataCache the metadataCache if any or null
67    */

68   public CallableStatementExecuteQueryTask(int nbToComplete, int totalNb,
69       StoredProcedure proc, MetadataCache metadataCache)
70   {
71     super(nbToComplete, totalNb, proc.isPersistentConnection(), proc
72         .getPersistentConnectionId());
73     this.proc = proc;
74     this.metadataCache = metadataCache;
75   }
76
77   /**
78    * Call a stored procedure that returns a ResultSet on the given backend
79    * thread.
80    *
81    * @param backendThread the backend thread that will execute the task
82    * @throws SQLException if an error occurs
83    */

84   public void executeTask(BackendWorkerThread backendThread)
85       throws SQLException JavaDoc
86   {
87     DatabaseBackend backend = backendThread.getBackend();
88
89     try
90     {
91       AbstractConnectionManager cm = backend.getConnectionManager(proc
92           .getLogin());
93       if (cm == null)
94       {
95         SQLException JavaDoc se = new SQLException JavaDoc(
96             "No Connection Manager for Virtual Login:" + proc.getLogin());
97         try
98         {
99           notifyFailure(backendThread, -1, se);
100         }
101         catch (SQLException JavaDoc ignore)
102         {
103
104         }
105         throw se;
106       }
107
108       Trace logger = backendThread.getLogger();
109       if (proc.isAutoCommit())
110         executeInAutoCommit(backendThread, backend, cm, logger);
111       else
112         executeInTransaction(backendThread, backend, cm, logger);
113
114       if (result != null)
115         notifySuccess(backendThread);
116     }
117     finally
118     {
119       backend.getTaskQueues().completeStoredProcedureExecution(this);
120     }
121   }
122
123   private void executeInAutoCommit(BackendWorkerThread backendThread,
124       DatabaseBackend backend, AbstractConnectionManager cm, Trace logger)
125       throws SQLException JavaDoc
126   {
127     if (!backend.canAcceptTasks(proc))
128     {
129       // Backend is disabling, we do not execute queries except the one in
130
// the
131
// transaction we already started. Just notify the completion for the
132
// others.
133
notifyCompletion(backendThread);
134       return;
135     }
136
137     // Use a connection just for this request
138
PooledConnection c = null;
139     try
140     {
141       c = cm.retrieveConnectionInAutoCommit(proc);
142     }
143     catch (UnreachableBackendException e1)
144     {
145       SQLException JavaDoc se = new SQLException JavaDoc("Backend " + backend.getName()
146           + " is no more reachable.");
147       try
148       {
149         notifyFailure(backendThread, -1, se);
150       }
151       catch (SQLException JavaDoc ignore)
152       {
153       }
154       // Disable this backend (it is no more in sync) by killing the backend
155
// thread
156
backendThread.getLoadBalancer().disableBackend(backend, true);
157       String JavaDoc msg = Translate.get("loadbalancer.backend.disabling.unreachable",
158           backend.getName());
159       logger.error(msg);
160       endUserLogger.error(msg);
161       throw se;
162     }
163
164     // Sanity check
165
if (c == null)
166     {
167       SQLException JavaDoc se = new SQLException JavaDoc("No more connections");
168       try
169       { // All backends failed, just ignore
170
if (!notifyFailure(backendThread, proc.getTimeout() * 1000L, se))
171           return;
172       }
173       catch (SQLException JavaDoc ignore)
174       {
175       }
176       // Disable this backend (it is no more in sync) by killing the backend
177
// thread
178
backendThread.getLoadBalancer().disableBackend(backend, true);
179       String JavaDoc msg = "Stored procedure '"
180           + proc.getSqlShortForm(backend.getSqlShortFormLength())
181           + "' failed on backend " + backend.getName() + " but " + getSuccess()
182           + " succeeded (" + se + ")";
183       logger.error(msg);
184       endUserLogger.error(Translate.get("loadbalancer.backend.disabling",
185           backend.getName()));
186       throw new SQLException JavaDoc(msg);
187     }
188
189     // Execute Query
190
try
191     {
192       result = AbstractLoadBalancer
193           .executeCallableStatementExecuteQueryOnBackend(proc, backend,
194               backendThread, c.getConnection(), metadataCache);
195
196       backend.setSchemaIsDirtyIfNeeded(proc);
197     }
198     catch (Exception JavaDoc e)
199     {
200       try
201       { // All backends failed, just ignore
202
if (!notifyFailure(backendThread, proc.getTimeout() * 1000L, e))
203         {
204           result = null;
205           return;
206         }
207       }
208       catch (SQLException JavaDoc ignore)
209       {
210       }
211       // Disable this backend (it is no more in sync) by killing the backend
212
// thread
213
backendThread.getLoadBalancer().disableBackend(backend, true);
214       String JavaDoc msg = "Stored procedure '"
215           + proc.getSqlShortForm(backend.getSqlShortFormLength())
216           + "' failed on backend " + backend.getName() + " but " + getSuccess()
217           + " succeeded (" + e + ")";
218       logger.error(msg);
219       endUserLogger.error(Translate.get("loadbalancer.backend.disabling",
220           backend.getName()));
221       throw new SQLException JavaDoc(msg);
222     }
223     finally
224     {
225       cm.releaseConnectionInAutoCommit(proc, c);
226     }
227   }
228
229   private void executeInTransaction(BackendWorkerThread backendThread,
230       DatabaseBackend backend, AbstractConnectionManager cm, Trace logger)
231       throws SQLException JavaDoc
232   {
233     // Re-use the connection used by this transaction
234
Connection JavaDoc c;
235     long tid = proc.getTransactionId();
236
237     try
238     {
239       c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(proc, cm);
240     }
241     catch (UnreachableBackendException ube)
242     {
243       SQLException JavaDoc se = new SQLException JavaDoc("Backend " + backend.getName()
244           + " is no more reachable.");
245       try
246       {
247         notifyFailure(backendThread, -1, se);
248       }
249       catch (SQLException JavaDoc ignore)
250       {
251       }
252       // Disable this backend (it is no more in sync) by killing the backend
253
// thread
254
backendThread.getLoadBalancer().disableBackend(backend, true);
255       String JavaDoc msg = Translate.get("loadbalancer.backend.disabling.unreachable",
256           backend.getName());
257       logger.error(msg);
258       endUserLogger.error(msg);
259       throw se;
260     }
261     catch (NoTransactionStartWhenDisablingException e)
262     {
263       // Backend is disabling, we do not execute queries except the one in
264
// the
265
// transaction we already started. Just notify the completion for the
266
// others.
267
notifyCompletion(backendThread);
268       return;
269     }
270     catch (SQLException JavaDoc e1)
271     {
272       SQLException JavaDoc se = new SQLException JavaDoc(
273           "Unable to get connection for transaction " + tid);
274       try
275       { // All backends failed, just ignore
276
if (!notifyFailure(backendThread, proc.getTimeout() * 1000L, se))
277           return;
278       }
279       catch (SQLException JavaDoc ignore)
280       {
281       }
282       // Disable this backend (it is no more in sync) by killing the
283
// backend thread
284
backendThread.getLoadBalancer().disableBackend(backend, true);
285       String JavaDoc msg = "Request '"
286           + proc.getSqlShortForm(backend.getSqlShortFormLength())
287           + "' failed on backend " + backend.getName() + " but " + getSuccess()
288           + " succeeded (" + se + ")";
289       logger.error(msg);
290       endUserLogger.error(Translate.get("loadbalancer.backend.disabling",
291           backend.getName()));
292       throw new SQLException JavaDoc(msg);
293     }
294
295     // Sanity check
296
if (c == null)
297     { // Bad connection
298
SQLException JavaDoc se = new SQLException JavaDoc(
299           "Unable to retrieve connection for transaction " + tid);
300       try
301       { // All backends failed, just ignore
302
if (!notifyFailure(backendThread, proc.getTimeout() * 1000L, se))
303           return;
304       }
305       catch (SQLException JavaDoc ignore)
306       {
307       }
308       // Disable this backend (it is no more in sync) by killing the
309
// backend thread
310
backendThread.getLoadBalancer().disableBackend(backend, true);
311       String JavaDoc msg = "Request '"
312           + proc.getSqlShortForm(backend.getSqlShortFormLength())
313           + "' failed on backend " + backend.getName() + " but " + getSuccess()
314           + " succeeded (" + se + ")";
315       logger.error(msg);
316       endUserLogger.error(Translate.get("loadbalancer.backend.disabling",
317           backend.getName()));
318       throw new SQLException JavaDoc(msg);
319     }
320
321     // Execute Query
322
try
323     {
324       result = AbstractLoadBalancer
325           .executeCallableStatementExecuteQueryOnBackend(proc, backend,
326               backendThread, c, metadataCache);
327
328       backend.setSchemaIsDirtyIfNeeded(proc);
329     }
330     catch (Exception JavaDoc e)
331     {
332       try
333       { // All backends failed, just ignore
334
if (!notifyFailure(backendThread, proc.getTimeout() * 1000L, e))
335         {
336           result = null;
337           return;
338         }
339       }
340       catch (SQLException JavaDoc ignore)
341       {
342       }
343       // Disable this backend (it is no more in sync) by killing the backend
344
// thread
345
backendThread.getLoadBalancer().disableBackend(backend, true);
346       String JavaDoc msg = "Stored procedure '"
347           + proc.getSqlShortForm(backend.getSqlShortFormLength())
348           + "' failed on backend " + backend.getName() + " but " + getSuccess()
349           + " succeeded (" + e + ")";
350       logger.error(msg);
351       endUserLogger.error(Translate.get("loadbalancer.backend.disabling",
352           backend.getName()));
353       throw new SQLException JavaDoc(msg);
354     }
355   }
356
357   /**
358    * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getRequest()
359    */

360   public AbstractRequest getRequest()
361   {
362     return proc;
363   }
364
365   /**
366    * Returns the result.
367    *
368    * @return a <code>ResultSet</code>
369    */

370   public ControllerResultSet getResult()
371   {
372     return result;
373   }
374
375   /**
376    * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getTransactionId()
377    */

378   public long getTransactionId()
379   {
380     return proc.getTransactionId();
381   }
382
383   /**
384    * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#isAutoCommit()
385    */

386   public boolean isAutoCommit()
387   {
388     return proc.isAutoCommit();
389   }
390
391   /**
392    * @see java.lang.Object#equals(java.lang.Object)
393    */

394   public boolean equals(Object JavaDoc other)
395   {
396     if ((other == null)
397         || !(other instanceof CallableStatementExecuteQueryTask))
398       return false;
399
400     CallableStatementExecuteQueryTask cseqt = (CallableStatementExecuteQueryTask) other;
401     if (proc == null)
402       return cseqt.getRequest() == null;
403     return proc.equals(cseqt.getRequest());
404   }
405
406   /**
407    * @see java.lang.Object#hashCode()
408    */

409   public int hashCode()
410   {
411     return (int) proc.getId();
412   }
413
414   /**
415    * @see java.lang.Object#toString()
416    */

417   public String JavaDoc toString()
418   {
419     if (proc.isAutoCommit())
420       return "Autocommit CallableStatementExecuteQueryTask "
421           + proc.getTransactionId() + " (" + proc.getUniqueKey() + ")";
422     else
423       return "CallableStatementExecuteQueryTask for transaction "
424           + proc.getTransactionId() + " (" + proc.getUniqueKey() + ")";
425   }
426
427 }
Popular Tags