KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > continuent > sequoia > controller > virtualdatabase > VirtualDatabaseWorkerThread


1 /**
2  * Sequoia: Database clustering technology.
3  * Copyright (C) 2002-2004 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
6  * Copyright (C) 2005-2006 Continuent, Inc.
7  * Contact: sequoia@continuent.org
8  *
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  * http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  *
21  * Initial developer(s): Emmanuel Cecchet.
22  * Contributor(s): Nicolas Modrzyk, Jean-Bernard van Zuylen, Damian Arregui.
23  * Refactored by Marc Herbert to remove the use of Java serialization.
24  */

25
26 package org.continuent.sequoia.controller.virtualdatabase;
27
28 import java.io.EOFException JavaDoc;
29 import java.io.IOException JavaDoc;
30 import java.io.Serializable JavaDoc;
31 import java.net.InetAddress JavaDoc;
32 import java.net.SocketException JavaDoc;
33 import java.sql.SQLException JavaDoc;
34 import java.sql.SQLWarning JavaDoc;
35 import java.util.ArrayList JavaDoc;
36 import java.util.HashMap JavaDoc;
37 import java.util.Iterator JavaDoc;
38 import java.util.List JavaDoc;
39
40 import org.continuent.sequoia.common.exceptions.BadJDBCApiUsageException;
41 import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
42 import org.continuent.sequoia.common.exceptions.NoMoreControllerException;
43 import org.continuent.sequoia.common.exceptions.NotImplementedException;
44 import org.continuent.sequoia.common.exceptions.ProtocolException;
45 import org.continuent.sequoia.common.exceptions.VDBisShuttingDownException;
46 import org.continuent.sequoia.common.exceptions.driver.protocol.BackendDriverException;
47 import org.continuent.sequoia.common.exceptions.driver.protocol.ControllerCoreException;
48 import org.continuent.sequoia.common.i18n.Translate;
49 import org.continuent.sequoia.common.log.Trace;
50 import org.continuent.sequoia.common.protocol.Commands;
51 import org.continuent.sequoia.common.protocol.SQLDataSerialization;
52 import org.continuent.sequoia.common.protocol.TypeTag;
53 import org.continuent.sequoia.common.protocol.SQLDataSerialization.Serializer;
54 import org.continuent.sequoia.common.sql.Request;
55 import org.continuent.sequoia.common.sql.RequestWithResultSetParameters;
56 import org.continuent.sequoia.common.sql.metadata.MetadataContainer;
57 import org.continuent.sequoia.common.sql.metadata.MetadataDescription;
58 import org.continuent.sequoia.common.stream.DriverBufferedInputStream;
59 import org.continuent.sequoia.common.stream.DriverBufferedOutputStream;
60 import org.continuent.sequoia.common.users.VirtualDatabaseUser;
61 import org.continuent.sequoia.common.util.Constants;
62 import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
63 import org.continuent.sequoia.controller.backend.result.ExecuteResult;
64 import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
65 import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult;
66 import org.continuent.sequoia.controller.core.Controller;
67 import org.continuent.sequoia.controller.core.ControllerConstants;
68 import org.continuent.sequoia.controller.recoverylog.RecoveryLog;
69 import org.continuent.sequoia.controller.recoverylog.events.LogEntry;
70 import org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager;
71 import org.continuent.sequoia.controller.requests.AbstractRequest;
72 import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
73 import org.continuent.sequoia.controller.requests.RequestFactory;
74 import org.continuent.sequoia.controller.requests.SelectRequest;
75 import org.continuent.sequoia.controller.requests.StoredProcedure;
76 import org.continuent.sequoia.controller.requests.StoredProcedureCallResult;
77 import org.continuent.sequoia.controller.requests.UnknownWriteRequest;
78 import org.continuent.sequoia.controller.scheduler.AbstractScheduler;
79 import org.continuent.sequoia.driver.Connection;
80
81 /**
82  * This class handles a connection with a Sequoia driver.
83  *
84  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
85  * @author <a HREF="mailto:Nicolas.Modrzyk@inria.fr">Nicolas Modrzyk </a>
86  * @author <a HREF="mailto:Marc.Herbert@emicnetworks.com">Marc Herbert </a>
87  * @author <a HREF="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
88  * </a>
89  * @author <a HREF="mailto:damian.arregui@continuent.com">Damian Arregui
90  * @version 2.0
91  */

92 public class VirtualDatabaseWorkerThread extends Thread JavaDoc
93 {
94   //
95
// How the code is organized?
96
//
97
// 1. Member variables
98
// 2. Constructor(s)
99
// 3. Request management
100
// 4. Getter/Setters
101

102   /** <code>true</code> if this thread has been killed. */
103   private boolean isKilled = false;
104
105   /** Virtual database instantiating this thread. */
106   private VirtualDatabase vdb;
107
108   /** Logger instance. */
109   private Trace logger = null;
110
111   private DriverBufferedInputStream in = null;
112   private DriverBufferedOutputStream out = null;
113
114   private VirtualDatabaseUser user;
115
116   private Controller controller;
117
118   private boolean waitForCommand;
119
120   private HashMap JavaDoc streamedResultSets;
121
122   private RequestFactory requestFactory = ControllerConstants.CONTROLLER_FACTORY
123                                                                  .getRequestFactory();
124   /**
125    * The following variables represent the state of the connection with the
126    * client
127    */

128   private boolean persistentConnection;
129   private long persistentConnectionId;
130   private boolean retrieveSQLWarnings;
131   private long currentTid;
132   private boolean transactionStarted;
133   private boolean transactionHasAborted;
134   private boolean queryExecutedInThisTransaction;
135   private boolean writeQueryExecutedInThisTransaction;
136   // Number of savepoints in the current transaction
137
private int hasSavepoint;
138   private String JavaDoc clientIpAddress;
139   private String JavaDoc login;
140   private boolean closed;
141   private int transactionIsolation = Connection.DEFAULT_TRANSACTION_ISOLATION_LEVEL;
142   private boolean isReadOnly = false;
143   private String JavaDoc connectionLineSeparator = null;
144
145   /* end user logger */
146   static Trace endUserLogger = Trace
147                                                                  .getLogger("org.continuent.sequoia.enduser");
148
149   /*
150    * Constructor
151    */

152
153   /**
154    * Creates a new <code>VirtualDatabaseWorkerThread</code> instance.
155    *
156    * @param controller the thread was originated from
157    * @param vdb the virtual database instantiating this thread.
158    */

159   public VirtualDatabaseWorkerThread(Controller controller, VirtualDatabase vdb)
160   {
161     super("VirtualDatabaseWorkerThread-" + vdb.getVirtualDatabaseName());
162     this.vdb = vdb;
163     this.controller = controller;
164     try
165     {
166       this.logger = Trace
167           .getLogger("org.continuent.sequoia.controller.virtualdatabase.VirtualDatabaseWorkerThread."
168               + vdb.getVirtualDatabaseName());
169     }
170     catch (Exception JavaDoc e)
171     {
172       this.logger = vdb.logger;
173     }
174   }
175
176   //
177
// Decoding commands from the stream
178
//
179

180   /**
181    * Gets a connection from the connection queue and process it.
182    */

183   public void run()
184   {
185     ArrayList JavaDoc vdbActiveThreads = vdb.getActiveThreads();
186     ArrayList JavaDoc vdbPendingQueue = vdb.getPendingConnections();
187     // List of open ResultSets for streaming. This is not synchronized since the
188
// connection does only handle one request at a time
189
streamedResultSets = new HashMap JavaDoc();
190     boolean isActive = true;
191
192     if (vdbActiveThreads == null)
193     {
194       logger
195           .error("Got null active threads queue in VirtualDatabaseWorkerThread");
196       isKilled = true;
197     }
198     if (vdbPendingQueue == null)
199     {
200       logger.error("Got null connection queue in VirtualDatabaseWorkerThread");
201       isKilled = true;
202     }
203
204     // Main loop
205
while (!isKilled)
206     {
207       // Get a connection from the pending queue
208
synchronized (vdbPendingQueue)
209       {
210         while (vdbPendingQueue.isEmpty())
211         {
212           if (!vdb.poolConnectionThreads)
213           { // User does not want thread pooling, kill this thread!
214
isKilled = true;
215             break;
216           }
217           boolean timeout = false;
218           try
219           {
220             if (isActive)
221             {
222               isActive = false;
223               // Remove ourselves from the active thread list
224
synchronized (vdbActiveThreads)
225               {
226                 vdbActiveThreads.remove(this);
227                 vdb.incrementIdleThreadCount();
228               }
229             }
230             long before = System.currentTimeMillis();
231             vdbPendingQueue.wait(vdb.getMaxThreadIdleTime());
232             long now = System.currentTimeMillis();
233             // Check if timeout has expired
234
timeout = now - before >= vdb.getMaxThreadIdleTime();
235           }
236           catch (InterruptedException JavaDoc e)
237           {
238             logger.warn("VirtualDatabaseWorkerThread wait() interrupted");
239           }
240           if (timeout && vdbPendingQueue.isEmpty())
241           {
242             if (vdb.currentNbOfThreads > vdb.minNbOfThreads)
243             { // We have enough threads, kill this one
244
isKilled = true;
245               break;
246             }
247           }
248         }
249
250         if (isKilled)
251         { // Cleaning up
252
synchronized (vdbActiveThreads)
253           { // Remove ourselves from the appropriate thread list
254
if (isActive)
255             {
256               vdbActiveThreads.remove(this);
257               vdb.decreaseCurrentNbOfThread();
258             }
259             else
260               vdb.decreaseIdleThread();
261           }
262           // Get out of the while loop
263
continue;
264         }
265
266         // Get a connection
267
try
268         {
269           in = (DriverBufferedInputStream) vdbPendingQueue.remove(0);
270           out = (DriverBufferedOutputStream) vdbPendingQueue.remove(0);
271         }
272         catch (Exception JavaDoc e)
273         {
274           logger.error("Error while getting streams from connection");
275           continue;
276         }
277
278         synchronized (vdbActiveThreads)
279         {
280           if (!isActive)
281           {
282             vdb.decreaseIdleThread();
283             isActive = true;
284             // Add this thread to the active thread list
285
vdbActiveThreads.add(this);
286           }
287         }
288       }
289
290       closed = false;
291
292       // Handle connection
293
// Read the user information and check authentication
294
/**
295        * @see org.continuent.sequoia.driver.Driver#connectToController(Properties,
296        * SequoiaUrl, ControllerInfo)
297        */

298       boolean success = false;
299       try
300       {
301         login = in.readLongUTF();
302         String JavaDoc password = in.readLongUTF();
303         user = new VirtualDatabaseUser(login, password);
304
305         // Pre-check for transparent login
306
if (vdb.getAuthenticationManager().isTransparentLoginEnabled())
307         {
308           if (!vdb.getAuthenticationManager().isValidVirtualUser(user))
309           {
310             vdb.checkAndAddVirtualDatabaseUser(user);
311           }
312         }
313
314         if (vdb.getAuthenticationManager().isValidVirtualUser(user))
315         { // Authentication ok, check access control
316
InetAddress JavaDoc inetAddress;
317           try
318           { // Check IP address
319
inetAddress = in.getSocket().getInetAddress();
320           }
321           catch (NullPointerException JavaDoc e) // no method above throws anything
322
{
323             inetAddress = null;
324           }
325
326           boolean authorized;
327           if (inetAddress == null)
328           { // Check if "Unresolved address" is allowed
329
clientIpAddress = "Unresolved address";
330             authorized = vdb.getAuthenticationManager()
331                 .userIsAuthorizedToConnectFrom(user, clientIpAddress);
332           }
333           else
334           { // Try with IP address
335
clientIpAddress = inetAddress.getHostAddress();
336             authorized = vdb.getAuthenticationManager()
337                 .userIsAuthorizedToConnectFrom(user, clientIpAddress);
338             if (!authorized)
339             { // Failed, try with host name
340
clientIpAddress = inetAddress.getHostName();
341               authorized = vdb.getAuthenticationManager()
342                   .userIsAuthorizedToConnectFrom(user, clientIpAddress);
343             }
344             if (!authorized)
345             { // Failed, try with canonical host name (naming service lookup
346
// forced here)
347
clientIpAddress = inetAddress.getCanonicalHostName();
348               authorized = vdb.getAuthenticationManager()
349                   .userIsAuthorizedToConnectFrom(user, clientIpAddress);
350             }
351           }
352
353           if (!authorized)
354           { // Access denied, close the connection
355
String JavaDoc msg = "Access denied for user '" + login + "' from "
356                 + clientIpAddress;
357             out.writeBoolean(false); // access denied
358
out.writeLongUTF(msg); // error message
359
if (logger.isDebugEnabled())
360               logger.debug(msg);
361             continue;
362           }
363
364           if (logger.isDebugEnabled())
365             logger.debug("Login accepted for " + login + " from "
366                 + clientIpAddress);
367
368           out.writeBoolean(true); // success code
369
out.flush();
370           success = true;
371
372           connectionLineSeparator = in.readLongUTF();
373           persistentConnection = in.readBoolean();
374           if (persistentConnection)
375           {
376             persistentConnectionId = vdb.getNextConnectionId();
377             try
378             {
379               vdb.openPersistentConnection(login, persistentConnectionId);
380               out.writeBoolean(true);
381               out.writeLong(persistentConnectionId);
382               out.flush();
383             }
384             catch (SQLException JavaDoc e)
385             {
386               success = false;
387               out.writeBoolean(false);
388               out.flush();
389               continue;
390             }
391           }
392           retrieveSQLWarnings = in.readBoolean();
393         }
394         else
395         { // Authentication failed, close the connection
396
String JavaDoc msg = "Authentication failed for user '" + login + "'";
397           out.writeBoolean(false); // authentication failed
398
out.writeLongUTF(msg); // error message
399
if (logger.isDebugEnabled())
400             logger.debug(msg);
401           endUserLogger.error(Translate.get(
402               "virtualdatabase.authentication.failed", login));
403           continue;
404         }
405       }
406       catch (IOException JavaDoc e)
407       {
408         logger.error("I/O error during user authentication (" + e + ")");
409         closed = true;
410       }
411       finally
412       {
413         if (!success)
414         {
415           try
416           {
417             out.close();
418             in.close();
419           }
420           catch (IOException JavaDoc ignore)
421           {
422           }
423         }
424       }
425
426       currentTid = 0;
427       transactionStarted = false;
428       transactionHasAborted = false;
429       transactionIsolation = Connection.DEFAULT_TRANSACTION_ISOLATION_LEVEL;
430       queryExecutedInThisTransaction = false;
431       writeQueryExecutedInThisTransaction = false;
432       hasSavepoint = 0;
433
434       int command = -1;
435       while (!closed && !isKilled)
436       {
437         try
438         {
439           // Get the query
440
waitForCommand = true;
441           out.writeInt(Commands.ControllerPrompt);
442           out.flush();
443           if (Commands.CommandPrefix != in.readInt())
444           {
445             logger.error("Protocol corruption with client " + login
446                 + ", last command was:" + command + ". Closing.");
447             // FIXME: because of the protocol corruption, this has very little
448
// chance to actually close the connection. We need something more
449
// rude here, like shutting down ourselves.
450
command = Commands.Close;
451           }
452           else
453           {
454             try
455             {
456               command = in.readInt();
457             }
458             catch (NullPointerException JavaDoc e)
459             {
460               // SEQUOIA-777: this NPE happens when this thread is blocked
461
// on in.read() and 'in' gets closed either explicitly
462
// by the shutdown thread or by an unexpected client socket death.
463
// The NPE is a known issue with jdk < 1.5, see SEQUOIA-777.
464
// In the shutdown case, the flag isKilled is set
465
// and we can exit the while loop. Otherwize, the client socket
466
// died unexpectedly and we consider this is a CLOSE.
467
if (isKilled)
468                 continue;
469               logger.warn("Client unexpectedly dropped connection. Closing.");
470               command = Commands.Close;
471             }
472           }
473
474           waitForCommand = false;
475
476           // Process it
477
switch (command)
478           {
479             case Commands.StatementExecuteQuery :
480               statementExecuteQuery(null);
481               break;
482             case Commands.StatementExecuteUpdate :
483               statementExecuteUpdate(null);
484               break;
485             case Commands.StatementExecuteUpdateWithKeys :
486               statementExecuteUpdateWithKeys();
487               break;
488             case Commands.CallableStatementExecuteQuery :
489               callableStatementExecuteQuery(null, false);
490               break;
491             case Commands.CallableStatementExecuteUpdate :
492               callableStatementExecuteUpdate(null, false);
493               break;
494             case Commands.CallableStatementExecute :
495               callableStatementExecute(null, false);
496               break;
497             case Commands.CallableStatementExecuteQueryWithParameters :
498               callableStatementExecuteQuery(null, true);
499               break;
500             case Commands.CallableStatementExecuteUpdateWithParameters :
501               callableStatementExecuteUpdate(null, true);
502               break;
503             case Commands.CallableStatementExecuteWithParameters :
504               callableStatementExecute(null, true);
505               break;
506             case Commands.StatementExecute :
507               statementExecute(null);
508               break;
509             case Commands.Begin :
510               begin();
511               break;
512             case Commands.Commit :
513               commit();
514               break;
515             case Commands.Rollback :
516               rollback();
517               break;
518             case Commands.SetNamedSavepoint :
519               setNamedSavepoint();
520               break;
521             case Commands.SetUnnamedSavepoint :
522               setUnnamedSavepoint();
523               break;
524             case Commands.ReleaseSavepoint :
525               releaseSavepoint();
526               break;
527             case Commands.RollbackToSavepoint :
528               rollbackToSavepoint();
529               break;
530             case Commands.SetTransactionIsolation :
531               connectionSetTransactionIsolation();
532               break;
533             case Commands.SetReadOnly :
534               connectionSetReadOnly();
535               break;
536             case Commands.ConnectionGetWarnings :
537               connectionGetWarnings();
538               break;
539             case Commands.ConnectionClearWarnings :
540               connectionClearWarnings();
541               break;
542             case Commands.GetVirtualDatabaseName :
543               getVirtualDatabaseName();
544               break;
545             case Commands.DatabaseMetaDataGetDatabaseProductName :
546               databaseMetaDataGetDatabaseProductName();
547               break;
548             case Commands.GetControllerVersionNumber :
549               getControllerVersionNumber();
550               break;
551             case Commands.DatabaseMetaDataGetTables :
552               databaseMetaDataGetTables();
553               break;
554             case Commands.DatabaseMetaDataGetColumns :
555               databaseMetaDataGetColumns();
556               break;
557             case Commands.DatabaseMetaDataGetPrimaryKeys :
558               databaseMetaDataGetPrimaryKeys();
559               break;
560             case Commands.DatabaseMetaDataGetProcedures :
561               databaseMetaDataGetProcedures();
562               break;
563             case Commands.DatabaseMetaDataGetProcedureColumns :
564               databaseMetaDataGetProcedureColumns();
565               break;
566             case Commands.ConnectionGetCatalogs :
567               connectionGetCatalogs();
568               break;
569             case Commands.ConnectionGetCatalog :
570               connectionGetCatalog();
571               break;
572             case Commands.DatabaseMetaDataGetTableTypes :
573               databaseMetaDataGetTableTypes();
574               break;
575             case Commands.DatabaseMetaDataGetSchemas :
576               databaseMetaDataGetSchemas();
577               break;
578             case Commands.DatabaseMetaDataGetTablePrivileges :
579               databaseMetaDataGetTablePrivileges();
580               break;
581             case Commands.DatabaseMetaDataGetAttributes :
582               databaseMetaDataGetAttributes();
583               break;
584             case Commands.DatabaseMetaDataGetBestRowIdentifier :
585               databaseMetaDataGetBestRowIdentifier();
586               break;
587             case Commands.DatabaseMetaDataGetColumnPrivileges :
588               databaseMetaDataGetColumnPrivileges();
589               break;
590             case Commands.DatabaseMetaDataGetCrossReference :
591               databaseMetaDataGetCrossReference();
592               break;
593             case Commands.DatabaseMetaDataGetExportedKeys :
594               databaseMetaDataGetExportedKeys();
595               break;
596             case Commands.DatabaseMetaDataGetImportedKeys :
597               databaseMetaDataGetImportedKeys();
598               break;
599             case Commands.DatabaseMetaDataGetIndexInfo :
600               databaseMetaDataGetIndexInfo();
601               break;
602             case Commands.DatabaseMetaDataGetSuperTables :
603               databaseMetaDataGetSuperTables();
604               break;
605             case Commands.DatabaseMetaDataGetSuperTypes :
606               databaseMetaDataGetSuperTypes();
607               break;
608             case Commands.DatabaseMetaDataGetTypeInfo :
609               databaseMetaDataGetTypeInfo();
610               break;
611             case Commands.DatabaseMetaDataGetUDTs :
612               databaseMetaDataGetUDTs();
613               break;
614             case Commands.DatabaseMetaDataGetVersionColumns :
615               databaseMetaDataGetVersionColumns();
616               break;
617             case Commands.PreparedStatementGetMetaData :
618               preparedStatementGetMetaData();
619               break;
620             case Commands.ConnectionSetCatalog :
621               connectionSetCatalog();
622               break;
623             case Commands.Close :
624               close();
625               break;
626             case Commands.Reset :
627               reset();
628               break;
629             case Commands.FetchNextResultSetRows :
630               fetchNextResultSetRows();
631               break;
632             case Commands.CloseRemoteResultSet :
633               closeRemoteResultSet();
634               break;
635             case Commands.DatabaseStaticMetadata :
636               databaseStaticMetadata();
637               break;
638             case Commands.RestoreConnectionState :
639               restoreConnectionState();
640               break;
641             case Commands.RetrieveExecuteQueryResult :
642               retrieveExecuteQueryResult();
643               break;
644             case Commands.RetrieveExecuteResult :
645               retrieveExecuteResult();
646               break;
647             case Commands.RetrieveExecuteUpdateResult :
648               retrieveExecuteUpdateResult();
649               break;
650             case Commands.RetrieveExecuteUpdateWithKeysResult :
651               retrieveExecuteUpdateWithKeysResult();
652               break;
653             case Commands.RetrieveExecuteQueryResultWithParameters :
654               retrieveExecuteQueryResultWithParameters();
655               break;
656             case Commands.RetrieveExecuteUpdateResultWithParameters :
657               retrieveExecuteUpdateResultWithParameters();
658               break;
659             case Commands.RetrieveExecuteResultWithParameters :
660               retrieveExecuteResultWithParameters();
661               break;
662             case Commands.RetrieveCommitResult :
663               retrieveCommitResult();
664               break;
665             case Commands.RetrieveRollbackResult :
666               retrieveRollbackResult();
667               break;
668             case Commands.RetrieveReleaseSavepoint :
669               retrieveReleaseSavepoint();
670               break;
671             default :
672               String JavaDoc errorMsg = "Unsupported protocol command: " + command;
673               logger.error(errorMsg);
674               sendToDriver(new RuntimeException JavaDoc(errorMsg));
675               break;
676           }
677         }
678         catch (EOFException JavaDoc e)
679         {
680           logger.warn("Client (login:" + login + ",host:"
681               + in.getSocket().getInetAddress().getHostName()
682               + " closed connection with server)");
683           closed = true;
684         }
685         catch (SocketException JavaDoc e)
686         {
687           // shutting down
688
closed = true;
689         }
690         catch (IOException JavaDoc e)
691         {
692           closed = true;
693           logger.warn("Closing connection with client " + login
694               + " because of IOException.(" + e + ")");
695         }
696         catch (VDBisShuttingDownException e)
697         {
698           isKilled = true;
699         }
700         catch (SQLException JavaDoc e)
701         {
702           // This maybe just invalid SQL, see SEQUOIA-809
703
logger.debug("Error during command execution (" + e.getMessage()
704               + ")");
705           if (transactionStarted && !transactionHasAborted)
706           { // Failure of a query within a transaction automatically aborts the
707
// transaction
708
transactionHasAborted = (hasSavepoint == 0)
709                 && ((command == Commands.StatementExecuteUpdate)
710                     || (command == Commands.StatementExecuteUpdateWithKeys)
711                     || (command == Commands.StatementExecute)
712                     || (command == Commands.CallableStatementExecuteWithParameters)
713                     || (command == Commands.CallableStatementExecuteQueryWithParameters)
714                     || (command == Commands.CallableStatementExecuteUpdateWithParameters)
715                     || (command == Commands.CallableStatementExecuteQuery) || (command == Commands.CallableStatementExecuteUpdate));
716           }
717           try
718           {
719             sendToDriver(e);
720           }
721           catch (IOException JavaDoc ignore)
722           {
723           }
724         }
725         catch (BadJDBCApiUsageException e)
726         {
727           logger
728               .warn("Error during command execution (" + e.getMessage() + ")");
729           try
730           {
731             sendToDriver(e);
732           }
733           catch (IOException JavaDoc ignore)
734           {
735           }
736         }
737         catch (Throwable JavaDoc e)
738         {
739           logger.warn("Runtime error during command execution ("
740               + e.getMessage() + ")", e);
741           if (transactionStarted)
742           { // Failure of a query within a transaction automatically aborts the
743
// transaction
744
transactionHasAborted = (hasSavepoint == 0)
745                 && ((command == Commands.StatementExecuteQuery)
746                     || (command == Commands.StatementExecuteUpdate)
747                     || (command == Commands.StatementExecuteUpdateWithKeys)
748                     || (command == Commands.StatementExecute)
749                     || (command == Commands.CallableStatementExecute)
750                     || (command == Commands.CallableStatementExecuteQuery) || (command == Commands.CallableStatementExecuteUpdate));
751           }
752           try
753           {
754             sendToDriver((SQLException JavaDoc) new SQLException JavaDoc(e
755                 .getLocalizedMessage()).initCause(e));
756           }
757           catch (IOException JavaDoc ignore)
758           {
759           }
760         }
761       } // while (!closed && !isKilled) get and process command from driver
762

763       // Do the cleanup
764
if (!streamedResultSets.isEmpty())
765       {
766         for (Iterator JavaDoc iter = streamedResultSets.values().iterator(); iter
767             .hasNext();)
768         {
769           ControllerResultSet crs = (ControllerResultSet) iter.next();
770           crs.closeResultSet();
771         }
772         streamedResultSets.clear();
773       }
774
775       if (!isKilled)
776       {
777         // Abort in-flight transaction
778
if (transactionStarted && !transactionHasAborted)
779         {
780           if (logger.isDebugEnabled())
781             logger.debug("Aborting transaction " + currentTid);
782           try
783           {
784             vdb.abort(currentTid, writeQueryExecutedInThisTransaction, true);
785           }
786           catch (Throwable JavaDoc e)
787           {
788             if (logger.isDebugEnabled())
789               logger.debug("Error during abort of transaction " + currentTid
790                   + "(" + e + ")");
791           }
792         }
793
794         // Close persistent connections
795
if (persistentConnection)
796         {
797           vdb.closePersistentConnection(login, persistentConnectionId);
798         }
799       }
800       else
801       {
802         // FIXME: debug message for safe mode parallel shutdown of controllers
803
// (note that parallel shutdown should be avoided)
804
if (logger.isInfoEnabled())
805         {
806           logger
807               .info("VirtualDatabaseWorkerThread killed by shutdown, no clean-up"
808                   + " done. Number of pending transaction in scheduler: "
809                   + vdb.getRequestManager().getScheduler()
810                       .getPendingTransactions());
811         }
812       }
813
814       // Close streams and underlying socket
815
try
816       {
817         in.close();
818       }
819       catch (IOException JavaDoc ignore)
820       {
821       }
822       try
823       {
824         out.close();
825       }
826       catch (IOException JavaDoc ignore)
827       {
828       }
829     }
830
831     synchronized (vdbActiveThreads)
832     { // Remove ourselves from the appropriate thread list
833
if (vdbActiveThreads.remove(this))
834         vdb.decreaseCurrentNbOfThread();
835     }
836
837     if (logger.isDebugEnabled())
838       logger.debug("VirtualDatabaseWorkerThread associated to login: "
839           + this.getUser() + " terminating.");
840   }
841
842   private void close() throws IOException JavaDoc
843   {
844     if (logger.isDebugEnabled())
845       logger.debug("Close command");
846
847     cleanup();
848
849     sendToDriver(true);
850
851     closed = true;
852   }
853
854   private void closeRemoteResultSet() throws IOException JavaDoc
855   {
856     if (logger.isDebugEnabled())
857       logger.debug("CloseRemoteResultSet command");
858
859     String JavaDoc cursor = in.readLongUTF();
860     ControllerResultSet crsToClose = (ControllerResultSet) streamedResultSets
861         .remove(cursor);
862     if (crsToClose == null)
863     {
864       sendToDriver(new SQLException JavaDoc("No valid RemoteResultSet to close."));
865     }
866     else
867     {
868       crsToClose.closeResultSet();
869       sendToDriver(true);
870     }
871   }
872
873   private void reset() throws IOException JavaDoc
874   {
875     // The client application has closed the connection but it is kept
876
// open in case the transparent connection pooling reuses it.
877
if (logger.isDebugEnabled())
878       logger.debug("Reset command");
879
880     cleanup();
881
882     currentTid = 0;
883     transactionStarted = false;
884     transactionHasAborted = false;
885     transactionIsolation = Connection.DEFAULT_TRANSACTION_ISOLATION_LEVEL;
886     queryExecutedInThisTransaction = false;
887     hasSavepoint = 0;
888     sendToDriver(true);
889   }
890
891   private void cleanup()
892   {
893     // Do the cleanup
894
if (transactionStarted && !transactionHasAborted)
895     {
896       /*
897        * We need to abort the begin to cleanup the metadata associated with the
898        * started transaction.
899        */

900       if (logger.isDebugEnabled())
901         logger.debug("Aborting transaction " + currentTid);
902       try
903       {
904         vdb.abort(currentTid, writeQueryExecutedInThisTransaction, true);
905       }
906       catch (Exception JavaDoc e)
907       {
908         if (logger.isDebugEnabled())
909           logger.debug("Error while aborting transaction " + currentTid + "("
910               + e + ")", e);
911       }
912     }
913   }
914
915   private void restoreConnectionState() throws IOException JavaDoc, SQLException JavaDoc
916   {
917     if (logger.isDebugEnabled())
918       logger.debug("RestoreConnectionState command");
919
920     // Re-connect has opened a new persistent connection that will not be used
921
if (persistentConnection)
922     {
923       vdb.closePersistentConnection(login, persistentConnectionId);
924     }
925
926     writeQueryExecutedInThisTransaction = in.readBoolean();
927     // We receive autocommit from driver
928
transactionStarted = !in.readBoolean();
929     if (transactionStarted)
930       currentTid = in.readLong();
931     persistentConnection = in.readBoolean();
932     if (persistentConnection)
933       persistentConnectionId = in.readLong();
934
935     // Restore the persistent connection first (if any) before trying to perform
936
// any operation on the transaction
937
if (persistentConnection)
938     {
939       if (!vdb.hasPersistentConnection(persistentConnectionId))
940         vdb.failoverForPersistentConnection(persistentConnectionId);
941     }
942
943     retrieveSQLWarnings = in.readBoolean();
944
945     // Acknowledge driver
946
out.writeBoolean(true);
947     out.flush();
948
949     if (transactionStarted)
950     {
951       try
952       {
953         // Check if the transaction exists here
954
vdb.requestManager.getTransactionMetaData(new Long JavaDoc(currentTid));
955         // Only notify failover if we have the transaction in our context
956
vdb.failoverForTransaction(currentTid);
957         /*
958          * Transaction is started on this controller... it was either a
959          * transaction that contained write statements, either a read-only
960          * transaction with broadcasted statements, so we force
961          * writeQueryExecutedInThisTransaction to true.
962          */

963         writeQueryExecutedInThisTransaction = true;
964       }
965       catch (SQLException JavaDoc e)
966       {
967         /*
968          * Transaction has not been found because it either already
969          * committed/rollbacked or it was not started (no request played so far
970          * in the transaction or just read queries on the controller that has
971          * failed). Check first if we can find a trace of commit/rollback in the
972          * recovery log and if not start the transaction now. This is needed
973          * only if it was a write transaction or a transaction with broadcasted
974          * read requests.
975          */

976         RecoveryLog recoveryLog = vdb.getRequestManager().getRecoveryLog();
977         if (writeQueryExecutedInThisTransaction)
978         {
979           if (!recoveryLog.findCommitForTransaction(currentTid)
980               && !recoveryLog.findRollbackForTransaction(currentTid))
981           {
982             vdb.requestManager.doBegin(login, currentTid, persistentConnection,
983                 persistentConnectionId);
984           }
985         }
986         else
987         {
988           vdb.requestManager.doBegin(login, currentTid, persistentConnection,
989               persistentConnectionId);
990           writeQueryExecutedInThisTransaction = true;
991         }
992       }
993     }
994   }
995
996   //
997
// Catalog
998
//
999

1000  private void connectionSetCatalog() throws IOException JavaDoc
1001  {
1002    // Warning! This could bypass the security checkings based on client IP
1003
// address. If a user has access to a virtual database, through setCatalog()
1004
// is will be able to access all other virtual databases where his
1005
// login/password is valid regardless of the IP filtering settings.
1006
if (logger.isDebugEnabled())
1007      logger.debug("ConnectionSetCatalog command");
1008    String JavaDoc catalog = in.readLongUTF();
1009    boolean change = controller.hasVirtualDatabase(catalog);
1010    if (change)
1011    {
1012      VirtualDatabase tempvdb = controller.getVirtualDatabase(catalog);
1013      if (!tempvdb.getAuthenticationManager().isValidVirtualUser(user))
1014        sendToDriver(new SQLException JavaDoc(
1015            "User authentication has failed for asked catalog. No change"));
1016      else
1017      {
1018        this.vdb = tempvdb;
1019        sendToDriver(true);
1020      }
1021    }
1022    else
1023      sendToDriver(false);
1024
1025  }
1026
1027  private void connectionGetCatalog() throws IOException JavaDoc
1028  {
1029    if (logger.isDebugEnabled())
1030      logger.debug("ConnectionGetCatalog command");
1031
1032    sendToDriver(vdb.getVirtualDatabaseName());
1033  }
1034
1035  private void connectionGetCatalogs() throws IOException JavaDoc
1036  {
1037    if (logger.isDebugEnabled())
1038      logger.debug("ConnectionGetCatalogs command");
1039    ArrayList JavaDoc list = controller.getVirtualDatabaseNames();
1040    sendToDriver(vdb.getDynamicMetaData().getCatalogs(list));
1041  }
1042
1043  private void connectionSetTransactionIsolation() throws IOException JavaDoc
1044  {
1045    int level = in.readInt();
1046    if (logger.isDebugEnabled())
1047      logger.debug("SetTransactionIsolation command (level=" + level + ")");
1048
1049    // Check that we are not in a running transaction
1050
if (transactionStarted && queryExecutedInThisTransaction)
1051    {
1052      sendToDriver(new SQLException JavaDoc(
1053          "Cannot change the transaction isolation in a running transaction"));
1054      return;
1055    }
1056
1057    MetadataContainer metadataContainer = vdb.getStaticMetaData()
1058        .getMetadataContainer();
1059    if (metadataContainer != null)
1060    {
1061      Object JavaDoc value = metadataContainer.get(MetadataContainer.getContainerKey(
1062          MetadataDescription.SUPPORTS_TRANSACTION_ISOLATION_LEVEL,
1063          new Class JavaDoc[]{Integer.TYPE}, new Object JavaDoc[]{new Integer JavaDoc(level)}));
1064
1065      if (value != null)
1066      {
1067        if (!((Boolean JavaDoc) value).booleanValue())
1068        {
1069          sendToDriver(new SQLException JavaDoc("Transaction isolation level " + level
1070              + " is not supported by the database"));
1071          return;
1072        }
1073      }
1074      else
1075        logger.warn("Unable to check validity of transaction isolation level "
1076            + level);
1077    }
1078    else
1079      logger.warn("Unable to check validity of transaction isolation level "
1080          + level);
1081    transactionIsolation = level;
1082    sendToDriver(true);
1083  }
1084
1085  private void connectionSetReadOnly() throws IOException JavaDoc
1086  {
1087    isReadOnly = in.readBoolean();
1088    if (logger.isDebugEnabled())
1089      logger.debug("SetReadOnly command (value=" + true + ")");
1090
1091    sendToDriver(true);
1092  }
1093
1094  private void connectionGetWarnings() throws IOException JavaDoc
1095  {
1096    long persistentConnId = in.readLong();
1097    try
1098    {
1099      sendToDriver(vdb.getConnectionWarnings(persistentConnId));
1100    }
1101    catch (SQLException JavaDoc e)
1102    {
1103      sendToDriver(e);
1104    }
1105  }
1106
1107  private void connectionClearWarnings() throws IOException JavaDoc
1108  {
1109    long persistentConnId = in.readLong();
1110    try
1111    {
1112      vdb.clearConnectionWarnings(persistentConnId);
1113      sendToDriver(true);
1114    }
1115    catch (SQLException JavaDoc e)
1116    {
1117      sendToDriver(e);
1118    }
1119  }
1120
1121  //
1122
// Database MetaData
1123
//
1124

1125  /**
1126   * @see java.sql.DatabaseMetaData#getAttributes(java.lang.String,
1127   * java.lang.String, java.lang.String, java.lang.String)
1128   */

1129  private void databaseMetaDataGetAttributes() throws IOException JavaDoc
1130  {
1131    if (logger.isDebugEnabled())
1132      logger.debug("DatabaseMetaDataGetAttributes command");
1133    String JavaDoc catalog = in.readLongUTF();
1134    String JavaDoc schemaPattern = in.readLongUTF();
1135    String JavaDoc typeNamePattern = in.readLongUTF();
1136    String JavaDoc attributeNamePattern = in.readLongUTF();
1137
1138    try
1139    {
1140      sendToDriver(vdb.getDynamicMetaData().getAttributes(
1141          new ConnectionContext(login, transactionStarted, currentTid,
1142              persistentConnection, persistentConnectionId), catalog,
1143          schemaPattern, typeNamePattern, attributeNamePattern));
1144    }
1145    catch (SQLException JavaDoc e)
1146    {
1147      if (logger.isWarnEnabled())
1148        logger.warn("Error while calling databaseMetaDataGetAttributes", e);
1149      sendToDriver(e);
1150    }
1151  }
1152
1153  /**
1154   * @see java.sql.DatabaseMetaData#getBestRowIdentifier(java.lang.String,
1155   * java.lang.String, java.lang.String, int, boolean)
1156   */

1157  private void databaseMetaDataGetBestRowIdentifier() throws IOException JavaDoc
1158  {
1159    if (logger.isDebugEnabled())
1160      logger.debug("DatabaseMetaDataGetBestRowIdentifier command");
1161
1162    String JavaDoc catalog = in.readLongUTF();
1163    String JavaDoc schema = in.readLongUTF();
1164    String JavaDoc table = in.readLongUTF();
1165    int scope = in.readInt();
1166    boolean nullable = in.readBoolean();
1167
1168    try
1169    {
1170      sendToDriver(vdb.getDynamicMetaData().getBestRowIdentifier(
1171          new ConnectionContext(login, transactionStarted, currentTid,
1172              persistentConnection, persistentConnectionId), catalog, schema,
1173          table, scope, nullable));
1174    }
1175    catch (SQLException JavaDoc e)
1176    {
1177      if (logger.isWarnEnabled())
1178        logger.warn("Error while calling databaseMetaDataGetBestRowIdentifier",
1179            e);
1180      sendToDriver(e);
1181    }
1182  }
1183
1184  /**
1185   * @see java.sql.DatabaseMetaData#getColumnPrivileges(java.lang.String,
1186   * java.lang.String, java.lang.String, java.lang.String)
1187   */

1188  private void databaseMetaDataGetColumnPrivileges() throws IOException JavaDoc
1189  {
1190    if (logger.isDebugEnabled())
1191      logger.debug("DatabaseMetaDataGetColumnPrivileges command");
1192
1193    String JavaDoc catalog = in.readLongUTF();
1194    String JavaDoc schema = in.readLongUTF();
1195    String JavaDoc table = in.readLongUTF();
1196    String JavaDoc columnNamePattern = in.readLongUTF();
1197
1198    try
1199    {
1200      sendToDriver(vdb.getDynamicMetaData().getColumnPrivileges(
1201          new ConnectionContext(login, transactionStarted, currentTid,
1202              persistentConnection, persistentConnectionId), catalog, schema,
1203          table, columnNamePattern));
1204    }
1205    catch (SQLException JavaDoc e)
1206    {
1207      if (logger.isWarnEnabled())
1208        logger.warn("Error while calling databaseMetaDataGetColumnPrivileges",
1209            e);
1210      sendToDriver(e);
1211    }
1212  }
1213
1214  /**
1215   * @see java.sql.DatabaseMetaData#getColumns(java.lang.String,
1216   * java.lang.String, java.lang.String, java.lang.String)
1217   */

1218  private void databaseMetaDataGetColumns() throws IOException JavaDoc
1219  {
1220    if (logger.isDebugEnabled())
1221      logger.debug("DatabaseMetaDataGetColumns command");
1222    String JavaDoc ccatalog = in.readLongUTF();
1223    String JavaDoc cschemaPattern = in.readLongUTF();
1224    String JavaDoc ctableNamePattern = in.readLongUTF();
1225    String JavaDoc ccolumnNamePattern = in.readLongUTF();
1226
1227    try
1228    {
1229      sendToDriver(vdb.getDynamicMetaData().getColumns(
1230          new ConnectionContext(login, transactionStarted, currentTid,
1231              persistentConnection, persistentConnectionId), ccatalog,
1232          cschemaPattern, ctableNamePattern, ccolumnNamePattern));
1233    }
1234    catch (SQLException JavaDoc e)
1235    {
1236      if (logger.isWarnEnabled())
1237        logger.warn("Error while calling databaseMetaDataGetColumns", e);
1238      sendToDriver(e);
1239    }
1240  }
1241
1242  /**
1243   * @see java.sql.DatabaseMetaData#getCrossReference(java.lang.String,
1244   * java.lang.String, java.lang.String, java.lang.String,
1245   * java.lang.String, java.lang.String)
1246   */

1247  private void databaseMetaDataGetCrossReference() throws IOException JavaDoc
1248  {
1249    if (logger.isDebugEnabled())
1250      logger.debug("DatabaseMetaDataGetCrossReference command");
1251
1252    String JavaDoc primaryCatalog = in.readLongUTF();
1253    String JavaDoc primarySchema = in.readLongUTF();
1254    String JavaDoc primaryTable = in.readLongUTF();
1255    String JavaDoc foreignCatalog = in.readLongUTF();
1256    String JavaDoc foreignSchema = in.readLongUTF();
1257    String JavaDoc foreignTable = in.readLongUTF();
1258
1259    try
1260    {
1261      sendToDriver(vdb.getDynamicMetaData().getCrossReference(
1262          new ConnectionContext(login, transactionStarted, currentTid,
1263              persistentConnection, persistentConnectionId), primaryCatalog,
1264          primarySchema, primaryTable, foreignCatalog, foreignSchema,
1265          foreignTable));
1266    }
1267    catch (SQLException JavaDoc e)
1268    {
1269      if (logger.isWarnEnabled())
1270        logger.warn("Error while calling databaseMetaDataGetCrossReference", e);
1271      sendToDriver(e);
1272    }
1273  }
1274
1275  /**
1276   * @see java.sql.DatabaseMetaData#getDatabaseProductName()
1277   */

1278  private void databaseMetaDataGetDatabaseProductName() throws IOException JavaDoc
1279  {
1280    if (logger.isDebugEnabled())
1281      logger.debug("GetDatabaseProductName command");
1282
1283    sendToDriver(vdb.getDatabaseProductName());
1284  }
1285
1286  /**
1287   * @see java.sql.DatabaseMetaData#getExportedKeys(java.lang.String,
1288   * java.lang.String, java.lang.String)
1289   */

1290  private void databaseMetaDataGetExportedKeys() throws IOException JavaDoc
1291  {
1292    if (logger.isDebugEnabled())
1293      logger.debug("DatabaseMetaDataGetExportedKeys command");
1294
1295    String JavaDoc catalog = in.readLongUTF();
1296    String JavaDoc schema = in.readLongUTF();
1297    String JavaDoc table = in.readLongUTF();
1298
1299    try
1300    {
1301      sendToDriver(vdb.getDynamicMetaData().getExportedKeys(
1302          new ConnectionContext(login, transactionStarted, currentTid,
1303              persistentConnection, persistentConnectionId), catalog, schema,
1304          table));
1305    }
1306    catch (SQLException JavaDoc e)
1307    {
1308      if (logger.isWarnEnabled())
1309        logger.warn("Error while calling databaseMetaDataGetExportedKeys", e);
1310      sendToDriver(e);
1311    }
1312  }
1313
1314  /**
1315   * @see java.sql.DatabaseMetaData#getImportedKeys(java.lang.String,
1316   * java.lang.String, java.lang.String)
1317   */

1318  private void databaseMetaDataGetImportedKeys() throws IOException JavaDoc
1319  {
1320    if (logger.isDebugEnabled())
1321      logger.debug("DatabaseMetaDataGetImportedKeys command");
1322
1323    String JavaDoc catalog = in.readLongUTF();
1324    String JavaDoc schema = in.readLongUTF();
1325    String JavaDoc table = in.readLongUTF();
1326
1327    try
1328    {
1329      sendToDriver(vdb.getDynamicMetaData().getImportedKeys(
1330          new ConnectionContext(login, transactionStarted, currentTid,
1331              persistentConnection, persistentConnectionId), catalog, schema,
1332          table));
1333    }
1334    catch (SQLException JavaDoc e)
1335    {
1336      if (logger.isWarnEnabled())
1337        logger.warn("Error while calling databaseMetaDataGetImportedKeys", e);
1338      sendToDriver(e);
1339    }
1340  }
1341
1342  /**
1343   * @see java.sql.DatabaseMetaData#getIndexInfo(java.lang.String,
1344   * java.lang.String, java.lang.String, boolean, boolean)
1345   */

1346  private void databaseMetaDataGetIndexInfo() throws IOException JavaDoc
1347  {
1348    if (logger.isDebugEnabled())
1349      logger.debug("databaseMetaDataGetIndexInfo command");
1350
1351    String JavaDoc catalog = in.readLongUTF();
1352    String JavaDoc schema = in.readLongUTF();
1353    String JavaDoc table = in.readLongUTF();
1354    boolean unique = in.readBoolean();
1355    boolean approximate = in.readBoolean();
1356
1357    try
1358    {
1359      sendToDriver(vdb.getDynamicMetaData().getIndexInfo(
1360          new ConnectionContext(login, transactionStarted, currentTid,
1361              persistentConnection, persistentConnectionId), catalog, schema,
1362          table, unique, approximate));
1363    }
1364    catch (SQLException JavaDoc e)
1365    {
1366      if (logger.isWarnEnabled())
1367        logger.warn("Error while calling databaseMetaDataGetIndexInfo", e);
1368      sendToDriver(e);
1369    }
1370  }
1371
1372  /**
1373   * @see java.sql.DatabaseMetaData#getPrimaryKeys(java.lang.String,
1374   * java.lang.String, java.lang.String)
1375   */

1376  private void databaseMetaDataGetPrimaryKeys() throws IOException JavaDoc
1377  {
1378    if (logger.isDebugEnabled())
1379      logger.debug("DatabaseMetaDataGetPrimaryKeys command");
1380
1381    String JavaDoc pcatalog = in.readLongUTF();
1382    String JavaDoc pschemaPattern = in.readLongUTF();
1383    String JavaDoc ptableNamePattern = in.readLongUTF();
1384
1385    try
1386    {
1387      sendToDriver(vdb.getDynamicMetaData().getPrimaryKeys(
1388          new ConnectionContext(login, transactionStarted, currentTid,
1389              persistentConnection, persistentConnectionId), pcatalog,
1390          pschemaPattern, ptableNamePattern));
1391    }
1392    catch (SQLException JavaDoc e)
1393    {
1394      if (logger.isWarnEnabled())
1395        logger.warn("Error while calling databaseMetaDataGetPrimaryKeys", e);
1396      sendToDriver(e);
1397    }
1398  }
1399
1400  /**
1401   * @see java.sql.DatabaseMetaData#getProcedureColumns(java.lang.String,
1402   * java.lang.String, java.lang.String, java.lang.String)
1403   */

1404  private void databaseMetaDataGetProcedureColumns() throws IOException JavaDoc
1405  {
1406    if (logger.isDebugEnabled())
1407      logger.debug("DatabaseMetaDataGetProcedureColumns command");
1408
1409    String JavaDoc pccatalog = in.readLongUTF();
1410    String JavaDoc pcschemaPattern = in.readLongUTF();
1411    String JavaDoc pcprocedureNamePattern = in.readLongUTF();
1412    String JavaDoc pccolumnNamePattern = in.readLongUTF();
1413
1414    try
1415    {
1416      sendToDriver(vdb.getDynamicMetaData().getProcedureColumns(
1417          new ConnectionContext(login, transactionStarted, currentTid,
1418              persistentConnection, persistentConnectionId), pccatalog,
1419          pcschemaPattern, pcprocedureNamePattern, pccolumnNamePattern));
1420    }
1421    catch (SQLException JavaDoc e)
1422    {
1423      if (logger.isWarnEnabled())
1424        logger.warn("Error while calling databaseMetaDataGetProcedureColumns",
1425            e);
1426      sendToDriver(e);
1427    }
1428  }
1429
1430  /**
1431   * @see java.sql.DatabaseMetaData#getProcedures(java.lang.String,
1432   * java.lang.String, java.lang.String)
1433   */

1434  private void databaseMetaDataGetProcedures() throws IOException JavaDoc
1435  {
1436    if (logger.isDebugEnabled())
1437      logger.debug("DatabaseMetaDataGetProcedures command");
1438
1439    String JavaDoc rcatalog = in.readLongUTF();
1440    String JavaDoc rschemaPattern = in.readLongUTF();
1441    String JavaDoc procedureNamePattern = in.readLongUTF();
1442
1443    try
1444    {
1445      sendToDriver(vdb.getDynamicMetaData().getProcedures(
1446          new ConnectionContext(login, transactionStarted, currentTid,
1447              persistentConnection, persistentConnectionId), rcatalog,
1448          rschemaPattern, procedureNamePattern));
1449    }
1450    catch (SQLException JavaDoc e)
1451    {
1452      if (logger.isWarnEnabled())
1453        logger.warn("Error while calling databaseMetaDataGetProcedures", e);
1454      sendToDriver(e);
1455    }
1456  }
1457
1458  /**
1459   * @see java.sql.DatabaseMetaData#getSchemas()
1460   */

1461  private void databaseMetaDataGetSchemas() throws IOException JavaDoc
1462  {
1463    if (logger.isDebugEnabled())
1464      logger.debug("DatabaseMetaDataGetSchemas Types command");
1465
1466    try
1467    {
1468      sendToDriver(vdb.getDynamicMetaData().getSchemas(
1469          new ConnectionContext(login, transactionStarted, currentTid,
1470              persistentConnection, persistentConnectionId)));
1471    }
1472    catch (SQLException JavaDoc e)
1473    {
1474      if (logger.isWarnEnabled())
1475        logger.warn("Error while calling databaseMetaDataGetSchemas", e);
1476      sendToDriver(e);
1477    }
1478  }
1479
1480  /**
1481   * @see java.sql.DatabaseMetaData#getSuperTables(java.lang.String,
1482   * java.lang.String, java.lang.String)
1483   */

1484  private void databaseMetaDataGetSuperTables() throws IOException JavaDoc
1485  {
1486    if (logger.isDebugEnabled())
1487      logger.debug("DatabaseMetaDataGetSuperTables command");
1488
1489    String JavaDoc catalog = in.readLongUTF();
1490    String JavaDoc schemaPattern = in.readLongUTF();
1491    String JavaDoc tableNamePattern = in.readLongUTF();
1492
1493    try
1494    {
1495      sendToDriver(vdb.getDynamicMetaData().getSuperTables(
1496          new ConnectionContext(login, transactionStarted, currentTid,
1497              persistentConnection, persistentConnectionId), catalog,
1498          schemaPattern, tableNamePattern));
1499    }
1500    catch (SQLException JavaDoc e)
1501    {
1502      if (logger.isWarnEnabled())
1503        logger.warn("Error while calling databaseMetaDataGetSuperTables", e);
1504      sendToDriver(e);
1505    }
1506  }
1507
1508  /**
1509   * @see java.sql.DatabaseMetaData#getSuperTypes(java.lang.String,
1510   * java.lang.String, java.lang.String)
1511   */

1512  private void databaseMetaDataGetSuperTypes() throws IOException JavaDoc
1513  {
1514    if (logger.isDebugEnabled())
1515      logger.debug("DatabaseMetaDataGetSuperTables command");
1516
1517    String JavaDoc catalog = in.readLongUTF();
1518    String JavaDoc schemaPattern = in.readLongUTF();
1519    String JavaDoc tableNamePattern = in.readLongUTF();
1520
1521    try
1522    {
1523      sendToDriver(vdb.getDynamicMetaData().getSuperTypes(
1524          new ConnectionContext(login, transactionStarted, currentTid,
1525              persistentConnection, persistentConnectionId), catalog,
1526          schemaPattern, tableNamePattern));
1527    }
1528    catch (SQLException JavaDoc e)
1529    {
1530      if (logger.isWarnEnabled())
1531        logger.warn("Error while calling databaseMetaDataGetSuperTypes", e);
1532      sendToDriver(e);
1533    }
1534  }
1535
1536  /**
1537   * @see java.sql.DatabaseMetaData#getTablePrivileges(java.lang.String,
1538   * java.lang.String, java.lang.String)
1539   */

1540  private void databaseMetaDataGetTablePrivileges() throws IOException JavaDoc
1541  {
1542    if (logger.isDebugEnabled())
1543      logger.debug("DatabaseMetaDataGetTablePrivileges command");
1544
1545    String JavaDoc tpcatalog = in.readLongUTF();
1546    String JavaDoc tpschemaPattern = in.readLongUTF();
1547    String JavaDoc tptablePattern = in.readLongUTF();
1548
1549    try
1550    {
1551      sendToDriver(vdb.getDynamicMetaData().getTablePrivileges(
1552          new ConnectionContext(login, transactionStarted, currentTid,
1553              persistentConnection, persistentConnectionId), tpcatalog,
1554          tpschemaPattern, tptablePattern));
1555    }
1556    catch (SQLException JavaDoc e)
1557    {
1558      if (logger.isWarnEnabled())
1559        logger
1560            .warn("Error while calling databaseMetaDataGetTablePrivileges", e);
1561      sendToDriver(e);
1562    }
1563  }
1564
1565  /**
1566   * @see java.sql.DatabaseMetaData#getTables(java.lang.String,
1567   * java.lang.String, java.lang.String, java.lang.String[])
1568   */

1569  private void databaseMetaDataGetTables() throws IOException JavaDoc
1570  {
1571    if (logger.isDebugEnabled())
1572      logger.debug("DatabaseMetaDataGetTables command");
1573
1574    String JavaDoc tcatalog = in.readLongUTF();
1575    String JavaDoc tschemaPattern = in.readLongUTF();
1576    String JavaDoc ttableNamePattern = in.readLongUTF();
1577
1578    String JavaDoc[] ttypes = null;
1579    if (in.readBoolean())
1580    {
1581      int size = in.readInt();
1582      ttypes = new String JavaDoc[size];
1583      for (int i = 0; i < size; i++)
1584        ttypes[i] = in.readLongUTF();
1585    }
1586
1587    try
1588    {
1589      sendToDriver(vdb.getDynamicMetaData().getTables(
1590          new ConnectionContext(login, transactionStarted, currentTid,
1591              persistentConnection, persistentConnectionId), tcatalog,
1592          tschemaPattern, ttableNamePattern, ttypes));
1593    }
1594    catch (SQLException JavaDoc e)
1595    {
1596      if (logger.isWarnEnabled())
1597        logger.warn("Error while calling databaseMetaDataGetTables", e);
1598      sendToDriver(e);
1599    }
1600  }
1601
1602  /**
1603   * @see java.sql.DatabaseMetaData#getTableTypes()
1604   */

1605  private void databaseMetaDataGetTableTypes() throws IOException JavaDoc
1606  {
1607    if (logger.isDebugEnabled())
1608      logger.debug("DatabaseMetaDataGetTableTypes command");
1609
1610    try
1611    {
1612      sendToDriver(vdb.getDynamicMetaData().getTableTypes(
1613          new ConnectionContext(login, transactionStarted, currentTid,
1614              persistentConnection, persistentConnectionId)));
1615    }
1616    catch (SQLException JavaDoc e)
1617    {
1618      if (logger.isWarnEnabled())
1619        logger.warn("Error while calling databaseMetaDataGetTableTypes", e);
1620      sendToDriver(e);
1621    }
1622  }
1623
1624  /**
1625   * @see java.sql.DatabaseMetaData#getTypeInfo()
1626   */

1627  private void databaseMetaDataGetTypeInfo() throws IOException JavaDoc
1628  {
1629    if (logger.isDebugEnabled())
1630      logger.debug("DatabaseMetaDataGetTypeInfo command");
1631
1632    try
1633    {
1634      sendToDriver(vdb.getDynamicMetaData().getTypeInfo(
1635          new ConnectionContext(login, transactionStarted, currentTid,
1636              persistentConnection, persistentConnectionId)));
1637    }
1638    catch (SQLException JavaDoc e)
1639    {
1640      if (logger.isWarnEnabled())
1641        logger.warn("Error while calling databaseMetaDataGetTypeInfo", e);
1642      sendToDriver(e);
1643    }
1644  }
1645
1646  /**
1647   * @see java.sql.DatabaseMetaData#getUDTs(java.lang.String, java.lang.String,
1648   * java.lang.String, int[])
1649   */

1650  private void databaseMetaDataGetUDTs() throws IOException JavaDoc
1651  {
1652    if (logger.isDebugEnabled())
1653      logger.debug("DatabaseMetaDataGetUDTs command");
1654
1655    String JavaDoc catalog = in.readLongUTF();
1656    String JavaDoc schemaPattern = in.readLongUTF();
1657    String JavaDoc tableNamePattern = in.readLongUTF();
1658
1659    int[] types = null;
1660    if (in.readBoolean())
1661    {
1662      int size = in.readInt();
1663      types = new int[size];
1664      for (int i = 0; i < size; i++)
1665        types[i] = in.readInt();
1666    }
1667
1668    try
1669    {
1670      sendToDriver(vdb.getDynamicMetaData().getUDTs(
1671          new ConnectionContext(login, transactionStarted, currentTid,
1672              persistentConnection, persistentConnectionId), catalog,
1673          schemaPattern, tableNamePattern, types));
1674    }
1675    catch (SQLException JavaDoc e)
1676    {
1677      if (logger.isWarnEnabled())
1678        logger.warn("Error while calling databaseMetaDataGetUDTs", e);
1679      sendToDriver(e);
1680    }
1681  }
1682
1683  /**
1684   * @see java.sql.DatabaseMetaData#getVersionColumns(java.lang.String,
1685   * java.lang.String, java.lang.String)
1686   */

1687  private void databaseMetaDataGetVersionColumns() throws IOException JavaDoc
1688  {
1689    if (logger.isDebugEnabled())
1690      logger.debug("DatabaseMetaDataGetVersionColumns command");
1691
1692    String JavaDoc catalog = in.readLongUTF();
1693    String JavaDoc schema = in.readLongUTF();
1694    String JavaDoc table = in.readLongUTF();
1695
1696    try
1697    {
1698      sendToDriver(vdb.getDynamicMetaData().getVersionColumns(
1699          new ConnectionContext(login, transactionStarted, currentTid,
1700              persistentConnection, persistentConnectionId), catalog, schema,
1701          table));
1702    }
1703    catch (SQLException JavaDoc e)
1704    {
1705      if (logger.isWarnEnabled())
1706        logger.warn("Error while calling databaseMetaDataGetVersionColumns", e);
1707      sendToDriver(e);
1708    }
1709  }
1710
1711  /**
1712   * Get the static metadata key from the socket and return the corresponding
1713   * metadata.
1714   *
1715   * @throws IOException if an IO error occurs
1716   * @throws NotImplementedException if the underlying metadata access method is
1717   * not implemented
1718   */

1719  private void databaseStaticMetadata() throws IOException JavaDoc,
1720      NotImplementedException
1721  {
1722    // the "getXXX(Y,Z,...)" hash key of the metadata
1723
// query called by the client using the driver.
1724
String JavaDoc key = in.readLongUTF();
1725    if (logger.isDebugEnabled())
1726      logger.debug("DatabaseStaticMetadata command for " + key);
1727    MetadataContainer container = vdb.getStaticMetaData()
1728        .getMetadataContainer();
1729    if (container == null) // no metadata has been gathered yet from backends
1730
{
1731      String JavaDoc msg = "No metadata is available probably because no backend is enabled on that controller.";
1732      logger.info(msg);
1733      sendToDriver(new SQLException JavaDoc(msg));
1734    }
1735    else
1736    {
1737      /**
1738       * To get an exhaustive list of all the types of java objects stored in
1739       * this hash table, search for all callers of
1740       * {@link org.continuent.sequoia.driver.DatabaseMetaData#getMetadata(String, Class[], Object[], boolean)}
1741       * and see also
1742       * {@link org.continuent.sequoia.controller.backend.DatabaseBackendMetaData#retrieveDatabaseMetadata()}
1743       * At this time it's limited to the following types: String, int and
1744       * boolean. boolean is the most frequent.
1745       */

1746      /*
1747       * Since we don't expect that any of these metadata methods will ever
1748       * return a non- java.sql.Types, we re-use here the serialization
1749       * implemented for SQL Data/ResultSets elements.
1750       */

1751
1752      SQLDataSerialization.Serializer serializer;
1753      Object JavaDoc result = container.get(key);
1754
1755      try
1756      {
1757        serializer = SQLDataSerialization.getSerializer(result);
1758        // TODO: clean-up this.
1759
if (serializer.isUndefined()) // <=> result == null
1760
throw new NotImplementedException();
1761      }
1762      catch (NotImplementedException innerEx)
1763      { // Should we just print a warning in case result == null ?
1764
// This should never happen with decent drivers.
1765
String JavaDoc msg;
1766        if (null == result)
1767          msg = " returned a null object.";
1768        else
1769          msg = " returned an object of an unsupported java type:"
1770              + result.getClass().getName() + ".";
1771
1772        NotImplementedException outerEx = new NotImplementedException(
1773            "Backend driver method " + key + msg);
1774        outerEx.initCause(innerEx);
1775        throw outerEx;
1776      }
1777
1778      TypeTag.NOT_EXCEPTION.sendToStream(out);
1779      serializer.getTypeTag().sendToStream(out);
1780      serializer.sendToStream(result, out);
1781    }
1782
1783    out.flush();
1784  }
1785
1786  private void preparedStatementGetMetaData() throws IOException JavaDoc
1787  {
1788    if (logger.isDebugEnabled())
1789      logger.debug("PreparedStatementGetMetaData command");
1790
1791    String JavaDoc sqlTemplate = in.readLongUTF();
1792
1793    try
1794    {
1795      AbstractRequest request = new UnknownWriteRequest(sqlTemplate, false, 0,
1796          "");
1797      request.setIsAutoCommit(!transactionStarted);
1798      setRequestParametersAndTransactionStarted(request);
1799      sendToDriver(vdb.getPreparedStatementGetMetaData(request));
1800    }
1801    catch (SQLException JavaDoc e)
1802    {
1803      if (logger.isWarnEnabled())
1804        logger.warn("Error while calling databaseMetaDataGetVersionColumns", e);
1805      sendToDriver(e);
1806    }
1807  }
1808
1809  private void getControllerVersionNumber() throws IOException JavaDoc
1810  {
1811    if (logger.isDebugEnabled())
1812      logger.debug("GetControllerVersionNumber command");
1813
1814    sendToDriver(Constants.VERSION);
1815  }
1816
1817  private void getVirtualDatabaseName() throws IOException JavaDoc
1818  {
1819    if (logger.isDebugEnabled())
1820      logger.debug("GetVirtualDatabaseName command");
1821
1822    sendToDriver(vdb.getDatabaseName());
1823  }
1824
1825  //
1826
// Transaction management
1827
//
1828

1829  /**
1830   * Check that we did not get a concurrent abort due to deadlock detection.
1831   *
1832   * @param request request that was executing
1833   * @throws SQLException if a concurrent abort has been detected
1834   */

1835  private void checkForConcurrentAbort(AbstractRequest request)
1836      throws SQLException JavaDoc
1837  {
1838    if (transactionStarted)
1839    {
1840      //
1841
synchronized (this)
1842      {
1843        if (transactionHasAborted)
1844        {
1845          /*
1846           * If the transaction was aborted before we execute we would never
1847           * have reached this point and vdb.execWriteRequest(write) would have
1848           * thrown a SQLException. Now we have to force a rollback because we
1849           * have probably lazily re-started the transaction and that has to be
1850           * cleaned up.
1851           */

1852          vdb.rollback(currentTid, writeQueryExecutedInThisTransaction);
1853          throw new SQLException JavaDoc("Transaction " + currentTid
1854              + " aborted, request " + request + "failed.");
1855        }
1856      }
1857    }
1858  }
1859
1860  /**
1861   * Commit the current transaction and reset the transaction state. If
1862   * sendTransactionId is true, the current transaction id is send back to the
1863   * driver else 'true' is sent back. See SEQUOIA-703.
1864   *
1865   * @throws SQLException if an error occurs at commit time
1866   * @throws IOException if an error occurs when sending the value to the driver
1867   */

1868  private void commit() throws SQLException JavaDoc, IOException JavaDoc
1869  {
1870    if (logger.isDebugEnabled())
1871      logger.debug("Commit command");
1872
1873    if (!transactionHasAborted)
1874      vdb.commit(currentTid, writeQueryExecutedInThisTransaction,
1875          !queryExecutedInThisTransaction);
1876    else if (logger.isWarnEnabled())
1877    {
1878      logger.warn("Transaction " + currentTid + " was aborted by database");
1879    }
1880
1881    // acknowledged the commit (even if transaction is aborted)
1882
sendToDriver(currentTid);
1883
1884    resetTransactionState();
1885  }
1886
1887  private void begin() throws SQLException JavaDoc, IOException JavaDoc
1888  {
1889    if (logger.isDebugEnabled())
1890      logger.debug("Begin command");
1891
1892    currentTid = vdb.begin(login, persistentConnection, persistentConnectionId);
1893    sendToDriver(currentTid);
1894
1895    transactionStarted = true;
1896    transactionHasAborted = false;
1897    queryExecutedInThisTransaction = false;
1898    writeQueryExecutedInThisTransaction = false;
1899    hasSavepoint = 0;
1900  }
1901
1902  /*
1903   * reset transaction State, begin will be initiated by driver
1904   */

1905  private void resetTransactionState()
1906  {
1907    currentTid = 0;
1908    transactionStarted = false;
1909    transactionHasAborted = false;
1910    queryExecutedInThisTransaction = false;
1911    writeQueryExecutedInThisTransaction = false;
1912    hasSavepoint = 0;
1913  }
1914
1915  private void rollback() throws SQLException JavaDoc, IOException JavaDoc
1916  {
1917    if (logger.isDebugEnabled())
1918      logger.debug("Rollback command");
1919
1920    if (!transactionHasAborted)
1921      vdb.rollback(currentTid, writeQueryExecutedInThisTransaction);
1922    else if (logger.isWarnEnabled())
1923    {
1924      logger.warn("Transaction " + currentTid + " was aborted by database");
1925    }
1926
1927    // acknowledged the rollback (even if transaction is aborted)
1928
sendToDriver(currentTid);
1929
1930    resetTransactionState();
1931  }
1932
1933  private void setNamedSavepoint() throws SQLException JavaDoc, IOException JavaDoc
1934  {
1935    if (logger.isDebugEnabled())
1936      logger.debug("Set named savepoint command");
1937
1938    String JavaDoc savepointName = in.readLongUTF();
1939
1940    // Check if this is not a duplicate savepoints
1941
if (vdb.getRequestManager().hasSavepoint(new Long JavaDoc(currentTid),
1942        savepointName))
1943      throw new SQLException JavaDoc("A savepoint named " + savepointName
1944          + " already exists for transaction " + currentTid);
1945
1946    vdb.setSavepoint(currentTid, savepointName);
1947    writeQueryExecutedInThisTransaction = true;
1948    hasSavepoint++;
1949    sendToDriver(true);
1950  }
1951
1952  private void setUnnamedSavepoint() throws SQLException JavaDoc, IOException JavaDoc
1953  {
1954    if (logger.isDebugEnabled())
1955      logger.debug("Set unnamed savepoint command");
1956
1957    int savepointId = vdb.setSavepoint(currentTid);
1958    writeQueryExecutedInThisTransaction = true;
1959    hasSavepoint++;
1960    sendToDriver(savepointId);
1961  }
1962
1963  private void releaseSavepoint() throws SQLException JavaDoc, IOException JavaDoc
1964  {
1965    if (logger.isDebugEnabled())
1966      logger.debug("Release savepoint command");
1967    String JavaDoc savepointName = in.readLongUTF();
1968    vdb.releaseSavepoint(currentTid, savepointName);
1969    hasSavepoint--;
1970    sendToDriver(true);
1971  }
1972
1973  private void rollbackToSavepoint() throws SQLException JavaDoc, IOException JavaDoc
1974  {
1975    if (logger.isDebugEnabled())
1976      logger.debug("Rollback to savepoint command");
1977    String JavaDoc savepointName = in.readLongUTF();
1978    vdb.rollback(currentTid, savepointName);
1979    hasSavepoint = vdb.getNumberOfSavepointsInTransaction(currentTid);
1980    sendToDriver(true);
1981  }
1982
1983  private void retrieveReleaseSavepoint() throws IOException JavaDoc
1984  {
1985    if (logger.isDebugEnabled())
1986      logger.debug("Retrieve release savepoint command");
1987
1988    // Wait for failover to be authorized
1989
waitForWritesFlushed(currentTid);
1990
1991    String JavaDoc savepointName = in.readLongUTF();
1992    sendToDriver(!vdb.getRequestManager().hasSavepoint(new Long JavaDoc(currentTid),
1993        savepointName));
1994  }
1995
1996  //
1997
// Decoding commands from the stream
1998
//
1999

2000  /**
2001   * Read a request (without ResultSet parameters) send by the
2002   * <code>Connection</code> object.
2003   *
2004   * @return an instance of <code>AbstractRequest</code>
2005   * @throws IOException if an error occurs in the procotol
2006   * @throws BadJDBCApiUsageException if the decoded request does not match
2007   * anything we can handle
2008   * @see Request#Request(DriverBufferedInputStream)
2009   */

2010  private AbstractRequest decodeRequestFromStream() throws IOException JavaDoc,
2011      BadJDBCApiUsageException
2012  {
2013    // Get request from the socket
2014
Request driverRequest = new Request(in);
2015
2016    String JavaDoc sqlQuery = driverRequest.getSqlQueryOrTemplate();
2017
2018    if (!requestFactory.isAuthorizedRequest(sqlQuery))
2019      throw new BadJDBCApiUsageException(
2020          "The following statement is not authorized to execute on the cluster (check your user documentation): "
2021              + sqlQuery);
2022
2023    AbstractRequest decodedRequest = requestFactory.requestFromString(sqlQuery,
2024        false, driverRequest.isEscapeProcessing(), driverRequest
2025            .getTimeoutInSeconds(), connectionLineSeparator);
2026    if (decodedRequest == null)
2027      throw new BadJDBCApiUsageException(
2028          "SQL statement does not match a query returning an update count ("
2029              + sqlQuery + ")");
2030
2031    decodedRequest.setPreparedStatementParameters(driverRequest
2032        .getPreparedStatementParameters());
2033    decodedRequest.setIsAutoCommit(driverRequest.isAutoCommit());
2034    return decodedRequest;
2035  }
2036
2037  /**
2038   * Read a request with ResultSet parameters send by the
2039   * <code>Connection</code> object.
2040   *
2041   * @param isExecuteQuery set to true if the received query is probably a read
2042   * statement (i.e. called by an executeQuery-like statement). This
2043   * will give priority to the parsing of read requests.
2044   * @return an instance of <code>AbstractRequest</code>
2045   * @throws IOException if an error occurs in the procotol
2046   * @throws BadJDBCApiUsageException if the request is not authorized to
2047   * execute
2048   * @see RequestWithResultSetParameters#RequestWithResultSetParameters(DriverBufferedInputStream)
2049   */

2050  private AbstractRequest decodeRequestWithResultSetParametersFromStream(
2051      boolean isExecuteQuery) throws IOException JavaDoc, BadJDBCApiUsageException
2052  {
2053    RequestWithResultSetParameters driverRequest = new RequestWithResultSetParameters(
2054        in);
2055
2056    String JavaDoc sqlQuery = driverRequest.getSqlQueryOrTemplate();
2057
2058    if (!requestFactory.isAuthorizedRequest(sqlQuery))
2059      throw new BadJDBCApiUsageException(
2060          "The following statement is not authorized to execute on the cluster (check your user documentation): "
2061              + sqlQuery);
2062
2063    AbstractRequest decodedRequest = requestFactory.requestFromString(sqlQuery,
2064        isExecuteQuery, driverRequest.isEscapeProcessing(), driverRequest
2065            .getTimeoutInSeconds(), connectionLineSeparator);
2066    if (decodedRequest == null)
2067    {
2068      decodedRequest = new UnknownWriteRequest(sqlQuery, driverRequest
2069          .isEscapeProcessing(), driverRequest.getTimeoutInSeconds(),
2070          connectionLineSeparator);
2071    }
2072    decodedRequest.setPreparedStatementParameters(driverRequest
2073        .getPreparedStatementParameters());
2074    decodedRequest.setIsAutoCommit(driverRequest.isAutoCommit());
2075    decodedRequest.setMaxRows(driverRequest.getMaxRows());
2076    decodedRequest.setFetchSize(driverRequest.getFetchSize());
2077    decodedRequest.setCursorName(driverRequest.getCursorName());
2078    return decodedRequest;
2079  }
2080
2081  /**
2082   * Log a transaction begin if needed for the AbstractRequest.<br />
2083   * The transaction is started only if needed (if the request is the first
2084   * write request for the current transaction)
2085   *
2086   * @param request a request
2087   * @throws SQLException if the transaction has aborted
2088   */

2089  private synchronized void logTransactionBegin(AbstractRequest request)
2090      throws SQLException JavaDoc
2091  {
2092    transactionStarted = setRequestParameters(request, login, currentTid,
2093        transactionStarted);
2094
2095    if (transactionHasAborted)
2096      throw new SQLException JavaDoc("Transaction is aborted, cannot execute query "
2097          + request);
2098
2099    if (!transactionStarted)
2100      currentTid = 0;
2101    else
2102    {
2103      // Transaction not started, check if we should do a lazy start
2104
queryExecutedInThisTransaction = true;
2105      writeQueryExecutedInThisTransaction = true;
2106    }
2107  }
2108
2109  /**
2110   * Set the login and transaction id on the given request. If the request is
2111   * autocommit and a transaction was started, the transaction is first commited
2112   * to return in autocommit mode.
2113   *
2114   * @param request The request to set
2115   * @param login user login to set
2116   * @param tid the transaction id to set
2117   * @return new value of transaction started
2118   */

2119  private boolean setRequestParameters(AbstractRequest request, String JavaDoc login,
2120      long tid, boolean transactionStarted) throws SQLException JavaDoc
2121  {
2122    request.setClientIpAddress(clientIpAddress);
2123    request.setLogin(login);
2124    request.setTransactionIsolation(transactionIsolation);
2125    request.setLineSeparator(connectionLineSeparator);
2126    request.setPersistentConnection(persistentConnection);
2127    request.setPersistentConnectionId(persistentConnectionId);
2128    request.setRetrieveSQLWarnings(retrieveSQLWarnings);
2129    request.setIsReadOnly(isReadOnly);
2130    if (request.isAutoCommit() && transactionStarted)
2131    {
2132      vdb.commit(tid, writeQueryExecutedInThisTransaction,
2133          !queryExecutedInThisTransaction);
2134      return false;
2135    }
2136    else
2137      request.setTransactionId(tid);
2138    request.setId(vdb.getNextRequestId());
2139    return transactionStarted;
2140  }
2141
2142  private void setRequestParametersAndTransactionStarted(AbstractRequest request)
2143      throws SQLException JavaDoc
2144  {
2145    synchronized (this)
2146    {
2147      transactionStarted = setRequestParameters(request, login, currentTid,
2148          transactionStarted);
2149
2150      if (transactionHasAborted)
2151        throw new SQLException JavaDoc("Transaction is aborted, cannot execute query "
2152            + request);
2153
2154      if (!transactionStarted)
2155        currentTid = 0;
2156      else
2157        queryExecutedInThisTransaction = true;
2158    }
2159  }
2160
2161  //
2162
// Request execution
2163
//
2164

2165  private void statementExecuteQuery(SelectRequest decodedRequest)
2166      throws IOException JavaDoc, SQLException JavaDoc, BadJDBCApiUsageException
2167  {
2168    if (logger.isDebugEnabled())
2169      logger.debug("StatementExecuteQuery command");
2170    AbstractRequest request = decodedRequest;
2171    if (decodedRequest == null)
2172      request = decodeRequestWithResultSetParametersFromStream(true);
2173
2174    if (request instanceof SelectRequest)
2175    {
2176      SelectRequest select = (SelectRequest) request;
2177      setRequestParametersAndTransactionStarted(select);
2178
2179      // Here, if the transaction isolation level was set to SERIALIZABLE, we
2180
// need to broadcast the request to all controllers
2181
if (!request.isAutoCommit()
2182          && requestFactory.isBroadcastRequired(transactionIsolation))
2183      {
2184        select.setMustBroadcast(true);
2185        writeQueryExecutedInThisTransaction = true;
2186      }
2187
2188      // send the resultset
2189
ControllerResultSet crs = vdb.statementExecuteQuery(select);
2190
2191      checkForConcurrentAbort(select);
2192
2193      // If this is a remapping of the call, we have to send the id back
2194
if (decodedRequest != null)
2195        sendToDriver(select.getId());
2196
2197      // send statement warnings
2198
sendToDriver(crs.getStatementWarnings());
2199
2200      sendToDriver(crs);
2201
2202      // streaming
2203
if (crs.hasMoreData())
2204        streamedResultSets.put(crs.getCursorName(), crs);
2205    }
2206    else if (request instanceof StoredProcedure)
2207    { // This is a stored procedure
2208
if (logger.isInfoEnabled())
2209        logger.info("Statement.executeQuery() detected a stored procedure ("
2210            + request
2211            + ") remapping the call to CallableStatement.executeQuery()");
2212      callableStatementExecuteQuery((StoredProcedure) request, false);
2213      return;
2214    }
2215    else
2216      throw new BadJDBCApiUsageException(
2217          "Statement.executeQuery() not allowed for requests returning an update count ("
2218              + request + ")");
2219  }
2220
2221  /**
2222   * Execute a write request that returns an int.
2223   *
2224   * @param decodedRequest an already decoded request or null
2225   * @throws IOException if an error occurs with the socket
2226   * @throws SQLException if an error occurs while executing the request
2227   * @throws BadJDBCApiUsageException if a query returning a ResultSet is called
2228   */

2229  private void statementExecuteUpdate(AbstractWriteRequest decodedRequest)
2230      throws IOException JavaDoc, SQLException JavaDoc, BadJDBCApiUsageException
2231  {
2232    if (logger.isDebugEnabled())
2233      logger.debug("StatementExecuteUpdate command");
2234
2235    AbstractRequest request = decodedRequest;
2236    if (request == null)
2237    {
2238      try
2239      {
2240        request = decodeRequestFromStream();
2241      }
2242      catch (BadJDBCApiUsageException e)
2243      {
2244        throw new BadJDBCApiUsageException(
2245            "Statement.executeUpdate() not allowed for requests returning a ResultSet",
2246            e);
2247      }
2248      logTransactionBegin(request);
2249    }
2250
2251    try
2252    {
2253      AbstractWriteRequest write = (AbstractWriteRequest) request;
2254
2255      // At this point we don't have a stored procedure
2256
// Send query id to driver for failover
2257
sendToDriver(request.getId());
2258
2259      // Execute the request
2260
ExecuteUpdateResult result = vdb.statementExecuteUpdate(write);
2261      // Check if there was an issue with deadlock detection
2262
checkForConcurrentAbort(write);
2263      // Send SQL Warnings if any
2264
sendToDriver(result.getStatementWarnings());
2265      // Send result back
2266
sendToDriver(result.getUpdateCount());
2267    }
2268    catch (ClassCastException JavaDoc e)
2269    {
2270      if (request instanceof StoredProcedure)
2271      {
2272        if (logger.isInfoEnabled())
2273          logger.info("Statement.executeUpdate() detected a stored procedure ("
2274              + request
2275              + ") remapping the call to CallableStatement.executeUpdate()");
2276        callableStatementExecuteUpdate((StoredProcedure) request, false);
2277        return;
2278      }
2279      else
2280        throw new BadJDBCApiUsageException(
2281            "Statement.executeUpdate() not allowed for requests returning a ResultSet ("
2282                + request + ")");
2283    }
2284  }
2285
2286  private void statementExecuteUpdateWithKeys() throws IOException JavaDoc,
2287      SQLException JavaDoc, BadJDBCApiUsageException
2288  {
2289    if (logger.isDebugEnabled())
2290      logger.debug("StatementExecuteUpdateWithKeys command");
2291    try
2292    {
2293      // Get the request from the socket
2294
AbstractWriteRequest writeWithKeys;
2295      try
2296      {
2297        writeWithKeys = (AbstractWriteRequest) decodeRequestFromStream();
2298      }
2299      catch (BadJDBCApiUsageException e)
2300      {
2301        throw new BadJDBCApiUsageException(
2302            "Statement.executeUpdate() not allowed for requests returning a ResultSet",
2303            e);
2304      }
2305      logTransactionBegin(writeWithKeys);
2306
2307      // Send query id to driver for failover
2308
sendToDriver(writeWithKeys.getId());
2309
2310      // Execute the request
2311
GeneratedKeysResult updateCountWithKeys = vdb
2312          .statementExecuteUpdateWithKeys(writeWithKeys);
2313      // Check if there was an issue with deadlock detection
2314
checkForConcurrentAbort(writeWithKeys);
2315      // Send SQL Warnings if any
2316
sendToDriver(updateCountWithKeys.getStatementWarnings());
2317      // Send result back
2318
sendToDriver(updateCountWithKeys.getUpdateCount());
2319      ControllerResultSet rs = updateCountWithKeys.getControllerResultSet();
2320      sendToDriver(rs);
2321
2322      // streaming
2323
if (rs.hasMoreData())
2324        streamedResultSets.put(rs.getCursorName(), updateCountWithKeys);
2325    }
2326    catch (ClassCastException JavaDoc e)
2327    {
2328      throw new BadJDBCApiUsageException(
2329          "RETURN_GENERATED_KEYS is not supported for stored procedures");
2330    }
2331
2332  }
2333
2334  private void statementExecute(AbstractRequest decodedRequest)
2335      throws IOException JavaDoc, SQLException JavaDoc
2336  {
2337    if (logger.isDebugEnabled())
2338      logger.debug("statementExecute command");
2339
2340    AbstractRequest request = decodedRequest;
2341    if (decodedRequest == null)
2342      try
2343      {
2344        request = decodeRequestWithResultSetParametersFromStream(false);
2345      }
2346      catch (BadJDBCApiUsageException e)
2347      {
2348        throw new SQLException JavaDoc(e.getMessage());
2349      }
2350
2351    synchronized (this)
2352    {
2353      transactionStarted = setRequestParameters(request, login, currentTid,
2354          transactionStarted);
2355
2356      if (transactionHasAborted)
2357        throw new SQLException JavaDoc("Transaction is aborted, cannot execute query "
2358            + request);
2359
2360      if (!transactionStarted)
2361        currentTid = 0;
2362      else
2363        queryExecutedInThisTransaction = true;
2364    }
2365
2366    ExecuteResult result;
2367    // Direct to Statement.execute() if this is an inline batch
2368
// or if matching some statements (such as EXPLAIN ANALYZE in postgres)
2369
if (requestFactory.requestNeedsExecute(request))
2370    {
2371      // Send query id to driver for failover
2372
sendToDriver(request.getId());
2373
2374      writeQueryExecutedInThisTransaction = true;
2375
2376      if (request instanceof SelectRequest)
2377      { // Convert to an unknown write request as expected by underlying
2378
// components (relates to SEQUOIA-674)
2379
UnknownWriteRequest writeRequest = new UnknownWriteRequest(request
2380            .getSqlOrTemplate(), request.getEscapeProcessing(), request
2381            .getTimeout(), request.getLineSeparator());
2382        writeRequest.setIsAutoCommit(request.isAutoCommit());
2383        writeRequest.setTransactionId(request.getTransactionId());
2384        writeRequest.setTransactionIsolation(request.getTransactionIsolation());
2385        writeRequest.setId(request.getId());
2386        writeRequest.setLogin(request.getLogin());
2387        writeRequest.setPreparedStatementParameters(request
2388            .getPreparedStatementParameters());
2389        writeRequest.setTimeout(request.getTimeout());
2390        writeRequest.setMaxRows(request.getMaxRows());
2391        writeRequest.setPersistentConnection(request.isPersistentConnection());
2392        writeRequest.setPersistentConnectionId(request
2393            .getPersistentConnectionId());
2394        request = writeRequest;
2395      }
2396
2397      result = vdb.statementExecute(request);
2398    }
2399    // Route to CallableStatement.execute() if this is a stored procedure
2400
else if (request instanceof StoredProcedure)
2401    {
2402      if (logger.isInfoEnabled())
2403        logger.info("Statement.execute() did detect a stored procedure ("
2404            + request + ") remapping the call to CallableStatement.execute()");
2405
2406      writeQueryExecutedInThisTransaction = true;
2407
2408      callableStatementExecute((StoredProcedure) request, false);
2409      return;
2410    }
2411    else
2412    { // Route SELECT to Statement.executeQuery() and others to
2413
// Statement.executeUpdate()
2414

2415      // Send query id to driver for failover (driver still expect a
2416
// Statement.execute() protocol)
2417
sendToDriver(request.getId());
2418
2419      result = new ExecuteResult();
2420      if (request instanceof SelectRequest)
2421      {
2422        request.setFetchSize(0); // disable streaming, fixes SEQUOIA-233
2423

2424        // Here, if the transaction isolation level was set to SERIALIZABLE, we
2425
// need to broadcast the select request to all controllers
2426
if (!request.isAutoCommit()
2427            && requestFactory.isBroadcastRequired(transactionIsolation))
2428        {
2429          ((SelectRequest) request).setMustBroadcast(true);
2430          writeQueryExecutedInThisTransaction = true;
2431        }
2432
2433        ControllerResultSet crs = vdb
2434            .statementExecuteQuery((SelectRequest) request);
2435        // call remapping: construct a ExecuteResult from a ExecuteUpdateResult
2436
result.addResult(crs);
2437        result.setStatementWarnings(crs.getStatementWarnings());
2438        result.addResult(-1);
2439      }
2440      else
2441      {
2442        writeQueryExecutedInThisTransaction = true;
2443
2444        ExecuteUpdateResult updateCount = vdb
2445            .statementExecuteUpdate((AbstractWriteRequest) request);
2446        // call remapping: construct a ExecuteResult from a ExecuteUpdateResult
2447
result.setStatementWarnings(updateCount.getStatementWarnings());
2448        result.addResult(updateCount.getUpdateCount());
2449        // end of result list marker
2450
if (updateCount.getUpdateCount() != -1)
2451          result.addResult(-1);
2452      }
2453    }
2454
2455    checkForConcurrentAbort(request);
2456
2457    // Send SQL Warnings if any
2458
sendToDriver(result.getStatementWarnings());
2459
2460    for (Iterator JavaDoc iter = result.getResults().iterator(); iter.hasNext();)
2461    {
2462      Object JavaDoc r = iter.next();
2463      if (r instanceof Integer JavaDoc)
2464      {
2465        sendToDriver(false);
2466        sendToDriver(((Integer JavaDoc) r).intValue());
2467      }
2468      else if (r instanceof ControllerResultSet)
2469      {
2470        sendToDriver(true);
2471        sendToDriver((ControllerResultSet) r);
2472      }
2473      else
2474        logger.error("Unexpected result " + r
2475            + " in statementExecute for request " + request);
2476    }
2477  }
2478
2479  /**
2480   * @param decodedProc Stored procedure if called from statementExecuteQuery(),
2481   * otherwise null
2482   * @param returnsOutParameters true if the call must return out/named
2483   * parameters
2484   * @throws BadJDBCApiUsageException
2485   */

2486  private void callableStatementExecuteQuery(StoredProcedure decodedProc,
2487      boolean returnsOutParameters) throws IOException JavaDoc, SQLException JavaDoc,
2488      BadJDBCApiUsageException
2489  {
2490    if (logger.isDebugEnabled())
2491      logger.debug("CallableStatementExecuteQuery command");
2492
2493    StoredProcedure proc = decodedProc;
2494    if (proc == null)
2495    {
2496      AbstractRequest request = decodeRequestWithResultSetParametersFromStream(true);
2497      if (request == null)
2498        throw new ProtocolException("Failed to decode stored procedure");
2499
2500      try
2501      {
2502        // Fetch the query from the socket
2503
proc = (StoredProcedure) request;
2504
2505        // Parse the query first to update the semantic information
2506
vdb.getRequestManager().getParsingFromCacheOrParse(proc);
2507
2508        // If procedure is read-only, we don't log lazy begin
2509
if (!proc.isReadOnly())
2510          logTransactionBegin(proc);
2511      }
2512      catch (ClassCastException JavaDoc e)
2513      {
2514        if (request instanceof SelectRequest)
2515        {
2516          if (logger.isInfoEnabled())
2517            logger
2518                .info("CallableStatement.executeQuery() did not detect a stored procedure ("
2519                    + request
2520                    + ") remapping the call to Statement.executeQuery()");
2521          statementExecuteQuery((SelectRequest) request);
2522          if (returnsOutParameters)
2523            sendNamedAndOutParametersToDriver(request);
2524          return;
2525        }
2526        throw new BadJDBCApiUsageException(
2527            "Unhandled stored procedure call in " + request);
2528      }
2529    }
2530
2531    setRequestParametersAndTransactionStarted(proc);
2532
2533    if (decodedProc == null)
2534    { // Send query id to driver for failover
2535
sendToDriver(proc.getId());
2536    }
2537    // else we come from statement.executeQuery and we should not send the id
2538

2539    // Execute the stored procedure
2540
ControllerResultSet sprs = vdb.callableStatementExecuteQuery(proc);
2541    checkForConcurrentAbort(proc);
2542
2543    // Send SQL Warnings if any
2544
sendToDriver(sprs.getStatementWarnings());
2545
2546    sendToDriver(sprs);
2547
2548    // streaming
2549
if (sprs.hasMoreData())
2550      streamedResultSets.put(sprs.getCursorName(), sprs);
2551
2552    if (returnsOutParameters)
2553      sendNamedAndOutParametersToDriver(proc);
2554  }
2555
2556  /**
2557   * @param sp Stored procedure if called from statementExecuteUpdate(),
2558   * otherwise null
2559   * @param returnsOutParameters true if the call must return out/named
2560   * parameters
2561   */

2562  private void callableStatementExecuteUpdate(StoredProcedure sp,
2563      boolean returnsOutParameters) throws IOException JavaDoc, SQLException JavaDoc,
2564      BadJDBCApiUsageException
2565  {
2566    if (logger.isDebugEnabled())
2567      logger.debug("CallableStatementExecuteUpdate command");
2568
2569    if (sp == null)
2570    {
2571      AbstractRequest request;
2572      try
2573      {
2574        request = decodeRequestFromStream();
2575      }
2576      catch (BadJDBCApiUsageException e)
2577      {
2578        throw new BadJDBCApiUsageException(
2579            "CallableStatement.executeUpdate() not allowed for requests returning a ResultSet ",
2580            e);
2581      }
2582      logTransactionBegin(request);
2583
2584      try
2585      {
2586        // Fetch the query from the socket
2587
sp = (StoredProcedure) request;
2588      }
2589      catch (ClassCastException JavaDoc e)
2590      {
2591        if (request instanceof AbstractWriteRequest)
2592        {
2593          if (logger.isInfoEnabled())
2594            logger
2595                .info("CallableStatement.executeUpdate() did not detect a stored procedure ("
2596                    + request
2597                    + ") remapping the call to Statement.executeUpdate()");
2598          statementExecuteUpdate((AbstractWriteRequest) request);
2599          if (returnsOutParameters)
2600            sendNamedAndOutParametersToDriver(request);
2601          return;
2602        }
2603        throw new BadJDBCApiUsageException(
2604            "Unhandled stored procedure call in " + request);
2605      }
2606    }
2607
2608    // Send query id to driver for failover
2609
sendToDriver(sp.getId());
2610
2611    // Execute the query
2612
ExecuteUpdateResult result = vdb.callableStatementExecuteUpdate(sp);
2613    checkForConcurrentAbort(sp);
2614    // Send SQL Warnings if any
2615
sendToDriver(result.getStatementWarnings());
2616    // Send result back
2617
sendToDriver(result.getUpdateCount());
2618
2619    if (returnsOutParameters)
2620      sendNamedAndOutParametersToDriver(sp);
2621  }
2622
2623  /**
2624   * @param sp Stored procedure if called from statementExecute(), otherwise
2625   * null.
2626   * @param returnsOutParameters true if the call must return out/named
2627   * parameters
2628   */

2629  private void callableStatementExecute(StoredProcedure sp,
2630      boolean returnsOutParameters) throws IOException JavaDoc, SQLException JavaDoc
2631  {
2632    if (logger.isDebugEnabled())
2633      logger.debug("CallableStatementExecute command");
2634
2635    if (sp == null)
2636    {
2637      AbstractRequest request;
2638      try
2639      {
2640        request = decodeRequestWithResultSetParametersFromStream(false);
2641      }
2642      catch (BadJDBCApiUsageException e)
2643      {
2644        throw new SQLException JavaDoc(e.getMessage());
2645      }
2646      if (request == null)
2647        throw new ProtocolException("Failed to decode stored procedure");
2648      try
2649      {
2650        // Fetch the query from the socket
2651
sp = (StoredProcedure) request;
2652
2653        // Parse the query first to update the semantic information
2654
vdb.getRequestManager().getParsingFromCacheOrParse(sp);
2655
2656        // If procedure is read-only, we don't log lazy begin
2657
if (!sp.isReadOnly())
2658          logTransactionBegin(sp);
2659      }
2660      catch (ClassCastException JavaDoc e)
2661      {
2662        if (logger.isInfoEnabled())
2663          logger
2664              .info("CallableStatement.execute() did not detect a stored procedure ("
2665                  + request + ") remapping the call to Statement.execute()");
2666        statementExecute(request);
2667        if (returnsOutParameters)
2668          sendNamedAndOutParametersToDriver(request);
2669        return;
2670      }
2671    }
2672
2673    setRequestParametersAndTransactionStarted(sp);
2674
2675    // Send query id to driver for failover
2676
sendToDriver(sp.getId());
2677
2678    // Execute the query
2679
ExecuteResult result = vdb.callableStatementExecute(sp);
2680    checkForConcurrentAbort(sp);
2681
2682    // Send SQL Warnings if any
2683
sendToDriver(result.getStatementWarnings());
2684
2685    for (Iterator JavaDoc iter = result.getResults().iterator(); iter.hasNext();)
2686    {
2687      Object JavaDoc r = iter.next();
2688
2689      if (r instanceof Integer JavaDoc)
2690      {
2691        sendToDriver(false);
2692        sendToDriver(((Integer JavaDoc) r).intValue());
2693      }
2694      else if (r instanceof ControllerResultSet)
2695      {
2696        sendToDriver(true);
2697        sendToDriver((ControllerResultSet) r);
2698      }
2699      else
2700        logger.error("Unexepected result " + r
2701            + " in callableStatementExecute for request " + sp);
2702    }
2703    if (returnsOutParameters)
2704      sendNamedAndOutParametersToDriver(sp);
2705  }
2706
2707  private void sendNamedAndOutParametersToDriver(AbstractRequest request)
2708      throws IOException JavaDoc, ProtocolException
2709  {
2710    if (request instanceof StoredProcedure)
2711    {
2712      try
2713      {
2714        StoredProcedure proc = (StoredProcedure) request;
2715        // First send the out parameters
2716
List JavaDoc outParamIndexes = proc.getOutParameterIndexes();
2717        if (outParamIndexes != null)
2718        {
2719          // Now send each param (index, then serializer and serialized object)
2720
for (Iterator JavaDoc iter = outParamIndexes.iterator(); iter.hasNext();)
2721          {
2722            Integer JavaDoc index = (Integer JavaDoc) iter.next();
2723            sendToDriver(index.intValue());
2724            Object JavaDoc object = proc.getOutParameterValue(index);
2725            sendObjectToDriver(object);
2726          }
2727        }
2728        sendToDriver(0);
2729
2730        // Fetch the named parameters
2731
List JavaDoc namedParamNames = proc.getNamedParameterNames();
2732        if (namedParamNames != null)
2733        {
2734          for (Iterator JavaDoc iter = namedParamNames.iterator(); iter.hasNext();)
2735          {
2736            // Send param name first
2737
String JavaDoc paramName = (String JavaDoc) iter.next();
2738            sendToDriver(paramName);
2739            // Now send value (serializer first then serialized object)
2740
Object JavaDoc object = proc.getNamedParameterValue(paramName);
2741            sendObjectToDriver(object);
2742          }
2743        }
2744        sendToDriver("0");
2745      }
2746      catch (NotImplementedException e)
2747      {
2748        String JavaDoc msg = "Unable to serialize parameter result for request "
2749            + request;
2750        logger.error(msg, e);
2751        throw new ProtocolException(msg);
2752      }
2753    }
2754    else
2755    // Not a stored procedure (remapped call)
2756
{
2757      // No out parameter
2758
sendToDriver(0);
2759      // No named parameter
2760
sendToDriver("0");
2761    }
2762  }
2763
2764  /**
2765   * Send an object to the driver (first tag then serialized object).
2766   *
2767   * @param object object to send
2768   * @throws IOException if an error occurs with the socket
2769   * @throws NotImplementedException if the object cannot be serialized
2770   */

2771  private void sendObjectToDriver(Object JavaDoc object) throws IOException JavaDoc,
2772      NotImplementedException
2773  {
2774    if (object == null)
2775    { // Special tag for null objects (nothing to send)
2776
TypeTag.JAVA_NULL.sendToStream(out);
2777    }
2778    else
2779    { // Regular object
2780
Serializer s = SQLDataSerialization.getSerializer(object);
2781      s.getTypeTag().sendToStream(out);
2782      s.sendToStream(object, out);
2783    }
2784  }
2785
2786  /**
2787   * Retrieve the result from the request result failover cache for the given
2788   * request id. If the result is not found, the scheduler is checked in case
2789   * the query is currently executing. If the query is executing, we wait until
2790   * it has completed and then return the result. If no result is found, null is
2791   * returned.
2792   *
2793   * @param requestId the request unique identifier
2794   * @return the request result or null if not found
2795   */

2796  private Serializable JavaDoc getResultForRequestId(long requestId)
2797  {
2798    waitForWritesFlushed(requestId);
2799
2800    Serializable JavaDoc result = ((DistributedVirtualDatabase) vdb)
2801        .getRequestResultFailoverCache().retrieve(requestId);
2802
2803    if (result == null)
2804    { // Check if query is not currently executing
2805
AbstractScheduler scheduler = vdb.getRequestManager().getScheduler();
2806      if (scheduler.isActiveRequest(requestId))
2807      {
2808        // Wait for request completion and then retrieve the result from the
2809
// failover cache.
2810
scheduler.waitForRequestCompletion(requestId);
2811        result = ((DistributedVirtualDatabase) vdb)
2812            .getRequestResultFailoverCache().retrieve(requestId);
2813      }
2814    }
2815    return result;
2816  }
2817
2818  /**
2819   * Retrieve the result of a stored procedure that returns multiple results,
2820   * out parameters and named parameters. Returns null to the driver if the
2821   * result has not been found.
2822   *
2823   * @throws IOException if an error occurs with the socket
2824   * @throws SQLException if an error occurs while retrieving the result
2825   */

2826  private void retrieveExecuteResultWithParameters() throws IOException JavaDoc,
2827      SQLException JavaDoc
2828  {
2829    if (logger.isDebugEnabled())
2830      logger.debug("Retrieve execute result with parameters command");
2831
2832    long requestId = in.readLong();
2833
2834    if (vdb.isDistributed())
2835    {
2836      Serializable JavaDoc result = getResultForRequestId(requestId);
2837
2838      if (result != null)
2839      {
2840        // Cache hit
2841
if (result instanceof StoredProcedureCallResult)
2842        {
2843          StoredProcedureCallResult spResult = (StoredProcedureCallResult) result;
2844
2845          // re-send statement warnings
2846
sendToDriver(((ExecuteResult) spResult.getResult())
2847              .getStatementWarnings());
2848          // Send results first
2849
for (Iterator JavaDoc iter = ((ExecuteResult) spResult.getResult())
2850              .getResults().iterator(); iter.hasNext();)
2851          {
2852            Object JavaDoc element = iter.next();
2853            if (element instanceof Integer JavaDoc)
2854            {
2855              sendToDriver(false);
2856              sendToDriver(((Integer JavaDoc) element).intValue());
2857            }
2858            else if (element instanceof ControllerResultSet)
2859            {
2860              sendToDriver(true);
2861              sendToDriver((ControllerResultSet) element);
2862            }
2863            else
2864              logger.error("Unexpected result " + element
2865                  + " in statementExecute for request " + requestId);
2866          }
2867
2868          // Send parameters
2869
sendNamedAndOutParametersToDriver(spResult.getStoredProcedure());
2870        }
2871        else
2872          throw new SQLException JavaDoc(
2873              "Expected StoredProcedureCallResult for request " + requestId
2874                  + " failover but got " + result);
2875      }
2876      else
2877      {
2878        // No cache hit
2879
sendToDriver((SQLWarning JavaDoc) null);
2880        sendToDriver(true);
2881        sendToDriver((ControllerResultSet) null);
2882      }
2883    }
2884    else
2885    {
2886      throw new SQLException JavaDoc(
2887          "Transparent failover for statements that potentially return multiple results is only supported in distributed configurations.");
2888    }
2889  }
2890
2891  /**
2892   * Retrieve the result of a stored procedure that returns an int, out
2893   * parameters and named parameters. Returns -1 to the driver if the result has
2894   * not been found.
2895   *
2896   * @throws IOException if an error occurs with the socket
2897   * @throws SQLException if an error occurs while retrieving the result
2898   */

2899  private void retrieveExecuteUpdateResultWithParameters() throws IOException JavaDoc,
2900      SQLException JavaDoc
2901  {
2902    if (logger.isDebugEnabled())
2903      logger.debug("Retrieve execute update with parameters command");
2904
2905    long requestId = in.readLong();
2906
2907    if (vdb.isDistributed())
2908    {
2909      Serializable JavaDoc result = getResultForRequestId(requestId);
2910
2911      if (result != null)
2912      {
2913        // Cache hit
2914
if (result instanceof StoredProcedureCallResult)
2915        {
2916          StoredProcedureCallResult spResult = (StoredProcedureCallResult) result;
2917          // Send warnings
2918
ExecuteUpdateResult r = (ExecuteUpdateResult) spResult.getResult();
2919          // re-send statement warnings
2920
sendToDriver(r.getStatementWarnings());
2921          // Send udpate count
2922
sendToDriver(r.getUpdateCount());
2923          // Send parameters
2924
sendNamedAndOutParametersToDriver(spResult.getStoredProcedure());
2925        }
2926        else
2927          throw new SQLException JavaDoc(
2928              "Expected StoredProcedureCallResult for request " + requestId
2929                  + " failover but got " + result);
2930      }
2931      else
2932      {
2933        // No cache hit
2934
sendToDriver((SQLWarning JavaDoc) null);
2935        sendToDriver(-1);
2936      }
2937    }
2938    else
2939    {
2940      throw new SQLException JavaDoc(
2941          "Transparent failover for statements that potentially return multiple results is only supported in distributed configurations.");
2942    }
2943  }
2944
2945  /**
2946   * Retrieve the result of a stored procedure that returns a ResultSet, out
2947   * parameters and named parameters. Returns null to the driver if the result
2948   * has not been found.
2949   *
2950   * @throws IOException if an error occurs with the socket
2951   * @throws SQLException if an error occurs while retrieving the result
2952   */

2953  private void retrieveExecuteQueryResultWithParameters() throws IOException JavaDoc,
2954      SQLException JavaDoc
2955  {
2956    if (logger.isDebugEnabled())
2957      logger.debug("Retrieve execute update with parameters command");
2958
2959    long requestId = in.readLong();
2960
2961    if (vdb.isDistributed())
2962    {
2963      Serializable JavaDoc result = getResultForRequestId(requestId);
2964
2965      if (result != null)
2966      {
2967        // Cache hit
2968
if (result instanceof StoredProcedureCallResult)
2969        {
2970          StoredProcedureCallResult spResult = (StoredProcedureCallResult) result;
2971          // re-send statement warnings
2972
sendToDriver(((ControllerResultSet) spResult.getResult())
2973              .getStatementWarnings());
2974          // Send ResultSet
2975
sendToDriver((ControllerResultSet) spResult.getResult());
2976          // Send parameters
2977
sendNamedAndOutParametersToDriver(spResult.getStoredProcedure());
2978        }
2979        else
2980          throw new SQLException JavaDoc(
2981              "Expected StoredProcedureCallResult for request " + requestId
2982                  + " failover but got " + result);
2983      }
2984      else
2985      {
2986        // No cache hit
2987
sendToDriver((SQLWarning JavaDoc) null);
2988        sendToDriver((ControllerResultSet) null);
2989      }
2990    }
2991    else
2992    {
2993      throw new SQLException JavaDoc(
2994          "Transparent failover for statements that potentially return multiple results is only supported in distributed configurations.");
2995    }
2996  }
2997
2998  /**
2999   * Retrieve the result of a write request that returns an int. Returns -1 to
3000   * the driver if the result has not been found.
3001   *
3002   * @throws IOException if an error occurs with the socket
3003   * @throws SQLException if an error occurs while retrieving the result
3004   */

3005  private void retrieveExecuteUpdateResult() throws IOException JavaDoc, SQLException JavaDoc
3006  {
3007    if (logger.isDebugEnabled())
3008      logger.debug("Retrieve execute update result command");
3009
3010    long requestId = in.readLong();
3011
3012    if (vdb.isDistributed())
3013    {
3014      // Result will always be null since result is only stored in the recovery
3015
// log but this call ensures that the query execution has completed.
3016
getResultForRequestId(requestId);
3017
3018      // We don't cache the warnings for write queries
3019
sendToDriver(new SQLWarning JavaDoc(
3020          Translate
3021              .get(
3022                  "virtualdatabase.distributed.write.failover.lost.warnings", requestId))); //$NON-NLS-1$
3023
sendToDriver(vdb.getRequestManager().getRecoveryLog()
3024          .getUpdateCountResultForQuery(requestId));
3025    }
3026    else
3027    {
3028      throw new SQLException JavaDoc(
3029          "Transparent failover for statements that return an update count is only supported in distributed configurations.");
3030    }
3031  }
3032
3033  /**
3034   * Retrieve the result of a request that returns multiple results. Returns
3035   * null to the driver if the result has not been found.
3036   *
3037   * @throws IOException if an error occurs with the socket
3038   * @throws SQLException if an error occurs while retrieving the result
3039   */

3040  private void retrieveExecuteResult() throws IOException JavaDoc, SQLException JavaDoc
3041  {
3042    if (logger.isDebugEnabled())
3043      logger.debug("Retrieve execute result command");
3044
3045    long requestId = in.readLong();
3046
3047    if (vdb.isDistributed())
3048    {
3049      Serializable JavaDoc result = getResultForRequestId(requestId);
3050
3051      if (result != null)
3052      {
3053        // Cache hit
3054
// re-send warnings
3055
SQLWarning JavaDoc cachedWarns = ((ExecuteResult) result)
3056            .getStatementWarnings();
3057        sendToDriver(cachedWarns);
3058        // and result
3059
for (Iterator JavaDoc iter = ((ExecuteResult) result).getResults().iterator(); iter
3060            .hasNext();)
3061        {
3062          Object JavaDoc element = iter.next();
3063          if (element instanceof Integer JavaDoc)
3064          {
3065            sendToDriver(false);
3066            sendToDriver(((Integer JavaDoc) element).intValue());
3067          }
3068          else if (element instanceof ControllerResultSet)
3069          {
3070            sendToDriver(true);
3071            sendToDriver((ControllerResultSet) element);
3072          }
3073          else
3074            logger.error("Unexpected result " + element
3075                + " in statementExecute for request " + requestId);
3076        }
3077      }
3078      else
3079      {
3080        try
3081        {
3082          // The query may have been remapped
3083
int updateCount = vdb.getRequestManager().getRecoveryLog()
3084              .getUpdateCountResultForQuery(requestId);
3085          sendToDriver((SQLWarning JavaDoc) null);
3086          if (updateCount != -1)
3087          { // Build a response with the update count and add 'no more result'
3088
sendToDriver(false);
3089            sendToDriver(updateCount);
3090            sendToDriver(false);
3091            sendToDriver(-1);
3092          }
3093          else
3094          { // Not found
3095
sendToDriver(true);
3096            sendToDriver((ControllerResultSet) null);
3097          }
3098        }
3099        catch (SQLException JavaDoc ex)
3100        { // No cache hit
3101
sendToDriver((SQLWarning JavaDoc) null);
3102          sendToDriver(true);
3103          sendToDriver((ControllerResultSet) null);
3104        }
3105      }
3106    }
3107    else
3108    {
3109      throw new SQLException JavaDoc(
3110          "Transparent failover for statements that potentially return multiple results is only supported in distributed configurations.");
3111    }
3112  }
3113
3114  /**
3115   * Retrieve the result of a write request that returns an int and generated
3116   * keys. Returns -1 to the driver if the result has not been found.
3117   *
3118   * @throws IOException if an error occurs with the socket
3119   * @throws SQLException if an error occurs while retrieving the result
3120   */

3121  private void retrieveExecuteUpdateWithKeysResult() throws IOException JavaDoc,
3122      SQLException JavaDoc
3123  {
3124    if (logger.isDebugEnabled())
3125      logger.debug("Retrieve execute update with keys result command");
3126
3127    long requestId = in.readLong();
3128
3129    if (vdb.isDistributed())
3130    {
3131      Serializable JavaDoc result = getResultForRequestId(requestId);
3132
3133      if (result != null)
3134      {
3135        // Cache hit
3136
sendToDriver(((GeneratedKeysResult) result).getStatementWarnings());
3137        sendToDriver(((GeneratedKeysResult) result).getUpdateCount());
3138        sendToDriver(((GeneratedKeysResult) result).getControllerResultSet());
3139      }
3140      else
3141      {
3142        // No cache hit
3143
sendToDriver((SQLWarning JavaDoc) null);
3144        sendToDriver(-1);
3145      }
3146    }
3147    else
3148    {
3149      throw new SQLException JavaDoc(
3150          "Transparent failover for statements that return generated keys is only supported in distributed configurations.");
3151    }
3152  }
3153
3154  /**
3155   * Retrieve the result of a request that returns a ResultSet. Returns null to
3156   * the driver if the result has not been found.
3157   *
3158   * @throws IOException if an error occurs with the socket
3159   * @throws SQLException if an error occurs while retrieving the result
3160   */

3161  private void retrieveExecuteQueryResult() throws IOException JavaDoc, SQLException JavaDoc
3162  {
3163    if (logger.isDebugEnabled())
3164      logger.debug("Retrieve execute query result command");
3165
3166    long requestId = in.readLong();
3167
3168    if (vdb.isDistributed())
3169    {
3170      Serializable JavaDoc result = getResultForRequestId(requestId);
3171
3172      if (result != null)
3173      {
3174        // Cache hit
3175
sendToDriver(((ControllerResultSet) result).getStatementWarnings());
3176        sendToDriver((ControllerResultSet) result);
3177      }
3178      else
3179      {
3180        // No cache hit
3181
sendToDriver((SQLWarning JavaDoc) null);
3182        sendToDriver((ControllerResultSet) null);
3183      }
3184    }
3185    else
3186    {
3187      throw new SQLException JavaDoc(
3188          "Transparent failover for statements that return a ResultSet is only supported in distributed configurations.");
3189    }
3190  }
3191
3192  /**
3193   * Retrieve the result of a transaction commit. If the transaction was not
3194   * commited, we commit it and acknowledge the driver back in all cases.
3195   *
3196   * @throws IOException if an error occurs with the socket
3197   * @throws SQLException if an error occurs while retrieving the result
3198   */

3199  private void retrieveCommitResult() throws IOException JavaDoc, SQLException JavaDoc
3200  {
3201    if (logger.isDebugEnabled())
3202      logger.debug("Retrieve commit command");
3203
3204    waitForWritesFlushed(currentTid);
3205
3206    if (transactionHasAborted)
3207    {
3208      if (logger.isWarnEnabled())
3209      {
3210        logger.warn("Transaction " + currentTid + " was aborted by database");
3211      }
3212      return;
3213    }
3214
3215    boolean retry;
3216    do
3217    {
3218      retry = false;
3219      String JavaDoc commitStatus = vdb.getRequestManager().getRecoveryLog()
3220          .getCommitStatusForTransaction(currentTid);
3221
3222      if (LogEntry.MISSING.equals(commitStatus))
3223      {
3224        if (writeQueryExecutedInThisTransaction)
3225        {
3226          // Transaction was not commited yet, let's commit
3227
commit();
3228        }
3229        else
3230        {
3231          // If this was a read-only transaction, it was never started on this
3232
// controller and therefore there is no need to commit. It is ok to
3233
// let the user believe that the transaction successfully commited
3234
// since it was read-only and had no effect on the database anyway.
3235
// We have to tell the client that the transaction was committed
3236
// successfully anyway, otherwise the application will hang waiting
3237
// for this answer.
3238
sendToDriver(currentTid);
3239          resetTransactionState();
3240          return;
3241        }
3242      }
3243      else if (LogEntry.SUCCESS.equals(commitStatus))
3244      {
3245        // Transaction was already commited, acknowledge the transaction id
3246
sendToDriver(currentTid);
3247
3248        resetTransactionState();
3249      }
3250      else if (LogEntry.FAILED.equals(commitStatus))
3251      {
3252        logger.warn("Commit of transaction " + currentTid + " failed");
3253        // commit failed
3254
throw new SQLException JavaDoc("Commit of transaction " + currentTid
3255            + " failed");
3256      }
3257      else
3258      {
3259        /*
3260         * Status is executing or unknown, if status is executing and we have
3261         * enabled backends locally, we have to wait for the final status to be
3262         * updated in the recovery log
3263         */

3264
3265        retry = LogEntry.EXECUTING.equals(commitStatus)
3266            && (vdb.getRequestManager().getLoadBalancer()
3267                .getNumberOfEnabledBackends() > 0);
3268        if (!retry)
3269          throw new SQLException JavaDoc("Commit of transaction " + currentTid
3270              + " is in unknown or executing state");
3271      }
3272    }
3273    while (retry);
3274
3275  }
3276
3277  /**
3278   * Retrieve the result of a transaction rollback. If the transaction was not
3279   * rollbacked, we rollback and acknowledge the driver back in all cases.
3280   *
3281   * @throws IOException if an error occurs with the socket
3282   * @throws SQLException if an error occurs while retrieving the result
3283   */

3284  private void retrieveRollbackResult() throws IOException JavaDoc, SQLException JavaDoc
3285  {
3286    if (logger.isDebugEnabled())
3287      logger.debug("Retrieve rollback command");
3288
3289    waitForWritesFlushed(currentTid);
3290
3291    if (!transactionHasAborted)
3292    {
3293      String JavaDoc rollbackStatus = vdb.getRequestManager().getRecoveryLog()
3294          .getRollbackStatusForTransaction(currentTid);
3295
3296      if (LogEntry.MISSING.equals(rollbackStatus))
3297        // Transaction was not rollbacked yet, let's do it
3298
rollback();
3299      else if (LogEntry.SUCCESS.equals(rollbackStatus)
3300          || LogEntry.FAILED.equals(rollbackStatus))
3301      {
3302        // Transaction was already rollbacked, acknowledge the transaction id
3303
sendToDriver(currentTid);
3304        resetTransactionState();
3305      }
3306      else
3307      { // UNKOWN OR EXECUTING state
3308
if (vdb.isDistributed())
3309        {
3310          ((DistributedRequestManager) vdb.getRequestManager())
3311              .cleanupRollbackFromOtherController(currentTid);
3312          sendToDriver(currentTid);
3313          resetTransactionState();
3314        }
3315        else
3316        {
3317          // rollback cannot fail locally, so notify right away even if the
3318
// rollback has not fully completed
3319
sendToDriver(currentTid);
3320          resetTransactionState();
3321        }
3322      }
3323    }
3324    else if (logger.isWarnEnabled())
3325    {
3326      logger.warn("Transaction " + currentTid + " was aborted by database");
3327    }
3328  }
3329
3330  private void waitForWritesFlushed(long requestIdOrTransactionId)
3331  {
3332    // In non-distributed configuration, there is no failover cache, so there
3333
// is no need to wait
3334
if (!vdb.isDistributed())
3335      return;
3336
3337    DistributedVirtualDatabase dvdb = (DistributedVirtualDatabase) vdb;
3338    HashMap JavaDoc writesFlushed = dvdb.getWritesFlushed();
3339    Long JavaDoc controllerIdKey = new Long JavaDoc(requestIdOrTransactionId
3340        & DistributedRequestManager.CONTROLLER_ID_BIT_MASK);
3341
3342    // Set "writes flushed" flag to false and wait for writes being flushed by
3343
// the clean-up thread
3344
synchronized (writesFlushed)
3345    {
3346      if (!writesFlushed.containsKey(controllerIdKey))
3347        writesFlushed.put(controllerIdKey, Boolean.FALSE);
3348      while (!((Boolean JavaDoc) writesFlushed.get(controllerIdKey)).booleanValue())
3349      {
3350        try
3351        {
3352          // Wait for notification from controller failure clean-up thread
3353
if (logger.isDebugEnabled())
3354          {
3355            logger
3356                .debug("Will wait for writes to be flushed for failed controller "
3357                    + controllerIdKey);
3358          }
3359          writesFlushed.wait();
3360        }
3361        catch (InterruptedException JavaDoc e)
3362        {
3363          // Ignore exception
3364
}
3365      }
3366    }
3367  }
3368
3369  /**
3370   * Serialize a ControllerResultSet answer, prefixed with the appropriate
3371   * TypeTag. Note that this will be deserialized in a DriverResultSet.
3372   *
3373   * @param crs the resultset to send
3374   * @throws IOException stream error
3375   */

3376  private void sendToDriver(ControllerResultSet crs) throws IOException JavaDoc
3377  {
3378    /**
3379     * If a (buggy) backend was returning a null ResultSet, we would have failed
3380     * much earlier in
3381     * {@link ControllerResultSet#ControllerResultSet(AbstractRequest, java.sql.ResultSet, MetadataCache, Statement, boolean)
3382     */

3383    /*
3384     * So we can safely use "null" as a special value during transparent
3385     * failover when the controller's request cache hasn't found a result for a
3386     * given request.
3387     */

3388    if (null == crs)
3389    {
3390      TypeTag.NULL_RESULTSET.sendToStream(out);
3391      out.flush();
3392      return;
3393    }
3394
3395    try
3396    {
3397      crs.initSerializers();
3398    }
3399    catch (NotImplementedException nie)
3400    { // we don't know how to serialize something
3401
sendToDriver(nie);
3402      return;
3403    }
3404
3405    TypeTag.RESULTSET.sendToStream(out);
3406    crs.sendToStream(out);
3407  }
3408
3409  /**
3410   * Send a protocol String, prefixed with the appropriate TypeTag
3411   */

3412  private void sendToDriver(String JavaDoc str) throws IOException JavaDoc
3413  {
3414    TypeTag.NOT_EXCEPTION.sendToStream(out);
3415    out.writeLongUTF(str);
3416    out.flush();
3417  }
3418
3419  /**
3420   * Send a protocol boolean, prefixed with the appropriate TypeTag
3421   */

3422  private void sendToDriver(boolean b) throws IOException JavaDoc
3423  {
3424    TypeTag.NOT_EXCEPTION.sendToStream(out);
3425    out.writeBoolean(b);
3426    out.flush();
3427  }
3428
3429  /**
3430   * Send a protocol int, prefixed with the appropriate TypeTag
3431   */

3432  private void sendToDriver(int i) throws IOException JavaDoc
3433  {
3434    TypeTag.NOT_EXCEPTION.sendToStream(out);
3435    out.writeInt(i);
3436    out.flush();
3437  }
3438
3439  /**
3440   * Send a protocol long, prefixed with the appropriate TypeTag
3441   */

3442  private void sendToDriver(long l) throws IOException JavaDoc
3443  {
3444    TypeTag.NOT_EXCEPTION.sendToStream(out);
3445    out.writeLong(l);
3446    out.flush();
3447  }
3448
3449  private void sendToDriver(SQLWarning JavaDoc s) throws IOException JavaDoc
3450  {
3451    if (s != null)
3452    {
3453      sendToDriver(true);
3454      TypeTag.BACKEND_EXCEPTION.sendToStream(out);
3455      new BackendDriverException(s).sendToStream(out);
3456    }
3457    else
3458      sendToDriver(false);
3459  }
3460
3461  private void sendToDriver(Exception JavaDoc e) throws IOException JavaDoc
3462  {
3463    TypeTag.EXCEPTION.sendToStream(out);
3464    // This is the place where we convert Exceptions to something
3465
// serializable and that the driver can understand
3466
// So this is the place where it's possible to trap all unknown exceptions
3467

3468    if (e instanceof SQLException JavaDoc)
3469    { // we assume that an SQLexception comes from the backend
3470

3471      // since this is currently false because some ControllerCoreExceptions
3472
// subclass SQLException, here are a few workarounds
3473
if (e instanceof NoMoreBackendException
3474          || e instanceof NoMoreControllerException
3475          || e instanceof NotImplementedException)
3476      {
3477        TypeTag.CORE_EXCEPTION.sendToStream(out);
3478        new ControllerCoreException(e).sendToStream(out);
3479        return;
3480      }
3481
3482      // non-workaround, regular SQLException from backend
3483
TypeTag.BACKEND_EXCEPTION.sendToStream(out);
3484      new BackendDriverException(e).sendToStream(out);
3485      return;
3486    }
3487
3488    // else we assume this is an exception from the core (currently...?)
3489
TypeTag.CORE_EXCEPTION.sendToStream(out);
3490    new ControllerCoreException(e).sendToStream(out);
3491    return;
3492
3493  }
3494
3495  /**
3496   * Implements streaming: send the next ResultSet chunk to driver, pulling it
3497   * from ControllerResultSet. The driver decides of the chunk size at each
3498   * call. Note that virtualdatabase streaming is independent from backend
3499   * streaming (which may not be supported). They even could be configured with
3500   * two different fetchSize -s (it's not currently the case).
3501   * <p>
3502   * This is a real issue: in case of a low fetchsize hint ignored by the driver
3503   * of the backend, then the whole backend resultset stays in the memory on the
3504   * controller. And we probably cannot know how many rows did it pulled out.
3505   *
3506   * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#executeStatementExecuteQueryOnBackend(SelectRequest,
3507   * org.continuent.sequoia.controller.backend.DatabaseBackend,
3508   * org.continuent.sequoia.controller.loadbalancer.BackendWorkerThread,
3509   * java.sql.Connection,
3510   * org.continuent.sequoia.controller.cache.metadata.MetadataCache)
3511   * @see ControllerResultSet#fetchData(int)
3512   */

3513  private void fetchNextResultSetRows() throws IOException JavaDoc, SQLException JavaDoc
3514  {
3515    if (logger.isDebugEnabled())
3516      logger.debug("FetchNextResultSetRows command");
3517
3518    String JavaDoc cursorName = in.readLongUTF();
3519    int fetchSize = in.readInt();
3520    ControllerResultSet crs = (ControllerResultSet) streamedResultSets
3521        .get(cursorName);
3522    if (crs == null)
3523    {
3524      sendToDriver(new SQLException JavaDoc(
3525          "No valid ControllerResultSet to fetch data from"));
3526    }
3527    else
3528    {
3529      // refresh ControllerResultSet with a new chunk of rows
3530
crs.fetchData(fetchSize);
3531
3532      // send it
3533
TypeTag.NOT_EXCEPTION.sendToStream(out);
3534      crs.sendRowsToStream(out);
3535
3536      // At this point we could probably data.clear() already sent as a memory
3537
// optimization, but still in doubt about others using it we leave it as
3538
// is.
3539

3540      if (!crs.hasMoreData())
3541        streamedResultSets.remove(cursorName);
3542    }
3543  }
3544
3545  //
3546
// Public API
3547
//
3548

3549  /**
3550   * Return the current transaction id (should be 0 if not in a transaction).
3551   *
3552   * @return the current transaction id
3553   */

3554  public long getCurrentTransactionId()
3555  {
3556    return currentTid;
3557  }
3558
3559  /**
3560   * Get time active
3561   *
3562   * @return time active since started
3563   */

3564  public long getTimeActive()
3565  {
3566    return ((System.currentTimeMillis() - in.getDateCreated()) / 1000);
3567  }
3568
3569  /**
3570   * @return Returns the login of the current user.
3571   */

3572  public String JavaDoc getUser()
3573  {
3574    if (user == null)
3575    {
3576      return "No user connected";
3577    }
3578    return user.getLogin();
3579  }
3580
3581  //
3582
// Public API
3583
//
3584

3585  /**
3586   * Notify the abort of the given transaction which should match the current
3587   * transaction id of this thread else an exception will be thrown.
3588   *
3589   * @param tid the transaction identifier to abort
3590   * @throws SQLException if the tid does not correspond to the current
3591   * transaction id of this thread or if the abort throws a
3592   * SQLException
3593   */

3594  public void notifyAbort(long tid) throws SQLException JavaDoc
3595  {
3596    synchronized (this)
3597    {
3598      if ((!transactionStarted) || (currentTid != tid))
3599        throw new SQLException JavaDoc("Cannot abort transaction " + tid
3600            + " since current worker thread is assigned to transaction "
3601            + currentTid);
3602
3603      transactionHasAborted = true;
3604    }
3605  }
3606
3607  /**
3608   * Retrieve general information on this client
3609   *
3610   * @return an array of string
3611   */

3612  public String JavaDoc[] retrieveClientData()
3613  {
3614    String JavaDoc[] data = new String JavaDoc[4];
3615    data[0] = in.getSocket().getInetAddress().getHostName();
3616    data[1] = in.getSocket().getInetAddress().getHostAddress();
3617    data[2] = String
3618        .valueOf(((System.currentTimeMillis() - in.getDateCreated()) / 1000));
3619    return data;
3620  }
3621
3622  /**
3623   * Shutdown this thread by setting <code>isKilled</code> value to true. This
3624   * gives time to check for needed rollback transactions
3625   */

3626  public void shutdown()
3627  {
3628    // Tell this thread to stop working gently.
3629
// This will cancel transaction if needed
3630
this.isKilled = true;
3631    try
3632    {
3633      if (waitForCommand)
3634      {
3635        // close only the streams if we're not in the middle of a request
3636
in.close();
3637        out.close();
3638      }
3639    }
3640    catch (IOException JavaDoc e)
3641    {
3642      // ignore, only the input stream should be close
3643
// for this thread to end
3644
}
3645  }
3646
3647}
Popular Tags