1 21 package com.db4o.cs; 22 23 import java.io.*; 24 import com.db4o.*; 25 import com.db4o.cs.messages.*; 26 import com.db4o.foundation.*; 27 import com.db4o.foundation.network.*; 28 29 public final class YapServerThread extends Thread { 30 31 private String i_clientName; 32 33 private boolean i_loggedin; 34 private long i_lastClientMessage; 35 private final YapFile i_mainStream; 36 37 private Transaction i_mainTrans; 38 private int i_pingAttempts = 0; 39 private int i_nullMessages; 40 private boolean i_rollbackOnClose = true; 41 private boolean i_sendCloseMessage = true; 42 43 private final YapServer i_server; 44 45 private YapSocket i_socket; 46 private YapFile i_substituteStream; 47 private Transaction i_substituteTrans; 48 49 private Hashtable4 _queryResults; 50 51 private Config4Impl i_config; 52 53 final int i_threadID; 54 55 YapServerThread( 56 YapServer aServer, 57 YapFile aStream, 58 YapSocket aSocket, 59 int aThreadID, 60 boolean loggedIn) 61 throws Exception { 62 63 64 i_loggedin = loggedIn; 65 66 i_lastClientMessage = System.currentTimeMillis(); i_server = aServer; 68 i_config = (Config4Impl)i_server.configure(); 69 i_mainStream = aStream; 70 i_threadID = aThreadID; 71 setName("db4o message server " + aThreadID); 72 i_mainTrans = aStream.newTransaction(); 73 try { 74 i_socket = aSocket; 75 i_socket.setSoTimeout(((Config4Impl)aServer.configure()).timeoutServerSocket()); 76 77 81 } catch (Exception e) { 82 i_socket.close(); 83 throw (e); 84 } 85 } 86 87 public void close() { 88 closeSubstituteStream(); 89 try { 90 if (i_sendCloseMessage) { 91 write(Msg.CLOSE); 92 } 93 } catch (Exception e) { 94 if (Debug.atHome) { 95 e.printStackTrace(); 96 } 97 98 } 99 if (i_mainStream != null && i_mainTrans != null) { 100 i_mainTrans.close(i_rollbackOnClose); 101 } 102 try { 103 i_socket.close(); 104 } catch (Exception e) { 105 if (Debug.atHome) { 106 e.printStackTrace(); 107 } 108 } 109 i_socket = null; 110 try { 111 i_server.removeThread(this); 112 } catch (Exception e) { 113 if (Debug.atHome) { 114 e.printStackTrace(); 115 } 116 } 117 } 118 119 private void closeSubstituteStream() { 120 if (i_substituteStream != null) { 121 if (i_substituteTrans != null) { 122 i_substituteTrans.close(i_rollbackOnClose); 123 i_substituteTrans = null; 124 } 125 try { 126 i_substituteStream.close(); 127 128 } catch (Exception e) { 129 if (Debug.atHome) { 130 e.printStackTrace(); 131 } 132 } 133 i_substituteStream = null; 134 } 135 } 136 137 private final YapFile getStream() { 138 if (i_substituteStream != null) { 139 return i_substituteStream; 140 } 141 return i_mainStream; 142 } 143 144 Transaction getTransaction() { 145 if (i_substituteTrans != null) { 146 return i_substituteTrans; 147 } 148 return i_mainTrans; 149 } 150 151 public void run() { 152 while (i_socket != null) { 153 try { 154 if(! messageProcessor()){ 155 break; 156 } 157 } catch (Exception e) { 158 if (i_mainStream == null || i_mainStream.isClosed()) { 159 break; 160 } 161 if(! i_socket.isConnected()){ 162 break; 163 } 164 if (Deploy.debug) { 165 e.printStackTrace(); 166 } 167 i_nullMessages++; 168 } 169 170 172 if (i_nullMessages > 20 || pingClientTimeoutReached()) { 173 if (i_pingAttempts > 5) { 174 getStream().logMsg(33, i_clientName); 176 break; 177 } 178 if (null == i_socket) break; 179 write(Msg.PING); 180 i_pingAttempts++; 181 } 182 } 183 close(); 184 } 185 186 private boolean pingClientTimeoutReached() { 187 return (System.currentTimeMillis() - i_lastClientMessage > i_config.timeoutPingClients()); 188 } 189 190 private boolean messageProcessor() throws IOException{ 191 192 Msg message = Msg.readMessage(getTransaction(), i_socket); 193 if(message == null){ 194 i_nullMessages ++; 195 return true; 196 } 197 198 i_lastClientMessage = System.currentTimeMillis(); 199 i_nullMessages = 0; 200 i_pingAttempts = 0; 201 if (! i_loggedin) { 202 if (Msg.LOGIN.equals(message)) { 203 String userName = ((MsgD) message).readString(); 204 String password = ((MsgD) message).readString(); 205 i_mainStream.showInternalClasses(true); 206 User found = i_server.getUser(userName); 207 i_mainStream.showInternalClasses(false); 208 if (found != null) { 209 if (found.password.equals(password)) { 210 i_clientName = userName; 211 i_mainStream.logMsg(32, i_clientName); 212 int blockSize = i_mainStream.blockSize(); 213 int encrypt = i_mainStream.i_handlers.i_encrypt ? 1 : 0; 214 write(Msg.LOGIN_OK.getWriterForInts(getTransaction(), new int[] {blockSize, encrypt})); 215 i_loggedin= true; 216 setName("db4o server socket for client " + i_clientName); 217 } else { 218 write(Msg.FAILED); 219 return false; 220 } 221 } else { 222 write(Msg.FAILED); 223 return false; 224 } 225 } 226 return true; 227 } 228 229 if (message.processAtServer(this)) { 230 return true; 231 } 232 233 if (Msg.PING.equals(message)) { 234 writeOK(); 235 return true; 236 } 237 238 if(Msg.OBJECTSET_FINALIZED.equals(message)){ 239 int queryResultID = ((MsgD) message).readInt(); 240 queryResultFinalized(queryResultID); 241 return true; 242 } 243 244 if (Msg.CLOSE.equals(message)) { 245 write(Msg.CLOSE); 246 getTransaction().commit(); 247 i_sendCloseMessage = false; 248 getStream().logMsg(34, i_clientName); 249 return false; 250 } 251 252 if (Msg.IDENTITY.equals(message)) { 253 respondInt((int)getStream().getID(getStream().identity())); 254 return true; 255 } 256 257 if (Msg.CURRENT_VERSION.equals(message)){ 258 long ver = 0; 259 synchronized(getStream()){ 260 ver = getStream().currentVersion(); 261 } 262 write(Msg.ID_LIST.getWriterForLong(getTransaction(), ver)); 263 return true; 264 } 265 266 if (Msg.RAISE_VERSION.equals(message)) { 267 long minimumVersion = ((MsgD)message).readLong(); 268 YapStream stream = getStream(); 269 synchronized(stream){ 270 stream.raiseVersion(minimumVersion); 271 } 272 return true; 273 } 274 275 if (Msg.GET_THREAD_ID.equals(message)) { 276 respondInt(i_threadID); 277 return true; 278 } 279 280 if (Msg.SWITCH_TO_FILE.equals(message)) { 281 switchToFile(message); 282 return true; 283 } 284 285 if (Msg.SWITCH_TO_MAIN_FILE.equals(message)) { 286 switchToMainFile(); 287 return true; 288 } 289 290 if (Msg.USE_TRANSACTION.equals(message)) { 291 useTransaction(message); 292 return true; 293 } 294 295 return true; 296 } 297 298 private void writeOK() { 299 write(Msg.OK); 300 } 301 302 private void queryResultFinalized(int queryResultID) { 303 _queryResults.remove(queryResultID); 304 } 305 306 public void mapQueryResultToID(LazyClientObjectSetStub stub, int queryResultID) { 307 if(_queryResults == null){ 308 _queryResults = new Hashtable4(); 309 } 310 _queryResults.put(queryResultID, stub); 311 } 312 313 public LazyClientObjectSetStub queryResultForID(int queryResultID){ 314 return (LazyClientObjectSetStub) _queryResults.get(queryResultID); 315 } 316 317 private void switchToFile(Msg message) { 318 synchronized (i_mainStream.i_lock) { 319 String fileName = ((MsgD) message).readString(); 320 try { 321 closeSubstituteStream(); 322 i_substituteStream = (YapFile) Db4o.openFile(fileName); 323 i_substituteTrans = i_substituteStream.newTransaction(); 324 i_substituteStream.configImpl().setMessageRecipient(i_mainStream.configImpl().messageRecipient()); 325 writeOK(); 326 } catch (Exception e) { 327 if (Debug.atHome) { 328 System.out.println("Msg.SWITCH_TO_FILE failed."); 329 e.printStackTrace(); 330 } 331 closeSubstituteStream(); 332 write(Msg.ERROR); 333 } 334 } 335 } 336 337 private void switchToMainFile() { 338 synchronized (i_mainStream.i_lock) { 339 closeSubstituteStream(); 340 writeOK(); 341 } 342 } 343 344 private void useTransaction(Msg message) { 345 int threadID = ((MsgD) message).readInt(); 346 YapServerThread transactionThread = i_server.findThread(threadID); 347 if (transactionThread != null) { 348 Transaction transToUse = transactionThread.getTransaction(); 349 if (i_substituteTrans != null) { 350 i_substituteTrans = transToUse; 351 } else { 352 i_mainTrans = transToUse; 353 } 354 i_rollbackOnClose = false; 355 } 356 } 357 358 private void respondInt(int response){ 359 write(Msg.ID_LIST.getWriterForInt(getTransaction(), response)); 360 } 361 362 public void write(Msg msg){ 363 msg.write(getStream(), i_socket); 364 } 365 366 public YapSocket socket(){ 367 return i_socket; 368 } 369 370 371 } | Popular Tags |