KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > db4o > cs > YapServerThread


1 /* Copyright (C) 2004 - 2006 db4objects Inc. http://www.db4o.com
2
3 This file is part of the db4o open source object database.
4
5 db4o is free software; you can redistribute it and/or modify it under
6 the terms of version 2 of the GNU General Public License as published
7 by the Free Software Foundation and as clarified by db4objects' GPL
8 interpretation policy, available at
9 http://www.db4o.com/about/company/legalpolicies/gplinterpretation/
10 Alternatively you can write to db4objects, Inc., 1900 S Norfolk Street,
11 Suite 350, San Mateo, CA 94403, USA.
12
13 db4o is distributed in the hope that it will be useful, but WITHOUT ANY
14 WARRANTY; without even the implied warranty of MERCHANTABILITY or
15 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
16 for more details.
17
18 You should have received a copy of the GNU General Public License along
19 with this program; if not, write to the Free Software Foundation, Inc.,
20 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */

21 package com.db4o.cs;
22
23 import java.io.*;
24 import com.db4o.*;
25 import com.db4o.cs.messages.*;
26 import com.db4o.foundation.*;
27 import com.db4o.foundation.network.*;
28
29 public final class YapServerThread extends Thread JavaDoc {
30
31     private String JavaDoc i_clientName;
32
33     private boolean i_loggedin;
34     private long i_lastClientMessage;
35     private final YapFile i_mainStream;
36
37     private Transaction i_mainTrans;
38     private int i_pingAttempts = 0;
39     private int i_nullMessages;
40     private boolean i_rollbackOnClose = true;
41     private boolean i_sendCloseMessage = true;
42
43     private final YapServer i_server;
44
45     private YapSocket i_socket;
46     private YapFile i_substituteStream;
47     private Transaction i_substituteTrans;
48     
49     private Hashtable4 _queryResults;
50     
51     private Config4Impl i_config;
52
53     final int i_threadID;
54
55     YapServerThread(
56         YapServer aServer,
57         YapFile aStream,
58         YapSocket aSocket,
59         int aThreadID,
60         boolean loggedIn)
61         throws Exception JavaDoc {
62         
63             
64         i_loggedin = loggedIn;
65          
66         i_lastClientMessage = System.currentTimeMillis(); // don't start pinging from the start
67
i_server = aServer;
68         i_config = (Config4Impl)i_server.configure();
69         i_mainStream = aStream;
70         i_threadID = aThreadID;
71         setName("db4o message server " + aThreadID);
72         i_mainTrans = aStream.newTransaction();
73         try {
74             i_socket = aSocket;
75             i_socket.setSoTimeout(((Config4Impl)aServer.configure()).timeoutServerSocket());
76
77             // TODO: Experiment with packetsize and noDelay
78
// i_socket.setSendBufferSize(100);
79
// i_socket.setTcpNoDelay(true);
80

81         } catch (Exception JavaDoc e) {
82             i_socket.close();
83             throw (e);
84         }
85     }
86
87     public void close() {
88         closeSubstituteStream();
89         try {
90             if (i_sendCloseMessage) {
91                 write(Msg.CLOSE);
92             }
93         } catch (Exception JavaDoc e) {
94             if (Debug.atHome) {
95                 e.printStackTrace();
96             }
97
98         }
99         if (i_mainStream != null && i_mainTrans != null) {
100             i_mainTrans.close(i_rollbackOnClose);
101         }
102         try {
103             i_socket.close();
104         } catch (Exception JavaDoc e) {
105             if (Debug.atHome) {
106                 e.printStackTrace();
107             }
108         }
109         i_socket = null;
110         try {
111             i_server.removeThread(this);
112         } catch (Exception JavaDoc e) {
113             if (Debug.atHome) {
114                 e.printStackTrace();
115             }
116         }
117     }
118
119     private void closeSubstituteStream() {
120         if (i_substituteStream != null) {
121             if (i_substituteTrans != null) {
122                 i_substituteTrans.close(i_rollbackOnClose);
123                 i_substituteTrans = null;
124             }
125             try {
126                 i_substituteStream.close();
127
128             } catch (Exception JavaDoc e) {
129                 if (Debug.atHome) {
130                     e.printStackTrace();
131                 }
132             }
133             i_substituteStream = null;
134         }
135     }
136
137     private final YapFile getStream() {
138         if (i_substituteStream != null) {
139             return i_substituteStream;
140         }
141         return i_mainStream;
142     }
143
144     Transaction getTransaction() {
145         if (i_substituteTrans != null) {
146             return i_substituteTrans;
147         }
148         return i_mainTrans;
149     }
150
151     public void run() {
152         while (i_socket != null) {
153             try {
154                 if(! messageProcessor()){
155                     break;
156                 }
157             } catch (Exception JavaDoc e) {
158                 if (i_mainStream == null || i_mainStream.isClosed()) {
159                     break;
160                 }
161                 if(! i_socket.isConnected()){
162                     break;
163                 }
164                 if (Deploy.debug) {
165                     e.printStackTrace();
166                 }
167                 i_nullMessages++;
168             }
169             
170             // TODO: Optimize - this doesn't need to be in the loop of executing statements
171

172             if (i_nullMessages > 20 || pingClientTimeoutReached()) {
173                 if (i_pingAttempts > 5) {
174                     //
175
getStream().logMsg(33, i_clientName);
176                     break;
177                 }
178                 if (null == i_socket) break;
179                 write(Msg.PING);
180                 i_pingAttempts++;
181             }
182         }
183         close();
184     }
185
186     private boolean pingClientTimeoutReached() {
187         return (System.currentTimeMillis() - i_lastClientMessage > i_config.timeoutPingClients());
188     }
189     
190     private boolean messageProcessor() throws IOException{
191         
192         Msg message = Msg.readMessage(getTransaction(), i_socket);
193         if(message == null){
194             i_nullMessages ++;
195             return true;
196         }
197         
198         i_lastClientMessage = System.currentTimeMillis();
199         i_nullMessages = 0;
200         i_pingAttempts = 0;
201         if (! i_loggedin) {
202             if (Msg.LOGIN.equals(message)) {
203                 String JavaDoc userName = ((MsgD) message).readString();
204                 String JavaDoc password = ((MsgD) message).readString();
205                 i_mainStream.showInternalClasses(true);
206                 User found = i_server.getUser(userName);
207                 i_mainStream.showInternalClasses(false);
208                 if (found != null) {
209                     if (found.password.equals(password)) {
210                         i_clientName = userName;
211                         i_mainStream.logMsg(32, i_clientName);
212                         int blockSize = i_mainStream.blockSize();
213                         int encrypt = i_mainStream.i_handlers.i_encrypt ? 1 : 0;
214                         write(Msg.LOGIN_OK.getWriterForInts(getTransaction(), new int[] {blockSize, encrypt}));
215                         i_loggedin= true;
216                         setName("db4o server socket for client " + i_clientName);
217                     } else {
218                         write(Msg.FAILED);
219                         return false;
220                     }
221                 } else {
222                     write(Msg.FAILED);
223                     return false;
224                 }
225             }
226             return true;
227         }
228         
229         if (message.processAtServer(this)) {
230             return true;
231         }
232         
233         if (Msg.PING.equals(message)) {
234             writeOK();
235             return true;
236         }
237         
238         if(Msg.OBJECTSET_FINALIZED.equals(message)){
239             int queryResultID = ((MsgD) message).readInt();
240             queryResultFinalized(queryResultID);
241             return true;
242         }
243         
244         if (Msg.CLOSE.equals(message)) {
245             write(Msg.CLOSE);
246             getTransaction().commit();
247             i_sendCloseMessage = false;
248             getStream().logMsg(34, i_clientName);
249             return false;
250         }
251         
252         if (Msg.IDENTITY.equals(message)) {
253             respondInt((int)getStream().getID(getStream().identity()));
254             return true;
255         }
256         
257         if (Msg.CURRENT_VERSION.equals(message)){
258             long ver = 0;
259             synchronized(getStream()){
260                 ver = getStream().currentVersion();
261             }
262             write(Msg.ID_LIST.getWriterForLong(getTransaction(), ver));
263             return true;
264         }
265         
266         if (Msg.RAISE_VERSION.equals(message)) {
267             long minimumVersion = ((MsgD)message).readLong();
268             YapStream stream = getStream();
269             synchronized(stream){
270                 stream.raiseVersion(minimumVersion);
271             }
272             return true;
273         }
274         
275         if (Msg.GET_THREAD_ID.equals(message)) {
276             respondInt(i_threadID);
277             return true;
278         }
279         
280         if (Msg.SWITCH_TO_FILE.equals(message)) {
281             switchToFile(message);
282             return true;
283         }
284         
285         if (Msg.SWITCH_TO_MAIN_FILE.equals(message)) {
286             switchToMainFile();
287             return true;
288         }
289         
290         if (Msg.USE_TRANSACTION.equals(message)) {
291             useTransaction(message);
292             return true;
293         }
294         
295         return true;
296     }
297
298     private void writeOK() {
299         write(Msg.OK);
300     }
301
302     private void queryResultFinalized(int queryResultID) {
303         _queryResults.remove(queryResultID);
304     }
305
306     public void mapQueryResultToID(LazyClientObjectSetStub stub, int queryResultID) {
307         if(_queryResults == null){
308             _queryResults = new Hashtable4();
309         }
310         _queryResults.put(queryResultID, stub);
311     }
312     
313     public LazyClientObjectSetStub queryResultForID(int queryResultID){
314         return (LazyClientObjectSetStub) _queryResults.get(queryResultID);
315     }
316
317     private void switchToFile(Msg message) {
318         synchronized (i_mainStream.i_lock) {
319             String JavaDoc fileName = ((MsgD) message).readString();
320             try {
321                 closeSubstituteStream();
322                 i_substituteStream = (YapFile) Db4o.openFile(fileName);
323                 i_substituteTrans = i_substituteStream.newTransaction();
324                 i_substituteStream.configImpl().setMessageRecipient(i_mainStream.configImpl().messageRecipient());
325                 writeOK();
326             } catch (Exception JavaDoc e) {
327                 if (Debug.atHome) {
328                     System.out.println("Msg.SWITCH_TO_FILE failed.");
329                     e.printStackTrace();
330                 }
331                 closeSubstituteStream();
332                 write(Msg.ERROR);
333             }
334         }
335     }
336
337     private void switchToMainFile() {
338         synchronized (i_mainStream.i_lock) {
339             closeSubstituteStream();
340             writeOK();
341         }
342     }
343
344     private void useTransaction(Msg message) {
345         int threadID = ((MsgD) message).readInt();
346         YapServerThread transactionThread = i_server.findThread(threadID);
347         if (transactionThread != null) {
348             Transaction transToUse = transactionThread.getTransaction();
349             if (i_substituteTrans != null) {
350                 i_substituteTrans = transToUse;
351             } else {
352                 i_mainTrans = transToUse;
353             }
354             i_rollbackOnClose = false;
355         }
356     }
357     
358     private void respondInt(int response){
359         write(Msg.ID_LIST.getWriterForInt(getTransaction(), response));
360     }
361     
362     public void write(Msg msg){
363         msg.write(getStream(), i_socket);
364     }
365     
366     public YapSocket socket(){
367         return i_socket;
368     }
369     
370     
371 }
Popular Tags