KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > continuent > sequoia > controller > loadbalancer > tasks > CallableStatementExecuteUpdateTask


1 /**
2  * Sequoia: Database clustering technology.
3  * Copyright (C) 2002-2004 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Copyright (C) 2005-2006 Continuent, Inc.
6  * Contact: sequoia@continuent.org
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  * Initial developer(s): Emmanuel Cecchet.
21  * Contributor(s): ______________________.
22  */

23
24 package org.continuent.sequoia.controller.loadbalancer.tasks;
25
26 import java.sql.Connection JavaDoc;
27 import java.sql.SQLException JavaDoc;
28
29 import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
30 import org.continuent.sequoia.common.i18n.Translate;
31 import org.continuent.sequoia.common.log.Trace;
32 import org.continuent.sequoia.controller.backend.DatabaseBackend;
33 import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
34 import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
35 import org.continuent.sequoia.controller.connection.PooledConnection;
36 import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
37 import org.continuent.sequoia.controller.loadbalancer.BackendWorkerThread;
38 import org.continuent.sequoia.controller.requests.AbstractRequest;
39 import org.continuent.sequoia.controller.requests.StoredProcedure;
40
41 /**
42  * Executes a write <code>StoredProcedure</code> call.
43  *
44  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
45  * @version 1.0
46  */

47 public class CallableStatementExecuteUpdateTask extends AbstractTask
48 {
49   private StoredProcedure proc;
50   private ExecuteUpdateResult result;
51
52   static Trace endUserLogger = Trace
53                                                 .getLogger("org.continuent.sequoia.enduser");
54
55   /**
56    * Creates a new <code>CallableStatementExecuteUpdateTask</code>.
57    *
58    * @param nbToComplete number of threads that must succeed before returning
59    * @param totalNb total number of threads
60    * @param proc the <code>StoredProcedure</code> to call
61    */

62   public CallableStatementExecuteUpdateTask(int nbToComplete, int totalNb,
63       StoredProcedure proc)
64   {
65     super(nbToComplete, totalNb, proc.isPersistentConnection(), proc
66         .getPersistentConnectionId());
67     this.proc = proc;
68   }
69
70   /**
71    * Executes a write request with the given backend thread.
72    *
73    * @param backendThread the backend thread that will execute the task
74    * @throws SQLException if an error occurs
75    */

76   public void executeTask(BackendWorkerThread backendThread)
77       throws SQLException JavaDoc
78   {
79     DatabaseBackend backend = backendThread.getBackend();
80
81     try
82     {
83       AbstractConnectionManager cm = backend.getConnectionManager(proc
84           .getLogin());
85       if (cm == null)
86       {
87         SQLException JavaDoc se = new SQLException JavaDoc(
88             "No Connection Manager for Virtual Login:" + proc.getLogin());
89         try
90         {
91           notifyFailure(backendThread, -1, se);
92         }
93         catch (SQLException JavaDoc ignore)
94         {
95
96         }
97         throw se;
98       }
99
100       Trace logger = backendThread.getLogger();
101       if (proc.isAutoCommit())
102         executeInAutoCommit(backendThread, backend, cm, logger);
103       else
104         executeInTransaction(backendThread, backend, cm, logger);
105
106       if (result == null)
107         return; // failure already handled, just return
108

109       int resultOnFirstBackendToSucceed = notifySuccess(backendThread, result
110           .getUpdateCount());
111       if (resultOnFirstBackendToSucceed != result.getUpdateCount())
112       {
113         String JavaDoc msg = "Disabling backend " + backend.getName()
114             + " that reports a different number of updated rows ("
115             + result.getUpdateCount() + ") than first backend to succeed ("
116             + resultOnFirstBackendToSucceed + ") for stored procedure " + proc;
117         logger.error(msg);
118         // Disable this backend (it is no more in sync)
119
backendThread.getLoadBalancer().disableBackend(backend, true);
120         endUserLogger.error(Translate.get("loadbalancer.backend.disabling",
121             backend.getName()));
122         throw new SQLException JavaDoc(msg);
123       }
124     }
125     finally
126     {
127       backend.getTaskQueues().completeStoredProcedureExecution(this);
128     }
129   }
130
131   private void executeInAutoCommit(BackendWorkerThread backendThread,
132       DatabaseBackend backend, AbstractConnectionManager cm, Trace logger)
133       throws SQLException JavaDoc
134   {
135     if (!backend.canAcceptTasks(proc))
136     {
137       // Backend is disabling, we do not execute queries except the one in the
138
// transaction we already started. Just notify the completion for the
139
// others.
140
notifyCompletion(backendThread);
141       return;
142     }
143
144     // Use a connection just for this request
145
PooledConnection c = null;
146     try
147     {
148       c = cm.retrieveConnectionInAutoCommit(proc);
149     }
150     catch (UnreachableBackendException e1)
151     {
152       SQLException JavaDoc se = new SQLException JavaDoc("Backend " + backend.getName()
153           + " is no more reachable.");
154       try
155       {
156         notifyFailure(backendThread, -1, se);
157       }
158       catch (SQLException JavaDoc ignore)
159       {
160       }
161       // Disable this backend (it is no more in sync) by killing the backend
162
// thread
163
backendThread.getLoadBalancer().disableBackend(backend, true);
164       String JavaDoc msg = Translate.get("loadbalancer.backend.disabling.unreachable",
165           backend.getName());
166       logger.error(msg);
167       endUserLogger.error(msg);
168       throw se;
169     }
170
171     // Sanity check
172
if (c == null)
173     {
174       SQLException JavaDoc se = new SQLException JavaDoc("No more connections");
175       try
176       { // All backends failed, just ignore
177
if (!notifyFailure(backendThread, proc.getTimeout() * 1000L, se))
178           return;
179       }
180       catch (SQLException JavaDoc ignore)
181       {
182       }
183       // Disable this backend (it is no more in sync) by killing the backend
184
// thread
185
backendThread.getLoadBalancer().disableBackend(backend, true);
186       String JavaDoc msg = "Stored procedure '"
187           + proc.getSqlShortForm(backend.getSqlShortFormLength())
188           + "' failed on backend " + backend.getName() + " but " + getSuccess()
189           + " succeeded (" + se + ")";
190       logger.error(msg);
191       endUserLogger.error(Translate.get("loadbalancer.backend.disabling",
192           backend.getName()));
193       throw new SQLException JavaDoc(msg);
194     }
195
196     // Execute Query
197
try
198     {
199       result = AbstractLoadBalancer
200           .executeCallableStatementExecuteUpdateOnBackend(proc, backend,
201               backendThread, c.getConnection());
202
203       backend.setSchemaIsDirtyIfNeeded(proc);
204     }
205     catch (Exception JavaDoc e)
206     {
207       try
208       { // All backends failed, just ignore
209
if (!notifyFailure(backendThread, proc.getTimeout() * 1000L, e))
210         {
211           result = null;
212           return;
213         }
214       }
215       catch (SQLException JavaDoc ignore)
216       {
217       }
218       // Disable this backend (it is no more in sync) by killing the backend
219
// thread
220
backendThread.getLoadBalancer().disableBackend(backend, true);
221       String JavaDoc msg = "Stored procedure '"
222           + proc.getSqlShortForm(backend.getSqlShortFormLength())
223           + "' failed on backend " + backend.getName() + " but " + getSuccess()
224           + " succeeded (" + e + ")";
225       logger.error(msg);
226       endUserLogger.error(Translate.get("loadbalancer.backend.disabling",
227           backend.getName()));
228       throw new SQLException JavaDoc(msg);
229     }
230     finally
231     {
232       cm.releaseConnectionInAutoCommit(proc, c);
233     }
234   }
235
236   private void executeInTransaction(BackendWorkerThread backendThread,
237       DatabaseBackend backend, AbstractConnectionManager cm, Trace logger)
238       throws SQLException JavaDoc
239   {
240     // Re-use the connection used by this transaction
241
Connection JavaDoc c;
242     long tid = proc.getTransactionId();
243
244     try
245     {
246       c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(proc, cm);
247     }
248     catch (UnreachableBackendException ube)
249     {
250       SQLException JavaDoc se = new SQLException JavaDoc("Backend " + backend.getName()
251           + " is no more reachable.");
252       try
253       {
254         notifyFailure(backendThread, -1, se);
255       }
256       catch (SQLException JavaDoc ignore)
257       {
258       }
259       // Disable this backend (it is no more in sync) by killing the backend
260
// thread
261
backendThread.getLoadBalancer().disableBackend(backend, true);
262       String JavaDoc msg = Translate.get("loadbalancer.backend.disabling.unreachable",
263           backend.getName());
264       logger.error(msg);
265       endUserLogger.error(msg);
266       throw se;
267     }
268     catch (SQLException JavaDoc e1)
269     {
270       SQLException JavaDoc se = new SQLException JavaDoc(
271           "Unable to get connection for transaction " + tid);
272       try
273       { // All backends failed, just ignore
274
if (!notifyFailure(backendThread, proc.getTimeout() * 1000L, se))
275           return;
276       }
277       catch (SQLException JavaDoc ignore)
278       {
279       }
280       // Disable this backend (it is no more in sync) by killing the
281
// backend thread
282
backendThread.getLoadBalancer().disableBackend(backend, true);
283       String JavaDoc msg = "Request '"
284           + proc.getSqlShortForm(backend.getSqlShortFormLength())
285           + "' failed on backend " + backend.getName() + " but " + getSuccess()
286           + " succeeded (" + se + ")";
287       logger.error(msg);
288       endUserLogger.error(Translate.get("loadbalancer.backend.disabling",
289           backend.getName()));
290       throw new SQLException JavaDoc(msg);
291     }
292
293     // Sanity check
294
if (c == null)
295     { // Bad connection
296
SQLException JavaDoc se = new SQLException JavaDoc(
297           "Unable to retrieve connection for transaction " + tid);
298       try
299       { // All backends failed, just ignore
300
if (!notifyFailure(backendThread, proc.getTimeout() * 1000L, se))
301           return;
302       }
303       catch (SQLException JavaDoc ignore)
304       {
305       }
306       // Disable this backend (it is no more in sync) by killing the
307
// backend thread
308
backendThread.getLoadBalancer().disableBackend(backend, true);
309       String JavaDoc msg = "Request '"
310           + proc.getSqlShortForm(backend.getSqlShortFormLength())
311           + "' failed on backend " + backend.getName() + " but " + getSuccess()
312           + " succeeded (" + se + ")";
313       logger.error(msg);
314       endUserLogger.error(Translate.get("loadbalancer.backend.disabling",
315           backend.getName()));
316       throw new SQLException JavaDoc(msg);
317     }
318
319     // Execute Query
320
try
321     {
322       result = AbstractLoadBalancer
323           .executeCallableStatementExecuteUpdateOnBackend(proc, backend,
324               backendThread, c);
325
326       backend.setSchemaIsDirtyIfNeeded(proc);
327     }
328     catch (Exception JavaDoc e)
329     {
330       try
331       { // All backends failed, just ignore
332
if (!notifyFailure(backendThread, proc.getTimeout() * 1000L, e))
333         {
334           result = null;
335           return;
336         }
337       }
338       catch (SQLException JavaDoc ignore)
339       {
340       }
341       // Disable this backend (it is no more in sync) by killing the backend
342
// thread
343
backendThread.getLoadBalancer().disableBackend(backend, true);
344       String JavaDoc msg = "Stored procedure '"
345           + proc.getSqlShortForm(backend.getSqlShortFormLength())
346           + "' failed on backend " + backend.getName() + " but " + getSuccess()
347           + " succeeded (" + e + ")";
348       logger.error(msg);
349       endUserLogger.error(Translate.get("loadbalancer.backend.disabling",
350           backend.getName()));
351       throw new SQLException JavaDoc(msg);
352     }
353   }
354
355   /**
356    * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getRequest()
357    */

358   public AbstractRequest getRequest()
359   {
360     return proc;
361   }
362
363   /**
364    * Returns the result.
365    *
366    * @return updateCount wrapped into a <code>ExecuteUpdateResult</code>
367    */

368   public ExecuteUpdateResult getResult()
369   {
370     return result;
371   }
372
373   /**
374    * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getTransactionId()
375    */

376   public long getTransactionId()
377   {
378     return proc.getTransactionId();
379   }
380
381   /**
382    * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#isAutoCommit()
383    */

384   public boolean isAutoCommit()
385   {
386     return proc.isAutoCommit();
387   }
388
389   /**
390    * @see java.lang.Object#equals(java.lang.Object)
391    */

392   public boolean equals(Object JavaDoc other)
393   {
394     if ((other == null)
395         || !(other instanceof CallableStatementExecuteUpdateTask))
396       return false;
397
398     CallableStatementExecuteUpdateTask cseut = (CallableStatementExecuteUpdateTask) other;
399     if (proc == null)
400       return cseut.getRequest() == null;
401     return proc.equals(cseut.getRequest());
402   }
403
404   /**
405    * @see java.lang.Object#hashCode()
406    */

407   public int hashCode()
408   {
409     return (int) proc.getId();
410   }
411
412   /**
413    * @see java.lang.Object#toString()
414    */

415   public String JavaDoc toString()
416   {
417     if (proc.isAutoCommit())
418       return "Autocommit CallableStatementExecuteUpdateTask "
419           + proc.getTransactionId() + " (" + proc.getUniqueKey() + ")";
420     else
421       return "CallableStatementExecuteUpdateTask for transaction "
422           + proc.getTransactionId() + " (" + proc.getUniqueKey() + ")";
423   }
424
425 }
Popular Tags