1 5 package org.h2.server; 6 7 import java.io.IOException ; 8 import java.io.PrintWriter ; 9 import java.io.StringWriter ; 10 import java.net.Socket ; 11 import java.sql.SQLException ; 12 13 import org.h2.command.Command; 14 import org.h2.engine.ConnectionInfo; 15 import org.h2.engine.Constants; 16 import org.h2.engine.Engine; 17 import org.h2.engine.Session; 18 import org.h2.engine.SessionRemote; 19 import org.h2.expression.Parameter; 20 import org.h2.message.Message; 21 import org.h2.result.LocalResult; 22 import org.h2.result.ResultColumn; 23 import org.h2.util.ObjectArray; 24 import org.h2.util.SmallMap; 25 import org.h2.value.Transfer; 26 import org.h2.value.Value; 27 28 public class TcpServerThread implements Runnable { 29 private TcpServer server; 30 private Session session; 31 private boolean stop; 32 private Thread thread; 33 private Transfer transfer; 34 private Command commit; 35 private SmallMap cache = new SmallMap(Constants.SERVER_CACHED_OBJECTS); 36 37 public TcpServerThread(Socket socket, TcpServer server) { 38 this.server = server; 39 transfer = new Transfer(null); 40 transfer.setSocket(socket); 41 } 42 43 public void run() { 44 try { 45 transfer.init(); 46 server.log("Connect"); 47 try { 49 int version = transfer.readInt(); 50 if(!server.allow(transfer.getSocket())) { 51 throw Message.getSQLException(Message.REMOTE_CONNECTION_NOT_ALLOWED); 52 } 53 if(version != Constants.TCP_DRIVER_VERSION) { 54 throw Message.getSQLException(Message.DRIVER_VERSION_ERROR_2, 55 new String [] { "" + version, "" + Constants.TCP_DRIVER_VERSION }, null); 56 } 57 String db = transfer.readString(); 58 String originalURL = transfer.readString(); 59 String baseDir = server.getBaseDir(); 60 ConnectionInfo ci = new ConnectionInfo(db); 61 if(baseDir != null) { 62 ci.setBaseDir(baseDir); 63 } 64 if(server.getIfExists()) { 65 ci.setProperty("IFEXISTS", "TRUE"); 66 } 67 ci.setOriginalURL(originalURL); 68 ci.setUserName(transfer.readString()); 69 ci.setUserPasswordHash(transfer.readBytes()); 70 ci.setFilePasswordHash(transfer.readBytes()); 71 int len = transfer.readInt(); 72 for(int i=0; i<len; i++) { 73 ci.setProperty(transfer.readString(), transfer.readString()); 74 } 75 Engine engine = Engine.getInstance(); 76 session = engine.getSession(ci); 77 transfer.setSession(session); 78 transfer.writeInt(SessionRemote.STATUS_OK).flush(); 79 server.log("Connected"); 80 } catch(Throwable e) { 81 sendError(e); 82 stop = true; 83 } 84 while (!stop) { 85 try { 86 process(); 87 } catch(Throwable e) { 88 sendError(e); 89 } 90 } 91 server.log("Disconnect"); 92 } catch(Throwable e) { 93 server.logError(e); 94 } finally { 95 close(); 96 } 97 } 98 99 private void closeSession() { 100 if(session != null) { 101 try { 102 Command rollback = session.prepareLocal("ROLLBACK"); 103 rollback.executeUpdate(); 104 session.close(); 105 } catch(Exception e) { 106 server.logError(e); 107 } finally { 108 session = null; 109 } 110 } 111 } 112 113 public void close() { 114 try { 115 stop = true; 116 closeSession(); 117 transfer.close(); 118 server.log("Close"); 119 } catch(Exception e) { 120 server.logError(e); 121 } 122 server.remove(this); 123 } 124 125 private void sendError(Throwable e) { 126 try { 127 SQLException s = Message.convert(e); 128 StringWriter writer = new StringWriter (); 129 e.printStackTrace(new PrintWriter (writer)); 130 String trace = writer.toString(); 131 transfer.writeInt(SessionRemote.STATUS_ERROR). 132 writeString(s.getSQLState()). 133 writeString(e.getMessage()). 134 writeInt(s.getErrorCode()). 135 writeString(trace). 136 flush(); 137 } catch(IOException e2) { 138 server.logError(e2); 139 stop = true; 141 } 142 } 143 144 private void setParameters(Command command) throws IOException , SQLException { 145 int len = transfer.readInt(); 146 ObjectArray params = command.getParameters(); 147 for(int i=0; i<len; i++) { 148 Parameter p = (Parameter) params.get(i); 149 p.setValue(transfer.readValue()); 150 } 151 } 152 153 private void process() throws IOException , SQLException { 154 int operation = transfer.readInt(); 155 switch(operation) { 156 case SessionRemote.SESSION_PREPARE: { 157 int id = transfer.readInt(); 158 String sql = transfer.readString(); 159 Command command = session.prepareLocal(sql); 160 boolean readonly = command.isReadOnly(); 161 cache.addObject(id, command); 162 boolean isQuery = command.isQuery(); 163 int paramCount = command.getParameters().size(); 164 transfer.writeInt(SessionRemote.STATUS_OK).writeBoolean(isQuery).writeBoolean(readonly).writeInt(paramCount).flush(); 165 break; 166 } 167 case SessionRemote.SESSION_CLOSE: { 168 closeSession(); 169 transfer.writeInt(SessionRemote.STATUS_OK).flush(); 170 close(); 171 break; 172 } 173 case SessionRemote.COMMAND_COMMIT: { 174 if (commit == null) { 175 commit = session.prepareLocal("COMMIT"); 176 } 177 commit.executeUpdate(); 178 transfer.writeInt(SessionRemote.STATUS_OK).flush(); 179 break; 180 } 181 case SessionRemote.COMMAND_EXECUTE_QUERY: { 182 int id = transfer.readInt(); 183 int objectId = transfer.readInt(); 184 int maxRows = transfer.readInt(); 185 int readRows = transfer.readInt(); 186 Command command = (Command)cache.getObject(id, false); 187 setParameters(command); 188 LocalResult result = command.executeQueryLocal(maxRows); 189 cache.addObject(objectId, result); 190 int columnCount = result.getVisibleColumnCount(); 191 transfer.writeInt(SessionRemote.STATUS_OK).writeInt(columnCount); 192 int rowCount = result.getRowCount(); 193 transfer.writeInt(rowCount); 194 for(int i=0; i<columnCount; i++) { 195 ResultColumn.writeColumn(transfer, result, i); 196 } 197 if(rowCount<readRows) { 198 for(int i=0; i<=rowCount; i++) { 199 sendRow(result); 200 } 201 } 202 transfer.flush(); 203 break; 204 } 205 case SessionRemote.COMMAND_EXECUTE_UPDATE: { 206 int id = transfer.readInt(); 207 Command command = (Command)cache.getObject(id, false); 208 setParameters(command); 209 int updateCount = command.executeUpdate(); 210 int status = SessionRemote.STATUS_OK; 211 if(session.isClosed()) { 212 status = SessionRemote.STATUS_CLOSED; 213 } 214 transfer.writeInt(status).writeInt(updateCount).writeBoolean(session.getAutoCommit()); 215 transfer.flush(); 216 break; 217 } 218 case SessionRemote.COMMAND_CLOSE: { 219 int id = transfer.readInt(); 220 Command command = (Command)cache.getObject(id, true); 221 if(command != null) { 222 command.close(); 223 cache.freeObject(id); 224 } 225 break; 226 } 227 case SessionRemote.RESULT_FETCH_ROW: { 228 int id = transfer.readInt(); 229 LocalResult result = (LocalResult)cache.getObject(id, false); 230 transfer.writeInt(SessionRemote.STATUS_OK); 231 sendRow(result); 232 transfer.flush(); 233 break; 234 } 235 case SessionRemote.RESULT_RESET: { 236 int id = transfer.readInt(); 237 LocalResult result = (LocalResult)cache.getObject(id, false); 238 result.reset(); 239 break; 240 } 241 case SessionRemote.RESULT_CLOSE: { 242 int id = transfer.readInt(); 243 LocalResult result = (LocalResult)cache.getObject(id, true); 244 if(result != null) { 245 result.close(); 246 cache.freeObject(id); 247 } 248 break; 249 } 250 case SessionRemote.CHANGE_ID: { 251 int oldId = transfer.readInt(); 252 int newId = transfer.readInt(); 253 Object obj = cache.getObject(oldId, false); 254 cache.freeObject(oldId); 255 cache.addObject(newId, obj); 256 break; 257 } 258 default: 259 server.logInternalError("Unknown operation: " + operation); 260 server.log("Unknown operation: " + operation); 261 closeSession(); 262 close(); 263 } 264 } 265 266 private void sendRow(LocalResult result) throws IOException , SQLException { 267 boolean n = result.next(); 268 transfer.writeBoolean(n); 269 if(n) { 270 Value[] v = result.currentRow(); 271 for(int i=0; i<result.getVisibleColumnCount(); i++) { 272 transfer.writeValue(v[i]); 273 } 274 } 275 } 276 277 public void setThread(Thread thread) { 278 this.thread = thread; 279 } 280 281 public Thread getThread() { 282 return thread; 283 } 284 285 } 286 | Popular Tags |