KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > continuent > sequoia > controller > loadbalancer > tasks > CallableStatementExecuteTask


1 /**
2  * Sequoia: Database clustering technology.
3  * Copyright (C) 2002-2004 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
6  * Copyright (C) 2005-2006 Continuent, Inc.
7  * Contact: sequoia@continuent.org
8  *
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  * http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  *
21  * Initial developer(s): Emmanuel Cecchet.
22  * Contributor(s): ______________________.
23  */

24
25 package org.continuent.sequoia.controller.loadbalancer.tasks;
26
27 import java.sql.Connection JavaDoc;
28 import java.sql.SQLException JavaDoc;
29
30 import org.continuent.sequoia.common.exceptions.NoTransactionStartWhenDisablingException;
31 import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
32 import org.continuent.sequoia.common.i18n.Translate;
33 import org.continuent.sequoia.common.log.Trace;
34 import org.continuent.sequoia.controller.backend.DatabaseBackend;
35 import org.continuent.sequoia.controller.backend.result.ExecuteResult;
36 import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
37 import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
38 import org.continuent.sequoia.controller.connection.PooledConnection;
39 import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
40 import org.continuent.sequoia.controller.loadbalancer.BackendWorkerThread;
41 import org.continuent.sequoia.controller.requests.AbstractRequest;
42 import org.continuent.sequoia.controller.requests.StoredProcedure;
43
44 /**
45  * Executes a <code>StoredProcedure</code> call using
46  * CallableStatement.execute() and returns multiple results stored in an
47  * ExecuteResult object.
48  *
49  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
50  * @version 1.0
51  */

52 public class CallableStatementExecuteTask extends AbstractTask
53 {
54   private StoredProcedure proc;
55   private ExecuteResult result;
56   private MetadataCache metadataCache;
57
58   static Trace endUserLogger = Trace
59                                             .getLogger("org.continuent.sequoia.enduser");
60
61   /**
62    * Creates a new <code>CallableStatementExecuteQueryTask</code>.
63    *
64    * @param nbToComplete number of threads that must succeed before returning
65    * @param totalNb total number of threads
66    * @param proc the <code>StoredProcedure</code> to call
67    * @param metadataCache the metadataCache if any or null
68    */

69   public CallableStatementExecuteTask(int nbToComplete, int totalNb,
70       StoredProcedure proc, MetadataCache metadataCache)
71   {
72     super(nbToComplete, totalNb, proc.isPersistentConnection(), proc
73         .getPersistentConnectionId());
74     this.proc = proc;
75     this.metadataCache = metadataCache;
76   }
77
78   /**
79    * Call a stored procedure that returns a ResultSet on the given backend
80    * thread.
81    *
82    * @param backendThread the backend thread that will execute the task
83    * @throws SQLException if an error occurs
84    */

85   public void executeTask(BackendWorkerThread backendThread)
86       throws SQLException JavaDoc
87   {
88     DatabaseBackend backend = backendThread.getBackend();
89
90     try
91     {
92       AbstractConnectionManager cm = backend.getConnectionManager(proc
93           .getLogin());
94       if (cm == null)
95       {
96         SQLException JavaDoc se = new SQLException JavaDoc(
97             "No Connection Manager for Virtual Login:" + proc.getLogin());
98         try
99         {
100           notifyFailure(backendThread, -1, se);
101         }
102         catch (SQLException JavaDoc ignore)
103         {
104
105         }
106         throw se;
107       }
108
109       Trace logger = backendThread.getLogger();
110       if (proc.isAutoCommit())
111         executeInAutoCommit(backendThread, backend, cm, logger);
112       else
113         executeInTransaction(backendThread, backend, cm, logger);
114
115       if (result != null)
116         notifySuccess(backendThread);
117     }
118     finally
119     {
120       backend.getTaskQueues().completeStoredProcedureExecution(this);
121     }
122   }
123
124   private void executeInAutoCommit(BackendWorkerThread backendThread,
125       DatabaseBackend backend, AbstractConnectionManager cm, Trace logger)
126       throws SQLException JavaDoc
127   {
128     if (!backend.canAcceptTasks(proc))
129     {
130       // Backend is disabling, we do not execute queries except the one in the
131
// transaction we already started. Just notify the completion for the
132
// others.
133
notifyCompletion(backendThread);
134       return;
135     }
136
137     // Use a connection just for this request
138
PooledConnection c = null;
139     try
140     {
141       c = cm.retrieveConnectionInAutoCommit(proc);
142     }
143     catch (UnreachableBackendException e1)
144     {
145       SQLException JavaDoc se = new SQLException JavaDoc("Backend " + backend.getName()
146           + " is no more reachable.");
147       try
148       {
149         notifyFailure(backendThread, -1, se);
150       }
151       catch (SQLException JavaDoc ignore)
152       {
153       }
154       // Disable this backend (it is no more in sync) by killing the backend
155
// thread
156
backendThread.getLoadBalancer().disableBackend(backend, true);
157       String JavaDoc msg = Translate.get("loadbalancer.backend.disabling.unreachable",
158           backend.getName());
159       logger.error(msg);
160       endUserLogger.error(msg);
161       throw se;
162     }
163
164     // Sanity check
165
if (c == null)
166     {
167       SQLException JavaDoc se = new SQLException JavaDoc("No more connections");
168       try
169       { // All backends failed, just ignore
170
if (!notifyFailure(backendThread, proc.getTimeout() * 1000L, se))
171           return;
172       }
173       catch (SQLException JavaDoc ignore)
174       {
175       }
176       // Disable this backend (it is no more in sync) by killing the backend
177
// thread
178
backendThread.getLoadBalancer().disableBackend(backend, true);
179       String JavaDoc msg = "Stored procedure '"
180           + proc.getSqlShortForm(backend.getSqlShortFormLength())
181           + "' failed on backend " + backend.getName() + " but " + getSuccess()
182           + " succeeded (" + se + ")";
183       logger.error(msg, se);
184       endUserLogger.error(Translate.get("loadbalancer.backend.disabling",
185           backend.getName()));
186       throw (SQLException JavaDoc) new SQLException JavaDoc(msg).initCause(se);
187     }
188
189     // Execute Query
190
try
191     {
192       result = AbstractLoadBalancer.executeCallableStatementExecuteOnBackend(
193           proc, backend, backendThread, c.getConnection(), metadataCache);
194
195       backend.setSchemaIsDirtyIfNeeded(proc);
196     }
197     catch (Exception JavaDoc e)
198     {
199       try
200       { // All backends failed, just ignore
201
if (!notifyFailure(backendThread, proc.getTimeout() * 1000L, e))
202         {
203           result = null;
204           return;
205         }
206       }
207       catch (SQLException JavaDoc ignore)
208       {
209       }
210       // Disable this backend (it is no more in sync) by killing the backend
211
// thread
212
backendThread.getLoadBalancer().disableBackend(backend, true);
213       String JavaDoc msg = "Stored procedure '"
214           + proc.getSqlShortForm(backend.getSqlShortFormLength())
215           + "' failed on backend " + backend.getName() + " but " + getSuccess()
216           + " succeeded (" + e + ")";
217       logger.error(msg, e);
218       endUserLogger.error(Translate.get("loadbalancer.backend.disabling",
219           backend.getName()));
220       throw (SQLException JavaDoc) new SQLException JavaDoc(msg).initCause(e);
221     }
222     finally
223     {
224       cm.releaseConnectionInAutoCommit(proc, c);
225     }
226   }
227
228   private void executeInTransaction(BackendWorkerThread backendThread,
229       DatabaseBackend backend, AbstractConnectionManager cm, Trace logger)
230       throws SQLException JavaDoc
231   {
232     // Re-use the connection used by this transaction
233
Connection JavaDoc c;
234     long tid = proc.getTransactionId();
235
236     try
237     {
238       c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(proc, cm);
239     }
240     catch (UnreachableBackendException ube)
241     {
242       SQLException JavaDoc se = new SQLException JavaDoc("Backend " + backend.getName()
243           + " is no more reachable.");
244       try
245       {
246         notifyFailure(backendThread, -1, se);
247       }
248       catch (SQLException JavaDoc ignore)
249       {
250       }
251       // Disable this backend (it is no more in sync) by killing the backend
252
// thread
253
backendThread.getLoadBalancer().disableBackend(backend, true);
254       logger.error("Disabling backend " + backend.getName()
255           + " because it is no more reachable.");
256       endUserLogger.error(Translate.get(
257           "loadbalancer.backend.disabling.unreachable", backend.getName()));
258       throw se;
259     }
260     catch (NoTransactionStartWhenDisablingException e)
261     {
262       // Backend is disabling, we do not execute queries except the one in the
263
// transaction we already started. Just notify the completion for the
264
// others.
265
notifyCompletion(backendThread);
266       return;
267     }
268     catch (SQLException JavaDoc e1)
269     {
270       SQLException JavaDoc se = (SQLException JavaDoc) new SQLException JavaDoc(
271           "Unable to get connection for transaction " + tid).initCause(e1);
272       try
273       { // All backends failed, just ignore
274
if (!notifyFailure(backendThread, proc.getTimeout() * 1000L, se))
275           return;
276       }
277       catch (SQLException JavaDoc ignore)
278       {
279       }
280       // Disable this backend (it is no more in sync) by killing the
281
// backend thread
282
backendThread.getLoadBalancer().disableBackend(backend, true);
283       String JavaDoc msg = "Request '"
284           + proc.getSqlShortForm(backend.getSqlShortFormLength())
285           + "' failed on backend " + backend.getName() + " but " + getSuccess()
286           + " succeeded (" + se + ")";
287       logger.error(msg, se);
288       endUserLogger.error(Translate.get("loadbalancer.backend.disabling",
289           backend.getName()));
290       throw (SQLException JavaDoc) new SQLException JavaDoc(msg).initCause(se);
291     }
292
293     // Sanity check
294
if (c == null)
295     { // Bad connection
296
SQLException JavaDoc se = new SQLException JavaDoc(
297           "Unable to retrieve connection (null) for transaction " + tid);
298       try
299       { // All backends failed, just ignore
300
if (!notifyFailure(backendThread, proc.getTimeout() * 1000L, se))
301           return;
302       }
303       catch (SQLException JavaDoc ignore)
304       {
305       }
306       // Disable this backend (it is no more in sync) by killing the
307
// backend thread
308
backendThread.getLoadBalancer().disableBackend(backend, true);
309       String JavaDoc msg = "Request '"
310           + proc.getSqlShortForm(backend.getSqlShortFormLength())
311           + "' failed on backend " + backend.getName() + " but " + getSuccess()
312           + " succeeded (" + se + ")";
313       logger.error(msg, se);
314       endUserLogger.error(Translate.get("loadbalancer.backend.disabling",
315           backend.getName()));
316       throw (SQLException JavaDoc) new SQLException JavaDoc(msg).initCause(se);
317     }
318
319     // Execute Query
320
try
321     {
322       result = AbstractLoadBalancer.executeCallableStatementExecuteOnBackend(
323           proc, backend, backendThread, c, metadataCache);
324
325       backend.setSchemaIsDirtyIfNeeded(proc);
326     }
327     catch (Exception JavaDoc e)
328     {
329       try
330       { // All backends failed, just ignore
331
if (!notifyFailure(backendThread, proc.getTimeout() * 1000L, e))
332         {
333           result = null;
334           return;
335         }
336       }
337       catch (SQLException JavaDoc ignore)
338       {
339       }
340       // Disable this backend (it is no more in sync) by killing the backend
341
// thread
342
backendThread.getLoadBalancer().disableBackend(backend, true);
343       String JavaDoc msg = "Stored procedure '"
344           + proc.getSqlShortForm(backend.getSqlShortFormLength())
345           + "' failed on backend " + backend.getName() + " but " + getSuccess()
346           + " succeeded (" + e + ")";
347       logger.error(msg, e);
348       endUserLogger.error(Translate.get("loadbalancer.backend.disabling",
349           backend.getName()));
350       throw (SQLException JavaDoc) new SQLException JavaDoc(msg).initCause(e);
351     }
352   }
353
354   /**
355    * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getRequest()
356    */

357   public AbstractRequest getRequest()
358   {
359     return proc;
360   }
361
362   /**
363    * Returns the results.
364    *
365    * @return an <code>ExecuteResult</code> object
366    */

367   public ExecuteResult getResult()
368   {
369     return result;
370   }
371
372   /**
373    * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getTransactionId()
374    */

375   public long getTransactionId()
376   {
377     return proc.getTransactionId();
378   }
379
380   /**
381    * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#isAutoCommit()
382    */

383   public boolean isAutoCommit()
384   {
385     return proc.isAutoCommit();
386   }
387
388   /**
389    * @see java.lang.Object#equals(java.lang.Object)
390    */

391   public boolean equals(Object JavaDoc other)
392   {
393     if ((other == null) || !(other instanceof CallableStatementExecuteTask))
394       return false;
395
396     CallableStatementExecuteTask cset = (CallableStatementExecuteTask) other;
397     if (proc == null)
398       return cset.getRequest() == null;
399     return proc.equals(cset.getRequest());
400   }
401
402   /**
403    * @see java.lang.Object#hashCode()
404    */

405   public int hashCode()
406   {
407     return (int) proc.getId();
408   }
409
410   /**
411    * @see java.lang.Object#toString()
412    */

413   public String JavaDoc toString()
414   {
415     if (proc.isAutoCommit())
416       return "Autocommit CallableStatementExecuteTask "
417           + proc.getTransactionId() + " (" + proc.getUniqueKey() + ")";
418     else
419       return "CallableStatementExecuteTask for transaction "
420           + proc.getTransactionId() + " (" + proc.getUniqueKey() + ")";
421   }
422
423 }
Popular Tags