KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > h2 > server > TcpServerThread


1 /*
2  * Copyright 2004-2006 H2 Group. Licensed under the H2 License, Version 1.0 (http://h2database.com/html/license.html).
3  * Initial Developer: H2 Group
4  */

5 package org.h2.server;
6
7 import java.io.IOException JavaDoc;
8 import java.io.PrintWriter JavaDoc;
9 import java.io.StringWriter JavaDoc;
10 import java.net.Socket JavaDoc;
11 import java.sql.SQLException JavaDoc;
12
13 import org.h2.command.Command;
14 import org.h2.engine.ConnectionInfo;
15 import org.h2.engine.Constants;
16 import org.h2.engine.Engine;
17 import org.h2.engine.Session;
18 import org.h2.engine.SessionRemote;
19 import org.h2.expression.Parameter;
20 import org.h2.message.Message;
21 import org.h2.result.LocalResult;
22 import org.h2.result.ResultColumn;
23 import org.h2.util.ObjectArray;
24 import org.h2.util.SmallMap;
25 import org.h2.value.Transfer;
26 import org.h2.value.Value;
27
28 public class TcpServerThread implements Runnable JavaDoc {
29     private TcpServer server;
30     private Session session;
31     private boolean stop;
32     private Thread JavaDoc thread;
33     private Transfer transfer;
34     private Command commit;
35     private SmallMap cache = new SmallMap(Constants.SERVER_CACHED_OBJECTS);
36
37     public TcpServerThread(Socket JavaDoc socket, TcpServer server) {
38         this.server = server;
39         transfer = new Transfer(null);
40         transfer.setSocket(socket);
41     }
42
43     public void run() {
44         try {
45             transfer.init();
46             server.log("Connect");
47             // TODO server: should support a list of allowed databases and a list of allowed clients
48
try {
49                 int version = transfer.readInt();
50                 if(!server.allow(transfer.getSocket())) {
51                     throw Message.getSQLException(Message.REMOTE_CONNECTION_NOT_ALLOWED);
52                 }
53                 if(version != Constants.TCP_DRIVER_VERSION) {
54                     throw Message.getSQLException(Message.DRIVER_VERSION_ERROR_2,
55                             new String JavaDoc[] { "" + version, "" + Constants.TCP_DRIVER_VERSION }, null);
56                 }
57                 String JavaDoc db = transfer.readString();
58                 String JavaDoc originalURL = transfer.readString();
59                 String JavaDoc baseDir = server.getBaseDir();
60                 ConnectionInfo ci = new ConnectionInfo(db);
61                 if(baseDir != null) {
62                     ci.setBaseDir(baseDir);
63                 }
64                 if(server.getIfExists()) {
65                     ci.setProperty("IFEXISTS", "TRUE");
66                 }
67                 ci.setOriginalURL(originalURL);
68                 ci.setUserName(transfer.readString());
69                 ci.setUserPasswordHash(transfer.readBytes());
70                 ci.setFilePasswordHash(transfer.readBytes());
71                 int len = transfer.readInt();
72                 for(int i=0; i<len; i++) {
73                     ci.setProperty(transfer.readString(), transfer.readString());
74                 }
75                 Engine engine = Engine.getInstance();
76                 session = engine.getSession(ci);
77                 transfer.setSession(session);
78                 transfer.writeInt(SessionRemote.STATUS_OK).flush();
79                 server.log("Connected");
80             } catch(Throwable JavaDoc e) {
81                 sendError(e);
82                 stop = true;
83             }
84             while (!stop) {
85                 try {
86                     process();
87                 } catch(Throwable JavaDoc e) {
88                     sendError(e);
89                 }
90             }
91             server.log("Disconnect");
92         } catch(Throwable JavaDoc e) {
93             server.logError(e);
94         } finally {
95             close();
96         }
97     }
98
99     private void closeSession() {
100         if(session != null) {
101             try {
102                 Command rollback = session.prepareLocal("ROLLBACK");
103                 rollback.executeUpdate();
104                 session.close();
105             } catch(Exception JavaDoc e) {
106                 server.logError(e);
107             } finally {
108                 session = null;
109             }
110         }
111     }
112
113     public void close() {
114         try {
115             stop = true;
116             closeSession();
117             transfer.close();
118             server.log("Close");
119         } catch(Exception JavaDoc e) {
120             server.logError(e);
121         }
122         server.remove(this);
123     }
124
125     private void sendError(Throwable JavaDoc e) {
126         try {
127             SQLException JavaDoc s = Message.convert(e);
128             StringWriter JavaDoc writer = new StringWriter JavaDoc();
129             e.printStackTrace(new PrintWriter JavaDoc(writer));
130             String JavaDoc trace = writer.toString();
131             transfer.writeInt(SessionRemote.STATUS_ERROR).
132                 writeString(s.getSQLState()).
133                 writeString(e.getMessage()).
134                 writeInt(s.getErrorCode()).
135                 writeString(trace).
136                 flush();
137         } catch(IOException JavaDoc e2) {
138             server.logError(e2);
139             // if writing the error does not work, close the connection
140
stop = true;
141         }
142     }
143
144     private void setParameters(Command command) throws IOException JavaDoc, SQLException JavaDoc {
145         int len = transfer.readInt();
146         ObjectArray params = command.getParameters();
147         for(int i=0; i<len; i++) {
148             Parameter p = (Parameter) params.get(i);
149             p.setValue(transfer.readValue());
150         }
151     }
152
153     private void process() throws IOException JavaDoc, SQLException JavaDoc {
154         int operation = transfer.readInt();
155         switch(operation) {
156         case SessionRemote.SESSION_PREPARE: {
157             int id = transfer.readInt();
158             String JavaDoc sql = transfer.readString();
159             Command command = session.prepareLocal(sql);
160             boolean readonly = command.isReadOnly();
161             cache.addObject(id, command);
162             boolean isQuery = command.isQuery();
163             int paramCount = command.getParameters().size();
164             transfer.writeInt(SessionRemote.STATUS_OK).writeBoolean(isQuery).writeBoolean(readonly).writeInt(paramCount).flush();
165             break;
166         }
167         case SessionRemote.SESSION_CLOSE: {
168             closeSession();
169             transfer.writeInt(SessionRemote.STATUS_OK).flush();
170             close();
171             break;
172         }
173         case SessionRemote.COMMAND_COMMIT: {
174             if (commit == null) {
175                 commit = session.prepareLocal("COMMIT");
176             }
177             commit.executeUpdate();
178             transfer.writeInt(SessionRemote.STATUS_OK).flush();
179             break;
180         }
181         case SessionRemote.COMMAND_EXECUTE_QUERY: {
182             int id = transfer.readInt();
183             int objectId = transfer.readInt();
184             int maxRows = transfer.readInt();
185             int readRows = transfer.readInt();
186             Command command = (Command)cache.getObject(id, false);
187             setParameters(command);
188             LocalResult result = command.executeQueryLocal(maxRows);
189             cache.addObject(objectId, result);
190             int columnCount = result.getVisibleColumnCount();
191             transfer.writeInt(SessionRemote.STATUS_OK).writeInt(columnCount);
192             int rowCount = result.getRowCount();
193             transfer.writeInt(rowCount);
194             for(int i=0; i<columnCount; i++) {
195                 ResultColumn.writeColumn(transfer, result, i);
196             }
197             if(rowCount<readRows) {
198                 for(int i=0; i<=rowCount; i++) {
199                     sendRow(result);
200                 }
201             }
202             transfer.flush();
203             break;
204         }
205         case SessionRemote.COMMAND_EXECUTE_UPDATE: {
206             int id = transfer.readInt();
207             Command command = (Command)cache.getObject(id, false);
208             setParameters(command);
209             int updateCount = command.executeUpdate();
210             int status = SessionRemote.STATUS_OK;
211             if(session.isClosed()) {
212                 status = SessionRemote.STATUS_CLOSED;
213             }
214             transfer.writeInt(status).writeInt(updateCount).writeBoolean(session.getAutoCommit());
215             transfer.flush();
216             break;
217         }
218         case SessionRemote.COMMAND_CLOSE: {
219             int id = transfer.readInt();
220             Command command = (Command)cache.getObject(id, true);
221             if(command != null) {
222                 command.close();
223                 cache.freeObject(id);
224             }
225             break;
226         }
227         case SessionRemote.RESULT_FETCH_ROW: {
228             int id = transfer.readInt();
229             LocalResult result = (LocalResult)cache.getObject(id, false);
230             transfer.writeInt(SessionRemote.STATUS_OK);
231             sendRow(result);
232             transfer.flush();
233             break;
234         }
235         case SessionRemote.RESULT_RESET: {
236             int id = transfer.readInt();
237             LocalResult result = (LocalResult)cache.getObject(id, false);
238             result.reset();
239             break;
240         }
241         case SessionRemote.RESULT_CLOSE: {
242             int id = transfer.readInt();
243             LocalResult result = (LocalResult)cache.getObject(id, true);
244             if(result != null) {
245                 result.close();
246                 cache.freeObject(id);
247             }
248             break;
249         }
250         case SessionRemote.CHANGE_ID: {
251             int oldId = transfer.readInt();
252             int newId = transfer.readInt();
253             Object JavaDoc obj = cache.getObject(oldId, false);
254             cache.freeObject(oldId);
255             cache.addObject(newId, obj);
256             break;
257         }
258         default:
259             server.logInternalError("Unknown operation: " + operation);
260             server.log("Unknown operation: " + operation);
261             closeSession();
262             close();
263         }
264     }
265
266     private void sendRow(LocalResult result) throws IOException JavaDoc, SQLException JavaDoc {
267         boolean n = result.next();
268         transfer.writeBoolean(n);
269         if(n) {
270             Value[] v = result.currentRow();
271             for(int i=0; i<result.getVisibleColumnCount(); i++) {
272                 transfer.writeValue(v[i]);
273             }
274         }
275     }
276
277     public void setThread(Thread JavaDoc thread) {
278         this.thread = thread;
279     }
280
281     public Thread JavaDoc getThread() {
282         return thread;
283     }
284
285 }
286
Popular Tags