KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > continuent > sequoia > controller > loadbalancer > tasks > StatementExecuteUpdateTask


1 /**
2  * Sequoia: Database clustering technology.
3  * Copyright (C) 2002-2004 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
6  * Copyright (C) 2005-2006 Continuent, Inc.
7  * Contact: sequoia@continuent.org
8  *
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  * http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  *
21  * Initial developer(s): Emmanuel Cecchet.
22  * Contributor(s): Julie Marguerite, Jaco Swart.
23  */

24
25 package org.continuent.sequoia.controller.loadbalancer.tasks;
26
27 import java.sql.Connection JavaDoc;
28 import java.sql.SQLException JavaDoc;
29
30 import org.continuent.sequoia.common.exceptions.NoTransactionStartWhenDisablingException;
31 import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
32 import org.continuent.sequoia.common.i18n.Translate;
33 import org.continuent.sequoia.common.log.Trace;
34 import org.continuent.sequoia.controller.backend.DatabaseBackend;
35 import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
36 import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
37 import org.continuent.sequoia.controller.connection.PooledConnection;
38 import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
39 import org.continuent.sequoia.controller.loadbalancer.BackendWorkerThread;
40 import org.continuent.sequoia.controller.requests.AbstractRequest;
41 import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
42
43 /**
44  * Executes an <code>AbstractWriteRequest</code> statement.
45  *
46  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
47  * @author <a HREF="mailto:Julie.Marguerite@inria.fr">Julie Marguerite </a>
48  * @author <a HREF="mailto:jaco.swart@iblocks.co.uk">Jaco Swart </a>
49  * @version 1.0
50  */

51 public class StatementExecuteUpdateTask extends AbstractTask
52 {
53   private AbstractWriteRequest request;
54   private ExecuteUpdateResult result = null;
55
56   static Trace endUserLogger = Trace
57                                                  .getLogger("org.continuent.sequoia.enduser");
58
59   /**
60    * Creates a new <code>StatementExecuteUpdateTask</code>.
61    *
62    * @param nbToComplete number of threads that must succeed before returning
63    * @param totalNb total number of threads
64    * @param request an <code>AbstractWriteRequest</code>
65    */

66   public StatementExecuteUpdateTask(int nbToComplete, int totalNb,
67       AbstractWriteRequest request)
68   {
69     super(nbToComplete, totalNb, request.isPersistentConnection(), request
70         .getPersistentConnectionId());
71     this.request = request;
72   }
73
74   /**
75    * Executes a write request with the given backend thread.
76    *
77    * @param backendThread the backend thread that will execute the task
78    * @throws SQLException if an error occurs
79    */

80   public void executeTask(BackendWorkerThread backendThread)
81       throws SQLException JavaDoc
82   {
83     DatabaseBackend backend = backendThread.getBackend();
84
85     try
86     {
87       AbstractConnectionManager cm = backend.getConnectionManager(request
88           .getLogin());
89       if (cm == null)
90       {
91         SQLException JavaDoc se = new SQLException JavaDoc(
92             "No Connection Manager for Virtual Login:" + request.getLogin());
93         try
94         {
95           notifyFailure(backendThread, -1, se);
96         }
97         catch (SQLException JavaDoc ignore)
98         {
99         }
100         throw se;
101       }
102
103       Trace logger = backendThread.getLogger();
104       if (request.isAutoCommit())
105         executeInAutoCommit(backendThread, backend, cm, logger);
106       else
107         executeInTransaction(backendThread, backend, cm, logger);
108
109       /*
110        * If the backend is disabling, no result is retrieved. The notification
111        * has alreay been handled.
112        */

113       if (result != null)
114       {
115         int resultOnFirstBackendToSucceed = notifySuccess(backendThread, result
116             .getUpdateCount());
117         if (resultOnFirstBackendToSucceed != result.getUpdateCount())
118         {
119           String JavaDoc msg = "Disabling backend " + backend.getName()
120               + " that reports a different number of updated rows ("
121               + result.getUpdateCount() + ") than first backend to succeed ("
122               + resultOnFirstBackendToSucceed + ") for request " + request;
123           logger.error(msg);
124           // Disable this backend (it is no more in sync)
125
backendThread.getLoadBalancer().disableBackend(backend, true);
126           endUserLogger.error(Translate.get(
127               "loadbalancer.backend.disabling", backend.getName()));
128           throw new SQLException JavaDoc(msg);
129         }
130       }
131     }
132     finally
133     {
134       backend.getTaskQueues().completeWriteRequestExecution(this);
135     }
136   }
137
138   private void executeInAutoCommit(BackendWorkerThread backendThread,
139       DatabaseBackend backend, AbstractConnectionManager cm, Trace logger)
140       throws SQLException JavaDoc
141   {
142     if (!backend.canAcceptTasks(request))
143     {
144       // Backend is disabling, we do not execute queries except the one in
145
// the
146
// transaction we already started. Just notify the completion for the
147
// others.
148
notifyCompletion(backendThread);
149       return;
150     }
151
152     // Use a connection just for this request
153
PooledConnection c = null;
154     try
155     {
156       c = cm.retrieveConnectionInAutoCommit(request);
157     }
158     catch (UnreachableBackendException e1)
159     {
160       SQLException JavaDoc se = new SQLException JavaDoc("Backend " + backend.getName()
161           + " is no more reachable.");
162       try
163       {
164         notifyFailure(backendThread, -1, se);
165       }
166       catch (SQLException JavaDoc ignore)
167       {
168       }
169       // Disable this backend (it is no more in sync) by killing the backend
170
// thread
171

172       backendThread.getLoadBalancer().disableBackend(backend, true);
173       String JavaDoc msg = Translate.get(
174           "loadbalancer.backend.disabling.unreachable", backend.getName());
175       logger.error(msg);
176       endUserLogger.error(msg);
177       throw se;
178     }
179
180     // Sanity check
181
if (c == null)
182     {
183       SQLException JavaDoc se = new SQLException JavaDoc("No more connections");
184       try
185       { // All backends failed, just ignore
186
if (!notifyFailure(backendThread, request.getTimeout() * 1000L, se))
187           return;
188       }
189       catch (SQLException JavaDoc ignore)
190       {
191       }
192       // Disable this backend (it is no more in sync) by killing the backend
193
// thread
194
backendThread.getLoadBalancer().disableBackend(backend, true);
195       String JavaDoc msg = "Request '"
196           + request.getSqlShortForm(backend.getSqlShortFormLength())
197           + "' failed on backend " + backend.getName() + " but " + getSuccess()
198           + " succeeded (" + se + ")";
199       logger.error(msg);
200       endUserLogger.error(Translate.get(
201           "loadbalancer.backend.disabling", backend.getName()));
202       throw new SQLException JavaDoc(msg);
203     }
204
205     // Execute Query
206
try
207     {
208       result = AbstractLoadBalancer.executeStatementExecuteUpdateOnBackend(
209           request, backend, backendThread, c.getConnection());
210
211       backend.updateDatabaseBackendSchema(request);
212     }
213     catch (Exception JavaDoc e)
214     {
215       try
216       { // All backends failed, just ignore
217
if (!notifyFailure(backendThread, request.getTimeout() * 1000L, e))
218         {
219           result = null;
220           return;
221         }
222       }
223       catch (SQLException JavaDoc ignore)
224       {
225       }
226       // Disable this backend (it is no more in sync) by killing the backend
227
// thread
228
backendThread.getLoadBalancer().disableBackend(backend, true);
229       String JavaDoc msg = "Request '"
230           + request.getSqlShortForm(backend.getSqlShortFormLength())
231           + "' failed on backend " + backend.getName() + " but " + getSuccess()
232           + " succeeded (" + e + ")";
233
234       if (logger.isDebugEnabled())
235         logger.debug(msg, e);
236       else
237         logger.error(msg);
238       endUserLogger.error(Translate.get(
239           "loadbalancer.backend.disabling", backend.getName()));
240       throw new SQLException JavaDoc(msg);
241     }
242     finally
243     {
244       cm.releaseConnectionInAutoCommit(request, c);
245     }
246   }
247
248   private void executeInTransaction(BackendWorkerThread backendThread,
249       DatabaseBackend backend, AbstractConnectionManager cm, Trace logger)
250       throws SQLException JavaDoc
251   {
252     // Re-use the connection used by this transaction
253
Connection JavaDoc c;
254     long tid = request.getTransactionId();
255
256     try
257     {
258       c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(request, cm);
259     }
260     catch (UnreachableBackendException ube)
261     {
262       SQLException JavaDoc se = new SQLException JavaDoc("Backend " + backend.getName()
263           + " is no more reachable.");
264       try
265       {
266         notifyFailure(backendThread, -1, se);
267       }
268       catch (SQLException JavaDoc ignore)
269       {
270       }
271       // Disable this backend (it is no more in sync) by killing the backend
272
// thread
273
backendThread.getLoadBalancer().disableBackend(backend, true);
274       String JavaDoc msg = Translate.get(
275           "loadbalancer.backend.disabling.unreachable", backend.getName());
276       logger.error(msg);
277       endUserLogger.error(msg);
278       throw se;
279     }
280     catch (NoTransactionStartWhenDisablingException e)
281     {
282       // Backend is disabling, we do not execute queries except the one in
283
// the
284
// transaction we already started. Just notify the completion for the
285
// others.
286
notifyCompletion(backendThread);
287       return;
288     }
289     catch (SQLException JavaDoc e1)
290     {
291       SQLException JavaDoc se = new SQLException JavaDoc(
292           "Unable to get connection for transaction " + tid);
293       try
294       { // All backends failed, just ignore
295
if (!notifyFailure(backendThread, request.getTimeout() * 1000L, se))
296           return;
297       }
298       catch (SQLException JavaDoc ignore)
299       {
300       }
301       // Disable this backend (it is no more in sync) by killing the
302
// backend thread
303
backendThread.getLoadBalancer().disableBackend(backend, true);
304       String JavaDoc msg = "Request '"
305           + request.getSqlShortForm(backend.getSqlShortFormLength())
306           + "' failed on backend " + backend.getName() + " but " + getSuccess()
307           + " succeeded (" + se + ")";
308       logger.error(msg);
309       endUserLogger.error(Translate.get(
310           "loadbalancer.backend.disabling", backend.getName()));
311       throw new SQLException JavaDoc(msg);
312     }
313
314     // Sanity check
315
if (c == null)
316     { // Bad connection
317
SQLException JavaDoc se = new SQLException JavaDoc(
318           "Unable to retrieve connection for transaction " + tid);
319       try
320       { // All backends failed, just ignore
321
if (!notifyFailure(backendThread, request.getTimeout() * 1000L, se))
322           return;
323       }
324       catch (SQLException JavaDoc ignore)
325       {
326       }
327       // Disable this backend (it is no more in sync) by killing the
328
// backend thread
329
backendThread.getLoadBalancer().disableBackend(backend, true);
330       String JavaDoc msg = "Request '"
331           + request.getSqlShortForm(backend.getSqlShortFormLength())
332           + "' failed on backend " + backend.getName() + " but " + getSuccess()
333           + " succeeded (" + se + ")";
334       logger.error(msg);
335       endUserLogger.error(Translate.get(
336           "loadbalancer.backend.disabling", backend.getName()));
337       throw new SQLException JavaDoc(msg);
338     }
339
340     // Execute Query
341
try
342     {
343       result = AbstractLoadBalancer.executeStatementExecuteUpdateOnBackend(
344           request, backend, backendThread, c);
345
346       backend.updateDatabaseBackendSchema(request);
347     }
348     catch (Exception JavaDoc e)
349     {
350       try
351       { // All backends failed, just ignore
352
if (!notifyFailure(backendThread, request.getTimeout() * 1000L, e))
353         {
354           result = null;
355           return;
356         }
357       }
358       catch (SQLException JavaDoc ignore)
359       {
360       }
361       // Disable this backend (it is no more in sync)
362
backendThread.getLoadBalancer().disableBackend(backend, true);
363       String JavaDoc msg = "Request '"
364           + request.getSqlShortForm(backend.getSqlShortFormLength())
365           + "' failed on backend " + backend.getName() + " but " + getSuccess()
366           + " succeeded (" + e + ")";
367       if (logger.isDebugEnabled())
368         logger.debug(msg, e);
369       else
370         logger.error(msg);
371       endUserLogger.error(Translate.get(
372           "loadbalancer.backend.disabling", backend.getName()));
373       throw new SQLException JavaDoc(msg);
374     }
375   }
376
377   /**
378    * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getRequest()
379    */

380   public AbstractRequest getRequest()
381   {
382     return request;
383   }
384
385   /**
386    * Returns the result.
387    *
388    * @return int
389    */

390   public ExecuteUpdateResult getResult()
391   {
392     return result;
393   }
394
395   /**
396    * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getTransactionId()
397    */

398   public long getTransactionId()
399   {
400     return request.getTransactionId();
401   }
402
403   /**
404    * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#isAutoCommit()
405    */

406   public boolean isAutoCommit()
407   {
408     return request.isAutoCommit();
409   }
410
411   /**
412    * @see java.lang.Object#equals(java.lang.Object)
413    */

414   public boolean equals(Object JavaDoc other)
415   {
416     if ((other == null) || !(other instanceof StatementExecuteUpdateTask))
417       return false;
418
419     StatementExecuteUpdateTask seut = (StatementExecuteUpdateTask) other;
420     return this.request.equals(seut.getRequest());
421   }
422
423   /**
424    * @see java.lang.Object#hashCode()
425    */

426   public int hashCode()
427   {
428     return (int) request.getId();
429   }
430
431   /**
432    * @see java.lang.Object#toString()
433    */

434   public String JavaDoc toString()
435   {
436     if (request.isAutoCommit())
437       return "Autocommit StatementExecuteUpdateTask "
438           + request.getTransactionId() + " (" + request.getUniqueKey() + ")";
439     else
440       return "StatementExecuteUpdateTask from transaction "
441           + request.getTransactionId() + " (" + request.getUniqueKey() + ")";
442   }
443
444 }
Popular Tags