KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > cjdbc > requestplayer > ClientThread


1 /**
2  * C-JDBC: Clustered JDBC.
3  * Copyright (C) 2002-2005 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Contact: c-jdbc@objectweb.org
6  *
7  * This library is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as published by the
9  * Free Software Foundation; either version 2.1 of the License, or any later
10  * version.
11  *
12  * This library is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15  * for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this library; if not, write to the Free Software Foundation,
19  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
20  *
21  *
22  * Initial developer(s): Emmanuel Cecchet.
23  * Contributor(s): Julie Marguerite.
24  */

25
26 package org.objectweb.cjdbc.requestplayer;
27
28 import java.sql.Connection JavaDoc;
29 import java.sql.ResultSet JavaDoc;
30 import java.sql.SQLException JavaDoc;
31 import java.sql.Statement JavaDoc;
32
33 import org.objectweb.cjdbc.common.util.Stats;
34
35 /**
36  * C-JDBC client emulator worker thread. Reads SQL requests in a file and
37  * forwards them to the cache. If the cache returns no reply, this class
38  * forwards the request to the database. Then it returns the reply and updates
39  * the cache if needed.
40  *
41  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
42  * @author <a HREF="mailto:julie.marguerite@inria.fr">Julie Marguerite </a>
43  * @version 1.0
44  */

45 public class ClientThread extends Thread JavaDoc
46 {
47   /** Debug on standard output. */
48   private static final boolean DEBUG = false;
49
50   /** Number of read requests. */
51   private Stats selectStats = null;
52
53   /** Number of unknown requests. */
54   private Stats unknownStats = null;
55
56   /** Number of update requests. */
57   private Stats updateStats = null;
58
59   /** Number of insert requests. */
60   private Stats insertStats = null;
61
62   /** Number of delete requests. */
63   private Stats deleteStats = null;
64
65   /** Number of transaction begin. */
66   private Stats beginStats = null;
67
68   /** Number of transaction commit. */
69   private Stats commitStats = null;
70
71   /** Number of transaction rollback. */
72   private Stats rollbackStats = null;
73
74   /** Statistics about get connection from driver */
75   private Stats getConnectionStats = null;
76
77   /** Statistics about closing a connection */
78   private Stats closeStats = null;
79
80   /** Statistics about getting request from the log file */
81   private Stats getRequestStats = null;
82
83   private Connection JavaDoc conn = null;
84
85   private ClientEmulator father;
86   private int threadId;
87
88   /** Type of connection management: standard, fixed or pooling */
89   private int connectionType;
90
91   /**
92    * Creates a new <code>ClientThread</code> instance.
93    *
94    * @param threadId thread id
95    * @param father father client emulator
96    * @param connectionType connection type
97    */

98   public ClientThread(int threadId, ClientEmulator father, int connectionType)
99   {
100     super("ClientThread" + threadId);
101
102     // Init the pointers to the stats
103
selectStats = father.getSelectStats();
104     unknownStats = father.getUnknownStats();
105     updateStats = father.getUpdateStats();
106     insertStats = father.getInsertStats();
107     deleteStats = father.getDeleteStats();
108     beginStats = father.getBeginStats();
109     commitStats = father.getCommitStats();
110     rollbackStats = father.getRollbackStats();
111     getRequestStats = father.getGetRequestStats();
112     getConnectionStats = father.getGetConnectionStats();
113     closeStats = father.getCloseStats();
114
115     this.father = father;
116     this.threadId = threadId;
117     this.connectionType = connectionType;
118
119     if (this.connectionType == RequestPlayerProperties.FIXED_CONNECTION)
120     {
121       // Get a new connection to the virtual database
122
conn = father.getConnection();
123     }
124   }
125
126   /**
127    * @see java.lang.Runnable#run()
128    */

129   public void run()
130   {
131     String JavaDoc request = null;
132     int tid = 0; // current transaction id
133

134     if (DEBUG)
135       System.out.println(threadId + ": Starting");
136
137     // Get SQL requests from the trace file and send them to the DB
138
while (true)
139     {
140       long startg = System.currentTimeMillis();
141       request = father.parallelGetNextSQLRequest(tid);
142       long endg = System.currentTimeMillis();
143       getRequestStats.incrementCount();
144       getRequestStats.updateTime(endg - startg);
145
146       if (request == null)
147       { // Could be the end of file or a transaction whose
148
// commit/rollback was never logged in the trace file
149
if (tid != 0)
150         {
151           System.out.println(threadId
152               + ": Warning! Rollbacking unterminated transaction " + tid);
153           request = "R";
154         }
155         else
156           break; // No more requests
157
}
158
159       try
160       {
161         switch (request.charAt(0))
162         {
163           case 'B' :
164             // Begin
165
if (DEBUG)
166               System.out.println(threadId + ": " + request);
167             long startb = System.currentTimeMillis();
168             if (connectionType != RequestPlayerProperties.FIXED_CONNECTION)
169             {
170               // Get a new connection to the virtual database
171
conn = getConnection();
172             }
173             conn.setAutoCommit(false);
174             long endb = System.currentTimeMillis();
175             beginStats.incrementCount();
176             beginStats.updateTime(endb - startb);
177             tid = new Integer JavaDoc(request.substring(2)).intValue();
178             break;
179           case 'C' :
180             // Commit
181
if (DEBUG)
182               System.out.println(threadId + ": " + request);
183             long startc = System.currentTimeMillis();
184             conn.commit();
185             long endc = System.currentTimeMillis();
186             commitStats.incrementCount();
187             commitStats.updateTime(endc - startc);
188             tid = 0;
189             if (connectionType != RequestPlayerProperties.FIXED_CONNECTION)
190             { // Close the connection
191
closeConnection();
192             }
193             break;
194           case 'R' :
195             // Rollback
196
if (DEBUG)
197               System.out.println(threadId + ": " + request);
198             long startr = System.currentTimeMillis();
199             conn.rollback();
200             long endr = System.currentTimeMillis();
201             rollbackStats.incrementCount();
202             rollbackStats.updateTime(endr - startr);
203             tid = 0;
204             if (connectionType != RequestPlayerProperties.FIXED_CONNECTION)
205             { // Close the connection
206
closeConnection();
207             }
208             break;
209           case 'S' :
210             // Select
211
if (tid == 0
212                 && (connectionType != RequestPlayerProperties.FIXED_CONNECTION))
213             {
214               // Get a new connection to the virtual database
215
conn = getConnection();
216               // Execute the request
217
execReadRequest(request);
218               // Close the connection
219
closeConnection();
220             }
221             else
222             {
223               execReadRequest(request);
224             }
225             break;
226           case 'W' :
227             // Write
228
if (tid == 0
229                 && (connectionType != RequestPlayerProperties.FIXED_CONNECTION))
230             {
231               // Get a new connection to the virtual database
232
conn = getConnection();
233               // Execute the request
234
execWriteRequest(request);
235               // Close the connection
236
closeConnection();
237             }
238             else
239             {
240               execWriteRequest(request);
241             }
242
243             break;
244           default :
245             System.err.println(threadId + ": Error! Unsupported request "
246                 + request);
247             break;
248         }
249       }
250       catch (Exception JavaDoc e)
251       {
252         System.err.println(threadId
253             + ": An error occured while executing SQL request ("
254             + e.getMessage() + ")");
255         if (request.charAt(0) != 'S' && request.charAt(0) != 'W')
256         { // Reset the tid for begin/commit/rollback
257
if (tid != 0)
258           {
259             try
260             {
261               conn.rollback();
262             }
263             catch (Exception JavaDoc ignore)
264             {
265             }
266             father.ignoreTid(tid);
267             tid = 0;
268           }
269         }
270         if (connectionType != RequestPlayerProperties.FIXED_CONNECTION)
271           // Close the connection
272
closeConnection();
273       }
274     }
275
276     if (connectionType == RequestPlayerProperties.FIXED_CONNECTION)
277       father.closeConnection(conn);
278
279     // if (DEBUG)
280
System.out.println(threadId + ": Ending.");
281   }
282
283   /**
284    * Executes a write request.
285    *
286    * @param req request to execute
287    */

288   private void execWriteRequest(String JavaDoc req)
289   {
290     Statement JavaDoc stmt = null;
291
292     String JavaDoc request = req.substring(2);
293     if (DEBUG)
294       System.out.println(threadId + ": " + request.substring(0, 5));
295     long startw = System.currentTimeMillis();
296     try
297     {
298       stmt = conn.createStatement();
299       stmt.setQueryTimeout(father.getTimeout());
300       stmt.executeUpdate(request);
301       stmt.close();
302     }
303     catch (SQLException JavaDoc e)
304     {
305       if ((request.charAt(0) == 'i') || (request.charAt(0) == 'I')) // insert
306
{
307         insertStats.incrementError();
308       }
309       else if ((request.charAt(0) == 'u') || (request.charAt(0) == 'U')) // update
310
{
311         updateStats.incrementError();
312       }
313       else if ((request.charAt(0) == 'd') || (request.charAt(0) == 'D')) // delete
314
{
315         deleteStats.incrementError();
316       }
317       else
318       {
319         unknownStats.incrementError();
320       }
321       System.err.println(threadId + ": Failed to execute request: " + request
322           + "(" + e + ")");
323       return;
324     }
325     long endw = System.currentTimeMillis();
326     if ((request.charAt(0) == 'i') || (request.charAt(0) == 'I')) // insert
327
{
328       insertStats.incrementCount();
329       insertStats.updateTime(endw - startw);
330     }
331     else if ((request.charAt(0) == 'u') || (request.charAt(0) == 'U')) // update
332
{
333       updateStats.incrementCount();
334       updateStats.updateTime(endw - startw);
335     }
336     else if ((request.charAt(0) == 'd') || (request.charAt(0) == 'D')) // delete
337
{
338       deleteStats.incrementCount();
339       deleteStats.updateTime(endw - startw);
340     }
341     else
342     {
343       unknownStats.incrementCount();
344       unknownStats.updateTime(endw - startw);
345     }
346   }
347
348   /**
349    * Executes a select request.
350    *
351    * @param req request to execute
352    */

353   private void execReadRequest(String JavaDoc req)
354   {
355     Statement JavaDoc stmt = null;
356     ResultSet JavaDoc dbReply = null; // The reply from the database
357
String JavaDoc request = req.substring(2);
358     if (DEBUG)
359       System.out.println(threadId + ": " + request.substring(0, 5));
360     long startr = System.currentTimeMillis();
361     try
362     {
363       stmt = conn.createStatement();
364       stmt.setQueryTimeout(father.getTimeout());
365       dbReply = stmt.executeQuery(request);
366       // Parse the result if any
367
if (dbReply != null)
368         dbReply.next(); // Fetch only the first row
369
stmt.close();
370     }
371     catch (SQLException JavaDoc e)
372     {
373       selectStats.incrementError();
374       System.err.println(threadId + ": Failed to execute request: " + request
375           + "(" + e + ")");
376     }
377     long endr = System.currentTimeMillis();
378     selectStats.incrementCount();
379     selectStats.updateTime(endr - startr);
380   }
381
382   /**
383    * Closes the connection to the database.
384    */

385   private void closeConnection()
386   {
387     long start = System.currentTimeMillis();
388     if (connectionType == RequestPlayerProperties.STANDARD_CONNECTION)
389     {
390       father.closeConnection(conn);
391     }
392     else if (connectionType == RequestPlayerProperties.POOLING_CONNECTION)
393     {
394       father.releaseConnectionToPool(conn);
395     }
396     long end = System.currentTimeMillis();
397     closeStats.incrementCount();
398     closeStats.updateTime(end - start);
399   }
400
401   /**
402    * Gets a new connection to the database.
403    *
404    * @return Connection
405    */

406   private Connection JavaDoc getConnection()
407   {
408     Connection JavaDoc c = null;
409     long start = System.currentTimeMillis();
410     if (connectionType == RequestPlayerProperties.STANDARD_CONNECTION)
411     {
412       c = father.getConnection();
413     }
414     else if (connectionType == RequestPlayerProperties.POOLING_CONNECTION)
415     {
416       c = father.getConnectionFromPool();
417     }
418     long end = System.currentTimeMillis();
419     getConnectionStats.incrementCount();
420     getConnectionStats.updateTime(end - start);
421     return c;
422   }
423 }
Popular Tags